python电商网站开发,怎么提高seo排名,企业电子商务网站建设和一般,上海网站域名注册价格文章目录 单线程Reactor模式多线程Reactor模式Reactor模式中IO事件的处理流程Netty中的通道ChannelNetty中的反应器ReactorNetty中的处理器HandlerNetty中的通道Channel和处理器Handler的协作组件Pipeline Reactor(反应器)模式是高性能网络编程在设计和架构方面的基础模式.Doug… 文章目录 单线程Reactor模式多线程Reactor模式Reactor模式中IO事件的处理流程Netty中的通道ChannelNetty中的反应器ReactorNetty中的处理器HandlerNetty中的通道Channel和处理器Handler的协作组件Pipeline Reactor(反应器)模式是高性能网络编程在设计和架构方面的基础模式.Doug Lea大师在文章“Scalable IOin Java”中对Reactor模式的定义 Reactor 模式是由Reactor线程、Handlers处理器两大角色组成两大角色的职责分别如下 1 Reactor 线程的职责负责响应IO事件并且分发到Handlers处理器。 2Handlers处理器的职责与IO事件或者选择键绑定负责IO事件的处理完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写到通道等。 我觉得Reactor模式类似事件驱动模式当有事件触发时事件源会将事件分发到Handler处理器由Handler负责事件处理。Reactor模式中的反应器角色类似于事件驱动模式中的事件分发器Dispatcher角色。
单线程Reactor模式
单线程Reactor模式是Reactor和Handlers 处于一个处于一个线程中执行。如图所示
在单线程Reactor模式中需要将attach和attachment结合使用 在选择键注册完成之后调用attach()方法将Handler实例绑定到选择键当IO事件发生时调用attachment()方法可以从选择键取出Handler实例将事件分发到Handler处理器中完成业务处理。
单线程Reactor模式逻辑示例EchoServerReactor代码如下
public class EchoServerReactor implements Runnable{Selector selector;ServerSocketChannel serverSocket;public EchoServerReactor() throws IOException {//Reactor初始化selector Selector.open();serverSocket ServerSocketChannel.open();InetSocketAddress address new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT);//非阻塞serverSocket.configureBlocking(false);//分步处理,第一步,接收accept事件SelectionKey sk serverSocket.register(selector,0,new AcceptorHandler());// SelectionKey.OP_ACCEPTserverSocket.socket().bind(address);Logger.info(服务端已经开始监听address);sk.interestOps(SelectionKey.OP_ACCEPT);}//轮询和分发事件Overridepublic void run() {try{while (!Thread.interrupted()){selector.select();SetSelectionKey selected selector.selectedKeys();IteratorSelectionKey it selected.iterator();while (it.hasNext()){//反应器负责dispatch收到的事件SelectionKey sk it.next();dispathch(sk);}selected.clear();}} catch (IOException e) {e.printStackTrace();}}void dispathch(SelectionKey key){Runnable r (Runnable) key.attachment();//调用之前绑定的选择键的处理器对象if (r!null){r.run();}}/*** 新连接处理器完成新连接的接收工作为新连接创建一个负责数据传输的Handler*/class AcceptorHandler implements Runnable{Overridepublic void run() {//接受新连接try {SocketChannel channel (SocketChannel) serverSocket .accept();//需要为新连接创建一个输入输出的Handlerif(channel!null){new EchoHandler(selector,channel);}} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) throws IOException {new Thread(new EchoServerReactor()).start();}
}
EchoHandler 代码如下
public class EchoHandler implements Runnable{final SocketChannel channel;final SelectionKey sk;final ByteBuffer byteBuffer ByteBuffer.allocate(1024);//处理器实例的状态发送和接收一个连接对应一个处理器实例static final int RECIEVING 0, SENDING 1;int state RECIEVING;EchoHandler(Selector selector, java.nio.channels.SocketChannel c) throws IOException {channel c;//设置为非阻塞模式c.configureBlocking(false);//仅仅取得选择键绑定事件处理器// 后设置感兴趣的IO事件sk channel.register(selector, 0);//将Handler作为选择键的附件sk.attach(this);//第二步,注册Read就绪事件sk.interestOps(SelectionKey.OP_READ);// 唤醒事件查询线程在单线程模式下这里没啥意义selector.wakeup();}Overridepublic void run() {try{if(state SENDING){//发送状态把数据写入连接通道channel.write(byteBuffer);//byteBuffer切换成写模式写完后就准备开始从通道度byteBuffer.clear();//注册read就绪时间开始接收客户端数据sk.interestOps(SelectionKey.OP_READ);//修改状态进入接收状态state RECIEVING;}else if (state RECIEVING){//接收状态从通道读取数据int length 0;while ((length channel.read(byteBuffer))0){Logger.info(new String(byteBuffer.array(),0,length));}//读完后翻转byteBuffer的读写模式byteBuffer.flip();//准备写数据到通道注册write就绪事件sk.interestOps(SelectionKey.OP_WRITE);//注册完成后进入发送状态state SENDING;}//处理结束了这里不能关闭select key 需要重复使用功能sk.cancel} catch (IOException e) {e.printStackTrace();}}
}NioDemoConfig 代码如下 public class NioDemoConfig extends ConfigProperties {static ConfigProperties singleton new NioDemoConfig(system.properties);private NioDemoConfig(String fileName){super(fileName);super.loadFromFile();}public static final String SOCKET_SERVER_IP singleton.getValue(socket.server.ip);public static final int SOCKET_SERVER_PORT singleton.getIntValue(socket.server.port);}单线程Reactor模式是基于Java的NIO实现的Reactor和Handler都在同一条线程中执行。这样带来了一个问题当其中某个Handler阻塞时会导致其他所有的Handler都得不到执行。在这种场景下被阻塞的Handler不仅仅负责输入和输出处理的传输处理器还包括负责新连接监听的AcceptorHandler处理器可能导致服务器无响应。这是一个非常严重的缺陷导致单线程反应器模型在生产场景中使用得比较少。
多线程Reactor模式
多线程版本的Reator模式如下 1将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样业务处理线程与负责新连接监听的反应器线程就能相互隔离避免服务器的连接监听受到阻塞。 2如果服务器为多核的CPU可以将反应器线程拆分为多个子反应器SubReactor线程同时引入多个选择器并且为每一个SubReactor引入一个线程一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力也大大提升了反应器管理大量连接或者监听大量传输通道的能力。
多线程Reactor模式的逻辑模型如下
核心代码MultiThreadEchoServerReactor 如下
public class MultiThreadEchoServerReactor {ServerSocketChannel serverSocket;AtomicInteger next new AtomicInteger(0);Selector bossSelector null;Reactor bossReactor null;//selectors集合,引入多个selector选择器Selector[] workSelectors new Selector[2];//引入多个子反应器Reactor[] workReactors null;MultiThreadEchoServerReactor() throws IOException {//初始化多个selector选择器bossSelector Selector.open();// 用于监听新连接事件workSelectors[0] Selector.open(); // 用于监听read、write事件workSelectors[1] Selector.open(); // 用于监听read、write事件serverSocket ServerSocketChannel.open();InetSocketAddress address new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT);serverSocket.socket().bind(address);serverSocket.configureBlocking(false);//非阻塞//bossSelector,负责监控新连接事件, 将 serverSocket注册到bossSelectorSelectionKey sk serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);//绑定Handler新连接监控handler绑定到SelectionKey选择键sk.attach(new AcceptorHandler());//bossReactor反应器处理新连接的bossSelectorbossReactor new Reactor(bossSelector);//第一个子反应器一子反应器负责一个worker选择器Reactor workReactor1 new Reactor(workSelectors[0]);//第二个子反应器一子反应器负责一个worker选择器Reactor workReactor2 new Reactor(workSelectors[1]);workReactors new Reactor[]{workReactor1, workReactor2};}private void startService() {// 一子反应器对应一条线程new Thread(bossReactor).start();new Thread(workReactors[0]).start();new Thread(workReactors[1]).start();}//反应器class Reactor implements Runnable {//每条线程负责一个选择器的查询final Selector selector;public Reactor(Selector selector) {this.selector selector;}Overridepublic void run() {try {while (!Thread.interrupted()) {//单位为毫秒selector.select(1000);SetSelectionKey selectedKeys selector.selectedKeys();if (null selectedKeys || selectedKeys.size() 0) {continue;}IteratorSelectionKey it selectedKeys.iterator();while (it.hasNext()) {//Reactor负责dispatch收到的事件SelectionKey sk it.next();dispatch(sk);}selectedKeys.clear();}} catch (IOException ex) {ex.printStackTrace();}}void dispatch(SelectionKey sk) {Runnable handler (Runnable) sk.attachment();//调用之前attach绑定到选择键的handler处理器对象if (handler ! null) {handler.run();}}}// Handler:新连接处理器class AcceptorHandler implements Runnable {Overridepublic void run() {try {SocketChannel channel serverSocket.accept();Logger.info(接收到一个新的连接);if (channel ! null) {int index next.get();Logger.info(选择器的编号 index);Selector selector workSelectors[index];new MultiThreadEchoHandler(selector, channel);}} catch (IOException e) {e.printStackTrace();}if (next.incrementAndGet() workSelectors.length) {next.set(0);}}}public static void main(String[] args) throws IOException {MultiThreadEchoServerReactor server new MultiThreadEchoServerReactor();server.startService();}
}MultiThreadEchoHandler 代码如下
public class MultiThreadEchoHandler implements Runnable {final SocketChannel channel;final SelectionKey sk;final ByteBuffer byteBuffer ByteBuffer.allocate(1024);static final int RECIEVING 0, SENDING 1;int state RECIEVING;//引入线程池static ExecutorService pool Executors.newFixedThreadPool(4);MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {channel c;channel.configureBlocking(false);channel.setOption(StandardSocketOptions.TCP_NODELAY, true);//仅仅取得选择键后设置感兴趣的IO事件sk channel.register(selector, 0);//将本Handler作为sk选择键的附件方便事件dispatchsk.attach(this);//向sk选择键注册Read就绪事件sk.interestOps(SelectionKey.OP_READ);//唤醒 查询线程使得OP_READ生效selector.wakeup();Logger.info(新的连接 注册完成);}Overridepublic void run() {//异步任务在独立的线程池中执行//提交数据传输任务到线程池//使得IO处理不在IO事件轮询线程中执行在独立的线程池中执行pool.execute(new AsyncTask());}//异步任务不在Reactor线程中执行//数据传输与业务处理任务不在IO事件轮询线程中执行在独立的线程池中执行public synchronized void asyncRun() {try {if (state SENDING) {//写入通道channel.write(byteBuffer);//写完后,准备开始从通道读,byteBuffer切换成写模式byteBuffer.clear();//写完后,注册read就绪事件sk.interestOps(SelectionKey.OP_READ);//写完后,进入接收的状态state RECIEVING;} else if (state RECIEVING) {//从通道读int length 0;while ((length channel.read(byteBuffer)) 0) {Logger.info(new String(byteBuffer.array(), 0, length));}//读完后准备开始写入通道,byteBuffer切换成读模式byteBuffer.flip();//读完后注册write就绪事件sk.interestOps(SelectionKey.OP_WRITE);//读完后,进入发送的状态state SENDING;}//处理结束了, 这里不能关闭select key需要重复使用//sk.cancel();} catch (IOException ex) {ex.printStackTrace();}}//异步任务的内部类class AsyncTask implements Runnable {Overridepublic void run() {MultiThreadEchoHandler.this.asyncRun();}}}Reactor模式中IO事件的处理流程
一个IO事件从操作系统底层产生后在Reactor模式中的处理流程如下所示
Reactor模式中IO事件的处理流程大致分为4步具体如下 step1通道注册。IO事件源于通道ChannelIO是和通道对应于底层连接而言强相关的。一个IO事件一定属于某个通道。如果要查询通道的事件首先就要将通道注册到选择器。 step2查询事件。在Reactor模式中一个线程会负责一个反应器或者SubReactor子反应器不断地轮询查询选择器中的IO事件选择键。 step3事件分发。如果查询到IO事件则分发给与IO事件有绑定关系的Handler业务处理器。 step4完成真正的IO操作和业务处理这一步由Handler业务处理器负责。
其中step1和step2是java NIO的功能。
Netty中的通道Channel
Reactor模式和通道紧密相关反应器的查询和分发的IO事件都来自于Channel组件 而Channel组件也是Netty中非常重要的组件Netty中不直接使用java NIO的Channel组件对Channel组件进行了自己封装对于每一种通信连接协议netty都实现了自己的通道每一种协议基本上偶有NIO和OIO两个版本。
对于不同协议Netty中常见的通道类型如下
NioSocketChannel异步非阻塞TCP Socket传输通道。OioSocketChannel同步阻塞式TCP Socket传输通道。NioServerSocketChannel异步非阻塞TCPSocket服务端监听通道。OioServerSocketChannel同步阻塞式TCPSocket服务端监听通道。NioDatagramChannel异步非阻塞的UDP传输通道。OioDatagramChannel同步阻塞式UDP传输通道。NioSctpChannel异步非阻塞Sctp传输通道。OioSctpChannel同步阻塞式Sctp传输通道。NioSctpServerChannel异步非阻塞Sctp服务端监听通道。OioSctpServerChannel同步阻塞式Sctp服务端监听通道。
在Netty的NioSocketChannel内部封装了一个Java NIO的SelectableChannel成员通过对该内部的Java NIO通道的封装对Netty的NioSocketChannel通道上的所有IO操作最终都会落地到Java NIO的SelectableChannel底层通道。NioSocketChannel的类结构图如下 Netty中的反应器Reactor
Netty中的反应器组件有多个实现类这些实现类与其通道类型相互匹配。对应于NioSocketChannel通道Netty的反应器类为NioEventLoopNIO事件轮询。 NioEventLoop类有两个重要的成员属性一个是Thread线程类的成员一个是Java NIO选择器的成员属性。NioEventLoop的继承关系和主要成员属性如下图:
从上图可知一个NioEventLoop拥有一个线程负责一个Java NIO选择器的IO事件轮询。理论上来说一个EventLoop反应器和NettyChannel通道是一对多的关系一个反应器可以注册成千上万的通道如下图所示 Netty 是通过使用EventLoopGroup 完成多线程版本的Reactor模式的,多个EventLoop线程放在一起可以组成一个EventLoopGroup。 EventLoopGroup的构造函数有一个参数用于指定内部的线程数。在构造器初始化时会按照传入的线程数量在内部构造多个线程和多个EventLoop子反应器一个线程对应一个EventLoop子反应器进行多线程的IO事件查询和分发。如果使用EventLoopGroup的无参数构造函数没有传入线程数量或者传入的数量为0默认的EventLoopGroup内部线程数量为最大可用的CPU处理器数量的2倍。假设电脑使用的是4核的CPU那么在内部会启动8个EventLoop线程相当于8个子反应器实例。
Netty中的处理器Handler
Netty的Handler分为两大类第一类是ChannelInboundHandler入站处理器第二类是ChannelOutboundHandler出站处理器二者都继承了ChannelHandler处理器接口。
以底层的Java NIO中的OP_READ输入事件为例剖析Netty入站处理流程在通道中发生了OP_READ事件后会被EventLoop查询到然后分发给ChannelInboundHandler入站处理器调用对应的入站处理的read()方法。在ChannelInboundHandler入站处理器内部的read()方法具体实现中可以从通道中读取数据。Netty中的入站处理触发的方向为从通道触发ChannelInboundHandler入站处理器负责接收或者执行。
Netty中的出站处理指的是从ChannelOutboundHandler出站处理器到通道的某次IO操作。无论是入站还是出站Netty都提供了各自的默认适配器实现ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter入站处理适配器。ChannelOutboundHandler的默认实现为ChannelOutBoundHandlerAdapter出站处理适配器。这两个默认的通道处理适配器分别实现了基本的入站操作和出站操作功能。如果要实现自己的业务处理器不需要从零开始去实现处理器的接口只需要继承通道处理适配器即可。
Netty中的通道Channel和处理器Handler的协作组件Pipeline
Netty设计了ChannelPipeline通道流水线组件。它像一条管道将绑定到一个通道的多个Handler处理器实例串联在一起形成一条流水线。ChannelPipeline的默认实现实际上被设计成一个双向链表。所有的Handler处理器实例被包装成双向链表的节点被加入到ChannelPipeline中。
以入站处理为例每一个来自通道的IO事件都会进入一次ChannelPipeline。在进入第一个Handler处理器后这个IO事件将按照既定的从前往后次序在流水线上不断地向后流动流向下一个Handler处理器。
在向后流动的过程中会出现3种情况 1如果后面还有其他Handler入站处理器那么IO事件可以交给下一个Handler处理器向后流动。 2如果后面没有其他的入站处理器就意味着这个IO事件在此次流水线中的处理结束了。 3如果在中间需要终止流动可以选择不将IO事件交给下一个Handler处理器流水线的执行也被终止了。
Netty的通道流水线是双向的并规定入站处理器的执行次序是从前到后出站处理器的执行次序是从后到前。流水线上入站处理器和出站处理器的执行次序可以用下图表示 其实netty在为了方便开发者进行开发N提供了一系列辅助类其中引导类把上面的三个组件快速组装起来完成一个Netty应用服务端的引导类叫作ServerBootstrap类客户端的引导类叫作Bootstrap类。我们将在一篇博文中介绍Bootstrap类。