Reactive-MongoDB如何异步Java Driver

发布时间:2021-09-29 09:27:09 作者:柒染
来源:亿速云 阅读:204

Reactive-MongoDB如何异步Java Driver,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Reactive-MongoDB如何异步Java Driver

 一、关于 异步驱动

从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。

但实质上,使用同步驱动(Java Sync  Driver)的项目也不在少数,或许是因为先入为主的原因(同步Driver的文档说明更加的完善),又或者是为了兼容旧的 MongoDB 版本。

无论如何,由于 Reactive 的发展,未来使用异步驱动应该是一个趋势。

在使用 Async Driver 之前,需要对 Reactive 的概念有一些熟悉。

二、理解 Reactive (响应式)

响应式(Reactive)是一种异步的、面向数据流的开发方式,最早是来自于.NET 平台上的 Reactive Extensions  库,随后被扩展为各种编程语言的实现。

在著名的 Reactive Manifesto(响应式宣言) 中,对 Reactive 定义了四个特征:

Reactive-MongoDB如何异步Java Driver

在响应式宣言的所定义的这些系统特征中,无一不与响应式的流有若干的关系,于是乎就有了 2013年发起的 响应式流规范(Reactive Stream  Specification)。

https://www.reactive-streams.org/

其中,对于响应式流的处理环节又做了如下定义:

Java 平台则是在 JDK 9 版本上发布了对 Reactive Streams 的支持。

下面介绍响应式流的几个关键接口:

Publisher 是数据的发布者。Publisher 接口只有一个方法 subscribe,用于添加数据的订阅者,也就是 Subscriber。

Subscriber 是数据的订阅者。Subscriber 接口有4个方法,都是作为不同事件的处理器。在订阅者成功订阅到发布者之后,其  onSubscribe(Subscription s) 方法会被调用。

Subscription 表示的是当前的订阅关系。

当订阅成功后,可以使用 Subscription 的 request(long n) 方法来请求发布者发布 n  条数据。发布者可能产生3种不同的消息通知,分别对应 Subscriber 的另外3个回调方法。

数据通知:对应 onNext 方法,表示发布者产生的数据。

错误通知:对应 onError 方法,表示发布者产生了错误。

结束通知:对应 onComplete 方法,表示发布者已经完成了所有数据的发布。

在上述3种通知中,错误通知和结束通知都是终结通知,也就是在终结通知之后,不会再有其他通知产生。

Subscription 表示的是一个订阅关系。除了之前提到的 request 方法之外,还有 cancel 方法用来取消订阅。需要注意的是,在  cancel 方法调用之后,发布者仍然有可能继续发布通知。但订阅最终会被取消。

这几个接口的关系如下图所示:

Reactive-MongoDB如何异步Java Driver

图片出处:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

MongoDB 的异步驱动为 mongo-java-driver-reactivestreams 组件,其实现了 Reactive Stream  的上述接口。

> 除了 reactivestream 之外,MongoDB 的异步驱动还包含 RxJava 等风格的版本,有兴趣的读者可以进一步了解

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

三、使用示例

接下来,通过一个简单的例子来演示一下 Reactive 方式的代码风格:

A. 引入依赖

org.mongodb    mongodb-driver-reactivestreams    1.11.0

> 引入mongodb-driver-reactivestreams 将会自动添加 reactive-streams, bson,  mongodb-driver-async组件

B. 连接数据库

//服务器实例表List servers =newArrayList(); servers.add(newServerAddress("localhost",27018));//配置构建器MongoClientSettings.Builder settingsBuilder =MongoClientSettings.builder();//传入服务器实例 settingsBuilder.applyToClusterSettings(         builder -> builder.hosts(servers));//构建 Client 实例MongoClient mongoClient =MongoClients.create(settingsBuilder.build());

C. 实现文档查询

