几个免费建立网站的平台,软件开发公司怎么找客户,wordpress判断为空,如何联系百度推广highlight: arduino-light 服务端如何处理客户端新建连接 Netty 服务端完全启动后#xff0c;就可以对外工作了。接下来 Netty 服务端是如何处理客户端新建连接的呢#xff1f; 主要分为四步#xff1a; md Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件#xff… highlight: arduino-light 服务端如何处理客户端新建连接 Netty 服务端完全启动后就可以对外工作了。接下来 Netty 服务端是如何处理客户端新建连接的呢 主要分为四步 md Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件 构造 初始化Netty 客户端 NioSocketChannel 注册 Netty 客户端 NioSocketChannel 到 Worker 工作线程中 从 Worker group 选择一个 eventLoop 工作线程注册到选择的eventLoop的Selector 注册 OP_READ 事件到 NioSocketChannel 的事件集合。 下面我们对每个步骤逐一进行简单的介绍。 接收新连接 bossGroup的EventLoop是一个线程是一个线程是一个线程。所以等服务器端启动起来以后就会执行线程的run方法逻辑。 java protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy (selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 轮询 I/O 事件 if (wakenUp.get()) { selector.wakeup(); } default: } } catch (IOException e) { rebuildSelector0(); handleLoopException(e); continue; } cancelledKeys 0; needsToSelectAgain false; final int ioRatio this.ioRatio; if (ioRatio 100) { try { // 处理 I/O 事件 processSelectedKeys(); } finally { runAllTasks(); // 处理所有任务 } } else { final long ioStartTime System.nanoTime(); try { processSelectedKeys(); // 处理 I/O 事件 } finally { final long ioTime System.nanoTime() - ioStartTime; // 处理完 I/O 事件再处理异步任务队列 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } NioEventLoop#processSelectedKeys java // processSelectedKeys private void processSelectedKeys() { if (selectedKeys ! null) { //不用JDK的selector.selectedKeys(), 性能更好(1%-2%)垃圾回收更少 processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } 服务器端监听 OP_ACCEPT 事件读取消息 NioEventLoop#processSelectedKeysOptimized Netty 中 Boss NioEventLoop 专门负责接收新的连接关于 NioEventLoop 的核心源码我们下节课会着重介绍在这里我们只先了解基本的处理流程。当客户端有新连接接入服务端时Boss NioEventLoop 会监听到 OP_ACCEPT 事件源码如下所示 java private void processSelectedKeysOptimized() { for (int i 0; i selectedKeys.size; i) { final SelectionKey k selectedKeys.keys[i]; // null out entry in the array to allow to have it GCed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] null; //呼应于channel的register中的this: //selectionKey javaChannel().register(eventLoop()// .unwrappedSelector(), 0, this);final Object a k.attachment();//因为客户端和服务器端的都继承自AbstractNioChannelif (a instanceof AbstractNioChannel) {//会进入判断processSelectedKey(k, (AbstractNioChannel) a);} else {SuppressWarnings(unchecked)NioTaskSelectableChannel task (NioTaskSelectableChannel) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GCed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i 1);selectAgain();i -1;}}
} NioEventLoop#processSelectedKey java private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop ! this || eventLoop null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps k.readyOps();if ((readyOps SelectionKey.OP_CONNECT) ! 0) {int ops k.interestOps();ops ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps SelectionKey.OP_WRITE) ! 0) {ch.unsafe().forceFlush();}//处理读请求(断开连接)或接入连接if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT))! 0 || readyOps 0) {//开始处理请求 服务器端处理的是OP_ACCEPT 接收新连接 unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
} NioMessageUnsafe#read NioServerSocketChannel 所持有的 unsafe 是 NioMessageUnsafe 类型。 我们看下 NioMessageUnsafe.read() 方法中做了什么事。 java public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config config(); final ChannelPipeline pipeline pipeline(); final RecvByteBufAllocator.Handle allocHandle unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed false; Throwable exception null; try { try { do {//readBuf一开始是一个空List//while 循环不断读取 Buffer 中的数据//创建底层SocketChannel并封装为NioSocketChannel放到readBuf返回1int localRead doReadMessages(readBuf); //执行完上面的方法 readBuf放的是新创建的NioSocketChannelif (localRead 0) {break;}if (localRead 0) {closed true;break;}allocHandle.incMessagesRead(localRead);//是否需要继续读 因为是建立连接 所以总共读取的字节数是0 不会继续进入循环//这里一次最多处理16个连接} while (allocHandle.continueReading());} catch (Throwable t) {exception t;}int size readBuf.size();//readBuf放的是新创建的NioSocketChannelfor (int i 0; i size; i ) {readPending false;// 对于服务器端NioServerSocketChannel来说 // handler有// 1.head // 2.ClientLoggingHandler // 3.ServerBootstrapAcceptor// 4.tail// 接下来就是调用服务器端的每个handler的channelRead方法 传播读取事件// 比如ClientLoggingHandler的channelRead用于打印接收到的消息到日志// 比如 serverBootStrapAcceptor的channelRead //用于向客户端的SocketChannel的pipeline添加handler//就是把服务器端方法中的childHandler都添加到客户端的NioSocketChannel的pipeline//具体看serverBootStrapAcceptor的channelRead 方法pipeline.fireChannelRead(readBuf.get(i)); }readBuf.clear();allocHandle.readComplete();// 传播读取完毕事件pipeline.fireChannelReadComplete(); // 省略其他代码
} finally {if (!readPending !config.isAutoRead()) {removeReadOp();}
} } 可以看出 read() 方法的核心逻辑就是通过 while 循环不断读取数据然后放入 List 中这里的数据其实就是新连接。每次最多处理16个。 需要重点跟进一下 NioServerSocketChannel 的 doReadMessages() 方法。 接前面NioMessageUnsafe#read 继续接着NioMessageUnsafe#read看 1.NioServerSocketChannel #doReadMessages 接收创建初始化客户端连接 java protected int doReadMessages(List buf) throws Exception { //Netty 先通过 JDK 底层的 accept() 获取 JDK 原生的 SocketChannel //想想这里 在NIO编程的时候 是做了判断 如果是OPACCEPT事件 //执行 SocketChannel sChannel ssChannel.accept(); //这里的accept方法返回的就是原生的SocketChannel SocketChannel ch SocketUtils.accept(javaChannel()); try { if (ch ! null) { //根据原生的 SocketChannel构造 Netty 客户端 NioSocketChannel //NioSocketChannel 的创建同样会完成几件事 //创建核心成员变量 id、unsafe、pipeline //注册 SelectionKey.OPREAD 事件 //设置 Channel 的为非阻塞模式 //新建客户端 Channel 的配置。 //this是NioServerSocketChannel //最后把NioSocketChannel添加到buf返回1 //super(parent, ch, SelectionKey.OP_READ); //这里不是注册读事件只是赋值 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn(Failed to create a new channel from an accepted socket., t); try { ch.close(); } catch (Throwable t2) { logger.warn(Failed to close a socket., t2); } } return 0; } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch ch; //设置读事件到NioSocketChannel this.readInterestOp readInterestOp; try { //非阻塞模式 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( Failed to close a partially initialized socket., e2); } throw new ChannelException(Failed to enter non-blocking mode., e);}
} public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction () { Override public SocketChannel run() throws IOException { // 非阻塞模式下没有连接请求时返回null return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } } 这时就开始执行第二个步骤构造 Netty 客户端 NioSocketChannel。Netty 先通过 JDK 底层的 accept() 获取 JDK 原生的 SocketChannel然后将它封装成 Netty 自己的 NioSocketChannel。 新建 Netty 的客户端 Channel 的实现原理与上文中我们讲到的创建服务端 Channel 的过程是类似的只是服务端 Channel 的类型是 NioServerSocketChannel而客户端 Channel 的类型是 NioSocketChannel。 NioSocketChannel 的创建同样会完成几件事创建核心成员变量 id、unsafe、pipeline 注册 SelectionKey.OP_READ 事件设置 Channel 的为非阻塞模式新建客户端 Channel 的配置。 成功构造客户端 NioSocketChannel 后接下来会通过 pipeline.fireChannelRead() 触发 channelRead 事件传播。对于服务端来说此时 Pipeline 的内部结构如下图所示。 2.pipeline.fireChannelRead 上文中我们提到了一种特殊的处理器 ServerBootstrapAcceptor在下面它就发挥了重要的作用。channelRead 事件会传播到 ServerBootstrapAcceptor.channelRead() 方法channelRead() 会将客户端 Channel 分配到工作线程组中去执行。具体实现如下 触发服务器端hanlder#channelRead ServerBootstrapAcceptor#channelRead java //ServerBootstrapAcceptor负责接收客户端连接 创建连接后对连接的初始化工作。 // ServerBootstrapAcceptor.channelRead() 方法 public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child (Channel) msg; //childHandler是我们自定义的EchoServer的代理类 child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { // 注册客户端 Channel到工作线程组 //1.MultithreadEventLoopGroup#register childGroup.register(child).addListener(new ChannelFutureListener() { Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } ServerBootstrapAcceptor 开始就把 msg 强制转换为 Channel。难道不会有其他类型的数据吗 因为 ServerBootstrapAcceptor 是服务端 Channel 中一个特殊的处理器而服务端 Channel 的 channelRead 事件只会在新连接接入时触发所以这里拿到的数据都是客户端新连接。 register()注册客户端 Channel java //MultithreadEventLoopGroup#register //从workGroup中选择一个EventLoop注册到channel Override public ChannelFuture register(Channel channel) { return next().register(channel); } java //io.netty.channel.nio.AbstractChannel#register0 private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration neverRegistered; //绑定选择器 doRegister(); neverRegistered false; registered true; //给客户端添加处理器 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); //NioServerSocketChannel的注册不会走进下面if(isActive())//NioSocketChannel可以走进去if(isActive())。因为accept后就active了。if (isActive()) {if (firstRegistration) {//第一次注册需要触发pipeline上的hanlder的read事件//实际上就是注册OP_ACCEPT/OP_READ事件:创建连接或者读事件//首先会进入DefaultChannelPipeLine的read方法pipeline.fireChannelActive();} else if (config().isAutoRead()) {//第二次注册的时候beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}} 客户端SocketChannel绑定selector AbstractNioChannel#doRegister java //io.netty.channel.nio.AbstractNioChannel#doRegister Override protected void doRegister() throws Exception { boolean selected false; for (;;) { try { logger.info(initial register 0); //这里的事件类型仍然是0 //attachement是NioSocketChannel selectionKey javaChannel().register (eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the canceled //SelectionKey may still be // cached and not removed because no //Select.select(..) operation was called yet. eventLoop().selectNow(); selected true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } DefaultChannelPipeline.HeadContext#read java //io.netty.channel.DefaultChannelPipeline.HeadContext#read Override public void read(ChannelHandlerContext ctx) { //实际上就是注册OP_ACCEPT/OP_READ事件:创建连接或者读事件 unsafe.beginRead(); } java Override public final void beginRead() { assertEventLoop(); if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}} java Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey this.selectionKey; if (!selectionKey.isValid()) { return; } readPending true;final int interestOps selectionKey.interestOps();//super(parent, ch, SelectionKey.OP_READ);//假设之前没有监听readInterestOp则监听readInterestOpif ((interestOps readInterestOp) 0) {//NioServerSocketChannel: readInterestOp OP_ACCEPT 1 4 16logger.info(interest ops readInterestOp);selectionKey.interestOps(interestOps | readInterestOp);}
} ServerBootstrapAcceptor 通过 childGroup.register() 方法会完成第三和第四两个步骤. 1.将 NioSocketChannel 注册到 Worker 工作线程中 2.并注册 OP_READ 事件到 NioSocketChannel 的事件集合。 在注册过程中比较有意思的一点是它会调用 pipeline.fireChannelRegistered() 方法传播 channelRegistered 事件然后再调用 pipeline.fireChannelActive() 方法传播 channelActive 事件。 兜了一圈这又会回到之前我们介绍的 readIfIsAutoRead() 方法此时它会将 SelectionKey.OP_READ 事件注册到 Channel 的事件集合。 添加自定义handler到客户端SocketChannel pipeline.invokeHandlerAddedIfNeeded 总结 java •接受连接本质 selector.select()/selectNow()/select(timeoutMillis) 发现 OP_ACCEPT 事件处理 •SocketChannel socketChannel serverSocketChannel.accept() •selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this); •selectionKey.interestOps(OP_READ); 关于服务端如何处理客户端新建连接的具体源码我在此就不继续展开了。这里留一个小任务建议你亲自动手分析下 childGroup.register() 的相关源码从而加深对服务端启动以及新连接处理流程的理解。有了服务端启动源码分析的基础再去理解客户端新建连接的过程会相对容易很多。