手机网站自适应布局怎么做,买卖网交易平台,wordpress 多媒体图片显示不了,wordpress云典目录 1.协议说明
2.类的实现
3.Decoder工作流程
4.源码解析
4.1 ByteToMessageDecoder#channelRead
4.2 累加器Cumulator 4.3 解码过程
4.4 Decoder实现举例
5. 如何开发自己的Decoder 1.协议说明
Netty框架是基于Java NIO框架#xff0c;性能彪悍#xff0c;支持的协…目录 1.协议说明
2.类的实现
3.Decoder工作流程
4.源码解析
4.1 ByteToMessageDecoder#channelRead
4.2 累加器Cumulator 4.3 解码过程
4.4 Decoder实现举例
5. 如何开发自己的Decoder 1.协议说明
Netty框架是基于Java NIO框架性能彪悍支持的协议丰富广受Java爱好者亲莱支持如下协议
TCP/UDPNetty提供了基于NIO的TCP和UDP编程框架可以用来构建高性能、高可用性的网络应用。HTTP/HTTPSNetty提供了HTTP/HTTPS编程框架可以用来开发Web服务器和客户端。WebSocketNetty提供了WebSocket编程框架可以用来实现双向通信应用程序如聊天室等。SPDY/HTTP2Netty提供了SPDY和HTTP2编程框架可以用来实现高效的Web应用程序。MQTT/CoAPNetty提供了MQTT和CoAP编程框架可以用来构建IoT应用程序。
我们在基于Netty框架开发过程中往往需要自定义私有协议如端到端的通信协议端到平台数据通信协议我们需要根据业务的特点自定义数据报文格式举例如下
数据报文格式定义TCP 帧头版本命令标识符序列号设备编码帧长正文校验码1byte1byte1byte2byte4byte 4byteN个byte2byte
假如我们定义了上述私有协议的TCP报文通过netty框架发送和解析
发送端某类通信设备client
接收端Java应用服务Server
本节我主要分析一下server端解析报文的一个过程client当然也很重要尤其在建立TCP连接和关闭连接需要严格控制否则服务端会发现大量的CLOSE_WAIT被动关闭连接,甚至大量TIME_WAIT主动关闭连接关于这个处理之前的文章有讲解。
本节Server端是基于Netty版本netty-all-4.1.30.Final
本节源码分析需求就是要解析一个自定义TCP协议的数据报文进行解码关于编码解码熟悉网络编程的同学都明白不清楚的可以稍微查阅一下资料有助于学习为什么要解码以及如何解码。本节不会对具体报文的解析做具体讲解只对Netty提供的解码器基类ByteToMessageDecoder做一下源码分解以及如何使用ByteToMessageDecoder开发属于自己的Decoder接下来我们看看ByteToMessageDecoder的定义。
#继承ChannelInboundHandlerAdapter
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
}
2.类的实现
解码器的ByteToMessageDecoder 该类继承了ChannelInboundHandlerAdapter ChannelInboundHandlerAdapter继承ChannelHandlerAdapter
ChannelInboundHandlerAdapter实现ChannelInboundHandler接口也就是说ChannelInboundHandler定义了解码器需要处理的工作方法
ChannelInboundHandlerAdapter是一个适配器模式负责Decoder的扩展。它的实现有很多简单列举一下 HeartBeatHandler MessageToMessageDecoder SimpleChannelInboundHandler抽象了方法channelRead0 ByteToMessageDecoder 。。。。。。
以上都是比较常用的Decoder或Handler基于这些基类还定义了很多handler有兴趣的同学可以跟代码查阅。
3.Decoder工作流程
每当数据到达Server端时SocketServer通过Reactor模型分配具体的worker线程进行处理数据处理数据就需要我们的事先定义好的Decoder以及handler假如我们定义了以下两个对象
MyDecoder extends ByteToMessageDecoder{} 作为解码器MyHandler extends SimpleChannelInboundHandler{} 作为解码后的业务处理器
worker线程——〉MyDecoder#channelRead实际就是调用ByteToMessageDecoder#channelRead——〉Cumulator累加器处理——〉
解码器decode处理MyDecoder需要实现decode方法——〉Myhandler#channelRead0处理具体的数据msg
4.源码解析 4.1 ByteToMessageDecoder#channelRead Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//如果是设置在ServerBootstrap的childHandler那么msg的对象类型就是ByteBuf否则就执行elseif (msg instanceof ByteBuf) {//CodecOutputList对象可以查阅文档https://www.freesion.com/article/4800509769///这个out对象随着callDecode方法进行传递解码后的数据保存在out中CodecOutputList out CodecOutputList.newInstance();try {ByteBuf data (ByteBuf) msg;//1.cumulation是累加器处理tcp半包与粘包问题first cumulation null;if (first) {//2.第一次收到数据累加器为nullcumulation data;} else {//3.第二次收到数据累加器需要评估ByteBuf的capacity够用则追加到cumulationcapacity不够则进行扩容cumulation cumulator.cumulate(ctx.alloc(), cumulation, data);}//4.调用callDecode进行解码//5.CodecOutputList out对象保存解码后的数据它的实现是基于AbstractList,//重新定义了add(),set(),remove()等方法其中add()方法实现对Array数组中//进行insert没有直接拷贝而是通过对象引用将对象指向数据索引的index是性能的一个提升。callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {//6.如果累加器cumulation中的数据被解码器读完了则可以完全释放累加器cumulationif (cumulation ! null !cumulation.isReadable()) {numReads 0;cumulation.release();cumulation null;} else if ( numReads discardAfterReads) {// We did enough reads already try to discard some bytes so we not risk to see a OOME.// See https://github.com/netty/netty/issues/4275//7.释放累加器cumulation里面的已读数据防止cumulation无限制增长numReads 0;discardSomeReadBytes();}int size out.size();decodeWasNull !out.insertSinceRecycled();//8.解码完成后需要触发事先定义好的handler的channelRead()方法处理解码后的out数据fireChannelRead(ctx, out, size);//9.最终需要回收out对象out.recycle();}} else {//10.非ByteBuf直接向后触发传递ctx.fireChannelRead(msg);}}4.2 累加器Cumulator
累加器的作用是解决tcp数据包中出现半包和粘包问题。
半包接收到的byte字节不足一个完整的数据包
半包处理办法不足一个完整的数据包先放入累加器不做解码等待续传的数据包
粘包接收到的byte字节数据包中包括其他数据包的数据靠数据包协议中定义的帧头帧尾标识来识别多于1个以上的帧头或帧尾数据包为粘包数据
粘包处理办法按照数据包帧结构定义去解析需要结合累加器解析完一个数据包交给handler去处理剩下的不足一个数据包长度的字节保存在累加器等待续传的数据包收到之后继续解码。
ByteToMessageDecoder内部定义了Cumulator接口 /*** Cumulate {link ByteBuf}s.*/public interface Cumulator {/*** Cumulate the given {link ByteBuf}s and return the {link ByteBuf} that holds the cumulated bytes.* The implementation is responsible to correctly handle the life-cycle of the given {link ByteBuf}s and so* call {link ByteBuf#release()} if a {link ByteBuf} is fully consumed.*/ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);}
其中在类最开始的时候构建了两个对象分别是MERGE_CUMULATORCOMPOSITE_CUMULATOR代码如下 /*** Cumulate {link ByteBuf}s by merge them into one {link ByteBuf}s, using memory copies.*/public static final Cumulator MERGE_CUMULATOR new Cumulator() {Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {try {final ByteBuf buffer;//1.如果累加器ByteBuf 剩余可写的capacity不满足当前需要写入的ByteBuf(in)长度则进行扩容累加器ByteBuf容量执行expandCumulation方法if (cumulation.writerIndex() cumulation.maxCapacity() - in.readableBytes()|| cumulation.refCnt() 1 || cumulation.isReadOnly()) {buffer expandCumulation(alloc, cumulation, in.readableBytes());} else {buffer cumulation;}//2.写入累加器并返回更新后的cumulationbuffer.writeBytes(in);return buffer;} finally {// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw// for whatever release (for example because of OutOfMemoryError)//3.由于是对in的拷贝所以需要releasein.release();}}};//通过对CompositeByteBuf的累加器的实现CompositeByteBuf内部使用ComponentList//实现对ByteBuf进行追加//ComponentList是ArrayList的实现所以每次Add操作都是一次内存拷贝。public static final Cumulator COMPOSITE_CUMULATOR new Cumulator() {Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;try {if (cumulation.refCnt() 1) {buffer expandCumulation(alloc, cumulation, in.readableBytes());buffer.writeBytes(in);} else {CompositeByteBuf composite;if (cumulation instanceof CompositeByteBuf) {composite (CompositeByteBuf) cumulation;} else {composite alloc.compositeBuffer(Integer.MAX_VALUE);composite.addComponent(true, cumulation);}composite.addComponent(true, in);in null;buffer composite;}return buffer;} finally {if (in ! null) {//因为是对ByteBuf in的拷贝所以需要释放in.release();}}}};4.3 解码过程
在ByteToMessageDecoder#channelRead看到了将累加器交给callDecoder方法
//这里ByteBuf in 就是累加器对象cumulaction
// ListObject out解码后的对象
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) {try {//1. 循环读取累加器对象的bytewhile (in.isReadable()) {int outSize out.size();//2.如果解码后out对象中产生数据则触发后边的handlerMyHandler处理数据if (outSize 0) {fireChannelRead(ctx, out, outSize);out.clear();if (ctx.isRemoved()) {break;}outSize 0;}//3.继续解析累加器传递过来的byteint oldInputLength in.readableBytes();//4.注意out对象是从channelRead()方法传递过来继续传递下去decodeRemovalReentryProtection(ctx, in, out);if (ctx.isRemoved()) {break;}//4.如果这次解码没有获得任何消息if (outSize out.size()) {//5.如果解码器decode没有消费累加器 in 任何字节结束循环 if (oldInputLength in.readableBytes()) {break;//6.否则继续循环调用解码器decode} else {continue;}}//7.如果累加器ByteBuf in中可读字节数依然没有变化说明实现的解码器decode()方法有问题需要检查自身代码问题if (oldInputLength in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) .decode() did not read anything but decoded a message.);}//8.是否设定每次调用解码器一次如果是则结束本次解码if (isSingleDecode()) {break;} }} catch (DecoderException e) {} catch (Exception cause) {}}继续查看ByteToMessageDecoder#decodeRemovalReentryProtection方法 //1.此方法不允许重写final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, ListObject out)throws Exception {decodeState STATE_CALLING_CHILD_DECODE;try {//2.核心方法decode这是一个抽象方法没有实现需要在自定义的DecoderMydecoder进行实现//3.自定义Decoder需要将解码后的数据放入到out对象中decode(ctx, in, out);} finally {boolean removePending decodeState STATE_HANDLER_REMOVED_PENDING;decodeState STATE_INIT;if (removePending) {handlerRemoved(ctx);}}}//解码decode方法需要子类自定义的实现类去实现该方法最终将解码后的数据放入ListObject outprotected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception;
4.4 Decoder实现举例
基于ByteToMessageDecoder的实现很多简单列举一下
JsonObjectDecoderRedisDecoderXmlDecoderMqttDecoderReplayingDecoderSslDecoderDelimiterBasedFrameDecoderFixedLengthFrameDecoderLengthFieldBasedFrameDecoder ....
我们拿JsonObjectDecoder举例如下 Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {// 省略代码。。。。。。int idx this.idx;int wrtIdx in.writerIndex();//省略代码。。。。。。。for (/* use current idx */; idx wrtIdx; idx) {byte c in.getByte(idx);if (state ST_DECODING_NORMAL) {decodeByte(c, in, idx);if (openBraces 0) {ByteBuf json extractObject(ctx, in, in.readerIndex(), idx 1 - in.readerIndex());//1.解析后的对象加入out中if (json ! null) {out.add(json);}in.readerIndex(idx 1);reset();}} else if (state ST_DECODING_ARRAY_STREAM) {//2.自身实现解析json格式的方法decodeByte(c, in, idx);if (!insideString (openBraces 1 c , || openBraces 0 c ])) {for (int i in.readerIndex(); Character.isWhitespace(in.getByte(i)); i) {in.skipBytes(1);}// skip trailing spaces.int idxNoSpaces idx - 1;while (idxNoSpaces in.readerIndex() Character.isWhitespace(in.getByte(idxNoSpaces))) {idxNoSpaces--;}ByteBuf json extractObject(ctx, in, in.readerIndex(), idxNoSpaces 1 - in.readerIndex());//3.解析后的对象加入out中if (json ! null) {out.add(json);}in.readerIndex(idx 1);if (c ]) {reset();}}} //省略代码。。。。。。}if (in.readableBytes() 0) {this.idx 0;} else {this.idx idx;}this.lastReaderIndex in.readerIndex();}
5. 如何开发自己的Decoder
读了ByteToMessageDecoder的部分源码以及它的实现JsonObjectDecoder那么如果我们自己实现一个Decoder该如何实现这里提供三个思路给大家有时间再补充代码。
基于ByteToMessageDecoder实现MyDecoder extends ByteToMessageDecoder{实现decode()方法}可参考RedisDecoder、XmlDecoder等实现。基于ChannelInboundHandlerAdapter实现这个时候需要自己负责解决TCP报文半包和粘包问题重写其中的channelRead()方法。直接使用已经实现ByteToMessageDecoder的解码器如FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。 注意事项 * Be aware that sub-classes of {link ByteToMessageDecoder} strongMUST NOT/strong * annotated with {link Sharable}. ByteToMessageDecoder的子类不能使用Sharable注解修饰因为解码器只能单独为一个Channel进行解码也就是说每个worker线程需要独立的Decoder。 * p * Some methods such as {link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer * is not released or added to the ttout/tt {link List}. Use derived buffers like {link ByteBuf#readSlice(int)} * to avoid leaking memory. 如果基于ChannelInboundHandlerAdapter自己实现Decoder#channelRead()方法时注意内存泄露问题ByteBuf#readBytes(int)方法会产生一个新的ByteBuf,需要手动释放。 或者 基于ByteToMessageDecoder实现decode()方法时将解析后的对象放入out对象中上面源码分析中有提示 或者 使用派生的ByteBuf,如调用ByteBuf#readSlice(int)方法返回的ByteBuf与原有ByteBuf共享内存不会产生新的Reference count可以避免内存泄露。 Netty Project官网也有说明 Reference counted objects ByteBuf.duplicate(), ByteBuf.slice() and ByteBuf.order(ByteOrder) create a derived buffer which shares the memory region of the parent buffer. A derived buffer does not have its own reference count, but shares the reference count of the parent buffer.