电气网站建设,我有项目想找投资人,在线制作生成器,网站推广妙招问题背景
问题描述
基于Flink-CDC #xff0c;Flink SQL的实时计算作业在运行一段时间后#xff0c;突然发现插入数据库的计算结果发生部分主键属性发生失败#xff0c;导致后续计算结果无法插入#xff0c;
超过失败次数失败的情况问题报错 Caused by: java.sql.BatchUp…问题背景
问题描述
基于Flink-CDC Flink SQL的实时计算作业在运行一段时间后突然发现插入数据库的计算结果发生部分主键属性发生失败导致后续计算结果无法插入
超过失败次数失败的情况问题报错 Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES (2024-03-11 00:00:0008, 2023, xxxx, xxxx学校, xxxx小学部, 203333300000, 57, 2024-03-21 09:31:08.4708) ON DUPLICATE KEY UPDATE school_yearVALUES(school_year), total_opfareVALUES(total_opfare), idsVALUES(ids), update_timeVALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column depart_name violates not-null constraint Call getNextException to see other errors in the batch.at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]... 1 moreCaused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column depart_name violates not-null constraintat com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332定位
定位思路
1.方向一怀疑数据库插入存在数据处理时造成数据处理出现空值的情况即数据本身不为空但是数据插入却出现了空
2.方向二Flink-SQL在消费kafka数据时存在了空值故加工的数据计算结果存在空值定位过程
因插入数据库定位比较麻烦且数据库已经设置该字段为主属性故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据定位方法一查询该字段为空的记录,待作业执行完成后未查询到空值对应记录 select select * from table_name where depart_name is null or depart_name or char_length(depart_name) 0;因考虑到使用Flink-CDC进行变更数据捕获故对应的update流存在-U,U,-D,I记录因此随着插入记录存在空值被记录进去的情况故采用view的方式先将宽表的加工、关联方式创建为view然后进行空值的过滤。实施如下
create view view_prd as
select a.* ,b.* from a join b on a.id b.idselect * from view_prd where depart_name is null or depart_name or char_length(depart_name) 0;通过查询结果发现存在最后一条记录存在空值的原因往源头定位发现该字段之前为空后面进行更新填充到值出现-U记录导致数据插入持续失败
原因
因为flink-SQL消费的数据时kafka topicflink以upsert-kafka形式的connector进行写入故存在changelog 流中数据更新存在-UU的记录按照Key进行区分唯一条记录value 为空(-U)的记录kafka也导致出现空值
解决
通过在DWS宽表创建一层View如上)在写入DWS宽表的kafka topic之前现将该字段空值过滤即可排除空值涉及记录被纳入结果指标计算的范围中