//获得数据库对象MongoDatabase database = client.getDatabase(databaseName);//获得集合MongoCollection collection = database.getCollection(collectionName);//异步返回PublisherFindPublisher publisher = collection.find();//订阅实现 publisher.subscribe(newSubscriber(){     @Override     publicvoid onSubscribe(Subscription s){         System.out.println("start...");         //执行请求         s.request(Integer.MAX_VALUE);      }     @Override     publicvoid onNext(Document document){         //获得文档         System.out.println("Document:"+ document.toJson());     }      @Override     publicvoid onError(Throwable t){         System.out.println("error occurs.");     }      @Override     publicvoid onComplete(){         System.out.println("finished.");     }});

注意到,与使用同步驱动不同的是,collection.find()方法返回的不是 Cursor,而是一个  FindPublisher对象,这是Publisher接口的一层扩展。

而且,在返回 Publisher 对象时,此时并没有产生真正的数据库IO请求。真正发起请求需要通过调用  Subscription.request()方法。

在上面的代码中,为了读取由 Publisher 产生的结果,通过自定义一个Subscriber,在onSubscribe 事件触发时就执行  数据库的请求,之后分别对 onNext、onError、onComplete进行处理。

尽管这种实现方式是纯异步的,但在使用上比较繁琐。试想如果对于每个数据库操作都要完成一个Subscriber 逻辑,那么开发的工作量是巨大的。

为了尽可能复用重复的逻辑,可以对Subscriber的逻辑做一层封装,包含如下功能:

代码如下:

publicclassObservableSubscriberimplementsSubscriber{      //响应数据     privatefinalList received;     //错误信息     privatefinalList errors;     //等待对象     privatefinalCountDownLatch latch;     //订阅器     privatevolatileSubscription subscription;     //是否完成     privatevolatileboolean completed;      publicObservableSubscriber(){         this.received =newArrayList();         this.errors =newArrayList();         this.latch =newCountDownLatch(1);     }      @Override     publicvoid onSubscribe(finalSubscription s){         subscription = s;     }      @Override     publicvoid onNext(final T t){         received.add(t);     }      @Override     publicvoid onError(finalThrowable t){         errors.add(t);         onComplete();     }      @Override     publicvoid onComplete(){         completed =true;         latch.countDown();     }      publicSubscription getSubscription(){         return subscription;     }      publicList getReceived(){         return received;     }      publicThrowable getError(){         if(errors.size()>0){             return errors.get(0);         }         returnnull;     }      publicboolean isCompleted(){         return completed;     }      /**      * 阻塞一定时间等待结果      *      * @param timeout      * @param unit      * @return      * @throws Throwable      */     publicListget(finallong timeout,finalTimeUnit unit)throwsThrowable{         return await(timeout, unit).getReceived();     }      /**      * 一直阻塞等待请求完成      *      * @return      * @throws Throwable      */     publicObservableSubscriber await()throwsThrowable{         return await(Long.MAX_VALUE,TimeUnit.MILLISECONDS);     }      /**      * 阻塞一定时间等待完成      *      * @param timeout      * @param unit      * @return      * @throws Throwable      */     publicObservableSubscriber await(finallong timeout,finalTimeUnit unit)throwsThrowable{         subscription.request(Integer.MAX_VALUE);         if(!latch.await(timeout, unit)){             thrownewMongoTimeoutException("Publisher onComplete timed out");         }         if(!errors.isEmpty()){             throw errors.get(0);         }         returnthis;     }}

借助这个基础的工具类,我们对于文档的异步操作就变得简单多了。

比如对于文档查询的操作可以改造如下:

ObservableSubscriber subscriber =newObservableSubscriber(); collection.find().subscribe(subscriber);//结果处理 subscriber.get(15,TimeUnit.SECONDS).forEach( d ->{     System.out.println("Document:"+ d.toJson());});

当然,这个例子还有可以继续完善,比如使用 List 作为缓存,则要考虑数据量的问题,避免将全部(或超量) 的文档一次性转入内存。

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

推荐阅读:
  1. java连接mysql 8.0的问题
  2. java读取properties文件

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

mongodb

上一篇:如何使用PHP global全局变量

下一篇:如何编写YII2框架扩展插件yii2-gird

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》