您好,登录后才能下订单哦!
这篇文章主要介绍“java异步并发请求和请求合并举例分析”,在日常操作中,相信很多人在java异步并发请求和请求合并举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java异步并发请求和请求合并举例分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在做业务系统需求开发中,经常需要从其他服务获取数据,拼接数据,然后返回数据给前端使用;常见的服务调用就是通过http接口调用,而对于http,通常一个请求会分配一个线程执行,在同步调用接口的情况下,整个线程是一直被占用或者阻塞的;如果有大量的这种请求,整个系统的吞吐量就比较低,而在依赖的服务响应时间比较低的情况下,我们希望先让出cpu,让其他请求先执行,等依赖的服务请求返回结果时再继续往下执行,这时我们会考虑将请求异步化,或者将相同的请求合并,从而达到提高系统执行效率和吞吐量的目的。
目前常见的几种调用方式是同步调用,线程池+future,异步回调completableFuture;协程也是异步调用的解决方式,但java目前不支持协程;对于future方式,只能用get或者while(!isDone)轮询这种阻塞的方式直到线程执行完成,这也不是我们希望的异步执行方式,jdk8提供的completableFuture其实也不是异步的方式,只是对依赖多服务的Callback调用结果处理做结果编排,来弥补Callback的不足,从而实现异步链式调用的目的,这也是比较推荐的方式。
RpcService rpcService = new RpcService(); HttpService httpService = new HttpService(); // 假设rpc1耗时10ms Map<String, String> rpcResult1=rpcService.getRpcResult(); // 假设rpc2耗时20ms Integer rpcResult2 = httpService.getHttpResult(); // 则线程总耗时:30ms
ExecutorService executor = Executors.newFixedThreadPool(2); RpcService rpcService = new RpcService(); HttpService httpService = new HttpService(); future1 = executor.submit(() -> rpcService.getRpcResult()); future2 = executor.submit(() -> httpService.getHttpResult()); //rpc1耗时10ms Map<String, String> rpcResult1 = future1.get(300, TimeUnit.MILLISECONDS); //rpc2耗时20ms Integer rpcResult2 = future2.get(300, TimeUnit.MILLISECONDS); //则线程总耗时20ms
/** * 场景:两个接口并发异步调用,返回CompletableFuture,不阻塞主线程 * 两个服务也是异步非阻塞调用 **/ CompletableFuture future1 = service.getHttpData("http://www.vip.com/showGoods/50"); CompletableFuture future2 = service.getHttpData("http://www.vip.com/showGoods/50"); CompletableFuture future3 = future1.thenCombine(future2, (f1, f2) -> { //处理业务.... return f1 + "," + f2; }).exceptionally(e -> { return ""; });
CompletableFuture使用ForkJoinPool执行线程,在ForkJoinPool类注册ForkJoinWorkerThread线程时可以看到,ForkJoinPool里面的线程都是daemon线程(垃圾回收线程就是一个典型的deamon线程),
/** * Callback from ForkJoinWorkerThread constructor to establish and * record its WorkQueue. * * @param wt the worker thread * @return the worker's queue */ final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; #注册守护线程 wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
当主线程执行完后,jvm就会退出,所以需要考虑主线程执行完成时间和fork出去的线程执行时间,也需要考虑线程池的大小,默认为当前cpu的核数-1,可以参考下其他系统的故障记录:CompletableFuture线程池问题。
当系统遇到瞬间产生大量请求时,可以考虑将相同的请求合并,最大化利用系统IO,提高系统的吞吐量。
设计时,可以将符合条件的url请求,先收集起来,直到满足以下条件之一时进行合并发送:
收集到的请求数超过预设的最大请求数。
距离上次请求发送时长超过预设的最大时长。
实现方案有自行使用阻塞队列方式:并发环境下的请求合并,也可以考虑Hystrix:Hystrix实现请求合并/请求缓存
目前公司网关组件janus也是通过合并auth请求的方式减少网络开销,提高cpu的利用率和系统吞吐量的。
nginx同样有合并请求模块nginx-http-concat用来减少请求io,参考:nginx 合并多个js/css请求为一个请求
赖泽坤@vipshop.com
到此,关于“java异步并发请求和请求合并举例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。