您好,登录后才能下订单哦!
这篇文章主要介绍了Reactive反应式编程是什么及如何使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Reactive反应式编程是什么及如何使用文章都会有所收获,下面我们一起来看看吧。
Reactor是Reactive Programming范例的一个实现,可以概括为:
反应式编程是一种涉及数据流和变化传播的异步编程范例。这意味着可以通过所采用的编程语言轻松地表达静态(例如阵列)或动态(例如事件发射器)数据流。
作为反应式编程方向的第一步,Microsoft在.NET生态系统中创建了Reactive Extensions(Rx)库。然后RxJava在JVM上实现了响应式编程。随着时间的推移,通过Reactive Streams工作出现了Java的标准化 ,这一规范定义了JVM上的反应库的一组接口和交互规则。它的接口已经集成到父Flow类下的Java 9中。
反应式编程范例通常以面向对象的语言呈现,作为Observer设计模式的扩展。人们还可以将主要的反应流模式与熟悉的迭代器设计模式进行比较,因为在所有这些库中对Iterable- Iterator对存在双重性 。一个主要的区别是,虽然迭代器是基于拉的,但是反应流是基于推的。
使用迭代器是一种命令式编程模式,即使访问值的方法完全由其负责Iterable。实际上,开发人员可以选择何时访问next()序列中的项目。在反应流中,相当于上述对Publisher-Subscriber。但是, 当它们出现时,Publisher它会通知订阅者新的可用值,而这一推动方面是被动反应的关键。此外,应用于推送值的操作以声明方式而非命令方式表示:程序员表达计算的逻辑而不是描述其精确的控制流。
除了推送值之外,还以明确定义的方式涵盖错误处理和完成方面。A Publisher可以将新值推送到Subscriber(通过调用onNext),但也可以发出错误(通过调用onError)或完成(通过调用onComplete)。错误和完成都会终止序列。这可以概括为:
onNext x 0..N [onError | onComplete]
这种方法非常灵活。该模式支持没有值,一个值或n值的用例(包括无限的值序列,例如时钟的连续滴答)。
但是我们首先考虑一下,为什么我们首先需要这样的异步反应库?
现代应用程序可以覆盖大量并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是一个关键问题。
人们可以通过两种方式来提高计划的绩效:
并行化:使用更多线程和更多硬件资源。
在现有资源的使用方式上寻求更高的效率。
通常,Java开发人员使用阻塞代码编写程序。这种做法很好,直到出现性能瓶颈,此时需要引入额外的线程,运行类似的阻塞代码。但是,资源利用率的这种扩展会很快引入争用和并发问题。
更糟糕的是,阻止浪费资源。如果仔细观察,一旦程序涉及一些延迟(特别是I / O,例如数据库请求或网络调用),资源就会被浪费,因为线程(或许多线程)现在处于空闲状态,等待数据。
所以并行化方法不是灵丹妙药。为了获得硬件的全部功能是必要的,但是理由也很复杂并且易受资源浪费的影响。
第二种方法(前面提到过),寻求更高的效率,可以解决资源浪费问题。通过编写异步,非阻塞代码,您可以使用相同的底层资源将执行切换到另一个活动任务,然后在异步处理完成后返回到当前进程。
但是如何在JVM上生成异步代码?Java提供了两种异步编程模型:
回调:异步方法没有返回值,但需要额外的 callback参数(lambda或匿名类),在结果可用时调用它们。一个众所周知的例子是Swing的EventListener层次结构。
期货:异步方法Future立即返回。异步进程计算一个T值,但该Future对象包含对它的访问。该值不会立即可用,并且可以轮询对象,直到该值可用。例如,ExecutorService运行Callable任务使用Future对象。
这些技术是否足够好?不适用于所有用例,两种方法都有局限性。
回调难以组合在一起,很快导致难以阅读和维护的代码(称为“Callback Hell”)。
考虑一个示例:在用户界面上显示用户的前五个收藏夹,或者如果她没有收藏夹则提出建议。这通过三个服务(一个提供喜欢的ID,第二个提取喜欢的详细信息,第三个提供详细建议):
userService.getFavorites(userId, new Callback() { public void onSuccess(Listlist) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback() { public void onSuccess(Listlist) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } });
我们有基于回调的服务:一个Callback接口,其中包含在异步过程成功时调用的方法,以及在发生错误时调用的方法。
第一个服务使用喜欢的ID列表调用其回调。
如果列表为空,我们必须去suggestionService。
在suggestionService给出了一个List到第二个回调。
由于我们处理UI,我们需要确保我们的消费代码将在UI线程中运行。
我们使用Java 8 Stream将处理的建议数限制为五个,并在UI中的图形列表中显示它们。
在每个级别,我们以相同的方式处理错误:在弹出窗口中显示它们。
回到最喜欢的ID级别。如果服务返回完整列表,那么我们需要转到favoriteService获取详细Favorite对象。由于我们只需要五个,我们首先流式传输ID列表,将其限制为五个。
再一次,一个回调。这次我们得到一个完全成熟的Favorite对象,我们将其推送到UI线程内的UI。
这是很多代码,它有点难以遵循并且具有重复的部分。考虑它在Reactor中的等价物:
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
我们从最喜欢的ID流开始。
我们将它们异步转换为详细的Favorite对象(flatMap)。我们现在有一个流动Favorite。
如果流量Favorite是空的,我们会切换到后退 suggestionService。
我们最多只对最终流程中的五个元素感兴趣。
最后,我们想要处理UI线程中的每个数据。
我们通过描述如何处理数据的最终形式(在UI列表中显示)以及在出现错误(显示弹出窗口)时该怎么做来触发流程。
如果您想确保在不到800毫秒内检索到喜欢的ID,或者如果需要更长时间从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在Reactor中,它变得像timeout在链中添加运算符一样简单:
userService.getFavorites(userId) .timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
如果上面的部分发出的时间超过800毫秒,则传播错误。
如果出现错误,请回复cacheService。
链的其余部分与前面的示例类似。
尽管Java 8中带来了改进,但期货比回调要好一些,但它们在构图方面仍然表现不佳CompletableFuture。一起编排多个未来是可行但不容易的。此外,Future还有其他问题:Future通过调用get() 方法很容易结束对象的另一个阻塞情况,它们不支持延迟计算,并且它们不支持多个值和高级错误处理。
考虑另一个例子:我们得到一个ID列表,我们要从中获取一个名称和一个统计信息,然后将它们成对地组合在一起,所有这些都是异步的。
CompletableFutureids = ifhIds(); CompletableFutureresult = ids.thenComposeAsync(l -> { Streamzip = l.stream().map(i -> { CompletableFuturenameTask = ifhName(i); CompletableFuturestatTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); ListcombinationList = zip.collect(Collectors.toList()); CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); CompletableFutureallDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); Listresults = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");
我们从一个未来开始,它为我们提供了一个id要处理的值列表。
一旦得到列表,我们想要开始一些更深入的异步处理。
对于列表中的每个元素:
异步获取关联的名称。
异步获取相关任务。
结合两个结果。
我们现在有一个代表所有组合任务的期货清单。为了执行这些任务,我们需要将列表转换为数组。
将数组传递给CompletableFuture.allOf,输出Future完成所有任务后完成的数组。
棘手的一点是allOf返回CompletableFuture,所以我们重申了期货清单,通过收集结果join() (这里没有阻止,因为allOf确保期货全部完成)。
一旦触发了整个异步管道,我们就等待它被处理并返回我们可以断言的结果列表。
由于Reactor具有更多开箱即用的组合运算符,因此可以简化此过程:
Fluxids = ifhrIds(); Fluxcombinations = ids.flatMap(id -> { MononameTask = ifhrName(id); MonostatTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); Monoresult = combinations.collectList(); Listresults = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
这一次,我们从异步提供的ids(a Flux)序列开始。
对于序列中的每个元素,我们异步处理它(在body函数内部flatMap)两次。
获取相关名称。
获取相关统计信息。
异步组合2个值。
在将值List变为可用时将值聚合为a 。
在生产中,我们将继续Flux通过进一步组合或订阅它来异步处理。最有可能的是,我们会回归result Mono。由于我们在测试中,我们阻塞,等待处理完成,然后直接返回聚合的值列表。
断言结果。
Callback和Future的这些风险是相似的,并且是反应式编程与该Publisher-Subscriber对的关系。
诸如Reactor之类的反应库旨在解决JVM上“经典”异步方法的这些缺点,同时还关注一些其他方面:
可组合性和可读性
数据作为一个用丰富的运算符词汇表操纵的流程
在您订阅之前没有任何事情发生
背压或消费者向生产者发出信号表明排放率过高的能力
高级但高价值的抽象,与并发无关
通过可组合性,我们指的是编排多个异步任务的能力,使用先前任务的结果将输入提供给后续任务或以fork-join方式执行多个任务,以及将异步任务重用为更高级别系统中的分立组件。
编排任务的能力与代码的可读性和可维护性紧密相关。随着异步过程层数量和复杂性的增加,能够编写和读取代码变得越来越困难。正如我们所看到的,回调模型很简单,但其主要缺点之一是,对于复杂的进程,您需要从回调执行回调,本身嵌套在另一个回调中,依此类推。那个混乱被称为Callback Hell。正如你可以猜到的(或者从经验中得知),这样的代码很难回归并推理。
Reactor提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套最小化)。
您可以将响应式应用程序处理的数据视为在装配线中移动。反应器既是传送带又是工作站。原材料从原料(原始Publisher)中倒出,最终成为成品,准备推送给消费者(或Subscriber)。
原材料可以经历各种转换和其他中间步骤,或者是将中间件聚集在一起的较大装配线的一部分。如果在某一点出现毛刺或堵塞(也许装箱产品需要不成比例的长时间),受影响的工作站可向上游发出信号以限制原材料的流动。
在Reactor中,运算符是我们的汇编类比中的工作站。每个操作符都将行为添加到a Publisher并将上一步骤包装Publisher到新实例中。因此,整个链被链接,使得数据源自第一Publisher链并且向下移动链,由每个链转换。最终,Subscriber完成了整个过程。请记住,在Subscriber订阅a 之前没有任何事情发生Publisher,下面就会提到。
了解操作员创建新实例可以帮助您避免一个常见错误,该错误会导致您认为您的链中使用的操作员未被应用。看到这个项目的常见问题。
虽然Reactive Streams规范根本没有指定运算符,但Reactor等反应库的最佳附加值之一是它们提供的丰富的运算符。这些涉及很多方面,从简单的转换和过滤到复杂的编排和错误处理。
在Reactor中,当您编写Publisher链时,默认情况下数据不会启动。相反,您可以创建异步过程的抽象描述(这可以帮助重用和组合)。
通过订阅行为,您将Publishera 绑定到a Subscriber,从而触发整个链中的数据流。这是通过上游传播的单个request 信号在内部实现的Subscriber,一直传回源 Publisher。
上游传播信号也用于实现背压,我们在装配线中将其描述为当工作站比上游工作站处理速度慢时向线路发送的反馈信号。
Reactive Streams规范定义的真实机制非常接近于类比:订阅者可以在无限制模式下工作,让源以最快的速度推送所有数据,或者可以使用该request机制向源发送信号表明它已准备就绪处理最多的n元素。
中间操作员也可以在途中更改请求。想象一个buffer 运算符,它将元素分组为10个。如果订阅者请求1个缓冲区,则源可以生成10个元素。一些操作员还实施 预取策略,这避免了request(1)往返,并且如果在请求之前生成元素并不太昂贵,则是有益的。
这将推模型转换为推拉式混合动力,如果它们随时可用,下游可以从上游拉出n个元素。但是如果元素没有准备好,它们就会在生成时被上游推动。
在反应库的Rx家族中,人们可以区分两大类反应序列:热和冷。这种区别主要与反应流如何对订阅的用户做出反应有关:
冷序列的含义是不论订阅者在何时订阅该序列,总是能收到序列中产生的全部消息。
而与之对应的热序列,则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。
关于“Reactive反应式编程是什么及如何使用”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Reactive反应式编程是什么及如何使用”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。