为什么要做响应式网站,凡科网是做什么的,盈科互动网站建设制作公司,微信官方网站注册NIO编程模型 Selector监听客户端不同的zhuangtai不同客户端触发不同的状态后#xff0c;交由相应的handles处理Selector和对应的处理handles都是在同一线程上实现的
I/O多路复用
在Java中#xff0c;I/O多路复用是一种技术#xff0c;它允许单个线程处理多个输入/输出交由相应的handles处理Selector和对应的处理handles都是在同一线程上实现的
I/O多路复用
在Java中I/O多路复用是一种技术它允许单个线程处理多个输入/输出I/O源而不需要为每个I/O源创建一个线程。这种技术可以显著提高性能因为它减少了线程创建和上下文切换的开销。I/O多路复用的核心思想是使用一个机制来监控多个I/O通道一旦某个通道有数据可读或可写就通知应用程序进行相应的操作。
NIO模型 Selector监听通道 经典的I/O多路复用
同步式I/O和异步I/O概念及分类
概念
同步式I/OSynchronous I/O 定义在同步I/O模型中当一个线程发起一个I/O请求时它会阻塞直到I/O操作完成。也就是说线程会一直等待直到数据被读取或写入完毕。 特点阻塞性线程在I/O操作完成之前不能执行其他任务。 资源消耗每个I/O操作都需要一个线程或进程可能导致资源消耗较大特别是在高并发场景下。异步I/OAsynchronous I/O 定义在异步I/O模型中当一个线程发起一个I/O请求后它不会被阻塞而是可以继续执行其他任务。I/O操作在后台进行当操作完成时系统会通知发起请求的线程。 特点非阻塞性线程不需要等待I/O操作完成可以继续执行其他任务。 并发性可以提高系统的并发处理能力适用于高并发场景。
分类
BIOBlocking I/O 类型同步I/O。 特点在BIO模型中当线程执行I/O操作时如果数据还没有准备好它会一直等待直到数据准备完成。在这个过程中线程被阻塞不能执行其他任务。NIONon-blocking I/O 类型非阻塞I/O可以用于同步或异步操作。 特点NIO模型中的I/O操作是非阻塞的这意味着当数据没有准备好时线程可以立即返回去做其他事情。NIO本身提供了非阻塞的能力但是它既可以用于同步编程通过在while循环中检查并处理I/O事件也可以与异步I/O如Java 7引入的NIO.2也称为Asynchronous I/O结合使用。I/O多路复用I/O Multiplexing 类型同步I/O。 特点I/O多路复用模型允许单个线程监控多个I/O通道但是当线程执行I/O操作时如果数据没有准备好线程仍然会被阻塞。最常见的I/O多路复用技术是select/poll系统调用。在Java中可以通过Selector和Channel实现I/O多路复用。
总结
NIO模型Selector实现的I/O多路复用是同步式I/O因为服务器端需要多次调用selector.select()来查看是否有新的事件发生。如果服务器端不通过多次调用selector.select()也没有其他线程会通知主线程有新的事件发生主线程就会持续阻塞。AIO异步I/O则是当主线程查看发现没有新事件发生时立刻返回处理其他事件当有新事件发生时主线程会被通知并来处理。
ChatServer实现
package server;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;public class ChatServer {private static final int DEFAULT_PORT 8888;private static final String QUIT quit;private static final int BUFFER 1024;private ServerSocketChannel server;private Selector selector;private ByteBuffer rBuffer ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer ByteBuffer.allocate(BUFFER);//统一编码解码方法private Charset charset Charset.forName(UTF-8);//可以自定义服务器端的端口private int port;public ChatServer() {this(DEFAULT_PORT);}public ChatServer(int port) {this.port port;}private void start() {try {server ServerSocketChannel.open();server.configureBlocking(false);server.socket().bind(new InetSocketAddress(port));selector Selector.open();//将ServerSocketChannel的Accept事件注册到selector上//一旦ServerSocketChannel接收到了客户端的连接请求selector就会得知server.register(selector, SelectionKey.OP_ACCEPT);System.out.println(启动服务器 监听端口 port ...);while (true) {//有事件被触发了select()函数才会有返回selector.select();SetSelectionKey selectionKeys selector.selectedKeys();for (SelectionKey key : selectionKeys) {// 处理被触发的事件handles(key);}//清空集合防止重复处理selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// ACCEPT事件 - 和客户端建立了连接if (key.isAcceptable()) {ServerSocketChannel server (ServerSocketChannel) key.channel();SocketChannel client server.accept();client.configureBlocking(false);// 在selector上注册可能发生的Read事件client.register(selector, SelectionKey.OP_READ);System.out.println(getClientName(client) 已连接);}// READ事件 - 客户端发送了消息else if (key.isReadable()) {SocketChannel client (SocketChannel) key.channel();String fwdMsg receive(client);if (fwdMsg.isEmpty()) {// 客户端异常// 取消掉key的注册以后不再响应Read的事件// selector的key注销掉以后通常搭配selector.wakeup(); (是个好习惯)立刻唤醒selector判断当前发生的事件key.cancel();selector.wakeup();} else {System.out.println(getClientName(client) : fwdMsg);forwardMessage(client, fwdMsg);// 检查用户是否退出if (readyToQuit(fwdMsg)) {key.cancel();selector.wakeup();System.out.println(getClientName(client) 已断开);}}}}private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {for (SelectionKey key: selector.keys()) {Channel connectedClient key.channel();//如果遍历到了服务器的监听Socket则跳过(不需要将消息转发给服务器)if (connectedClient instanceof ServerSocketChannel) {continue;}// key是否有效(key对应的Channel和selector都在运行没有关闭) 不是发消息的客户端他自己if (key.isValid() !client.equals(connectedClient)) {// 写Buffer前先将Buffer清空wBuffer.clear();// 写入Buffer消息wBuffer.put(charset.encode(getClientName(client) : fwdMsg));// 将Buffer从写状态反转成读状态wBuffer.flip();// 将Buffer中的数据写入通道中while (wBuffer.hasRemaining()) {((SocketChannel)connectedClient).write(wBuffer);}}}}private String receive(SocketChannel client) throws IOException {// 在每次新的读取前先把buffer清空rBuffer.clear();// 将channel中的信息读入rBuffer中直到读不出文件while(client.read(rBuffer) 0);// 将rBuffer从写模式从转换成读模式rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}private String getClientName(SocketChannel client) {return 客户端[ client.socket().getPort() ];}private boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable ! null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) {ChatServer chatServer new ChatServer(7777);chatServer.start();}
}
将Channel中的事件注册在Selector用Selector监控事件的发生实现了在同一线程处理多个客户端输入极大提高了线程的使用效率使得服务器端能够处理大量的客户端连接
实现ChatClient
ChatClient
package client;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;public class ChatClient {private static final String DEFAULT_SERVER_HOST 127.0.0.1;private static final int DEFAULT_SERVER_PORT 8888;private static final String QUIT quit;private static final int BUFFER 1024;private String host;private int port;private SocketChannel client;private ByteBuffer rBuffer ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer ByteBuffer.allocate(BUFFER);private Selector selector;private Charset charset Charset.forName(UTF-8);public ChatClient() {this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);}public ChatClient(String host, int port) {this.host host;this.port port;}public boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable ! null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}private void start() {try {client SocketChannel.open();client.configureBlocking(false);selector Selector.open();client.register(selector, SelectionKey.OP_CONNECT);client.connect(new InetSocketAddress(host, port));while (true) {selector.select();SetSelectionKey selectionKeys selector.selectedKeys();for (SelectionKey key : selectionKeys) {handles(key);}selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException e) {// 用户正常退出} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// CONNECT事件 - 连接就绪事件if (key.isConnectable()) {SocketChannel client (SocketChannel) key.channel();// 判断连接是否建立完全if (client.isConnectionPending()) {// 调用finishConnect方法正式建立连接client.finishConnect();// 创建一个新的线程来处理用户的输入new Thread(new UserInputHandler(this)).start();}// 将Read事件注册在selector上面client.register(selector, SelectionKey.OP_READ);}// READ事件 - 服务器转发消息else if (key.isReadable()) {SocketChannel client (SocketChannel) key.channel();String msg receive(client);if (msg.isEmpty()) {// 服务器异常close(selector);} else {System.out.println(msg);}}}public void send(String msg) throws IOException {if (msg.isEmpty()) {return;}wBuffer.clear();wBuffer.put(charset.encode(msg));wBuffer.flip();while (wBuffer.hasRemaining()) {client.write(wBuffer);}// 检查用户是否准备退出if (readyToQuit(msg)) {close(selector);}}private String receive(SocketChannel client) throws IOException {rBuffer.clear();while (client.read(rBuffer) 0);rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}public static void main(String[] args) {ChatClient client new ChatClient(127.0.0.1, 7777);client.start();}
}
ChatClient仍然需要通过创建新的线程来处理用户输入
UserInputHanlder
package client;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class UserInputHandler implements Runnable {private ChatClient chatClient;public UserInputHandler(ChatClient chatClient) {this.chatClient chatClient;}Overridepublic void run() {try {// 等待用户输入消息BufferedReader consoleReader new BufferedReader(new InputStreamReader(System.in));while (true) {String input consoleReader.readLine();// 向服务器发送消息chatClient.send(input);// 检查用户是否准备退出if (chatClient.readyToQuit(input)) {break;}}} catch (IOException e) {e.printStackTrace();}}
}UserInputHandler仍然需要阻塞式的等待用户的输入用户的输入延迟应非常小所以线程必须时刻等待着用户的输入以便第一时间处理
总结
与用BIO模型实现的多人聊天室有什么区别
使用Channel代替Stream使用Selector监控多条Channel可以在一个线程里处理多个Channel I/O