直播网站功能怎么做,企业it外包服务公司,企业网站小程序源码,用adsl做网站备案背景
netty框架中#xff0c;自定义解码器的起点是ByteBuf类型的消息, 自定义编码器的终点是ByteBuf类型。
1.解码器 业务解码器的起点是ByteBuf类型 netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelInboundHandlerAdap…背景
netty框架中自定义解码器的起点是ByteBuf类型的消息, 自定义编码器的终点是ByteBuf类型。
1.解码器 业务解码器的起点是ByteBuf类型 netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelInboundHandlerAdapterChannelInboundHandlerAdapter使用默认方式(不处理向下传递事件)实现了所有的Inbound接口。因此MessageToMessageEncoder只需要重写channelRead方法并在该方法中提取消息、转换消息、通过ChannelInvoker将转换后的消息以channelRead事件发向pipeline即可。 MessageToMessageEncoder抽象类的实现如下:
public abstract class MessageToMessageDecoderI extends ChannelInboundHandlerAdapter {private final TypeParameterMatcher matcher;protected MessageToMessageDecoder() {matcher TypeParameterMatcher.find(this, MessageToMessageDecoder.class, I);}protected MessageToMessageDecoder(Class? extends I inboundMessageType) {matcher TypeParameterMatcher.get(inboundMessageType);}public boolean acceptInboundMessage(Object msg) throws Exception {return matcher.match(msg);}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CodecOutputList out CodecOutputList.newInstance();try {if (acceptInboundMessage(msg)) {I cast (I) msg;try {decode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}} else {out.add(msg);}} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {int size out.size();for (int i 0; i size; i) {ctx.fireChannelRead(out.getUnsafe(i));}} finally {out.recycle();}}}protected abstract void decode(ChannelHandlerContext ctx, I msg, ListObject out) throws Exception;
}1.1 类型的匹配器
MessageToMessageDecoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher用于指定解码器可以处理的消息类型。可通过构造函数为其设置类型也可通过泛型指定:
// 使用泛型类型
protected MessageToMessageDecoder() {matcher TypeParameterMatcher.find(this, MessageToMessageDecoder.class, I);
}// 子类调用MessageToMessageDecoder构造器时传入类型
protected MessageToMessageDecoder(Class? extends I inboundMessageType) {matcher TypeParameterMatcher.get(inboundMessageType);
}一般通过泛型指定解码器处理的消息对象即使用MessageToMessageDecoder的无参构造函数。 acceptInboundMessage方法封装matcher的实现返回布尔值表示是否支持处理msg消息类型:
public boolean acceptInboundMessage(Object msg) throws Exception {return matcher.match(msg);
}根据matcher的match方法:
private static final class ReflectiveMatcher extends TypeParameterMatcher {private final Class? type;//...Overridepublic boolean match(Object msg) {// msg消息是否为type类型或者其子类型return type.isInstance(msg);}
}1.2 channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 构造List列表对象存储解码后的对象CodecOutputList out CodecOutputList.newInstance();try {// 判断是否支持处理消息if (acceptInboundMessage(msg)) {I cast (I) msg;try {// 处理消息将cast对象解码后的结果存放到out列表中decode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}} else {// 不处理消息以原样保存out.add(msg);}} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {int size out.size();// 遍历列表依次向pipeline触发解码后的对象for (int i 0; i size; i) {ctx.fireChannelRead(out.getUnsafe(i));}} finally {out.recycle();}}
}逻辑较为清晰 [1] 构造列表对象out用于临时存放解码后的消息 [2] 判断当前解码器是否可以处理该消息不可以处理直接添加到out中可以处理调用decode方法解码消息解码结果都添加到out中; [3] 遍历out列表将消息以ChannelRead事件传递给向pipeline [4] out清理、回收再利用;
1.3 decode方法
decode方法是实际进行消息转换的逻辑由子类根据业务具体实现:
protected abstract void decode(ChannelHandlerContext ctx, I msg, ListObject out) throws Exception;将msg解码解码后的对象存放在out中由于out是数组因此可以从msg中解码出一个对象也可以解码出多个。如下所示:
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, ListObject out) {out.add(msg.toString(charset));
}将ByteBuf类型的msg消息转为一个String类型的对象
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, ListObject out) {String[] decodedMsgs msg.toString(charset).split(;);for (String decodedMsg: decodedMsgs) {out.add(decodedMsg);}
}将ByteBuf转为String并按照;分隔符进行拆分每个字符串作为一个消息对象。
2.解码器案例
案例的结构图如下所示消息流入解码器和流出时的消息类型会发生变化: 引入三个解码器和一个业务Handler: [1] 编码器1实现ByteBuf-String类型的转换; [2] 编码器2实现String-Message1类型的转换; [3] 编码器3实现Message1-Message2类型的转换; [4] 业务Handler打印消息类型和消息; 实现类依次为:
// MyMessageDecoder1
public class MyMessageDecoder1 extends MessageToMessageDecoderByteBuf {Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, ListObject out) {out.add(msg.toString(Charset.defaultCharset()));}
}// MyMessageDecoder2
class MyMessageDecoder2 extends MessageToMessageDecoderString {Overrideprotected void decode(ChannelHandlerContext ctx, String msg, ListObject out) {String[] decodedMsgs msg.split(;);for (String decodedMsg : decodedMsgs) {out.add(new Message1(decodedMsg));}}
}// MyMessageDecoder3
class MyMessageDecoder3 extends MessageToMessageDecoderMessage1 {Overrideprotected void decode(ChannelHandlerContext ctx, Message1 msg, ListObject out) {out.add(new Message2(msg.getContent()));}
}业务Handler定义如下:
private static class MyHandler extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(msg);}
}Message1和Message2消息定义如下:
Data
RequiredArgsConstructor
pulic class Message1 {private final String content;
}Data
RequiredArgsConstructor
pulic class Message2 {private final String content;
}客户端发送消息:test1;test2;test3时
Microsoft Telnet send test1;test2;test3
发送字符串 test1;test2;test3
Microsoft Telnet服务器日志如下所示:
Message2(contenttest1)
Message2(contenttest2)
Message2(contenttest3)注意解码的顺序沿着pipeline进行因此需要注意调整netty解码器在pipeline中的位置。
如果将3和解码器2的顺序调整一下:
protected void initChannel(NioSocketChannel channel) {channel.pipeline().addLast(new MyMessageDecoder1());channel.pipeline().addLast(new MyMessageDecoder3());channel.pipeline().addLast(new MyMessageDecoder2());channel.pipeline().addLast(new MyHandler());
}重复上述操作服务器日志如下:
Message1(contenttest1)
Message1(contenttest2)
Message1(contenttest3)此时解码器1流出的数据为String类型流入解码器2时-类型校验不通过直接以流入的String类型流出流入解码器3时将String类型转为Message1类型流入业务Handler进行打印。
3.编码器 业务编码器的终点是ByteBuf类型 netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelOutboundHandlerAdapterChannelOutboundHandlerAdapter使用默认方式实现(不处理向前传递事件)了所有的Outbound接口。因此MessageToMessageEncoder只需要重写write方法并在该方法中编码消息、并通过ChannelInvoker将编码后的消息发送到pipeline即可。 MessageToMessageEncoder抽象类的实现如下:
public abstract class MessageToMessageEncoderI extends ChannelOutboundHandlerAdapter {private final TypeParameterMatcher matcher;protected MessageToMessageEncoder() {matcher TypeParameterMatcher.find(this, MessageToMessageEncoder.class, I);}protected MessageToMessageEncoder(Class? extends I outboundMessageType) {matcher TypeParameterMatcher.get(outboundMessageType);}public boolean acceptOutboundMessage(Object msg) throws Exception {return matcher.match(msg);}Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {CodecOutputList out null;try {if (acceptOutboundMessage(msg)) {out CodecOutputList.newInstance();SuppressWarnings(unchecked)I cast (I) msg;try {encode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}if (out.isEmpty()) {throw new EncoderException(StringUtil.simpleClassName(this) must produce at least one message.);}} else {ctx.write(msg, promise);}} catch (EncoderException e) {throw e;} catch (Throwable t) {throw new EncoderException(t);} finally {if (out ! null) {try {final int sizeMinusOne out.size() - 1;if (sizeMinusOne 0) {ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne 0) {if (promise ctx.voidPromise()) {writeVoidPromise(ctx, out);} else {writePromiseCombiner(ctx, out, promise);}}} finally {out.recycle();}}}}private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {final ChannelPromise voidPromise ctx.voidPromise();for (int i 0; i out.size(); i) {ctx.write(out.getUnsafe(i), voidPromise);}}private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {final PromiseCombiner combiner new PromiseCombiner(ctx.executor());for (int i 0; i out.size(); i) {combiner.add(ctx.write(out.getUnsafe(i)));}combiner.finish(promise);}protected abstract void encode(ChannelHandlerContext ctx, I msg, ListObject out) throws Exception;
}3.1 类型的匹配器
MessageToMessageEncoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher用于指定该编码器器可以处理的消息类型与解码器中的matcher作用完全相同不再赘述。
3.2 write方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {CodecOutputList out null;try {// 判断当前编码器是否可以编码消息if (acceptOutboundMessage(msg)) {out CodecOutputList.newInstance();SuppressWarnings(unchecked)I cast (I) msg;try {// 编码消息并将编码后的消息保存到out列表中encode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}if (out.isEmpty()) {throw new EncoderException(StringUtil.simpleClassName(this) must produce at least one message.);}} else {// 不能编码的消息不处理直接沿着pipeline向前传递ctx.write(msg, promise);}} catch (EncoderException e) {throw e;} catch (Throwable t) {throw new EncoderException(t);} finally {// 遍历out,依次调用ctx.write沿着pipeline向前传递if (out ! null) {try {final int sizeMinusOne out.size() - 1;if (sizeMinusOne 0) {ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne 0) {if (promise ctx.voidPromise()) {writeVoidPromise(ctx, out);} else {writePromiseCombiner(ctx, out, promise);}}} finally {// 清理out列表回收再利用out.recycle();}}}
}逻辑较为清晰 [1] 构造列表对象out用于临时存放编码后的消息 [2] 判断当前编码器是否可以处理该消息不可以处理直接通过ctx.write沿着pipeline向前传递可以处理调用encode方法编码消息编码结果添加到out中; [3] 遍历out列表将消息以write事件传递给向pipeline [4] out清理、回收再利用;
3.2 encode方法
encode方法是实际进行消息转换的逻辑由子类根据业务具体实现:
protected abstract void encode(ChannelHandlerContext ctx, I msg, ListObject out) throws Exception;将msg消息进行编码编码后的对象存放在out中由于out是数组因此可以从msg中编码出一个对象也可以编码出多个与解码器逻辑相同。
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, ListObject out) {out.add(msg.toString(charset));
}将ByteBuf类型的msg消息转为一个String类型的对象
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, ListObject out) {String[] decodedMsgs msg.toString(charset).split(;);for (String decodedMsg: decodedMsgs) {out.add(decodedMsg);}
}将ByteBuf转为String并按照;分隔符进行拆分每个字符串作为一个消息对象。 netty向外发送数据时一般经过业务Handler-编码器-HeadContext的流程。 向客户端发送消息的底层实现在HeadContext的unsafe对象(NioSocketChannel的unsafe对象)中而发送前有消息类型判断:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler{ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);}
}unsafe对象的write方法如下:
public final void write(Object msg, ChannelPromise promise) {//...msg filterOutboundMessage(msg);//...
}在真实写操作前通过filterOutboundMessage进行消息类型的判断:
Override
protected final Object filterOutboundMessage(Object msg) {// 要求消息必须时ByteBuf或者FileRegion类型或其子类型if (msg instanceof ByteBuf) {ByteBuf buf (ByteBuf) msg;if (buf.isDirect()) {return msg;}return newDirectBuffer(buf);}if (msg instanceof FileRegion) {return msg;}throw new UnsupportedOperationException(unsupported message type: StringUtil.simpleClassName(msg) EXPECTED_TYPES);
}由此编码器将消息传递给HeadContext前需要将消息最终编码为ByteBuf类型。
4.解码器案例
案例结构图如下所示:
在章节2中的案例基础上新增两个编码器并修改业务Handler: [1] 业务Handler接收客户端消息后响应相同消息; [2] 编码器1将Message2类型的消息转为String类型; [3] 编码器2: 将String类型消息转为ByteBuf类型; 代码实现如下: 修改业务Handler:
private static class MyHandler extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(msg);// 新增逻辑“将消息对象发送给客户端ctx.write(msg);}
}添加编码器:
// 将Message2消息转为String消息
public class MyEncoder1 extends MessageToMessageEncoderMessage2 {Overrideprotected void encode(ChannelHandlerContext ctx, Message2 msg, ListObject out) throws Exception {out.add(msg.getContent());}
}// 将String消息转为ByteBuf消息
public class MyEncoder2 extends MessageToMessageEncoderString {Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ListObject out) {out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), Charset.defaultCharset()));}
}在MyHandler前依次添加解码器MyEncoder2和MyEncoder1:
protected void initChannel(NioSocketChannel channel) {channel.pipeline().addLast(new MyMessageDecoder1());channel.pipeline().addLast(new MyMessageDecoder2());channel.pipeline().addLast(new MyMessageDecoder3());channel.pipeline().addLast(new MyEncoder2());channel.pipeline().addLast(new MyEncoder1());channel.pipeline().addLast(new MyHandler());
}可以使用Netty写一个客户端, 也可用客户端工具模拟这里为了方便使用SocketTool.exe控制台日志如下:
14:36:15 发送数据test1;test2;test3[1次]
14:36:15 收到数据test1test2test3注意客户端收到了test1test2test3消息在客户端开来是一个消息但在服务器看来是连续发送的3个消息消息内容分别为test1和test2和test3。这是TCP的流传输模式导致可在业务层添加额外处理解决这个问题。将在下一篇文件介绍Netty如何处理粘包和分包问题。