当前位置: 首页 > news >正文

湖南中核建设工程公司官方网站重庆网站推广运营公司

湖南中核建设工程公司官方网站,重庆网站推广运营公司,怎样自己做网站推广,门户网站 需求虽然了解了整个内存池管理的细节#xff0c;包括它的内存分配的具体逻辑#xff0c;但是每次从NioSocketChannel中读取数据时#xff0c;应该分配多少内存去读呢#xff1f; 例如#xff0c;客户端发送的数据为1KB , 应该分配多少内存去读呢#xff1f; 例如#xff1a;…  虽然了解了整个内存池管理的细节包括它的内存分配的具体逻辑但是每次从NioSocketChannel中读取数据时应该分配多少内存去读呢 例如客户端发送的数据为1KB , 应该分配多少内存去读呢 例如 客户端发送的数据为1KB , 若每次都分配8KB的内存去读取数据则会导致内存大量浪费若分配16B的内存去读取数据那么需要64次才能全部读完 对性能的有很大的影响 那么对于 这个问题Netty是如何解决的呢 NioEventLoop线程在处理OP_READ事件进入NioByteUnsafe循环读取数据时使用了两个类来处理内存的分配一个是ByteBufAllocator PooledByteBufAllocator为它的默认实现类 另一个是RecvByteBufAllocatorAdaptiveRecvByteBufAllocator是它的默认实现类在DefaultChannelConfig初始化时设置 PooledByteBufAllocator主要用来处理内存分配并最终委托PoolArena去完成AdaptiveRecvByteBufAllocator主要用来计算每次读循环时应该分配多少内存NioByteUnsafe之所有需要循环读取主要是因为分配的初始ByteBuf不一定能够容纳读取到的所有数据NioByteUnsafe循环读取的核心代码解读如下 public final void read() {// 获取pipeline通道配置Channel管道final ChannelConfig config config();// socketChannel已经关闭if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline pipeline();// 获取内存分配器默认为PooledByteBufAllocatorfinal ByteBufAllocator allocator config.getAllocator();// 获取RecvByteBufAllocator内部的计算器Handlefinal RecvByteBufAllocator.Handle allocHandle recvBufAllocHandle();// 清空上一次读取的字节数每次读取时均重新计算// 字节buf分配器 并计算字节buf分配器HandlerallocHandle.reset(config);ByteBuf byteBuf null;boolean close false;try {//当对端发送一个超大的数据包时TCP会拆包。// OP_READ事件只会触发一次Netty需要循环读默认最多读16次,因此ChannelRead()可能会触发多次拿到的是半包数据。// 如果16次没把数据读完没有关系下次select()还会继续处理。// 对于Selector的可读事件如果你没有读完数据它会一直返回。do {// 分配内存 ,allocator根据计算器Handle计算此次需要分配多少内存并从内存池中分配// 分配一个ByteBuf大小能容纳可读数据又不过于浪费空间。byteBuf allocHandle.allocate(allocator);// 读取通道接收缓冲区的数据 设置最后一次分配内存大小加上每次读取的字节数// doReadBytes(byteBuf):ByteBuf内部有ByteBuffer底层还是调用了SocketChannel.read(ByteBuffer)// allocHandle.lastBytesRead()根据读取到的实际字节数自适应调整下次分配的缓冲区大小。allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() 0) {// nothing was read. release the buffer.// 若没有数据可读则释放内存byteBuf.release();byteBuf null;close allocHandle.lastBytesRead() 0;if (close) {// There is nothing left to read as we received an EOF.// 当读到-1时 表示Channel 通道已经关闭// 没有必要再继续readPending false;}break;}// 更新读取消息计数器, 递增已经读取的消息数量allocHandle.incMessagesRead(1);readPending false;// 通知通道处理读取数据触发Channel管道的fireChannelRead事件pipeline.fireChannelRead(byteBuf);byteBuf null;} while (allocHandle.continueReading());// 读取操作完毕 读结束后调用记录此次实际读取到的数据大小并预测下一次内存分配大小allocHandle.readComplete();// 触发Channel管道的fireChannelReadComplete事件pipeline.fireChannelReadComplete();if (close) {// 如果Socket通道关闭则关闭读操作closeOnRead(pipeline);}} catch (Throwable t) {// 处理读取异常handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending !config.isAutoRead()) {// 若操作完毕且没有配置自动读// 则从选择Key兴趣集中移除读操作事件removeReadOp();}}} } 每一次创建byteBuf分配内存大小是多大呢 这个由allocate()方法内部的guess()方法来决定 。 public ByteBuf allocate(ByteBufAllocator alloc) {return alloc.ioBuffer(guess()); }如果是第一次 调用guess()方法默认分配1024B的内存空间 后面分配内存大小动态调节 。 // 实现doReadBytes()方法从SocketChannel中读取数据。 protected int doReadBytes(ByteBuf byteBuf) throws Exception {// 获取计算内存分配器Handlefinal RecvByteBufAllocator.Handle allocHandle unsafe().recvBufAllocHandle();// 设置尝试读取字节数组的buf的可写字节数allocHandle.attemptedBytesRead(byteBuf.writableBytes());// 从Channel中读取字节并写入到buf中返回读取的字节数return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }在这里我们需要明白byteBuf.writableBytes()这个方法writableBytes()方法的返回值为byteBuf中可写的字节数内部计算方法用byteBuf的容量- byteBuf的写索引得出而byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());这一行代码实际上就是将Channel中的数据写入到byteBuf中返回值为实际写入到ByteBuf中的字节数。   RecvByteBufAllocator的默认实现类AdaptiveRecvByteBufAllocator是实际的缓冲管理区这个类可以根据读取到的数据预测所需要的字节的多少从而自动增加或减少如果上一次读循环将缓冲区的写满了那么预测的字节数会变大如果连续两次循环都不能填满已经分配的缓冲区则预测字节数会变小。 public void lastBytesRead(int bytes) {// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.// This helps adjust more quickly when large amounts of data is pending and can avoid going back to// the selector to check for more data. Going back to the selector can add significant latency for large// data transfers.// 如果上 一次读循环将缓冲区填充满了那么预测的字节数会变大if (bytes attemptedBytesRead()) {// 如果此次读取将缓冲区填充满了增加一次记录的机会record(bytes);}super.lastBytesRead(bytes); }// 该方法的参数是一次读取操作中实际读取到的数据大小将其与nextReceiveBufferSize 进行比较如果实际字节数actualReadBytes大于等于该值则立即更新nextReceiveBufferSize // 其更新后的值与INDEX_INCREMENT有关。INDEX_INCREMENT为默认常量值为4。也就是说在扩容时会一次性增大多一些以保证下次有足够空间可以接收数据。而相对扩容的策略 // 缩容策略则实际保守些常量为INDEX_INCREMENT值为1同样也是进行对比 但不同的是若实际字节小于所用nextReceiveBufferSize并不会立马进行大小调整 // 而是先把 decreaseNow 设置为true如果下次仍然小于则才会减少nextReceiveBufferSize的大小 private void record(int actualReadBytes) {// 如果小了两个数量级则需要缩容if (actualReadBytes SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {if (decreaseNow) { // 若减少标识decreaseNow连续两次为true, 则说明下次读取字节数需要减少SIZE_TABLE下标减1index max(index - INDEX_DECREMENT, minIndex);nextReceiveBufferSize SIZE_TABLE[index];decreaseNow false;} else {decreaseNow true; // 第一次减少只做记录}} else if (actualReadBytes nextReceiveBufferSize) { // 实际读取的字节大小要大于或等于预测值index min(index INDEX_INCREMENT, maxIndex); // SIZE_TABLE 下标 4nextReceiveBufferSize SIZE_TABLE[index]; // 若当前缓存为512则变成 512 * 2 ^ 4decreaseNow false;} }public void lastBytesRead(int bytes) {// 设置最后读取的字节数lastBytesRead bytes;if (bytes 0) {// 总读取的字节数totalBytesRead bytes;} }上述过程中SIZE_TABLE是什么呢 请看AdaptiveRecvByteBufAllocator源码实现。 public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {static final int DEFAULT_MINIMUM 64; // 接收缓冲区的最小长度下限static final int DEFAULT_INITIAL 1024; // 接收缓冲区的最大长度上限static final int DEFAULT_MAXIMUM 65536; // 接收缓冲区最大长度上限// 在调整缓冲区大小时若是增加缓冲区容量那么增加的索引值。// 比如当前缓冲区的大小为SIZE_TABLE[20],若预测下次需要创建的缓冲区需要增加容量大小// 则新缓冲区的大小为SIZE_TABLE[20 INDEX_INCREMENT]即SIZE_TABLE[24]private static final int INDEX_INCREMENT 4; // 扩容增长量// 在调整缓冲区大小时若是减少缓冲区容量那么减少的索引值。// 比如当前缓冲区的大小为SIZE_TABLE[20],若预测下次需要创建的缓冲区需要减小容量大小// 则新缓冲区的大小为SIZE_TABLE[20 - INDEX_DECREMENT]即SIZE_TABLE[19]private static final int INDEX_DECREMENT 1; // 扩容减少量private static final int[] SIZE_TABLE;// 分配了一个int类型的数组并进行了数组的初始化处理 从实现来看该数组的长度是53前32位是16的倍数value值是从16开始的到512从33位开始值是前一位的// 两倍即从10242048 到最大值 1073741824 。static {ListInteger sizeTable new ArrayListInteger();for (int i 16; i 512; i 16) {sizeTable.add(i);}for (int i 512; i 0; i 1) {sizeTable.add(i);}SIZE_TABLE new int[sizeTable.size()];for (int i 0; i SIZE_TABLE.length; i) {SIZE_TABLE[i] sizeTable.get(i);}System.out.println();}/*** deprecated There is state for {link #maxMessagesPerRead()} which is typically based upon channel type.*/public static final AdaptiveRecvByteBufAllocator DEFAULT new AdaptiveRecvByteBufAllocator();// 入参是一个大小然后利用二分查找法对该数组进行size定位 目标是为了找出该size值在数组中的下标位置 主要是为了初始化maxIndex, maxIndex这两个参数private static int getSizeTableIndex(final int size) {for (int low 0, high SIZE_TABLE.length - 1; ; ) {if (high low) {return low;}if (high low) {return high;}int mid low high 1;int a SIZE_TABLE[mid];int b SIZE_TABLE[mid 1];if (size b) {low mid 1;} else if (size a) {high mid - 1;} else if (size a) {return mid;} else {return mid 1;}}}private final int minIndex;private final int maxIndex;private final int initial;/*** Creates a new predictor with the default parameters. With the default* parameters, the expected buffer size starts from {code 1024}, does not* go down below {code 64}, and does not go up above {code 65536}.*/public AdaptiveRecvByteBufAllocator() {this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);}/*** Creates a new predictor with the specified parameters.* param minimum the inclusive lower bound of the expected buffer size* param initial the initial buffer size when no feed back was received* param maximum the inclusive upper bound of the expected buffer size*/public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {checkPositive(minimum, minimum);if (initial minimum) {throw new IllegalArgumentException(initial: initial);}if (maximum initial) {throw new IllegalArgumentException(maximum: maximum);}int minIndex getSizeTableIndex(minimum);if (SIZE_TABLE[minIndex] minimum) {this.minIndex minIndex 1;} else {this.minIndex minIndex;}int maxIndex getSizeTableIndex(maximum);if (SIZE_TABLE[maxIndex] maximum) {this.maxIndex maxIndex - 1;} else {this.maxIndex maxIndex;}this.initial initial;}Overridepublic Handle newHandle() {return new HandleImpl(minIndex, maxIndex, initial);}Overridepublic AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {super.respectMaybeMoreData(respectMaybeMoreData);return this;}}SIZE_TABLE由上述加粗代码进行初始化 。 AdaptiveRecvByteBufAllocator内部维护了一个SIZE_TABLE数组记录了不同的内存的内存块大小按照分配需要寻找最合适的内存块SIZE_TABLE数组中的值为2^n,这样便于软硬件进行处理SIZE_TABLE数组的初始化与PoolArena中的normalizeCapacity的初始化类似当需要的内存很小时 增长的幅度不大 当需要的内存较大时 增长的幅度比较大因此在[16,512]区间每次增加16直到512而从512起每次翻一倍 直到int的最大值 。 那size的具体大小值是什么呢 SIZE_TABLE 数组的toString()打印如下 [16B, 32B, 48B, 64B, 80B, 96B, 112B, 128B, 144B, 160B, 176B, 192B, 208B, 224B, 240B, 256B, 272B, 288B, 304B, 320B, 336B, 352B, 368B, 384B, 400B, 416B, 432B, 448B, 464B, 480B, 496B, 512B, 1k, 2k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1M, 2M, 4M, 8M, 16M, 32M, 64M, 128M, 256M, 512M, 1G] 当对内部计算器Handle的具体实现类HandleImpl进行初始化时可根据AdaptiveRecvByteBufAllocator的getSizeTableIndex()二分查找方法获取SIZE_TABLE的下标index并保存通过SIZE_TABLE[index]获取下次需要分配的缓冲区大小nextReceiveBufferSize并记录缓冲区的最小容量属性对SIZE_TABLE中的下标为minIndex的值 最大容量属性对应的SIZE_TABLE中的下标为maxIndex的值及bool类型标识属性decreaseNow 这三个属性用于判断下一次创建缓冲区是否需要减少 。   NioByteUnsafe每次循环完成后会根据实际读取到的字节数和当前缓冲区的大小重新设置下次需要分配的缓冲区的大小。 具体代码如下 。 // 循环读取完后被调用 public void readComplete() {record(totalBytesRead()); }//返回已经读取的字节个数若‘totalBytesRead 0’则说明已经读取的字节数已经操作了’Integer.MAX_VALUE’则返回Integer.MAX_VALUE否则返回真实的已经读取的字节数。 protected final int totalBytesRead() {return totalBytesRead 0 ? Integer.MAX_VALUE : totalBytesRead; }可以模拟NioByteUnsafe的read()方法在每次循环开始时 一定要先重置totalMessages与totalByteRead清零,读取完成后 readComplete会计算并调整下次预计需要分配的缓冲区的大小 具体代码如下 public static void main(String[] args) throws Exception {AdaptiveRecvByteBufAllocator allocator new AdaptiveRecvByteBufAllocator();RecvByteBufAllocator.Handle handle allocator.newHandle();System.out.println(开始 I/O 读事件模拟);// 读取循环开始前先重置将读取的次数和字节数设置为0 将totalMessages与totalBytesRead设置为0handle.reset(null);System.out.println(String.format(第一次模拟读需要分配大小 %d, handle.guess()));handle.lastBytesRead(256);// 调整下次预测值handle.readComplete();// 在每次读取数据时都需要重置totalMessage 与totalBytesReadhandle.reset(null);System.out.println(String.format(第2次花枝招展读需要分配大小%d , handle.guess()));handle.lastBytesRead(256);handle.readComplete();System.out.println(连续2次读取的字节数小于默认分配的字节数 );handle.reset(null);System.out.println(String.format(第3次模拟读需要分配大小 %d, handle.guess()));handle.lastBytesRead(512);// 调整下次预测值预测值应该增加到512 * 2 ^ 4handle.readComplete();System.out.println(读取的字节数变大 );handle.reset(null);// 读循环中缓冲区的大小System.out.println(String.format(第4次模拟读需要分配的大小为:%d , handle.guess())); }结果输出 当然啦如果觉得自己已经很明白了可以看看下面这个例子。 public class Test2 {public static void main(String[] args) {AdaptiveRecvByteBufAllocator allocator new AdaptiveRecvByteBufAllocator();RecvByteBufAllocator.Handle handle allocator.newHandle();System.out.println(开始 I/O 读事件模拟);// 读取循环开始前先重置将读取的次数和字节数设置为0 将totalMessages与totalBytesRead设置为0handle.reset(null);System.out.println(String.format(第一次模拟读需要分配大小 %d, handle.guess()));handle.lastBytesRead(512);// 调整下次预测值handle.readComplete();// 在每次读取数据时都需要重置totalMessage 与totalBytesReadhandle.reset(null);System.out.println(String.format(第2次花枝招展读需要分配大小%d , handle.guess()));handle.lastBytesRead(512);handle.readComplete();System.out.println(连续2次读取的字节数小于默认分配的字节数 );handle.reset(null);System.out.println(String.format(第3次模拟读需要分配大小 %d, handle.guess()));} }最后一次结果输出为1024并没有缩容源码读到这里我相信对输出结果已经没有什么意外了。 接下来看一个例子。 Netty服务端代码 public class NettyServer {public static void main(String[] args) {// 创建两个线程组bossGroup 和workerGroup 含有的子线程NioEventLoop 的个数默认为CPU 核数的两倍// BossGroup只是处理连接请求真正的和客户端业务处理会交给workerGroup完成EventLoopGroup bossGroup new NioEventLoopGroup(1);EventLoopGroup workerGroup new NioEventLoopGroup();try {// 创建服务端的启动对象ServerBootstrap bootstrap new ServerBootstrap();// 使用链式编程来配置参数bootstrap.group(bossGroup, workerGroup)//设置两个线程组.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel 作为服务器的通道实现// 初始化服务器连接队列大小服务端处理客户端连接请求是顺序处理的所以同一时间只能处理一个客户端连接多个客户端同时来的时候// 服务端将不能处理的客户端连接请求放在队列中等待处理.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {//ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2056));// 对workerGroup 的SocketChannel设置处理器ch.pipeline().addLast(new NettyServerHandler());}});System.out.println(netty server start ....);// 绑定一个商品并且同步生成一个ChannelFuture异步对象通过isDone()等方法可以判断异步事件的执行情况// 启动服务器并绑定端口bind是异步操作sync方法是等待异步操作执行完毕ChannelFuture cf bootstrap.bind(9000).sync();// 给注册监听器监听我们关心的事件cf.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println(监听端口9000成功);} else {System.out.println(监听端口9000失败);}}});// 对通道关闭进行监听closeFuture是异步操作监听通道关闭// 通过sync方法同步等待通道关闭处理完毕这里会阻塞等待通道关闭完成cf.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }// 自定义Handler需要继承netty 规定好的某个HandlerAdapter(规范) public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 读取客户端发送的数据** param ctx 上下文对象含有通道channel 管道 pipeline* param msg 就是客户端发送的数据* throws Exception*/Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(服务器读取的线程 Thread.currentThread().getName());ByteBuf buf (ByteBuf) msg;System.out.println(客户端发送的消息是 buf.toString(CharsetUtil.UTF_8));}/*** 数据读取完毕处理方法* param ctx* throws Exception*/Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println(channelReadComplete);ByteBuf buf Unpooled.copiedBuffer(Hello Client, CharsetUtil.UTF_8);ctx.writeAndFlush(buf);}// 处理异常一般需要关闭通道Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }Netty客户端代码 public class NettyClient {public static void main(String[] args) {// 客户端需要一个事件循环组EventLoopGroup group new NioEventLoopGroup();try {// 创建客户端启动对象// 注意客户端使用的不是ServerBootstrap 而是BootstrapBootstrap bootstrap new Bootstrap();// 设置相关的参数bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler());}});System.out.println(netty client start );// 启动客户端去连接服务器端ChannelFuture channelFuture bootstrap.connect(127.0.0.1,9000).sync();// 对关闭通道进行监听channelFuture.channel().closeFuture().sync();}catch (Exception e ){e.printStackTrace();}finally {group.shutdownGracefully();}} }public class NettyClientHandler extends ChannelInboundHandlerAdapter {// 当客户端连接服务器完成就会触发这个方法Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {StringBuffer sb new StringBuffer();for(int i 0 ;i 1023;i ){sb.append(a);}sb.append(中);sb.append(bbbb);ByteBuf buf Unpooled.copiedBuffer(sb.toString(), CharsetUtil.UTF_8);ctx.writeAndFlush(buf);}// 当通道在读取事件时会触发即服务端发送数据给客户端Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;System.out.println( 收到服务端的消息 buf.toString(CharsetUtil.UTF_8));System.out.println(服务端的地址 ctx.channel().remoteAddress());}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} } 在NettyClientHandler的channelActive()方法中先for循环写了1023个字节 然后写了一个中 字utf-8编码一个中文占3个字节再写了4个bbbb因此最终写入到ByteBuf中是1030个字节 。   我们之前也分析过第一次读取时ByteBuf默认容量为1024因此在NioSocketChannel的read()方法中while()循环中会循环两遍。 如下图所示 。   而刚好在ByteBuf的[1024,1025,1026]这三个字节中被中的中字占用因此ByteBuf取0~1023个字节时“中”字被截断了 。   最终在服务端代码中打印了两次ByteBuf字符串信息发现打印的信息中文乱码。 这个问题怎样解决呢 解决方案一 如果说 Netty 默认提供了一个可变的缓冲区大小分配方案那么我们可不可以改变这个策略呢从AdaptiveRecvByteBufAllocator开始向上找到根类型可以最终找到 RecvByteBufAllocator 接口上查看这个接口的子类应该会有其他缓冲区大小分配方案。 这里有一个固定的接收数组空间分配器现在只要想办法把默认的 AdaptiveRecvByteBufAllocator换成 FixedRecvByteBufAllocator 就可以解决问题了。 首先调用 config方法然后调用getRecvByteBufAllocator来创建这个allocHandle。既然有getRecvByteBufAllocator()方法那肯定有setRecvByteBufAllocator()方法。   因此只需要调用config()的setRecvByteBufAllocator()方法即可。    ByteBuf一次就打印完了并没有出现中文乱码。   对于这个问题 还有另外一种解决方案。 方案二 客户端代码修改 在内容前面添加内容的长度 。 在initChannel()方法中添加ch.pipeline().addLast(new NettyServerHandler2()) 一行代码。   的具体代码如下 public class NettyServerHandler2 extends ByteToMessageDecoder {private int alreadyReadLength ;private int sumByteLength ;private ByteBuf buf ;Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {System.out.println(NettyServerHandler2 invoke);if (sumByteLength 0) {sumByteLength in.readInt();buf Unpooled.copiedBuffer(, CharsetUtil.UTF_8);}int readableBytes in.readableBytes();alreadyReadLength readableBytes;byte[] data new byte[readableBytes];in.readBytes(data);buf.writeBytes(data);if (alreadyReadLength sumByteLength) {sumByteLength 0;byte[] outData new byte[buf.readableBytes()];buf.readBytes(outData);out.add(new String(outData,utf-8));buf.release();buf null;}} }写一个Handler继承ByteToMessageDecoder而在这个类的内部定义了三个属性alreadyReadLength记录已经读取的字节数 sumByteLength本次客户端发送过来的总字节数 buf 临时存储客户端传递过来的字节当alreadyReadLength和sumByteLength相等时则表示字节已经读取完全 。 此时可以将数据写回到out中。因此NettyServerHandler2的主要作用就是合并客户端传递过来的字节从而避免客户端数据还没有读取完就时行业务处理。 没有出现乱码问题了个人觉得第二种方案比第一种方案更加好因为在第一种方案中调用了ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2056)) 方法指定了RecvByteBufAllocator 为FixedRecvByteBufAllocator并且初始化ByteBuf的容量为2056如果此次用户发送的byte长度是1030这是已知的如果用户第一次请求的字节长度是3000是不是又要修改FixedRecvByteBufAllocator中的bufferSize的值为3000又要重启Netty服务器显然不适用于生产环境 。 而第二种方案基本上适用于所有的情况 当然啦第二种情况在NettyServerHandler2定义了三个局部变量alreadyReadLengthsumByteLengthbuf 那会不会存在并发问题呢 这个不得而知因为我自己对Netty也是在不断的学习中 具体的情况我会在下一篇博客去求证但这里也给我们提供了一种解决问题的思路希望给读者有借鉴意义 。 第三种解决方案 当然对于之前提到问题还有第三种解决方案我们利用LengthFieldBasedFrameDecoder来解决 。 在NettyServer中 在ChannelInitializer的initChannel方法中添加自定义handler NettyServerHandler2 这个类继承LengthFieldBasedFrameDecoder实现了decode()方法 。 在NettyServerHandler2的构造方法中传递了3个参数这3个参数的含义为 maxFrameLength : 发送的数据包最大长度 发送数据包的最大长度例如1024表示一个数据包最多可发送1024个字节lengthFieldOffset: 长度字段的偏移量 指的是长度字段位于数据包内部字节数组中的下标值engthFieldLength: 长度字段自己占用的字节数如果长度字段是一个int整数则为4如果长度字段是一个short整数则为2lengthAdjustment: 长度字段的偏移量矫正 这个参数最为难懂在传输协议比较复杂的情况下例如包含了长度字段协议版本号 魔数等 那么解码时就需要进行长度字段的矫正长度矫正值的计算公式为内容字段偏移量 - 长度字段偏移量 - 长度字段的字节数 写一个NettyServerHandler2继承LengthFieldBasedFrameDecoder类 public class NettyServerHandler2 extends LengthFieldBasedFrameDecoder {public NettyServerHandler2(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {super(maxFrameLength, lengthFieldOffset, lengthFieldLength);}Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {in (ByteBuf) super.decode(ctx,in);if(in null){return null;}if(in.readableBytes()4){throw new Exception(字节数不足);}//读取length字段int length in.readInt();if(in.readableBytes()!length){throw new Exception(标记的长度不符合实际长度);}//content内容byte []bytes new byte[length];in.readBytes(bytes);return new String(bytes,UTF-8);} }看输出结果   从结果输出来看也是一次性打印出所有数据并没有出现中文乱码。 这是第三种解决方案当然关于这一块的源码在下一篇博客 Netty 源码解析中 我会做详细的分析这里就不再赘述了。 到这里这篇博客就告一段落下一篇博客见。 本文对应github地址为 https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git
http://www.w-s-a.com/news/451038/

