您好,登录后才能下订单哦!
接上一篇 ServerBootstrap的初始化
https://blog.51cto.com/483181/2119149
先看下调用的源代码
public void bind(int port) throws Exception {
        ...
        try {
            ...
            ChannelFuture f = b.bind(port).sync(); //bind过程
            ...
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
}
从上面代码可以看出几点:
继续看doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); //1. init和register
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        if (regFuture.isDone()) { 
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
上面这一段代码包含的东西就比较多了,先来看 initAndRegister
顾名思义,这个方法包含初始化和注册两个步骤,代码如下:
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            ...
        }
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
从上面代码,我们可以看到几点:
public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
        }
    }   
}       
它的newChannel方法也是非常的简单,直接实例化传入的channel对象,也就是NioServerSocketChannel (可以看上一篇初始化的分析)
代码如下:
ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
我们先看看NioServerSocketChannel的实现
先看下NioServerSocketChannel的继承关系
NioServerSocketChannel提供了一个无参构造函数,然后分别有SelectorProvider,ServerSocketChannel的构造函数,如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
        }
    }
    private final ServerSocketChannelConfig config;
    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
无参构造函数里面调用newSocket(xx),参数是SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
先看看SelectorProvider.provider()
private static SelectorProvider provider = null;
public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
                ...
        }
    }
可以看到provider是个单例,不知道大家是否记得上上一篇文章(NioEventLoopGroup实例化)分析的时候也有provider,类型是KQueueSelectorProvider
具体可以看: https://blog.51cto.com/483181/2118817
回到newSocket里面,调用的是provider.openServerSocketChannel()
代码是SelectorProviderImpl里面,返回的是 ServerSocketChannel
public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }
得到ServerSocketChannel之后,继续调用构造函数
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
这个构造方法里面做了两件事
看它的父类构造函数是怎么实现的
首先是AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
        }
    }
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
然后继续看AbstractNioChannel的父类构造方法,也就是AbstractChannel
private final ChannelId id;
protected abstract AbstractUnsafe newUnsafe();
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
}
可以看到这几点:
先看unsafe的初始化
在AbstractChannel里面,它是一个抽象类
protected abstract AbstractUnsafe newUnsafe();
实现类在子类AbstractNioMessageChannel里面,如下,类型是NioMessageUnsafe
@Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }
NioMessageUnsafe代码后面再看。
继续看pipeline的初始化,初始化了一个 DefaultChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
在DefaultChannelPipeline里面初始化了一个head和tail,分别是HeadContext和TailConext类型,而且head和tail组成双向链表。
head和tail的区别之一就是inbound和outbound值是相反的,如下:
| 节点 | inbound | outbound | 
|---|---|---|
| head | false | true | 
| tail | true | false | 
HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }               
借一张图显示下ChannelInBound和ChannelOutBound,如下。head是发送出去的入口,tail是接收消息的入口。

另外我们来看一下添加一个ChannelHandler的流程,比如addLast
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            newCtx = newContext(group, filterName(name, handler), handler);
            addLast0(newCtx);
            ...
                    return this;
    }
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }       
private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }       
首先它初始化了一个DefaultChannelHandlerContext对象,里面封装了要add的channelHandler,这个很重要,在Netty的pipeLine里面,都是通过ChannelHandlerContext来描述的,不是直接添加channelHandler。
那,我们来总结下NioServerSocketChannel的初始化过程:
1. NioServerSocketChannel提供了一个无参构造函数,里面SelectorProvider DEFAULT_SELECTOR_PROVIDER,它是一个单例,类型是KQueueSelectorProvider。
2. 我们调用KQueueSelectorProvider.openServerSocketChannel()方法,得到一个ServerSocketChannel
3. 我们用生成的ServerSocketChannel对象创建了一个ServerSocketChannelConfig config,具体是NioServerSocketChannelConfig对象,存在NioServerSocketChannel里面
4. 我们用生成的ServerSocketChannel调用它的父类构造函数,先来到了AbstractNioChannel
5. 在AbstractNioChannel会把ServerSocketChannel存起来,变量是ch,然后把channel设置成非阻塞。
6. AbstractNioChannel还会把readInterestOp存起来,类型是SelectionKey.OP_ACCEPT
7. 继续调用父类构造函数,来到AbstractChannel
8. AbstractChannel里面的parent设置成null
9. AbstractChannel初始化channel id
10. AbstractChannel初始化unsafe,类型是NioMessageUnsafe.
11. AbstractChannel初始化pipeline,类型是DefaultChannelPipeline, 每个Channel都有一个自己的Pipeline
看完NioServerSocketChannel的实例化方法后,我们继续往下看init
abstract void init(Channel channel) throws Exception;
AbstractBootstrap里面的init(channel)方法是一个抽象方法,参数是Channel类型,其实就是上一步实例化好的NioServerSocketChannel对象。
具体实现方法在它的子类ServerBootstrap和Bootstrap(给客户端启动使用的),那我们是分析服务端的代码,所以看ServerBootstrap里面的实现。
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) { //1. 设置options
            setChannelOptions(channel, options, logger);
        }
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { //设置attr属性
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
先来看设置options
final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) { //1. 设置options
            setChannelOptions(channel, options, logger);
        }
