可以做国外购物的网站有哪些,网上电子商城系统,最新中国企业500强名单,wordpress去谷歌插件在项目开发 有需求 需要跟硬件通信 也没有mqtt 作为桥接 也不能http 请求 api 所以也不能 json字符串这么爽传输
所以要用tcp 请求 进行数据交互 数据还是16进制的 写法 有帧头 什么的 对于这种物联网的这种对接 我的理解就是 我们做的工作就像翻译 把这些看不懂的 字节流 变成…在项目开发 有需求 需要跟硬件通信 也没有mqtt 作为桥接 也不能http 请求 api 所以也不能 json字符串这么爽传输
所以要用tcp 请求 进行数据交互 数据还是16进制的 写法 有帧头 什么的 对于这种物联网的这种对接 我的理解就是 我们做的工作就像翻译 把这些看不懂的 字节流 变成 java 认识的数据
就了解了一下 netty 这个依赖 在springboot 项目种使用这个依赖进行 连接
但是这个依赖 如果不熟悉 使用不当 有可能有内存溢出的风险 所以大家如果自己用这个netty 依赖 做物联网的调用 要小心 最好能找一下 物联网中间件 基于netty封装的
这是我后续 找的一个中间件 也不算很成熟 如果大家有更好用的 欢迎留言 一起分享
http://www.iteaj.com/#/course
但是我发现还是要根据 对接的物联网硬件文档 写自定义解码器 或者编码器 这篇文章主要分享一下 我了解到的 netty 下的 四个解码器 使用方案
概念 拆包和沾包 是典型的拆包和沾包问题俗话说就是两端通信一端发送一端接收接收的那一端怎么知道是否已经完整的接收了数据
假设服务端连续发送了两条消息hello world! / hello client!
由于客户端不知道怎么才算一条消息怎么才算两条消息所以读取会有以下几种情况
1.分两次读取消息第一次是hello world!第二次是hello client! 这是正常情况
2.一次就读取完成hello world!hello client! 这种情况就叫沾包
3.分两次读取消息第一次是hello 第二次是world!hello client! 这第一次读取就是拆包第二次就是沾包
总之就是读取到的信息不完整就是拆包读取到的信息有额外多的信息就是沾包
解码器就可以来解决这个问题
依赖引入就不说了 先分享代码
netty配置类
package com.netty.server.tpcServer;import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;/*** User:Json* Date: 2024/4/12**/
Component
Slf4j
public class NettyServer {private static final Logger logger LoggerFactory.getLogger(NettyServer.class);// 服务端NIO线程组private final EventLoopGroup bossGroup new NioEventLoopGroup();private final EventLoopGroup workGroup new NioEventLoopGroup();public ChannelFuture start(String host, int port) {ChannelFuture channelFuture null;try {ServerBootstrap bootstrap new ServerBootstrap();bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 1024 就是规定传入的 字节长度的大小// 对于字节 一个字符串占多少字节 int 占多多少字节 long 占多多少字节 如果明白这个 应该很好理解//第一种LineBasedFrameDecoder传入的参数是消息最大长度发送消息的大小必须小于设置值// 行分隔符解码器(结尾根据 “\n” 作为结束标识)//socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));//第二种DelimiterBasedFrameDecoder 自定义分割器解码器结尾根据什么作为结束标识可以自定义 比如用 |// socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(|.getBytes())));// 第三种 FixedLengthFrameDecoder // 固定长度解码器发送的消息需要定长// socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(7));// 上面三个解码器明显的看到不够灵活 所以还有第四种//第四种LengthFieldBasedFrameDecoder基于长度的自定义解码器比较灵活 这个// 这个 解码器 一共有 5个参数
// maxFrameLength最大帧长度。也就是可以接收的数据的最大长度。如果超过此次数据会被丢弃
// lengthFieldOffset长度域偏移量。存储数据长度的一个偏移量
// lengthFieldLength长度域字节数。存储数据长度的一个大小
// lengthAdjustment数据长度修正。因为长度既可以代表data的长度也可以是整个消息的长度
// initialBytesToStrip跳过的字节数。可以选择舍弃一部分数据// 可以理解为 我们给数据定义传输规则 就像http请求 定义规则 比如 请求头 数据结构 json 等等// 所以我们作为接收数据的服务端 可以在开发接收数据的时候 定义规则 然后让客户端按照规则来传输数据// 比如 发送一个消息我们定义的结构为消息头数据长度数据// 我们怎么定义规则呢 比如// 整体字节为 15个字节// 要发送的数据 Message 长度为 7个字节//我们规定 maxFrameLength 1024 接收的最大数据// lengthFieldOffset长度域偏移量 一般用来定义消息头 4个字节、// lengthFieldLength长度域字节数 用于定义数据长度length的【值】的大小 4个字节// lengthAdjustment : 0 数据长度修正// 假设我设置的数据长度是20代表了整个消息体的长度但是我数据却只有12个字节这往后读20个字节无疑是错的所以我们需要修正怎么修正 减8 就行// 所以如果你需要修正你的 数据长度那么lengthAdjustment就是用来修正的。// initialBytesToStrip 8 ,如果在我们业务层只需要 消息体 像消息头 和 数据长度都不需要 那么 initialBytesToStrip就是用来跳过的字节数。// 就比如 消息头规定 4个字节 数据长度 4个字节 所以我们如果只要 消息体里的内容 就需要跳过 8个字节 所以设置8 即可// 获取全部数据// socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,4,4,0,0));// 只获取消息体里的数据socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,4,4,0,8));// 自定义服务处理 所以再进行服务处理之前 需要先经历 解码器 既然有解码器 也有编码器// 如果我们作为服务端 那就需要编码器 如果我们作为客户端 那就需要解码器// 解码器主要用于 将接收到的字节数据转换成有意义的业务对象。比如java 认识的 实体类对象// 在网络编程中数据在传输过程中通常是以字节数组的形式进行传递的// 而业务处理通常需要对这些字节数组进行解析转换为具体的业务对象。解码器正是完成这一工作的组件// 而且解码器 还需要 处理拆包和沾包 等 所以我们在使用 tcp 进行通信的时候 到业务层处理 的时候 需要先经过解码器socketChannel.pipeline().addLast(new ServerHandler());}});// 绑定端口并同步等待channelFuture bootstrap.bind(host, port).sync();log.info(Start Up Success!);} catch (Exception e) {e.printStackTrace();}return channelFuture;}public void close() {workGroup.shutdownGracefully();bossGroup.shutdownGracefully();log.info(Shutdown Netty Server Success!);}//测试 mainpublic static void main(String[] args) throws Exception {EventLoopGroup group new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}});ChannelFuture channelFuture bootstrap.connect(192.168.0.209, 17070).sync();Channel channel channelFuture.channel();// 连续发送多条消息for (int i 0; i 100; i) {//System.getProperty(line.separator) 获取系统换行符 测试一种情况// String message Message i System.getProperty(line.separator);// String message Message i|; //测试第二种情况// String message Message; // 第三种 固定长度情况// channel.writeAndFlush(message);//第四种 自定义解码器//[4个字节 4个字节 数据]// 我们传输数据 就要这样拼接数据 消息头 数据长度数据String message Messagei;//消息头 4个字节ByteBuf byteBuf Unpooled.buffer();byteBuf.writeInt(102); //【lengthFieldOffset】 4个字节// 102 测试随便写的 只要是占4个字节的数字就行 为啥写个102就4个字节 因为102 是int类型 int类型的字节数是4个字节// 这个应该是 数据类型基础 int string long 等等 占几个字节的问题//数据长度 4个字节 长度也是 int型 所以跟消息头 一样 所以在设置解码器长度参数的时候 【lengthFieldLength】 4个字节就够用byteBuf.writeInt(message.getBytes().length);//发送的数据byteBuf.writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));channel.writeAndFlush(byteBuf);// Thread.sleep(100); // 可能需要调整间隔时间}channel.closeFuture().sync();} finally {group.shutdownGracefully();}}
}
回调类
package com.netty.server.tpcServer;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;import java.text.SimpleDateFormat;
import java.util.Date;/*** User:Json* Date: 2024/4/12**/
public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 客户端数据到来时触发*/Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;//第一种到第三种解码器测试
// System.out.println(client request: buf.toString(CharsetUtil.UTF_8));
// SimpleDateFormat sf new SimpleDateFormat(yyyy/MM/dd HH:mm:ss);
// String callback sf.format(new Date());
// ctx.write(Unpooled.copiedBuffer(callback.getBytes()));//第四种 自定义解码器测试System.out.println();// 获取全部数据
// System.out.println(消息头buf.readInt());
// int i buf.readInt();
// System.out.println(数据长度i);
// System.out.println(消息体buf.readBytes(i).toString(CharsetUtil.UTF_8));//只获取消息体里的数据System.out.println(只获取消息体: buf.toString(CharsetUtil.UTF_8));}Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 将发送缓冲区的消息全部写到SocketChannel中ctx.flush();}/*** 发生异常时触发*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getMessage());cause.printStackTrace();// 释放与ChannelHandlerContext相关联的资源ctx.close();}
}
springboot 启动时运行
package com.netty.server.init;import com.netty.server.tpcServer.NettyServer;
import io.netty.channel.ChannelFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** User:Json* Date: 2024/4/12**/
Component
Slf4j
Order(2)
public class NettyServerInit implements CommandLineRunner {Resourceprivate NettyServer nettyServer;// 这个 线程 后续优化 比如关闭 等等Overridepublic void run(String... args) throws Exception {// 开启服务ChannelFuture future nettyServer.start(192.168.0.209, 17070);// 在JVM销毁前关闭服务Runtime.getRuntime().addShutdownHook(new Thread() {Overridepublic void run() {nettyServer.close();}});future.channel().closeFuture().sync();}
}
核心测试代码 在 NettyServer 类里
下面逐个分享一下解码器的 意思 第一种
new LineBasedFrameDecoder(1024)1024 就是规定传入的 字节长度的大小 对于字节 一个字符串占多少字节 int 占多多少字节 long 占多多少字节 如果明白这个 应该很好理解 LineBasedFrameDecoder传入的参数是消息最大长度发送消息的大小必须小于设置值 行分隔符解码器(结尾根据 “\n” 作为结束标识)
第二种
new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(|.getBytes()))DelimiterBasedFrameDecoder 自定义分割器解码器结尾根据什么作为结束标识可以自定义 比如用 |
第三种
new FixedLengthFrameDecoder(7)// 固定长度解码器发送的消息需要定长
第四种重点
new LengthFieldBasedFrameDecoder(1024,4,4,0,0)第四种LengthFieldBasedFrameDecoder基于长度的自定义解码器比较灵活 这个 这个 解码器 一共有 5个参数 maxFrameLength最大帧长度。也就是可以接收的数据的最大长度。如果超过此次数据会被丢弃 lengthFieldOffset长度域偏移量。存储数据长度的一个偏移量 lengthFieldLength长度域字节数。存储数据长度的一个大小 lengthAdjustment数据长度修正。因为长度既可以代表data的长度也可以是整个消息的长度 initialBytesToStrip跳过的字节数。可以选择舍弃一部分数据 可以理解为 我们给数据定义传输规则 就像http请求 定义规则 比如 请求头 数据结构 json 等等 所以我们作为接收数据的服务端 可以在开发接收数据的时候 定义规则 然后让客户端按照规则来传输数据 比如 发送一个消息我们定义的结构为消息头数据长度数据
对于字节 一个字符串占多少字节 int 占多多少字节 long 占多多少字节 如果明白这个 应该很好理解
我们怎么定义规则呢 比如 整体字节为 15个字节 要发送的数据 “Message” 长度为 7个字节 我们规定 maxFrameLength 1024 接收的最大数据 lengthFieldOffset长度域偏移量 一般用来定义消息头 4个字节、 lengthFieldLength长度域字节数 用于定义数据长度length的【值】的大小 4个字节 lengthAdjustment : 0 数据长度修正 假设我设置的数据长度是20代表了整个消息体的长度但是我数据却只有12个字节这往后读20个字节无疑是错的所以我们需要修正怎么修正 减8 就行 所以如果你需要修正你的 数据长度那么lengthAdjustment就是用来修正的。 initialBytesToStrip 8 ,如果在我们业务层只需要 消息体 像消息头 和 数据长度都不需要 那么 initialBytesToStrip就是用来跳过的字节数。 就比如 消息头规定 4个字节 数据长度 4个字节 所以我们如果只要 消息体里的内容 就需要跳过 8个字节 所以设置8 即可 结合发送端和 接收数据端 一个例子 整体
接收数据端的 定义 socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,4,4,0,8));
发送端 //第四种 自定义解码器//[4个字节 4个字节 数据]// 我们传输数据 就要这样拼接数据 消息头 数据长度数据String message Messagei;//消息头 4个字节ByteBuf byteBuf Unpooled.buffer();byteBuf.writeInt(102); //【lengthFieldOffset】 4个字节// 102 测试随便写的 只要是占4个字节的数字就行 为啥写个102就4个字节 因为102 是int类型 int类型的字节数是4个字节// 这个应该是 数据类型基础 int string long 等等 占几个字节的问题//数据长度 4个字节 长度也是 int型 所以跟消息头 一样 所以在设置解码器长度参数的时候 【lengthFieldLength】 4个字节就够用byteBuf.writeInt(message.getBytes().length);//发送的数据byteBuf.writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));channel.writeAndFlush(byteBuf);测试结果 没有解码器的情况
有了之后