聊城优化网站建设,关于网站建设的论坛,做网站和做电脑软件差别大吗,企业建站有什么好处前言 首先很遗憾的告诉大家#xff0c;今天这篇分享要关注才可以看了。原因是穷啊#xff0c;现在基本都是要人民币玩家了#xff0c;就比如chatGPT、copilot#xff0c;这些AI虽然都是可以很好的辅助编码#xff0c;但是都是要钱。入驻CSDN有些年头了#xff0c;中间有几…前言 首先很遗憾的告诉大家今天这篇分享要关注才可以看了。原因是穷啊现在基本都是要人民币玩家了就比如chatGPT、copilot这些AI虽然都是可以很好的辅助编码但是都是要钱。入驻CSDN有些年头了中间有几年大学毕业失恋了没有写沉沦了几年。后面逐渐捡起来我们之间应该说是互相成就吧亦师亦友亦笔记。说实话其实CSDN之前有出一些插件我很欣慰也一直在用其实我一直希望CSDN能出个copilot采用AI辅助就好了。或者国内几大技术论坛能一起搞个也行其实大家都是有这方面的优势的至少代码、训练库是足够的。 言归正传今天要要分享的要是紧接之前的设计物联网设备流水入库TDengine改造方案这里是具体的实现过程。这个是TDengine可自动扩展列方案这个方案实现代码绝对是目前独家关注我你值得拥有。 一、整体思路 整体思路消费信息 》》 数据转换 》》组织sql 》》orm框架自动配备数据源》》执行入库TDengine》》异常处理扩展的核心》》DDL执行扩列》》再次执行入库。。。。 这里大家应该可以猜到具体做法了其实要不是因为这个列不固定实现起来可简单多了也可以用超级表而且性能也会好很多。更重要的是可以用ORM框架基本不用写啥sql。而且查询结果用实体接受数据不会出现VARCHAR字段不能正确显示字符串的问题我就是被这个坑了下。 其实也可以用flink等消费信息做入库处理当然这样处理可就不能用ORM框架了只能用经典的JDBC。 核心思路根据设备上报数据做插入数据转换sql执行入库处理异常根据异常做DDL操作实现自动扩列最后入库。上报的数据json串做数据转换数据值做反射获取类型转换为对应的扩列sql执行、组织入库sql。
二、实现流程图 我的整体环境SpringBoot3 mybatisPlus 双数据源mysql、TDengine 集成kafka 消费上游平台放入kafka的信息然后走以上流程目标执行入库TDengine。
三、核心代码
这里的整体框架我之前的博文有写并且是公开独家分享到csdn的gitCodehttps://gitcode.net/zwrlj527/data-trans.git
1.引入库
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency2.配置文件
spring:
#kafka配置kafka:#bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092#bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092bootstrap-servers: localhost:9092client-id: dc-device-flow-analyzeconsumer:group-id: dc-device-flow-analyze-consumer-groupmax-poll-records: 10#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 有三个选项 【latest, earliest, none】auto-offset-reset: earliest#是否开启自动提交enable-auto-commit: false#自动提交的时间间隔auto-commit-interval: 1000listener:ack-mode: MANUAL_IMMEDIATEconcurrency: 1 #推荐设置为topic的分区数type: BATCH #开启批量监听#消费topic配置
xiaotian:analyze:device:flow:topic:consumer: device-flow3.kafka消费监听
package com.xiaotian.datagenius.kafka;import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;/*** 消费者listener** author zhengwen**/
Slf4j
Component
public class KafkaListenConsumer {Autowiredprivate DataTransService dataTransService;/*** 设备流水listenner** param records 消费信息* param ack Ack机制*/KafkaListener(topics ${easylinkin.analyze.device.flow.topic.consumer})public void deviceFlowListen(ListConsumerRecord records, Acknowledgment ack) {log.debug(设备流水deviceFlowListen消费者接收信息);try {for (ConsumerRecord record : records) {log.debug(---开启线程解析设备流水数据:{}, record.toString());dataTransService.deviceFlowTransSave(record);}} catch (Exception e) {log.error(----设备流水数据消费者解析数据异常:{}, e.getMessage(), e);} finally {//手动提交偏移量ack.acknowledge();}}}
4.消息具体处理方法实现
package com.xiaotian.datagenius.service.impl;import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.xiaotian.datagenius.mapper.tdengine.DeviceFlowRecordMapper;
import com.xiaotian.datagenius.mapper.tdengine.TableOperateMapper;
import com.xiaotian.datagenius.service.DataTransService;
import com.xiaotian.datagenius.utils.TDengineDbUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.util.*;/*** author zhengwen*/
Slf4j
Service
public class DataTransServiceImpl implements DataTransService {/*** 专门记录业务错误日志*/private final static Logger logger LoggerFactory.getLogger(businessExp);Autowiredprivate KafkaTemplate kafkaTemplate;Autowiredprivate TableOperateMapper tableOperateMapper;Autowiredprivate DeviceFlowRecordMapper deviceFlowRecordMapper;Overridepublic void deviceFlowTransSave(ConsumerRecord record) {log.debug(----设备流水转换解析存储----);log.debug(String.format(offset %d, key %s, value %s%n \n, record.offset(), record.key(), record.value()));//字段不可控所以没有实体可言只能直接sql//先直接执行插入,try异常 - 如果是报字段不存在 - 执行校验字段 - dml创建字段//再执行插入String stableName device_flow_mater;String tableName device_flow_record;String recordStr record.value().toString();if (JSONUtil.isTypeJSON(recordStr)) {JSONObject recordJson JSONUtil.parseObj(recordStr);//初始化语句MapString, MapString, String columnData new HashMap();String insertSql initDataInsertSql(recordJson, tableName, columnData);//保存数据saveRecord(recordJson, insertSql, columnData, tableName);} else {logger.error(---设备上报数据推送信息格式异常无法解析---);}}/*** 初始化数据插入语句** param recordJson 记录json* param tableName 表名* param columnData 字段信息* return 数据插入语句*/private String initDataInsertSql(JSONObject recordJson, String tableName, MapString, MapString, String columnData) {//这里先转换成sql的字段、valueStringJoiner columnSj new StringJoiner(,);StringJoiner valueSj new StringJoiner(,);String insertSql transInitInsertSql(tableName, columnSj, valueSj, recordJson, columnData);if (StringUtils.isBlank(insertSql)) {logger.error(---上报数据转插入语句异常上报数据{}, JSONUtil.toJsonStr(recordJson));return null;}return insertSql;}/*** 保存记录** param recordJson 记录json对象* param insertSql 插入语句* param columnData 字段信息* param tableName 普通表或子表*/private void saveRecord(JSONObject recordJson, String insertSql, MapString, MapString, String columnData, String tableName) {try {//boolean insertRes SqlRunner.db(DeviceFlowMaterRecord.class).insert(insertSql, 1);int num deviceFlowRecordMapper.insert(insertSql);} catch (Exception e) {logger.error(Error inserting,{}, e.getMessage());Throwable throwable e.getCause();String msg throwable.getMessage();//报缺少字段、字段长度不够if (msg.contains(Invalid column name:) || msg.contains(Value too long for column/tag)) {transAddOrChangeColumnsSql(columnData, tableName, recordJson, insertSql);}}}/*** 转换扩展列** param columnData 上报数据字段信息map* param tableName 表名* param recordJson 上报数据json* param insertSql 插入语句*/private void transAddOrChangeColumnsSql(MapString, MapString, String columnData, String tableName, JSONObject recordJson, String insertSql) {String showColumnsSql desc tableName;ListMapString, Object columnLs tableOperateMapper.operateSql(showColumnsSql);if (CollectionUtil.isNotEmpty(columnLs)) {//StringBuffer sbf new StringBuffer();//sbf.append(ALTER TABLE ).append(tableName).append( ADD COLUMN );MapString, MapString, Object tableColumns new HashMap();columnLs.stream().forEach(c - {Object byBufferObj c.get(field);//获取字段String field TDengineDbUtil.getColumnInfoBy(byBufferObj);tableColumns.put(field, c);});columnData.entrySet().forEach(c - {String key c.getKey();MapString, String columnMp c.getValue();String length columnMp.get(length);if (tableColumns.containsKey(key)) {//包含字段比较数据类型长度MapString, Object tcMp tableColumns.get(key);Object byBufferObj tcMp.get(length);//获取字段长度String dbLength TDengineDbUtil.getColumnInfoBy(byBufferObj);if (dbLength ! null) {if (Integer.parseInt(length) Integer.parseInt(dbLength)) {String changeColumnSql TDengineDbUtil.getColumnChangeSql(tableName,length,key);tableOperateMapper.operateSql(changeColumnSql);}}} else {//不包含需要执行增加字段String addColumnSql TDengineDbUtil.getColumnAddSql(tableName,length,key);tableOperateMapper.operateSql(addColumnSql);}});//复调存储saveRecord(recordJson, insertSql, columnData, tableName);}}/*** 转换初始化插入语句sql** param tableName 表名* param columnSj 字段字符串* param valueSj 值字符串* param recordJson 上报数据json* param columnData 字段Map* return 插入语句sql*/private String transInitInsertSql(String tableName, StringJoiner columnSj, StringJoinervalueSj, JSONObject recordJson, MapString, MapString, String columnData) {StringBuffer sb new StringBuffer();//子表不能扩展列所以超级表思路走不通sb.append(insert into ).append(tableName);if (!JSONUtil.isNull(recordJson)) {JSONObject tmpRecordJson recordJson;JSONObject dataJson tmpRecordJson.getJSONObject(data);Date collectTime tmpRecordJson.getDate(collectTime);tmpRecordJson.remove(data);tmpRecordJson.entrySet().forEach(entry - {//TODO 这里要设置调整下数据库区分大小写后去掉//String key entry.getKey().toLowerCase();String key entry.getKey();columnSj.add( key );Object val entry.getValue();//TODO 校验字符串类型处理sqlint length 5;if (val ! null) {//TODO 几个时间字段传的是long是转时间类型还是改字段为字符串String valStr TDengineDbUtil.convertValByKey(val,key);valueSj.add(valStr);length valStr.length() 5;} else {valueSj.add(null).add(,);}//TODO 字段数据类型后面要优化处理MapString, String columnMp TDengineDbUtil.checkColumnType(key, val, length);columnData.put(key, columnMp);});if (!JSONUtil.isNull(dataJson)) {dataJson.entrySet().forEach(entry - {//TODO 这里要设置调整下数据库区分大小写后去掉String key entry.getKey();columnSj.add( key );Object val entry.getValue();int length 3;if (val ! null) {//TODO 几个时间字段传的是long是转时间类型还是改字段为字符串String valStr TDengineDbUtil.convertValByKey(val,key);valueSj.add(valStr);length valStr.length() 1;} else {valueSj.add(null).add(,);}//TODO 字段数据类型后面要优化处理MapString, String columnMp TDengineDbUtil.checkColumnType(key, val, length);columnData.put(key, columnMp);});}//Tags//sb.append( TAGS ().append(dataJson).append(,).append(deviceUnitCode).append(,).append(deviceCode).append() );//sb.append( TAGS ().append(JSONUtil.toJsonStr(dataJson)).append() );//ColumnscolumnSj.add(data_ts);sb.append(().append(columnSj.toString()).append() );//Values//valueSj.add( DateUtil.format(collectTime, DatePattern.NORM_DATETIME_MS_FORMAT) );//主键应该是时间不能是设备上报数据的时间因为设备上报数据万一相同就更新了valueSj.add(NOW);sb.append( VALUES ().append(valueSj.toString()).append());logger.debug(----插入语句sql, sb.toString());return sb.toString();}return null;}
}
5.工具类
package com.xiaotian.datagenius.utils;import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import io.micrometer.core.instrument.util.TimeUtils;
import lombok.extern.slf4j.Slf4j;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;/*** TDengine数据库工具类** author zhengwen*/
Slf4j
public class TDengineDbUtil {/*** orm框架执行ddl语句返回的字段是byte数组处理** param byBufferObj byte数组object对象* return*/public static String getColumnInfoBy(Object byBufferObj) {try {if (byBufferObj instanceof byte[]) {byte[] bytes (byte[]) byBufferObj;ByteArrayOutputStream bos new ByteArrayOutputStream();ObjectOutputStream oos new ObjectOutputStream(bos);oos.write(bytes);oos.flush();String strRead new String(bytes);oos.close();bos.close();return strRead;}} catch (IOException e) {log.error(----字段异常{}, e.getMessage());}return null;}/*** 校验字段类型返回字段信息** param key 字段* param val 值* param length 长度* return 字段信息*/public static MapString, String checkColumnType(String key, Object val, int length) {MapString, String columnMp new HashMap();columnMp.put(type, String);columnMp.put(length, String.valueOf(length));return columnMp;}/*** param tableName* param length* param key* return*/public static String getColumnAddSql(String tableName, String length, String key) {String beforeSql ALTER TABLE tableName ADD COLUMN ;//TODO 处理字段类型String addColumnSql beforeSql key NCHAR( Integer.parseInt(length) );return addColumnSql;}/*** param tableName* param length* param key* return*/public static String getColumnChangeSql(String tableName, String length, String key) {String changeLengthSql ALTER TABLE tableName MODIFY COLUMN ;//TODO 处理字段类型String changeColumnSql changeLengthSql key NCHAR( length );return changeColumnSql;}/*** 根据字段、字段值对插入sql的字段值做处理** param val 字段原始值* param key 字段* return 字段转换后的值*/public static String convertValByKey(Object val, String key) {//其他全部当字符串处理String valStr val.toString() ;//TODO 根据字段处理转换后的字段值这里暂时对几个时间字段做特殊处理if (key.equals(collectTime) || key.equals(createTime) || key.equals(storageTime)) {if (val instanceof Long){LocalDateTime localDateTime LocalDateTimeUtil.of(Long.parseLong(val.toString()));valStr DateUtil.format(localDateTime,DatePattern.NORM_DATETIME_MS_PATTERN) ;}if (val instanceof Integer){LocalDateTime localDateTime LocalDateTimeUtil.of(Long.parseLong(val.toString() 100));valStr DateUtil.format(localDateTime,DatePattern.NORM_DATETIME_MS_PATTERN) ;}}return valStr;}
}
6.Mapper的重要方法
TableOperateMapper
?xml version1.0 encodingUTF-8?
!DOCTYPE mapper PUBLIC -//mybatis.org//DTD Mapper 3.0//EN http://mybatis.org/dtd/mybatis-3-mapper.dtdmapper namespacecom.xiaotian.datagenius.mapper.tdengine.TableOperateMapperselect idoperateSql resultTypejava.util.Map${sql}/select
/mapper
DeviceFlowRecordMapper
?xml version1.0 encodingUTF-8?
!DOCTYPE mapper PUBLIC -//mybatis.org//DTD Mapper 3.0//EN http://mybatis.org/dtd/mybatis-3-mapper.dtd
mapper namespacecom.xiaotian.datagenius.mapper.tdengine.DeviceFlowRecordMapper!-- 通用查询映射结果 --resultMap idBaseResultMap typecom.xiaotian.datagenius.entity.DeviceFlowRecordid columndata_ts propertydataTs /result columndeviceUnitCode propertydeviceUnitCode /result columndeviceUnitName propertydeviceUnitName /result columndeviceCode propertydeviceCode /result columndeviceName propertydeviceName /result columndeviceTypeName propertydeviceTypeName /result columncollectTime propertycollectTime /result columncreateTime propertycreateTime /result columnstorageTime propertystorageTime /result columnprojectId propertyprojectId /result columncompanyId propertycompanyId /result columndata propertydata /result columnshowMessage propertyshowMessage //resultMapinsert idinsert${sql}/insertselect idselectPageMap resultTypejava.util.Map${sql}/selectselect idselectPageBy parameterTypecom.easylinkin.datagenius.vo.DeviceFlowRecordVo resultTypejava.util.Mapselect r.*from device_flow_record rwhere 1 1if testparam2 ! nullif testparam2.deviceCode ! null and param2.deviceCode ! and r.deviceCode #{param2.deviceCode}/ifif testparam2.startTime ! null![CDATA[ and r.collectTime #{param2.startTime} ]]/ifif testparam2.endTime ! null![CDATA[ and r.collectTime #{param2.endTime} ]]/if/iforder by r.data_ts desc/select
/mapper 核心点就以上这些位置了大家自行体会。 总结
TDengine还不错官方有交流群群里也有技术支持不过肯定不是每一个问题都有回复各方面都在支持它它的优化空间还很多我们用开源实际也是在帮忙测试。就从开年到现在我就整这玩意刚开始是3.0.2.3现在都迭代到3.0.2.5了ORM框架也在逐步支持但是官方支持明确跟我说了可能ORM框架会拖慢影响性能。里面坑还是很多的我就踩了乱码、返回字节码的问题 就写到这里希望能帮到大家有需要帮助的可以在CSDN发消息我。