西安网站建设云阔网络,排名优化的公司,近期国际新闻事件,2022年新闻大事文章目录 一、操作系统底层IO原理1. 简介2. 操作系统进行IO的流程 二、BIO底层原理1. 什么是Socket2. JDK原生编程的BIO 三、Java原生编程的NIO1. 简介2. NIO和BIO的主要区别3. Reactor模式4. NIO的三大核心组件5. NIO核心源码分析 一、操作系统底层IO原理
1. 简介
IO#x… 文章目录 一、操作系统底层IO原理1. 简介2. 操作系统进行IO的流程 二、BIO底层原理1. 什么是Socket2. JDK原生编程的BIO 三、Java原生编程的NIO1. 简介2. NIO和BIO的主要区别3. Reactor模式4. NIO的三大核心组件5. NIO核心源码分析 一、操作系统底层IO原理
1. 简介
IO即Input/Output指的是输入和输出。在计算机科学中IO描述的是数据在内部存储器和外部存储器或其他周边设备之间的流动过程既包括数据从外部复制到内存输入也包括数据从内存复制到外部输出。IO是计算机与外界交互的过程涉及到的对象可以是人或其他设备如文件、管道、网络、命令行、信号等更广义地讲I/O指代任何操作系统理解为“文件”的事务。此外IO也是操作系统中的一个核心概念在各种系统中都有重要地位例如在本机、传统的单体应用、分布式系统中。IO操作可以有多种方式如DIO(Direct I/O)、AIO(Asynchronous I/O,异步I/O)、Memory-Mapped I/O(内存映射I/O)等不同的I/O方式有不同的实现方式和性能适用于不同的应用场景
2. 操作系统进行IO的流程
首先我们需要了机计算机网络的协议栈这里有两种分别是OSI参考模型和TCP/IP五层模型在实际中通常使用到的只有TCP/IP五层模型因为OSI参考模型实现过于复杂。 现在我们大致看一下数据是如何从一个计算机传递到另一个计算机的假如张三向李四发送了一条你好的消息大致会经过一下过程
首先应用程序会进行编码处理将字符消息转化为二进制流然后交给传输层此时产生的数据包类型为报文TCP 根据应用的指示负责建立连接、发送数据以及断开连接。TCP 提供将应用层发来的数据顺利发送至对端的可靠传输。为了实现这一功能需要将应用层数据封装为报文段 (segment)并附加一个 TCP 首部然后交给下面的 IP 层。IP 将 TCP 传过来的 TCP 首部和 TCP 数据合起来当做自己的数据并在 TCP 首部的前端 加上自己的 IP 首部生成 IP 数据报(datagram)然后交给下面的数据链路层。从 IP 传过来的 IP 包对于数据链路层来说就是数据。给这些数据附加上链路层首部封装为链路层帧(frame)生成的链路层帧(frame)将通过物理层传输给接收端。然后到了李四的计算机就会逆向的进行上面的过程将消息最后传输给应用程序这样李四就收到了张三的消息。
上面就是整个计算机网络基于TCP通信的大致过程那么现在的问题是操作系统内部是如何进行IO的
我们知道IO无非就是两个核心点读数据和写数据我们的应用程序是工作在操作系统的用户态时当应用程序要执行IO时用户态需要通过系统调用从用户态切换到核心态。如果应用程序现在在执行读操作那么操作系统首先会将接收到的网络IO数据存储在内核缓冲然后将内核缓存准备好的数据拷贝到用户缓存区然后应用程序就可以处理接收到的数据了。如果应用程序正在执行写操作那么操作系统需要将应用程序准备好的数据从用户缓存拷贝到内核缓存接着发送出去下图就展示了大致的细节。 可以发现上面的过程经过了多次的操作系统用户态到内核态的切换这是很耗时的可以使用0拷贝等相关技术进行优化这里就不详细分析了。 下面我们更加深入
读数据 ①首先在网络的网卡上或本地存储设备中准备数据然后调用read()函数。 ②调用read()函数后由内核将网络/本地数据读取到内核缓冲区中。 ③读取完成后向CPU发送一个中断信号通知CPU对数据进行后续处理。 ④CPU将内核中的数据写入到对应的程序缓冲区或网络Socket接收缓冲区中。 ⑤数据全部写入到缓冲区后应用程序开始对数据开始实际的处理。 程序中试图利用IO机制读写数据时仅仅只是调用了内核提供的接口函数而已本质上真正的IO操作还是由内核自己去完成的。Linux 系统为了提高 IO 效率会在用户空间和内核空间都加入缓冲区缓冲区可以减少频繁的系统 IO 调 用。系统调用需要保存之前的进程数据和状态等信息而结束调用之后回来还需要恢复之前的信息为 了减少这种损耗时间、也损耗性能的系统调用于是出现了缓冲区 写数据 ①应用程序准备要写入的数据可能是从用户输入、其他应用程序输出或者本地文件等获取的数据。 ②当应用程序调用write()函数时数据被写入到应用程序的内核缓冲区。 ③CPU处理写操作内核在写入数据到内核缓冲区后向CPU发送一个中断信号通知CPU有数据需要写入到指定的目的地例如硬盘或网络。 ④发送完成通知当数据全部写入到目标设备或网络中时系统可能会向应用程序发送一个写入完成的通知。
二、BIO底层原理
1. 什么是Socket
Socket 是应用层与 TCP/IP 协议族通信的中间软件抽象层它是一组接口一般由操作系统提供。在设计模式中Socket 其实就是一个门面模式它把复杂的 TCP/IP 协议处理和通信缓存管理等等都隐藏在 Socket 接口后面对用户来说使用一组简单的接口就能进行网络应用编程让 Socket 去组织数据以符合指定的协议。主机 A 的应用程序要能和主机 B 的 应用程序通信必须通过 Socket 建立连接。客户端连接上一个服务端就会在客户端中产生一个 socket 接口实例服务端每接受 一个客户端连接就会产生一个 socket 接口实例和客户端的 socket 进行通信有多个客户端连接自然就有多个 socket 接口实例。 2. JDK原生编程的BIO
BIO也就是阻塞式IO。在 BIO 中类 ServerSocket 负责绑定IP地址启动监听端口等待客户连接客户端 Socket 类的实例发起连接操作ServerSocket 接受连接后产生一个新的服务端 socket 实例负责和客户端 socket 实例通过输入和输出流进行通信。 BIO阻塞的含义体现在两个方面
若一个服务器启动就绪那么主线程就一直在等待着客户端的连接这个等待过程中主线程就一直在阻塞。在连接建立之后在读取到 socket 信息之前客户端线程也是一直在等待一直处于阻塞的状态下的。
我们看一个java实现的BIO通信模式的案例的代码首先是服务端 public static void main(String[] args) throws IOException {//服务端启动必备ServerSocket serverSocket new ServerSocket();//表示服务端在哪个端口上监听serverSocket.bind(new InetSocketAddress(10001));System.out.println(Start Server ....);try{while(true){new Thread(new ServerTask(serverSocket.accept())).start();}}finally {serverSocket.close();}}//每个和客户端的通信都会打包成一个任务交个一个线程来执行private static class ServerTask implements Runnable{private Socket socket null;public ServerTask(Socket socket){this.socket socket;}Overridepublic void run() {//实例化与客户端通信的输入输出流try(ObjectInputStream inputStream new ObjectInputStream(socket.getInputStream());ObjectOutputStream outputStream new ObjectOutputStream(socket.getOutputStream())){//接收客户端的输出也就是服务器的输入String userName inputStream.readUTF();System.out.println(Accept client message:userName);//服务器的输出也就是客户端的输入outputStream.writeUTF(Hello,userName);outputStream.flush();}catch(Exception e){e.printStackTrace();}finally {try {socket.close();} catch (IOException e) {e.printStackTrace();}}}}首先定义了一个ServerSocket方法并调用accept方法去监听10001端口当然上面代码是创建了一个新的线程来专门监听10001端口我们看看accept方法底层到底在做什么 public Socket accept() throws IOException {if (isClosed())throw new SocketException(Socket is closed);if (!isBound())throw new SocketException(Socket is not bound yet);Socket s new Socket((SocketImpl) null);implAccept(s);return s;}首先它会调用isClosed()方法判断当前的ServerSocket是否已经关闭了ServerSocket声明了一个closed变量来维护ServerSocekt的状态。 private boolean closed false;
public boolean isClosed() {synchronized(closeLock) {return closed;}}下面代码用于判断当前的SocketServer是否已经与端口绑定了ServerSocekt底层同样是有一个bound成员变量来维护当前ServerSocket的绑定状态。
if (!isBound())throw new SocketException(Socket is not bound yet);上面代码中我们调用了bind方法来将ServerSocket与指定的端口进行绑定下面我们看看绑定的时候底层在做什么
public void bind(SocketAddress endpoint, int backlog) throws IOException {if (isClosed())throw new SocketException(Socket is closed);if (!oldImpl isBound())throw new SocketException(Already bound);if (endpoint null)endpoint new InetSocketAddress(0);if (!(endpoint instanceof InetSocketAddress))throw new IllegalArgumentException(Unsupported address type);InetSocketAddress epoint (InetSocketAddress) endpoint;if (epoint.isUnresolved())throw new SocketException(Unresolved address);if (backlog 1)backlog 50;try {SecurityManager security System.getSecurityManager();if (security ! null)security.checkListen(epoint.getPort());getImpl().bind(epoint.getAddress(), epoint.getPort());getImpl().listen(backlog);bound true;} catch(SecurityException e) {bound false;throw e;} catch(IOException e) {bound false;throw e;}}其实核心的就下面两句代码一个是绑定动作一个是监听动作监听动作底层调用了socketListen这一个native方法。
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);回到accept方法接着它创建了一个Socket对象
Socket s new Socket((SocketImpl) null);然后调用了implAccept(s)方法参数是上面我们创建的Socekt我们进入该方法。
protected final void implAccept(Socket s) throws IOException {SocketImpl si null;try {if (s.impl null)//用 setImpl() 方法该方法用于设置 Socket 对象的底层实现。s.setImpl();else {//调用 reset() 方法该方法用于重置 Socket 对象的底层实现。s.impl.reset();}si s.impl;s.impl null;si.address new InetAddress();//指定文件描述符si.fd new FileDescriptor();//这个accept底层也是调用的socketListen这个native方法getImpl().accept(si);SecurityManager security System.getSecurityManager();if (security ! null) {security.checkAccept(si.getInetAddress().getHostAddress(),si.getPort());}} catch (IOException e) {if (si ! null)si.reset();s.impl si;throw e;} catch (SecurityException e) {if (si ! null)si.reset();s.impl si;throw e;}s.impl si;s.postAccept();}socketListen 方法通常是在底层操作系统或网络库中实现的用于启动套接字的监听过程。这个方法在大多数情况下是阻塞的因为它需要等待客户端的连接请求到达。当调用 socketListen 方法后套接字会进入监听状态等待客户端连接请求。在这个过程中如果没有客户端连接请求到达socketListen 方法会一直阻塞直到有新的连接请求到达或者发生超时。 上面方法我们会阻塞在 getImpl().accept(si);然后一旦客户端有连接来就会立即返回accept方法并将新创建的Socket返回重新回顾整个过程服务端程序一直阻塞等待如果客户端有连接来了就会创建一个新的Socket用于与该连接通信上面有个疑问的地方就是bind和accept方法好像都有一个socketListen那么意味bind方法执行后是否就可以处理客户端连接了我的个人理解是前者主要是用来建立TCP连接的参考[这篇博客]。
上面就是传统的BIO的通信模型采用 BIO 通信模型的服务端通常由一个独立的 Acceptor 线程负责监听客户端的连接它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理上面的案例我没有使用多线程处理而是服务端就一个线程处理完成后通过输出流返回应答给客户端线程销毁。即典型的一请求一应答模型同时数据的读取写入也必须阻塞在一个线程内等待其完成。 该模型最大的问题就是缺乏弹性伸缩能力当客户端并发访问量增加后服务端的线程 个数和客户端并发访问数呈 1:1 的正比关系Java 中的线程也是比较宝贵的系统资源线程数量快速膨胀后系统的性能将急剧下降随着访问量的继续增大系统最终就死掉了。
为了改进这种一连接一线程的模型我们可以使用线程池来管理这些线程实现 1 个或 多个线程处理 N 个客户端的模型(但是底层还是使用的同步阻塞 I/O)通常被称为“伪异 步 I/O 模型”。我们知道如果使用 CachedThreadPool 线程池(不限制线程数量如果不清楚请参考 文首提供的文章)其实除了能自动帮我们管理线程(复用)看起来也就像是 1:1 的客户 端:线程数模型而使用 FixedThreadPool 我们就有效的控制了线程的最大数量保证了系统有限的资源的控制实现了 N:M 的伪异步 I/O 模型。 但是正因为限制了线程数量如果发生读取数据较慢时(比如数据量大、网络传输慢等)大量并发的情况下其他接入的消息只能一直等待这就是最大的弊端。
三、Java原生编程的NIO
1. 简介
NIO 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 BIO 的不足它在标准 Java 代码中提供了高速的、面向块的 I/O。NIO 被称为 no-blocking io 或者 new io 都说得通。
2. NIO和BIO的主要区别
面向流和面向缓冲
Java NIO 和 BIO 之间第一个最大的区别是BIO 是面向流的NIO 是面向缓冲区的。 Java BIO 面向流意味着每次从流中读一个或多个字节直至读取所有字节它们没有被缓存在任何地 方。此外它不能前后移动流中的数据。如果需要前后移动从流中读取的数据需要先将它缓存到一个缓冲区。 Java NIO 的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是还需要检查是否该缓冲区中包含所有需要处理的数据。而且需确保当更多的数据读入缓冲区时不要覆盖缓冲区里尚未处理的数据。
阻塞和非阻塞IO
Java IO 的各种流是阻塞的。这意味着当一个线程调用 read() 或 write()时该线程被阻塞直到有一些数据被读取或数据完全写入。该线程在此期间不能再干任何事情了。Java NIO 的非阻塞模式使一个线程从某通道发送请求读取数据但是它仅能得到目前可用的数据如果目前没有数据可用时就什么都不会获取。而不是保持线程阻塞所以直至数据变的可以读取之前该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道但不需要等待它完全写入这个线程同时可以去做别的事情。 线程通常将非阻塞 IO 的空闲时间用于在其它通道上执行 IO 操作所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
3. Reactor模式
Reator模式可以看[这篇博客]。
4. NIO的三大核心组件
Selector
Selector 的英文含义是“选择器”也可以称为为“轮询代理器”、“事件订阅器”、“channel 容器管理机”都行。 Java NIO 的选择器允许一个单独的线程来监视多个输入通道你可以注册多个通道使用一个选择器(Selectors)然后使用一个单独的线程来操作这个选择器进而“选择”通道这些通道里已经有可以处理的输入或者选择已准备写入的通道。这种选择机制使得一个单独的线程很容易来管理多个通道。应用程序将向 Selector 对象注册需要它关注的 Channel以及具体的某一个 Channel 会对哪些 IO 事件感兴趣。Selector 中也会维护一个“已经注册的 Channel”的容器。
Channels
通道被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递那么说明应用程序可以通过通道读取数据也可以通过通道向操作系统写数据而且可以同时进行读写。 • 所有被 Selector(选择器)注册的通道只能是继承了SelectableChannel 类的子类。 • ServerSocketChannel应用服务器程序的监听通道。只有通过这个通道应用程序才能向操作系统注册支持“多路复用 IO”的端口监听。同时支持 UDP 协议和 TCP 协议。 • ScoketChannelTCP Socket套接字的监听通道一个Socket套接字对应了一个客户端IP端口 到服务器 IP端口的通信连接。 通道中的数据总是要先读到一个 Buffer或者总是要从一个 Buffer 中写入。
Buffer
我们前面说过 JDK NIO 是面向缓冲的。Buffer 就是这个缓冲用于和 NIO 通道进行交互。 数据是从通道读入缓冲区从缓冲区写入到通道中的。以写为例应用程序都是将数据写入缓冲再通过通道把缓冲的数据发送出去读也是一样数据总是先从通道读到缓冲应用 程序再读缓冲的数据。缓冲区本质上是一块可以写入数据然后可以从中读取数据的内存(其实就是数组)。 这块内存被包装成 NIO Buffer 对象并提供了一组方法用来方便的访问该块内存。 5. NIO核心源码分析
首先我们给出服务端的实现
public class NioServer {private static NioServerHandle nioServerHandle;public static void main(String[] args){nioServerHandle new NioServerHandle(DEFAULT_PORT);new Thread(nioServerHandle,Server).start();}}public class NioServerHandle implements Runnable{private volatile boolean started;private ServerSocketChannel serverSocketChannel;private Selector selector;/*** 构造方法* param port 指定要监听的端口号*/public NioServerHandle(int port) {try {/*创建选择器的实例*/selector Selector.open();/*创建ServerSocketChannel的实例*/serverSocketChannel ServerSocketChannel.open();/*设置通道为非阻塞模式*/serverSocketChannel.configureBlocking(false);/*绑定端口*/serverSocketChannel.socket().bind(new InetSocketAddress(port));/*注册事件表示关心客户端连接*/serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);started true;System.out.println(服务器已启动端口号port);} catch (IOException e) {e.printStackTrace();}}Overridepublic void run() {while(started){try {/*获取当前有哪些事件*/selector.select(1000);/*获取事件的集合*/SetSelectionKey selectionKeys selector.selectedKeys();IteratorSelectionKey iterator selectionKeys.iterator();while(iterator.hasNext()){SelectionKey key iterator.next();/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键那么它仍然会在主集合中以一个激活的键出现这会导致我们尝试再次处理它。*/iterator.remove();handleInput(key);}} catch (IOException e) {e.printStackTrace();}}}/*处理事件的发生*/private void handleInput(SelectionKey key) throws IOException {if(key.isValid()){/*处理新接入的客户端的请求*/if(key.isAcceptable()){/*获取关心当前事件的Channel*/ServerSocketChannel ssc (ServerSocketChannel) key.channel();/*接受连接*/SocketChannel sc ssc.accept();System.out.println(建立连接);sc.configureBlocking(false);/*关注读事件*/sc.register(selector,SelectionKey.OP_READ);}/*处理对端的发送的数据*/if(key.isReadable()){SocketChannel sc (SocketChannel) key.channel();/*创建ByteBuffer开辟一个缓冲区*/ByteBuffer buffer ByteBuffer.allocate(1024);/*从通道里读取数据然后写入buffer*/int readBytes sc.read(buffer);if(readBytes0){/*将缓冲区当前的limit设置为position,position0用于后续对缓冲区的读取操作*/buffer.flip();/*根据缓冲区可读字节数创建字节数组*/byte[] bytes new byte[buffer.remaining()];/*将缓冲区可读字节数组复制到新建的数组中*/buffer.get(bytes);String message new String(bytes,UTF-8);System.out.println(服务器收到消息message);/*处理数据*/String result Const.response(message);/*发送应答消息*/doWrite(sc,result);}else if(readBytes0){/*取消特定的注册关系*/key.cancel();/*关闭通道*/sc.close();}}}}/*发送应答消息*/private void doWrite(SocketChannel sc,String response) throws IOException {byte[] bytes response.getBytes();ByteBuffer buffer ByteBuffer.allocate(bytes.length);buffer.put(bytes);buffer.flip();sc.write(buffer);}public void stop(){started false;}}
首先NioServerHandle构成方法接受一个参数port也就是socket要绑定的本地端口首先它创建了一个选择器实例Selector就是IO多路复用中的多路复用器Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的并且使用选择器来更新通道的就绪状态。当这么做的时候可以选择将被激发的线程挂起直到有就绪的的通道。
selector Selector.open();
public static Selector open() throws IOException {return SelectorProvider.provider().openSelector();
}这个provider()本质是SelectorProviderl类是一个抽象类它定义了创建Selector和Channel实例的方法。不同的操作系统可能有不同的I/O机制和系统调用因此SelectorProvider的实现类会根据当前平台的特性提供相应的Selector和Channel实例。创建Selector调用了抽象类Selector中的静态方法open方法。这个方法的返回值是操作系统对应的选择器这个与你虚拟机所在的系统相关这里我们就不深纠了。创建好选择器之后就执行下面代码创建了一个ServerSocketChannel对象。
serverSocketChannel ServerSocketChannel.open();我们看看这个open()静态方法底层做了一些什么事 public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}它同样调用了SelectorProviderl的provider方法返回了一个适合本系统的Channel实现。然后下面就开始绑定端口了
serverSocketChannel.socket().bind(new InetSocketAddress(port));public abstract class ServerSocketChannelextends AbstractSelectableChannelimplements NetworkChannel
{public abstract ServerSocket socket();}根据上面代码我们知道serverSocketChannel内部是封装了ServerSocket的实现了的所以通道的本质上就是在Socket的基础上封装了更多的操作。下面就是NIO特别的地方了它将Channel注册到了Selector中
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);register方法有两个参数第一个是通道要注册的选择器第二个参数就是选择器所关心的通道操作。这个是SelectionKey中定义的四个事件之一也就是连接事件。它实际上是一个表示选择器在检查通道就绪状态时需要关心的操作的比特掩码。如果 Selector 对通道的多操作类型感兴趣可以用“位或”操作符来实现SelectionKey.OP_READ|SelectionKey.OP_WRITE;。 public static final int OP_READ 1 0;public static final int OP_WRITE 1 2;public static final int OP_CONNECT 1 3;public static final int OP_ACCEPT 1 4;注意一个 Channel 仅仅可以被注册到一个 Selector 一次, 如果将 Channel 注册 到 Selector 多次, 那么其实就是相当于更新 SelectionKey 的 interest set。我们进入SelectionKey类 public abstract int interestOps();public abstract int readyOps();
interestOps可以判断 Selector 是否对 Channel 的某种事件感兴趣 readyOps()来获取相关通道已经就绪的操作。然后还有下面两个方法 public abstract SelectableChannel channel();abstract Selector selector();public abstract void cancel();通过上面方法我们可以获取这个 SelectionKey 所关联的 Selector 和 Channel。 如果我们要取消关联关系SelectionKey 对象的 cancel()方法来取消特定的注册关系。
上面我们服务端的ServerSocketChannel就创建完了通过上面我们知道上面的核心关键就是创建了Selector并将ServerSoceketChannel关联的SelectionKey注册到了Seletctor中了。下面回到NioServer类下面就是创建一个新的线程来开启服务端。 new Thread(nioServerHandle,Server).start();NioServerHandle本身就是实现了Runable接口的所以在上面创建的线程执行run方法的时候会间接调用到NioServerHandle的run方法我们进入该方法。 Overridepublic void run() {while(started){try {/*获取当前有哪些事件*/selector.select(1000);/*获取事件的集合*/SetSelectionKey selectionKeys selector.selectedKeys();IteratorSelectionKey iterator selectionKeys.iterator();while(iterator.hasNext()){SelectionKey key iterator.next();/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键那么它仍然会在主集合中以一个激活的键出现这会导致我们尝试再次处理它。*/iterator.remove();handleInput(key);}} catch (IOException e) {e.printStackTrace();}}}首先执行了selector.select(1000)方法该方法是一个阻塞方法它会等待一段时间以毫秒为单位直到有一个或多个通道准备好进行 I/O 操作、超时时间到达或者当前线程被中断。底层实现会查询注册在 Selector 上的所有通道检查它们是否处于就绪状态。就绪状态表示通道可以执行某种 I/O 操作比如读取或写入数据。 当有通道处于就绪状态时select() 方法会返回对应的通道数量并且可以通过调用 selector.selectedKeys() 方法获取到这些就绪的 SelectionKey 集合。而在超时时间到达之前如果没有通道处于就绪状态或者当前线程被中断select() 方法也会提前返回返回值为 0。 public int select(long var1) throws IOException {if (var1 0L) {throw new IllegalArgumentException(Negative timeout);} else {return this.lockAndDoSelect(var1 0L ? -1L : var1);}}select最底层也是调用的本地方法而且它是线程安全的。我们这里只需要知道它会返回就绪的通道的数量。然后调用下面方法来获取所有的就绪的SelectionKey的集合
SetSelectionKey selectionKeys selector.selectedKeys();public SetSelectionKey selectedKeys() {if (!this.isOpen() !Util.atBugLevel(1.4)) {throw new ClosedSelectorException();} else {return this.publicSelectedKeys;}}然后就迭代就绪的SelectionKey然后将该事件从集合中删除表示这个事件已经被处理了然后就调用了handleInput来开始具体的处理。 private void handleInput(SelectionKey key) throws IOException {if(key.isValid()){/*处理新接入的客户端的请求*/if(key.isAcceptable()){/*获取关心当前事件的Channel*/ServerSocketChannel ssc (ServerSocketChannel) key.channel();/*接受连接*/SocketChannel sc ssc.accept();System.out.println(建立连接);sc.configureBlocking(false);/*关注读事件*/sc.register(selector,SelectionKey.OP_READ);}/*处理对端的发送的数据*/if(key.isReadable()){SocketChannel sc (SocketChannel) key.channel();/*创建ByteBuffer开辟一个缓冲区*/ByteBuffer buffer ByteBuffer.allocate(1024);/*从通道里读取数据然后写入buffer*/int readBytes sc.read(buffer);if(readBytes0){/*将缓冲区当前的limit设置为position,position0用于后续对缓冲区的读取操作*/buffer.flip();/*根据缓冲区可读字节数创建字节数组*/byte[] bytes new byte[buffer.remaining()];/*将缓冲区可读字节数组复制到新建的数组中*/buffer.get(bytes);String message new String(bytes,UTF-8);System.out.println(服务器收到消息message);/*处理数据*/String result Const.response(message);/*发送应答消息*/doWrite(sc,result);}else if(readBytes0){/*取消特定的注册关系*/key.cancel();/*关闭通道*/sc.close();}}}}
/*发送应答消息*/private void doWrite(SocketChannel sc,String response) throws IOException {byte[] bytes response.getBytes();ByteBuffer buffer ByteBuffer.allocate(bytes.length);buffer.put(bytes);buffer.flip();sc.write(buffer);}然后它会获得这个SelectionKey所绑定的通道。首先我们可以发现有两个if判断这里就是该SelectionKey绑定的事件是读事件、写事件还是连接事件注意连接事件是客户端的如果是连接事件就获得ServerSocketChannel对象
ServerSocketChannel ssc (ServerSocketChannel) key.channel();然后就可以执行下面代码来处理连接了可以发现也是调用的accept方法因为前面说过通道的底层是封装了Socket了的。
SocketChannel sc ssc.accept();sc.register(selector,SelectionKey.OP_READ);可以发现ssc.accept()也就是说一旦有连接接入就会创建一个新的SocketChannel对象然后这个通道也要注册到selector中绑定事件为读事件这样就可以接受客户端发来的数据了。
如果SelectionKey绑定的事件是读事件说明现在已经接受到了用户的数据了我们可以进行处理了首先我们仍然是从SelectionKey获取对应的通道。 SocketChannel sc (SocketChannel) key.channel();然后从channel中读取数据注意这里就是和BIO很大的不同的地方它不是以流的形式读完所有数据而是读到了一个buffer缓冲中。 ByteBuffer buffer ByteBuffer.allocate(1024);注意此时只是将数据读到了一个缓冲中应用程序还没有处理数据现在有了这个缓冲我们就可以很方便的处理接受到的数据了注意此时buffer要调用flip方法来切换模式。flip 方法将 Buffer 从写模式切换到读模式。调用 flip()方法会将 position 设回 0并将 limit设置成之前的position。 buffer.flip();最后服务端向客户端发出回应
doWrite(sc,result);以上就是服务端的大致工作中原理下面我们看看客户端又是怎么工作的。
public class NioClient {private static NioClientHandle nioClientHandle;public static void start(){nioClientHandle new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);//nioClientHandle new NioClientHandle(DEFAULT_SERVER_IP,8888);new Thread(nioClientHandle,client).start();}//向服务器发送消息public static boolean sendMsg(String msg) throws Exception{nioClientHandle.sendMsg(msg);return true;}public static void main(String[] args) throws Exception {start();Scanner scanner new Scanner(System.in);while(NioClient.sendMsg(scanner.next()));}}public class NioClientHandle implements Runnable{private String host;private int port;private volatile boolean started;private Selector selector;private SocketChannel socketChannel;public NioClientHandle(String ip, int port) {this.host ip;this.port port;try {/*创建选择器的实例*/selector Selector.open();/*创建ServerSocketChannel的实例*/socketChannel SocketChannel.open();/*设置通道为非阻塞模式*/socketChannel.configureBlocking(false);started true;} catch (IOException e) {e.printStackTrace();}}public void stop(){started false;}Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循环遍历selectorwhile(started){try{//无论是否有读写事件发生selector每隔1s被唤醒一次selector.select(1000);//获取当前有哪些事件可以使用SetSelectionKey keys selector.selectedKeys();//转换为迭代器IteratorSelectionKey it keys.iterator();SelectionKey key null;while(it.hasNext()){key it.next();/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键那么它仍然会在主集合中以一个激活的键出现这会导致我们尝试再次处理它。*/it.remove();try{handleInput(key);}catch(Exception e){if(key ! null){key.cancel();if(key.channel() ! null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector关闭后会自动释放里面管理的资源if(selector ! null)try{selector.close();}catch (Exception e) {e.printStackTrace();}}//具体的事件处理方法private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){//获得关心当前事件的channelSocketChannel sc (SocketChannel) key.channel();//连接事件if(key.isConnectable()){if(sc.finishConnect()){socketChannel.register(selector,SelectionKey.OP_READ);}else System.exit(1);}//有数据可读事件if(key.isReadable()){//创建ByteBuffer并开辟一个1M的缓冲区ByteBuffer buffer ByteBuffer.allocate(1024);//读取请求码流返回读取到的字节数int readBytes sc.read(buffer);//读取到字节对字节进行编解码if(readBytes0){//将缓冲区当前的limit设置为position,position0// 用于后续对缓冲区的读取操作buffer.flip();//根据缓冲区可读字节数创建字节数组byte[] bytes new byte[buffer.remaining()];//将缓冲区可读字节数组复制到新建的数组中buffer.get(bytes);String result new String(bytes,UTF-8);System.out.println(客户端收到消息 result);}//链路已经关闭释放资源else if(readBytes0){key.cancel();sc.close();}}}}private void doWrite(SocketChannel channel,String request)throws IOException {//将消息编码为字节数组byte[] bytes request.getBytes();//根据数组容量创建ByteBufferByteBuffer writeBuffer ByteBuffer.allocate(bytes.length);//将字节数组复制到缓冲区writeBuffer.put(bytes);//flip操作writeBuffer.flip();//发送缓冲区的字节数组/*关心事件和读写网络并不冲突*/channel.write(writeBuffer);}private void doConnect() throws IOException{/*非阻塞的连接*/if(socketChannel.connect(new InetSocketAddress(host,port))){socketChannel.register(selector,SelectionKey.OP_READ);}else{socketChannel.register(selector,SelectionKey.OP_CONNECT);}}//写数据对外暴露的APIpublic void sendMsg(String msg) throws Exception{doWrite(socketChannel, msg);}}其实有了上面的基础客户端就很简单了只是多了一个连接事件我们看看这部分
if(socketChannel.connect(new InetSocketAddress(host,port))){socketChannel.register(selector,SelectionKey.OP_READ);}else{socketChannel.register(selector,SelectionKey.OP_CONNECT);}这个if判断是很关键的我们知道我们调用connect方法后底层需要进行TCP的三次握手如果网络状况不好的话这个connect方法执行完毕后可能连接并没有执行完毕socketChannel.connect(new InetSocketAddress(host,port))如果为false就说明连接没有建立完所以需要创建一个通道来处理连接事件否则我们就可以注册读事件通道来等待服务端回应。