相关文章:

  • 网页与网站的区别是什么2023年8月份新冠
  • 唐山网站建设外包公司安卓手机怎么搭建网页
  • 国内做网站最大的公司计量检测网站平台建设方案
  • 重庆沛宣网站建设网页制作初学者
  • php网站漂浮广告代码网络营销跟网站推广有啥区别
  • wordpress调用图片优化型网站建设的基本要求
  • 郑州模板网站建设策划公司做网站怎么赚钱滑县电
  • 东昌府聊城网站优化秦皇岛市妇幼保健院
  • 做网站能赚钱吗网页升级访问通知天天更新
  • 做网站使用什么软件的免费招聘网
  • 宁波网站建设公司推荐哪家淄博网站制作公司服务
  • 做网站网页挣钱不免费主题wordpress
  • 如何提高你的网站的粘性手机网站整站模板下载
  • 学校网站建设制度网站相关推荐怎么做
  • 昌图网站wordpress 视频外链
  • 企业网站要怎么建设重庆住房城乡建设部网站
  • html5网站特点seo教程培训班
  • 深圳网站建设哪个最好网站 多语
  • 互联网工具型网站创意网络广告
  • 影视公司网站建设网页界面设计分辨率是多少dpi
  • 免费的做微博的网站模板wordpress 页面 首页
  • 摄影图片网站网站辅导运营与托管公司
  • 做课件的网站长春免费建站模板
  • 响应式网站模板下载免费wordpress 小工具移动
  • 网站标签title在线app制作平台
  • 做电器推广的网站简洁大方的网站模板
  • 网站开发的平台100个详情页设计图
  • wordpress淘宝客建站教程视频知名的设计公司网站
  • 批量做单页网站怎么直接用代码做网站
  • 百度收录较好的网站办公室装修设计方案