static void setChannelOptions(
            Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
        for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
            setChannelOption(channel, e.getKey(), e.getValue(), logger);
        }
    }
private static void setChannelOption(
            Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
        try {
            if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
            }
        } catch (Throwable t) {}
    }       
这段代码我们这样看
 b.xxxx.
    .option(ChannelOption.SO_BACKLOG, 100)
`
@Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);
        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_BACKLOG) {
            setBacklog((Integer) value);
        } else {
            return super.setOption(option, value);
        }
        return true;
    }
父类 DefaultChannelConfig.java
public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);
        if (option == CONNECT_TIMEOUT_MILLIS) {
            setConnectTimeoutMillis((Integer) value);
        } else if (option == MAX_MESSAGES_PER_READ) {
            setMaxMessagesPerRead((Integer) value);
        } else if (option == WRITE_SPIN_COUNT) {
            setWriteSpinCount((Integer) value);
        } else if (option == ALLOCATOR) {
            setAllocator((ByteBufAllocator) value);
        } else if (option == RCVBUF_ALLOCATOR) {
            setRecvByteBufAllocator((RecvByteBufAllocator) value);
        } else if (option == AUTO_READ) {
            setAutoRead((Boolean) value);
        } else if (option == AUTO_CLOSE) {
            setAutoClose((Boolean) value);
        } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
            setWriteBufferHighWaterMark((Integer) value);
        } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
            setWriteBufferLowWaterMark((Integer) value);
        } else if (option == WRITE_BUFFER_WATER_MARK) {
            setWriteBufferWaterMark((WriteBufferWaterMark) value);
        } else if (option == MESSAGE_SIZE_ESTIMATOR) {
            setMessageSizeEstimator((MessageSizeEstimator) value);
        } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
            setPinEventExecutorPerGroup((Boolean) value);
        } else {
            return false;
        }
        return true;
    }       
根据传入的属性不行,用不同的方法进行设置,这些属性的值大家可以去单独百度,可能不同的环境配置不同的值对服务器性能有好处。
那继续往下面看,设置attr
setAttr是封装了一个Attribute的类,然后存储key,value,大家具体要看的话,可以看DefaultAttributeMap.java
继续往下看
p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
上面这段代码,我们一步步看
b..handler(new LoggingHandler(LogLevel.INFO));
所以 pipeline.addLast(handler); 就是把我们设置的handler添加到pipeline里面。
ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
ServerBootstrapAcceptor是把客户端连接的channel从bossGroup转移到workGroup,代码如下:
ServerBootstrap.java
@Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
            setChannelOptions(child, childOptions, logger);
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
上面这段代码把客户端的channel读进来转换成一个channel类型,然后调用childGroup,然后把channel注册进去,这样workGroup就接手了channel后面的事情。
那init就看完了,总结一下init做的事情
其实看到这里,我们会发现init还只是初始化参数,把handler添加到pipeLine里面,做好一切准备,并没有bind服务器端口。
那我们继续看
ChannelFuture regFuture = config().group().register(channel);
先继续贴一下initAndRegister的代码,因为上面讲的东西有点多,大家可能忘记initAndRegister里面的代码了。
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); //1. NioServerSocketChannel的初始化已经讲了
            init(channel); //2. init过程已经讲了
        } catch (Throwable t) {
        }
        ChannelFuture regFuture = config().group().register(channel); //3. 现在讲register
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
如同上面的注释,我们讲register过程
EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
由于NioEventLoopGroup继承自MultithreadEventLoopGroup,所以调用的是MultithreadEventLoopGroup的register(channel)方法,如下:
public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
@Override
    public EventExecutor next() {
        return chooser.next();
    }       
那next()又是什么呢?在上篇 NioEventLoopGroup实例化 里面我们分析了,NioEventLoopGroup里面初始化了跟传入线程数目相同的NioEventLoop对象,而next()方法有两种算法选出下一个NioEventLoop对象是什么。
这两种算法是PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser,所以我们就可以知道继续会调用NioEventLoop对象的register(channel)对象。
而NioEventLoop类并没有实现register(channel)方法,它继承自SingleThreadEventLoop,它里面有实现register(channel)方法,如下:
public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
这个方法里面实例化了一个DefaultChannelPromise对象,它其实就是保存channel和当前的NioEventLoop对象,做了一层封装而已,如下:
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = checkNotNull(channel, "channel");
    }
public DefaultPromise(EventExecutor executor) {
        this.executor = checkNotNull(executor, "executor");
    }       
所以我们可以暂时不管它,继续往下面走.
@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
调用的是unsafe.register(this, promise)
那unsafe是什么对象呢?从上面2.6可以看到unsafe()初始化的是NioMessageUnsafe对象
protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }
由于NioMessageUnsafe并没有重写register(EventLoop eventLoop,  ChannelPromise promise)方法,所以追踪它的父类,最后在AbstractUnsafe里面看到了register(EventLoop eventLoop,  ChannelPromise promise),如下:
先附上NioMessageUnsafe的继承关系图:
AbstractUnsafe.java
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }
都会走到register0(promise)这个方法里面,继续看register0(promise)
private void register0(ChannelPromise promise) {
            try {
                ...
                boolean firstRegistration = neverRegistered;
                doRegister(); //1. 
                neverRegistered = false;
                registered = true;
                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                ...
            }
        }
先看doRegister
这个方法在AbstractChannel里面,是个空实现
/**
     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
     *
     * Sub-classes may override this method
     */
    protected void doRegister() throws Exception {
        // NOOP
    }
在AbstractNioChannel里面有重写
@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
            }
        }
    }
protected SelectableChannel javaChannel() {
        return ch;
}       
public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
}
ServerSocketChannelImpl是JDK提供的类,那javaChannel().register(xxx)就是调用JDK nio的方法实现register,那就不继续深入下去了。
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

那我们可以这样理解,上面这段代码是把一个Selector对象注册到Java的 Channel里面,这个Channel和我们上面讲的Netty Channel不是一个东西。
继续看register0()
private void register0(ChannelPromise promise) {
            try {
                ...
                doRegister(); //1. 把selector注册到Java channel, ops = 0
                ...
                pipeline.fireChannelRegistered(); //2. 通知handler channel已经注册
                                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
                ...
            } catch (Throwable t) {
                ...
            }
        }
pipeline里面维护channelHandler的列表,通过链表的方法,如DefaultChannelPipeline.java里面
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
然后通知channel registered,如果channelHandler有重写channelRegitstered(ChannelHandlerContext ctx)的话,就会被回调。如LoggingHandler就会打印

然后判断isActive(),isActive()是一个多态方法,对于服务器,它是判断监听是否启动;
NioServerSocketChannle.java
@Override
    public boolean isActive() {
        return javaChannel().socket().isBound();
    }
对于客户端,它是判断TCP连接是否完成
NioSocketChannel.java
@Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }
我们这里直讲服务器,如果isActive(),那么就会调用 pipeline.fireChannelActive(); 通知channelHander已经active,这样就会回调他们的channelActive方法。
继续看pipeline.fireChannelActive();
DefaultChannelPipeline.java
@Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
AbstractChannelHandlerContext.invokeChannelActive方法就不看了,就是调用参数的channelActive。由于参数是head,那么我们去看channelActive方法。
DefaultChannelPipeline.java
@Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
            readIfIsAutoRead();
        }
private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }               
调用的是channel.read(),channel是NioServerSocketChannel,它的实现是在父类AbstractChannel.java里面
@Override
    public Channel read() {
        pipeline.read();
        return this;
    }
DefaultChannelPipeline.java
@Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }
AbstractChannelHandlerContext.java
@Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            ...
        }
        return this;
    }
private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }       
首先要寻找findContextOutbound,由于head的inbound=false,outbound=true,所以next=head,那么就是调用head的read方法,如下:
DefaultChannelPipeline.java
@Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }
AbstractChannel.java
@Override
        public final void beginRead() {
            assertEventLoop();
            if (!isActive()) {
                return;
            }
            try {
                doBeginRead();
            } catch (final Exception e) {
                ...
            }
        }
直接看doBeginRead()
AbstractNioChannel.java
@Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
还记得我们初始化NioServerSocketChannel的时候,我们传给父类的readInterestOp吗?没错,就是SelectionKey.OP_ACCEPT,如下:
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
                ...
}               
上面doReadBegin就是把我们设置的readInterestOp重新设置到java selector上面,代表我们监听的类型是SelectionKey.OP_ACCEPT,不在是最开始的0了。
到这里,initAndRegister方法就基本讲完了,再贴一次它的代码,加深下印象。
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); //1. 实例化NioServerSocketChannel
            init(channel); //2. 初始化
        } catch (Throwable t) {
        }
        ChannelFuture regFuture = config().group().register(channel); //3. 注册selector到Java channel上面,注册类型是0
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
我们再来回忆一下initAndRegister方法
1. 实例化NioServerSocketChannel对象,channelFactory.newChannel()
a. 传入父类的ops是SelectionKey.OP_ACCEPT
b. 它的父类AbstractNioChannel把channel设置成非阻塞,然后把SelectionKey.OP_ACCEPT存起来
c. 父类AbstractChannel初始化了ChannelId
d. AbstractChannel初始化了unsafe,类型是NioMessageUnsafe。
e. AbstractChannel初始化了pipeline,类型是DefaultChannelPipeline,每个channel都有自己的pipleline,它维护了channelHandler列表,如果有事件发生,那么pipeline就负责把事件从头传到尾。
2. init方法     
a. 它是在子类ServerBootstrap里面实现,子类Bootstrap实现的是客户端的。
b. setOptions()设置属性,类型有很多,不同的业务场景可以设置不同的属性。
c. addLast把我们设置的channelHandler添加到pipeline
d. 实例化了一个ServerBootstrapAcceptor,里面封装了childChannel,也添加到pipeline里面
3. register
a.  register调用的是bossGroup NioEventLoopGroup的register方法,NioEventLoopGroup  regitster方法调用的next().regitster,next()调用chooser.next.
b. chooser有两种PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser,它们负责选择NioEventLoopGroup里面下一个NioEventLoop(NioEventLoopGroup里面有nThreads个NioEventLoop,nThreads表示线程数,默认是cpu*2)
c. NioEventLoop.register调用的是它的父类SingleThreadEventLoop.register,所以它调用的是unsafe.register。从上面的初始化就可以知道,unsafe指的是NioMessageUnsafe,所以调用的是NioMessageUnsafe.register
d. NioMessageUnsafe并没有实现register,所以调用的是它的父类AbstractUnsafe.regitster,然后调用register0
e. 在doRegitster里面把selector注册到Java的channel,key=0
f.  调用pipeline.fireChannelRegistered(),通知pipeline维护的channelHander,channel已经注册了,回调了它们的channelRegitstered方法。
那initAndRegister就讲完了,bind过程还没有结束,因为篇幅有点多了,下一篇继续介绍doBind0:
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); //1. 这一篇的内容
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise); //2. 下一篇讲doBind0()
            return promise;
        } else {
            ...
            });
            return promise;
        }
    }
													免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。