您好,登录后才能下订单哦!
本篇内容主要讲解“netty服务端处理请求联合pipeline源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“netty服务端处理请求联合pipeline源码分析”吧!
在客户端接入的时候, NioMessageUnsafe
的read
方法中pipeline.fireChannelRead(readBuf.get(i))
为什么会调用到ServerBootstrap
的内部类ServerBootstrapAcceptor
中的channelRead()
方法。
客户端handler
是什么时候被添加的?
先分析第一个问题。回到netty处理客户端请求分析_1中服务端接收到accpet
事件后,进行读取的方法NioMessageUnsafe.read()
public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPipeline pipeline = pipeline(); //处理服务端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置配置 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //创建jdk底层的channel //readBuf用于临时承载读到链接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器将读到的链接进行计数 allocHandle.incMessagesRead(localRead); //连接数是否超过最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端连接 for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将创建NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略 } finally { //代码省略 } }
重点看pipeline.fireChannelRead(readBuf.get(i))
首先, 这里pipeline
是服务端channel
的pipeline
, 也就是NioServerSocketChannel
的pipeline
我们学习过pipeline
之后, 对这种写法并不陌生, 就是传递channelRead
事件, 这里通过传递channelRead
事件走到了ServerBootstrapAcceptor
的channelRead()
方法, 说明在这步之前, ServerBootstrapAcceptor
作为一个handler
添加到了服务端channel
的pipeline
中, 那么这个handler
什么时候添加的呢?
我们回顾下第一章, 初始化NioServerSocketChannel的时候, 调用了ServerBootstrap的init方法 回顾下ServerBootstrap.init
的调用链路:
ServerBootstrap.bind(8899)
---> AbstractBootstrap.doBind(final SocketAddress localAddress)
---> AbstractBootstrap.initAndRegister()
---> ServerBootstrap.init(Channel channel)
void init(Channel channel) throws Exception { //获取用户定义的选项(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //获取用户定义的属性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work线程组(4) final EventLoopGroup currentChildGroup = childGroup; //用户设置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //选项转化为Entry对象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //属性转化为Entry对象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服务端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
我们重点关注第8步, 添加服务端channel
, 这里的pipeline
, 是服务服务端channel
的pipeline
, 也就是NioServerSocketChannel
绑定的pipeline
, 这里添加了一个ChannelInitializer
类型的handler
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { //省略类体 }
我们看到其继承了ChannelInboundHandlerAdapter
, 说明是一个inbound
类型的handler
这里我们可能会想到, 添加完handler
会执行handlerAdded
, 然后在handlerAdded
方法中做了添加ServerBootstrapAcceptor
这个handler
但是, 实际上并不是这样的, 当程序执行到这里, 并没有马上执行handlerAdded
, 我们紧跟addLast
方法
最后执行到DefualtChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler)
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //判断handler是否被重复添加(1) checkMultiplicity(handler); //创建一个HandlerContext并添加到列表(2) newCtx = newContext(group, filterName(name, handler), handler); //添加HandlerContext(3) addLast0(newCtx); //是否已注册 if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); //回调用户事件 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //回调添加事件(4) callHandlerAdded0(newCtx); return this; }
首先完成了handler
的添加, 但是并没有马上执行回调
这里我们重点关注if (!registered)
这个条件判断, 其实在注册完成, registered
会变成true
, 但是走到这一步的时候NioServerSockeChannel
并没有完成注册(可以回顾第一章看注册在哪一步), 所以会进到if里并返回自身
DefualtChannelPipeline.callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; //判断是否已添加, 未添加, 进行添加, 已添加进行删除 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); //获取第一个Callback任务 PendingHandlerCallback pending = pendingHandlerCallbackHead; //如果第一个Callback任务为空 if (pending == null) { //将第一个任务设置为刚创建的任务 pendingHandlerCallbackHead = task; } else { while (pending.next != null) { pending = pending.next; } pending.next = task; } }
因我们调用这个方法的时候added
传的true
, 所以PendingHandlerCallback task
赋值为new PendingHandlerAddedTask(ctx)
PendingHandlerAddedTask
这个类, 我们从名字可以看出, 这是一个handler
添加的延迟任务, 用于执行handler
延迟添加的操作, 同样也对应一个名字为PendingHandlerRemovedTask
的类, 用于执行延迟删除handler
的操作, 这两个类都继承抽象类PendingHandlerCallback
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); }
进入super(ctx)
PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; }
在父类中, 保存了要添加的context
, 也就是ChannelInitializer
类型的包装类
PendingHandlerCallback pending = pendingHandlerCallbackHead;
这表示获取第一个PendingHandlerCallback
的任务, 其实PendingHandlerCallback
是一个单向链表, 自身维护一个PendingHandlerCallback
类型的next
, 指向下一个任务, 在DefaultChannelPipeline
这个类中, 定义了个PendingHandlerCallback
类型的引用pendingHandlerCallbackHead
, 用来指向延迟回调任务的中的第一个任务。
之后判断这个任务是为空, 如果是第一次添加handler
, 那么这里就是空, 所以将第一个任务赋值为我们刚创建的添加任务。
如果不是第一次添加handler
, 则将我们新创建的任务添加到链表的尾部, 因为这里我们是第一次添加, 所以第一个回调任务就指向了我们创建的添加handler
的任务。
完成这一系列操作之后, addLast
方法返归, 此时并没有完成添加操作。
而什么时候完成添加操作的呢?
回到在服务端channel注册时候的会走到AbstractChannel.register0方法 回顾下AbstractChannel.register0
的调用链路:
ServerBootstrap.bind(8899)
---> AbstractBootstrap.doBind(final SocketAddress localAddress)
---> AbstractBootstrap.initAndRegister()
---> config().group().register(channel)
---> SingleThreadEventLoop.register(final ChannelPromise promise)
---> AbstractChannel.register(EventLoop eventLoop, final ChannelPromise promise)
---> AbstractChannel.register0(ChannelPromise promise)
private void register0(ChannelPromise promise) { try { //做实际的注册(1) doRegister(); neverRegistered = false; registered = true; //触发事件(2) pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //触发注册成功事件(3) pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //传播active事件(4) pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //省略代码 } }
重点关注第二步pipeline.invokeHandlerAddedIfNeeded(), 这里已经通过doRegister()方法完成了实际的注册, 我们跟到该方法中
final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; callHandlerAddedForAllHandlers(); } }
这里会判断是否第一次注册, 这里返回true
, 然后会执行callHandlerAddedForAllHandlers()
方法, 我们跟进去
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; this.pendingHandlerCallbackHead = null; } //获取task PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { //执行添加handler方法 task.execute(); task = task.next; } }
这里拿到第一个延迟执行handler
添加的task
其实就是我们之前剖析过的, 延迟执行handler
添加的task
, 就是PendingHandlerAddedTask
对象
在while
循环中, 通过执行execute()
方法将handler
添加
void execute() { //获取当前eventLoop线程 EventExecutor executor = ctx.executor(); //是当前执行的线程 if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { //添加到队列 executor.execute(this); } catch (RejectedExecutionException e) { //代码省略 } } }
再进入callHandlerAdded0
方法
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); } catch (Throwable t) { //省略... } }
终于在这里, 我们看到了执行回调的方法
void init(Channel channel) throws Exception { //获取用户定义的选项(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //获取用户定义的属性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work线程组(4) final EventLoopGroup currentChildGroup = childGroup; //用户设置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //选项转化为Entry对象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //属性转化为Entry对象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服务端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
我们继续看第8步添加服务端handler
因为这里的handler
是ChannelInitializer
, 所以完成添加之后会调用ChannelInitializer
的handlerAdded
方法
跟到handlerAdded
方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //默认情况下, 会返回true if (ctx.channel().isRegistered()) { initChannel(ctx); } }
因为执行到这步服务端channel
已经完成注册, 所以会执行到initChannel
方法
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { //这段代码是否被执行过 if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { //调用之后会删除当前节点 remove(ctx); } return true; } return false; }
我们关注initChannel
这个方法, 这个方法是在ChannelInitializer
的匿名内部来实现的, 这里我们注意, 在initChannel
方法执行完毕之后会调用remove(ctx)
删除当前节点
public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
这里首先添加用户自定义的handler
, 这里如果用户没有定义, 则添加不成功, 然后, 会调用addLast
将ServerBootstrapAcceptor
这个handler
添加了进去, 同样这个handler
也继承了ChannelInboundHandlerAdapter
, 在这个handler
中, 重写了channelRead
方法, 所以, 这就是第一个问题的答案
紧接着我们看第二个问题:客户端handler是什么时候被添加的?
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //添加channelHadler, 这个channelHandler, 就是用户代码添加的ChannelInitializer child.pipeline().addLast(childHandler); //代码省略 try { //work线程注册channel childGroup.register(child).addListener(new ChannelFutureListener() { //代码省略 }); } catch (Throwable t) { forceClose(child, t); } }
这里真相可以大白了, 服务端再创建完客户端channel
之后, 将新创建的NioSocketChannel
作为参数触发channelRead
事件(可以回顾NioMessageUnsafe.read
方法, 代码这里就不贴了), 所以这里的参数msg
就是NioSocketChannel
拿到channel
时候再将客户端的handler
添加进去, 我们回顾客户端handler
的添加过程:
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } });
和服务端channel
的逻辑一样, 首先会添加ChannelInitializer
这个handler
但是没有注册所以没有执行添加handler
的回调, 将任务保存到一个延迟回调的task
中
等客户端channel
注册完毕, 会将执行添加handler
的回调, 也就是handlerAdded
方法, 在回调中执行initChannel
方法将客户端handler
添加进去, 然后删除ChannelInitializer
这个handler
因为在服务端channel
中这块逻辑已经进行了详细的剖析, 所以这边就不在赘述, 同学们可以自己跟进去走一遍流程
这里注意, 因为每创建一个NioSoeketChannel
都会调用服务端ServerBootstrapAcceptor
的channelRead
方法, 所以这里会将每一个NioSocketChannel
的handler
进行添加。
到此,相信大家对“netty服务端处理请求联合pipeline源码分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。