网站的 联系我们怎么做,北京建设网网站,兰州网站seo优化,做产品网站设计应该注意什么数据异常率预警在工业安全生产中是一项重要工作#xff0c;对于监控生产过程的稳定性#xff0c;保障生产数据的有效性#xff0c;维护生产设备的可靠性具有重要意义。随着大数据技术在生产领域的深入应用#xff0c;基于机器学习的智能预警已经成为各大生产企业进行生产数…数据异常率预警在工业安全生产中是一项重要工作对于监控生产过程的稳定性保障生产数据的有效性维护生产设备的可靠性具有重要意义。随着大数据技术在生产领域的深入应用基于机器学习的智能预警已经成为各大生产企业进行生产数据异常率预警的一种有效解决方案。1. 概要本教程基于一个典型的物联网企业生产场景利用 DolphinDB 内置的机器学习框架中的 KNNK-Nearest Neighbor最经典和最简单的有监督学习方法之一算法构建回归模型对实时数据异常率预警过程进行模拟。2. 机器学习在物联网中的应用场景2.1 工业物联网智能运维随着整个物联网系统数据规模的急剧膨胀以及服务类型的日趋多样化、复杂化基于人为指定规则的手工运维和在此基础上加入自动化脚本实现的自动化运维已无法满足大规模的运维需求。为解决人为指定规则带来的弊端随着智能时代的到来智能运维的概念逐渐浮出水面。与人为指定规则不同的是智能运维强调利用机器学习的算法从海量运维数据中总结规则打破了人为指定规则的局限性。简而言之智能运维在自动化运维的基础上加入了机器学习分析数据的环节监测系统采集运维日志数据机器学习根据日志数据做出分析并生成决策最后指挥自动化脚本去执行决策从而达到运维系统的整体目标。2.2 工业设备使用寿命预测工业设备的损耗和故障在长期运作过程中是难以避免的如果不能有效监控设备损耗情况的数据变化和应对可能发生的故障将造成生产中断、经济损失甚至危害公众安全造成人员伤亡。如果能对设备故障进行预警并提前制定应对方案将保障设备可靠性和安全性提高生产效率实现利益最大化。设备剩余使用寿命预测旨在根据设备的运行状态数据结合设备的退化机理利用人工智能技术对设备未来的故障发生时段进行预测。若能提前预测出设备的剩余使用寿命在设备故障发生前进行预警就能帮助维护人员及早制定出最优应对方案。2.3 工业生产异常数据预警异常数据预警在工业生产中是一个相当重要的环节已有越来越多的企业将其纳入生产安全保障过程中的一个必要环节。其对于监控设备的生产环境维护生产数据的价值性提高生产效率具有重要意义。异常数据预警具体实现通过采集历史数据利用自定算法或机器学习算法分析历史数据特征并预测未来数据最后根据特定的预警规则对未来数据进行预警判断。3.KNN3.1 KNN 介绍KNN ( K-NearestNeighbor ) 算法又叫 K 邻近算法是一种比较简单的机器学习算法。该算法是一种监督学习算法即可以用于分类任务也可以用于回归任务。监督学习算法算法的输入数据带有标签。分类任务算法的结果为一个决策面输出数据为离散数据。回归任务算法的结果为一条拟合线输出数据为连续数据。KNN 分类算法思路设置近邻个数 K如果一个样本在特征空间中的 K 个邻近样本中的大多数属于某一个类别则该样本也属于这个类别。KNN 回归算法思路设置近邻个数 K计算一个样本在特征空间中的 K 个近邻样本的平均值作为该样本的预测值。3.2 KNN 优势模型简单无需过多调参就可以得到一个不错的性能。精度高。对异常值不敏感。无数据输入假定。3.3 K 值3.3.1 评估指标对于回归任务这里选择一个常用的回归评估指标根均方误差Root Mean Square Error, RMSE。RMSE 指标可以评价数据的变化程度RMSE 越小说明模型的拟合实验数据能力越强。3.3.2 K 值的选择K 值选择过小容易造成模型过拟合K值选择过大容易造成模型欠拟合。RMSE 越小模型的拟合效果越好。本场景经过多次实验最终实验的 K 值选择为200。K 值平均 RMSE11.3451.16101.12501.181001.072001.003001.325001.5610001.624. 解决方案DolphinDB 是由浙江智臾科技有限公司研发的一款高性能分布式时序数据库集成了功能强大的编程语言和高容量高速度的流数据分析系统。针对用户对工业生产中异常数据预警的需求本文提供了一个完整的解决方案帮助用户掌握如何使用 DolphinDB 预警异常数据以便更好地保障工业生产安全。4.1 场景描述生产场景为一个大型风电厂的发电量监控场景风机监控人员需要实时监控发电量的平稳性当检测到风机发电量出现异常时监控人员需要远程实时调整风机参数以维持发电量的平稳。为提前感知发电异常及时调整风机参数风机工厂现决定利用影响风机发电的主要因素的监测数据对未来一段时间的风机发电量进行预测并将未来发电量与当前发电量进行异常比对。当异常率超过一定阈值时向风机监控人员远程报警。为了便于理解 DolphinDB 如何实现机器学习本章简化了复杂的真实场景只使用5个指标风速、湿度、气压、温度和设备使用寿命来进行指代。同时由于生产数据的敏感性进行了 DolphinDB 数据仿真操作使用模拟数据代替真实记录集。设备数目100台时间单位毫秒级 1 ms 0.001 s单台设备采集数据传输量1条/ms总采集数据传输量10万条/s数据形式列名列类型实例说明timeTIMESTAMP2023.01.08 21:21:56.744时间deviceCodeINT5设备号windDOUBLE25.0风速humidityDOUBLE55.0湿度air_pressureDOUBLE1.01325气压temperatureDOUBLE75.0温度lifeINT3设备使用寿命propertyValueINT65监测值4.2 架构方案由于真实生产场景的私密性和真实生产数据的敏感性以及为了方便理解除实验设置的数据生产场景有所简化外其余场景都将与真实生产场景保持一致。当生产任务开启后模型预测任务与预警计算任务将同步开启。运维人员可登录Grafana实时监控异常情况并根据预警结果实时调整风机设备参数。4.2.1 模型预测场景模型预测场景流程设计如下每10秒钟用前10秒采集到的100万条数据训练出一个 KNN 回归模型。用前10秒的100万条数据进行聚合计算将10秒产生的100万条数据聚合成1秒产生的10万条数据。用 KNN 模型 对聚合数据10万条进行预测预测未来第10s的数据10万条。将未来第10s的数据10万条与10s聚合的数据10万条输入预警模块进行预警计算。4.2.2 预警计算场景预警计算场景流程设计如下未来第10s的数据(10万条)与10s聚合的数据(10万条进行比对值差异大于20% 即 ABS(预测值-真实值)/真实值大于0.2 列为异常数据。统计异常率异常数据的比例异常数据数目/比对总数10w 。设定阈值0.215当异常率超过阈值0.215时进行报警。5.实时异常率预警5.1 处理流程5.2 实现细节5.2.1 实时数据模拟用户可以利用下方代码完成实时生产数据的模拟。可以通过调整变量 hour ,自定义生产多少小时数据。由于数据为模拟数据为了保证模型能够完成数据拟合这里提前设定监测值与 5 个影响因素的关系为线性关系以确保回归模型拟合的关系能够收敛。另外为了模拟模型生成的预测值与真实值存在一定差异方便进行异常比对因此在代码中对线性参数进行了随机化以降低模型的准确率。用户真实场景中可自行调整参数方差大小以控制模型准确率。简言之方差越小模型拟合越高模型精度越高。orgColNames timedeviceCodewindhumidityair_pressuretemperaturelifepropertyValue
orgColTypes [TIMESTAMP,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,INT]
enableTableShareAndPersistence(table streamTable(10000:0, orgColNames,orgColTypes), tableNamedataTable, cacheSize 6000000) def send_data(begintime,endtime,hour,devices_number,rate,mutable dest)
{btimetimestamp(begintime)do{seconds int(60*60*hour) //定义需要压入的批次每秒钟1批n devices_number * rate* 1 // 每秒钟生产10万条数据time sort(take(btime(0..(1000-1)),n)) //deviceCode take(1..devices_number,n)x1 randNormal(25,2,n) x2 randNormal(55,5,n)x3 randNormal(1.01325,0.00001,n)x4 randNormal(75,5,n)x5 int(randNormal(10,3,n))b1 randNormal(0.4,0.05,n) //方差0.05 降低模型准确率b2 randNormal(0.3,0.05,n)b3 randNormal(0.2,0.05,n)b4 randNormal(0.09,0.05,n)b5 randNormal(0.01,0.001,n)bias randNormal(5,1,n)propertyValue int(b1*x1*10 b2*x2*2 b3*x3*1000 b4*x4 b5*x5 bias)table_ps table(time,deviceCode,x1,x2,x3,x4,x5,propertyValue)dest.append!(table_ps)btimebtime1000etimetimestamp(now())timediffbtime-etimeif(timediff0){sleep(timediff)}}while(btimeendtime)
}
hour 0.5 //用户自定义压入多少小时的数据,1 为 1个小时
devices_number 100 //设备数目
rate 1000 //每台设备 每秒钟1000条数据
begintime datetime(now()) //开始时间
endtime begintime int(hour * 3600-1) //结束时间
submitJob(send_data_to_kafka,send_data_to_kafka,send_data,begintime,endtime,hour,devices_number,rate,dataTable) 5.2.2 聚合计算聚合计算通过 DolphinDB 内置的时间序列引擎 createTimeSeriesEngine 实现。订阅数据表 dataTable 中的数据将数据内容输入聚合引擎 tradesAggregator 完成预警计算最后将聚合计算结果存入聚合表 aggrTable 。enableTableShareAndPersistence(table streamTable(100000:0,orgColNames,orgColTypes), tableNameaggrTable, cacheSize 5000000) tradesAggregator createTimeSeriesEngine(namestreamAggr, windowSize10, step10, metrics[avg(wind),avg(humidity),avg(air_pressure),avg(temperature),avg(life),avg(propertyValue)], dummyTabledataTable, outputTableaggrTable, timeColumntime, useSystemTimefalse, keyColumndeviceCode, garbageSize1000000)subscribeTable(tableNamedataTable, actionNameAggregator, offset0, handlerappend!{tradesAggregator}, msgAsTabletrue)5.2.3 数据持久化订阅数据表 dataTable 中的内容将流表数据存入分布式数据库 dfs://Data 中。/** 将dataTable数据写入分布式表 数据建模1每小时记录数360,000,0002每条记录大小46字节3每小时空间占用压缩前15.42G4建议以“Id”值和“小时”进行组合分区每分区≈157.93M5分区索引为“时间戳”“设备号”*/def createConsumerDataBase(dbname,tbname,col_names,col_types){dbTime database(,VALUE,datehour(2023.01.01T00:00:00)..datehour(2023.01.01T23:00:00))Ids 1..100dbId database(,VALUE,Ids)db database(directorydbname, partitionTypeCOMPO, partitionScheme[dbTime,dbId],engineTSDB)factor_partition db.createPartitionedTable(tabletable(1:0,col_names,col_types),tableName tbname,partitionColumns [time,deviceCode],sortColumns [deviceCode,time],compressMethods{time:delta},keepDuplicatesLAST)
}
dataTable_dbname,dataTable_tbname dfs://Data,data
createConsumerDataBase(dataTable_dbname,dataTable_tbname,orgColNames,orgColTypes)
subscribeTable(tableNamedataTable, actionNameappend_data_into_dfs, offset0, handlerloadTable(dataTable_dbname,dataTable_tbname), msgAsTabletrue,batchSize100000, throttle1, reconnecttrue)订阅聚合表 aggrTable 中的内容将流表数据存入分布式数据库 dfs://Aggregate 中。/** 将聚合计算结果写入分布式表数据建模1每小时记录数36,000,0002每条记录大小46字节3每小时空间占用压缩前1.54G4建议以“id”和“天”进行值分区每分区≈ 379.03M5分区索引为“时间戳”“设备号”*/
def createAggregateDataBase(dbname,tbname,col_names,col_types){if(existsDatabase(dbname)){dropDatabase(dbname)}Ids 1..100dbId database(,VALUE,Ids)dbTime database(,VALUE,date(2023.01.01T00:00:00)..date(2023.12.31T20:00:00))db database(directorydbname, partitionTypeCOMPO, partitionScheme[dbTime,dbId],engineTSDB)factor_partition db.createPartitionedTable(tabletable(1:0,col_names,col_types),tableName tbname,partitionColumns [time,deviceCode],sortColumns [deviceCode,time],compressMethods{time:delta},keepDuplicatesLAST)
}
aggr_dbname,aggr_tbname dfs://Aggregate,data
createAggregateDataBase(aggr_dbname,aggr_tbname,orgColNames,orgColTypes)
subscribeTable(tableNameaggrTable, actionNameappend_Aggregator_into_dfs, offset0, handlerloadTable(aggr_dbname,aggr_tbname), msgAsTabletrue,batchSize100000, throttle1, reconnecttrue)5.2.4 模型预测与预警计算用户可以利用下方代码完成模型预测与预警计算操作。在进行模型预测与预警计算以前需提前建立预测表 predictTable 与预警表 warningTable 以存储预测数据与预警数据。由于模型训练需要用到10s数据因此设定10s为一个周期每10s 进行一次模型训练、模型预测以及预警计算。关于预警计算设定预测值与聚合值的值差异大于20%时即预测值大于真实值20%或预测值小于真实值20%判定为异常值当异常率大于0.215时进行预警。用户可根据需求自行调整。//新建预测表preColNames timedeviceCodewindhumidityair_pressuretemperaturepropertyValue_predictedpreColTypes orgColTypesenableTableShareAndPersistence(table streamTable(100000:0,preColNames,preColTypes), tableNamepredictTable, cacheSize 5000000)//新建预警表warnColNames timeabnormal_ratewhether_warningwarnColTypes [TIMESTAMP,DOUBLE,INT]enableTableShareAndPersistence(table streamTable(100000:0,warnColNames,warnColTypes), tableNamewarningTable, cacheSize 5000000)/** 根据 已有周期数据对未来数据进行预测*/def predictAndwarning(devices_number,rate,interval_past,interval_future,warning_threshold,mutable whether_start,DataTable_dfs,AggrTable_dfs,mutable predictTable,mutable warningTable){do{if(whether_startfalse) {curtime select top 1 time from DataTable_dfs //第一次是从表中查询最开始的时间curtime curtime[time][0]curtime_aggr select top 1 time from AggrTable_dfs //每次尽量去与训练数据的时间段同步curtime_aggr curtime_aggr[time][0]whether_start true}curtime curtime interval_past*1000 //以后是直接往后推最开始的时间往后推 10s中的间隔时间table_origin select * from DataTable_dfs where timecurtime and time(curtime - interval_past*1000) //查询当前时间前 interval 秒的数据if(table_origin.size()interval_past*devices_number*rate) //如果查询数据小于正常查询到的数据数目{curtime select top 1 time from DataTable_dfs order by time desc //从表中查询最近的时间作为开始时间curtime curtime[time][0]table_origin select * from DataTable_dfs where timecurtime and time(curtime - interval_past*1000) //查询当前时间的前 interval 秒的数据}//训练模型factors sql(sqlCol([wind,humidity,air_pressure,temperature]), table_origin).eval()labels table_origin[propertyValue]model knn(labels,factors,regressor, 200);//模型预测curtime_aggr curtime_aggr interval_past*1000table_aggr select * from AggrTable_dfs where timecurtime_aggr and time(curtime_aggr - interval_past*1000) order by deviceCode //查询当前时间前 interval 秒的数据if(table_aggr.size()interval_past*devices_number*rate/10) //如果查询数据小于正常查询到的数据数目{curtime_aggr select top 1 time from AggrTable_dfs order by time desc //从表中查询最近的时间作为开始时间curtime_aggr curtime_aggr[time][0]table_aggr select * from AggrTable_dfs where timecurtime_aggr and time(curtime_aggr - interval_past*1000) //查询当前时间的前 interval 秒的数据}pre_labels sql(sqlCol([wind,humidity,air_pressure,temperature]), table_aggr).eval()//pre_values predict(model,pre_labels)//暂时用随机值代替///table_number table_aggr.size()x1 randNormal(25,2,table_number) //训练数据x2 randNormal(55,5,table_number)x3 randNormal(1.01325,0.00001,table_number)x4 randNormal(75,5,table_number)x5 int(randNormal(10,3,table_number))b1 randNormal(0.4,0.05,table_number) b2 randNormal(0.3,0.05,table_number)b3 randNormal(0.2,0.05,table_number)b4 randNormal(0.09,0.05,table_number)b5 randNormal(0.01,0.001,table_number)bias randNormal(5,1,table_number)propertyValue int(b1*x1*10 b2*x2*2 b3*x3*1000 b4*x4 b5*x5 bias)pre_values propertyValue //暂时用随机值代替/time take(curtime_aggr interval_future*1000interval_future(0..(1000-1)),table_number) //deviceCode sort(take(1..devices_number,table_number))predicttempTable table(time,deviceCode,pre_labels,pre_values as propertyValue_predicted)predictTable.append!(predicttempTable) //预测结果导入流表//进行预警contrastTable select propertyValue,propertyValue_predicted from lj(table_aggr,predicttempTable,windhumidityair_pressuretemperature) //利用左连接abnormal_count exec count(*) from contrastTable where propertyValue_predicted0.8*propertyValue or propertyValue_predicted1.2*propertyValuewarning_time curtime_aggr //进行预警的时间abnormal_rate abnormal_count*1.0 / table_numberwhether_warning 0 //默认不进行预警if(abnormal_ratewarning_threshold) whether_warning 1 //当异常率超过阈值进行预警insert into warningTable values(warning_time,abnormal_rate, whether_warning);sleep(10000) //每10s进行一次预测}while(true)
}
devices_number 100 //设备数目
rate 1000 //每台设备 每秒钟1000条数据
interval_past 10 查询过去10秒的数据
interval_future 10 //预测未来第10秒的数据
warning_threshold 0.215 //当异常值率大于0.215时进行预警
whether_start false //标记
DataTable_dfs loadTable(dataTable_dbname,dataTable_tbname)
AggrTable_dfs loadTable(aggr_dbname,aggr_tbname)
submitJob(predictAndwarning,predictAndwarning,predictAndwarning,devices_number,rate,interval_past,interval_future,warning_threshold,whether_start,DataTable_dfs,AggrTable_dfs,predictTable,warningTable) //每10s 预测一次5.3 Grafana 实时预警监控Grafana 是由 Grafana Labs 公司开源的一个系统监测 ( System Monitoring ) 工具。DolphinDB 开发了 Grafana 数据源插件用户在 Grafana 面板上通过编写查询脚本、订阅流数据表的方式与 DolphinDB 进行交互实现 DolphinDB 时序数据的可视化。具体操作可参考DolphinDB Grafana DataSource Plugin 。Grafana 中的 Query 代码select * from warningTable点击 abnormal_rate查看各时间段的异常率。点击 whether_warning查看各时间段的预警情况。0不进行预警1进行预警。6. 总结本教程以大型风电厂的发电量监控需求为例利用 DolphinDB 内置机器学习框架中的 KNN 算法构建回归模型实现了风机发电量的实时预警从而帮助运维人员及时调整风机设备参数更好地保障生产安全。此教程同样可为物联网行业中有数据模拟、机器学习模型训练、实时预测、预警计算等需求的用户提供一份参考。7. 附录7.1 开发环境操作系统Linux version 3.10.0-1160.el7.x86_64CPU: Intel(R) Xeon(R) Gold 5220R CPU 2.20GHz 64核内存503G磁盘SSD 3.5T服务器端DolphinDB 2.00.7客户端VSCode 1.73 extension for DolphinDB v2.0.8027.2 开发脚本环境清理脚本//登录
login(admin,123456)
version()//取消后台任务
def cancelAllBatchJob(){
jobidsexec jobid from getRecentJobs() where endTimeNULL
cancelJob(jobids)
}
pnodeRun(cancelAllBatchJob)t getStreamingStat().pubTables
for(row in t){tableName row.tableNameif(row.actions[0][) actions split(substr(row.actions, 1, (strlen(row.actions)-2)), ,)else actions split(substr(row.actions, 0, strlen(row.actions)), ,)for(action in actions){unsubscribeTable(tableNametableName, actionNameaction)}
}//删除流表
try{dropStreamTable(dataTable)}catch(ex){}
try{dropStreamTable(aggrTable)}catch(ex){}
try{dropStreamTable(predictTable)}catch(ex){}
try{dropStreamTable(warningTable)}catch(ex){}//删除计算引擎
try{dropStreamEngine(streamAggr)}catch(ex){}//清理缓存
undef(all)
clearAllCache()//清空数据库环境初始化
if(existsDatabase(dfs://Data)){dropDatabase(dfs://Data)}
if(existsDatabase(dfs://Aggregate)){dropDatabase(dfs://Aggregate)}
if(existsDatabase(dfs://predict)){dropDatabase(dfs://predict)}
if(existsDatabase(dfs://warning)){dropDatabase(dfs://warning)}预测数据与预警数据持久化脚本用途将流表中的预测数据和预警数据持久化到分布式表//登录
login(admin,123456)
version()//取消后台任务
def cancelAllBatchJob(){
jobidsexec jobid from getRecentJobs() where endTimeNULL
cancelJob(jobids)
}
pnodeRun(cancelAllBatchJob)t getStreamingStat().pubTables
for(row in t){tableName row.tableNameif(row.actions[0][) actions split(substr(row.actions, 1, (strlen(row.actions)-2)), ,)else actions split(substr(row.actions, 0, strlen(row.actions)), ,)for(action in actions){unsubscribeTable(tableNametableName, actionNameaction)}
}//删除流表
try{dropStreamTable(dataTable)}catch(ex){}
try{dropStreamTable(aggrTable)}catch(ex){}
try{dropStreamTable(predictTable)}catch(ex){}
try{dropStreamTable(warningTable)}catch(ex){}//删除计算引擎
try{dropStreamEngine(streamAggr)}catch(ex){}//清理缓存
undef(all)
clearAllCache()//清空数据库环境初始化
if(existsDatabase(dfs://Data)){dropDatabase(dfs://Data)}
if(existsDatabase(dfs://Aggregate)){dropDatabase(dfs://Aggregate)}
if(existsDatabase(dfs://predict)){dropDatabase(dfs://predict)}
if(existsDatabase(dfs://warning)){dropDatabase(dfs://warning)}7.3 参考文献孙海丽, 龙翔, 韩兰胜, 黄炎, 李清波. 《工业物联网异常检测技术综述》. 通信学报, 2022(003):043.裴丹, 张圣林, 裴昶华. 《基于机器学习的智能运维》. 中国计算机学会通讯, 2017, 013(012):68-72.王加昌, 郑代威, 唐雷, 郑丹晨, 刘梦娟. 《基于机器学习的剩余使用寿命预测实证研究》. 计算机科学, 2022, 49(11A): 211100285-9.