建设网站具备的知识,网站登录人太多进不去怎么办,高明网站设计,公司内部网站页面设计大数据系列之#xff1a;Flink Doris Connector#xff0c;实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、… 大数据系列之Flink Doris Connector实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、使用FlinkSQL通过CDC访问并实现部分列更新的示例十、使用FlinkCDC访问多个表或整个数据库支持MySQL、Oracle、PostgreSQL、SQLServer十一、使用FlinkCDC更新Key列十二、使用Flink根据指定的列删除数据十三、最佳实践应用场景十四、常见问题解答 可以通过Flink操作读取、插入、修改、删除支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。
注意
修改和删除仅支持唯一键模型。当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法您需要自行实现。
一、版本兼容性 二、使用
Maven
添加 flink-doris-connector
!-- flink-doris-connector --
dependencygroupIdorg.apache.doris/groupIdartifactIdflink-doris-connector-1.16/artifactIdversion1.6.0/version
/dependency请根据不同的Flink版本替换相应的Connector和Flink依赖版本。也可以从这里下载相关版本的jar包。
flink-doris-connector下载地址
https://repo.maven.apache.org/maven2/org/apache/doris/
编译
编译时直接运行sh build.sh即可。编译成功后会在dist目录下生成目标jar包如flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如Flink 运行在 Local 模式则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下将此文件放入预部署包中。
三、Flink SQL
read
-- doris source
CREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH (connector doris,fenodes FE_IP:HTTP_PORT,table.identifier database.table,username root,password password
);write
--enable checkpoint
SET execution.checkpointing.interval 10s;-- doris sink
CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH (connector doris,fenodes FE_IP:HTTP_PORT,table.identifier db.table,username root,password password,sink.label-prefix doris_label
);-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source四、DataStream
read
DorisOptions.Builder builder DorisOptions.builder().setFenodes(FE_IP:HTTP_PORT).setTableIdentifier(db.table).setUsername(root).setPassword(password);DorisSourceList? dorisSource DorisSource.List?builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), doris source).print();write
DorisSink通过StreamLoad向Doris写入数据DataStream写入时支持不同的序列化方式
字符串数据流SimpleStringSerializer
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.BuilderString builder DorisSink.builder();
DorisOptions.Builder dorisBuilder DorisOptions.builder();
dorisBuilder.setFenodes(FE_IP:HTTP_PORT).setTableIdentifier(db.table).setUsername(root).setPassword(password);Properties properties new Properties();
// When the upstream is writing json, the configuration needs to be enabled.
//properties.setProperty(format, json);
//properties.setProperty(read_json_by_line, true);
DorisExecutionOptions.Builder executionBuilder DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(label-doris) //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); ;builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string source
ListTuple2String, Integer data new ArrayList();
data.add(new Tuple2(doris,1));
DataStreamSourceTuple2String, Integer source env. fromCollection(data);source.map((MapFunctionTuple2String, Integer, String) t - t.f0 \t t.f1).sinkTo(builder.build());//mock json string source
//env.fromElements({\name\:\zhangsan\,\age\:1}).sinkTo(builder.build());RowData数据流RowDataSerializer
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink option
DorisSink.BuilderRowData builder DorisSink.builder();
DorisOptions.Builder dorisBuilder DorisOptions.builder();
dorisBuilder.setFenodes(FE_IP:HTTP_PORT).setTableIdentifier(db.table).setUsername(root).setPassword(password);// json format to streamload
Properties properties new Properties();
properties.setProperty(format, json);
properties.setProperty(read_json_by_line, true);
DorisExecutionOptions.Builder executionBuilder DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(label-doris) //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); //streamload params//flink rowdatas schema
String[] fields {city, longitude, latitude, destroy_date};
DataType[] types {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType(json) //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata source
DataStreamRowData source env. fromElements().map(new MapFunctionString, RowData() {Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData new GenericRowData(4);genericRowData.setField(0, StringData.fromString(beijing));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);genericRowData.setField(3, LocalDate.now().toEpochDay());return genericRowData;}});source. sinkTo(builder. build());SchemaChange数据流JsonDebeziumSchemaSerializer
// enable checkpoint
env.enableCheckpointing(10000);Properties props new Properties();
props. setProperty(format, json);
props.setProperty(read_json_by_line, true);
DorisOptions dorisOptions DorisOptions. builder().setFenodes(127.0.0.1:8030).setTableIdentifier(test.t1).setUsername(root).setPassword().build();DorisExecutionOptions.Builder executionBuilder DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(label-prefix).setStreamLoadProp(props).setDeletable(true);DorisSink.BuilderString builder DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL Source).sinkTo(builder.build());五、Lookup Join
CREATE TABLE fact_table (id BIGINT,name STRING,city STRING,process_time as proctime()
) WITH (connector kafka,...
);create table dim_city(city STRING,level INT ,province STRING,country STRING
) WITH (connector doris,fenodes 127.0.0.1:8030,jdbc-url jdbc:mysql://127.0.0.1:9030,table.identifier dim.dim_city,username root,password
);SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city c.city这个命令是用于创建两个表和一个查询语句。第一个表是名为fact_table的表它有四个列分别为id、“name”、“city和process_time”。其中id列是BIGINT类型name和city列是STRING类型process_time列是一个基于当前系统时间的计算列它使用proctime()函数实现。第二个表是名为dim_city的表它有四个列分别为city、“level”、“province和country”。其中“city”、“province和country列是STRING类型“level列是INT类型。该表使用Doris作为存储引擎连接器为connector”并且需要指定连接器的其他参数如fenodes”、“jdbc-url”、“table.identifier”、username和password等。最后一个命令是一个查询语句它使用LEFT JOIN将fact_table和dim_city两个表进行连接并使用FOR SYSTEM_TIME AS OF来指定连接时的时间戳这里使用了process_time列的值。查询结果包括id、“name”、“city”、“province”、country和level这些列。
六、配置
通用配置项
fenodes
Doris FE http地址支持多个地址用逗号分隔
benodes
Doris BE http地址支持多个地址以逗号分隔。
jdbc-url
jdbc连接信息如jdbc:mysql://127.0.0.1:9030
table.identifier
Doris表名如db.tbl
auto-redirect
默认值true是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入不再显示BE信息。
doris.request.retries
默认值3向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms
默认值30000向 Doris 发送请求的连接超时
doris.request.read.timeout.ms
默认值30000读取向 Doris 发送请求的超时
源配置项
doris.request.query.timeout.s
默认值3600查询Doris的超时时间默认1小时-1表示无超时限制
doris.request.tablet.size
默认值Integer. MAX_VALUE一个Partition对应的Doris Tablet数量。该值设置得越小生成的Partition就越多。这提高了 Flink 端的并行度但同时也给 Doris 带来了更大的压力。
doris.batch.size
默认值1024一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。
doris.exec.mem.limit
默认值2147483648单个查询的内存限制。默认为2GB以字节为单位
doris.deserialize.arrow.async
默认值FALSE是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch
doris.deserialize.queue.size
默认值64Arrow格式的内部处理队列的异步转换当doris.deserialize.arrow.async为true时有效
doris.read.field
读取Doris表的列名列表以逗号分隔
doris.filter.query
过滤读取数据的表达式这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄18。
接收器配置项
sink.label-prefix
Stream加载导入使用的标签前缀。在2pc场景下需要全局唯一性来保证Flink的EOS语义。
sink.properties.*
导入流负载参数。 例如 ‘sink.properties.column_separator’ , ’ 定义列分隔符 ‘sink.properties.escape_delimiters’ ‘true’ 特殊字符作为分隔符 ‘\x01’ 将转换为二进制 0x01JSON格式导入 ‘sink.properties.format’ ‘json’ ‘sink.properties.按行读取 json’ ‘true’ 详细参数请参考这里。
sink.enable-delete
默认值TRUE是否启用删除。该选项需要Doris表开启批量删除功能Doris 0.15版本默认开启且仅支持Unique模型。
sink.enable-2pc
默认值TRUE是否启用两阶段提交2pc默认为true以保证Exactly-Once语义。
sink.buffer-size
默认值1MB写入数据缓存缓冲区的大小以字节为单位。不建议修改默认配置即可
sink.buffer-count
默认值3写入数据缓冲区的数量。不建议修改默认配置即可
sink.max-retries
默认值3Commit失败后最大重试次数默认3
sink.use-cache
默认值false发生异常时是否使用内存缓存进行恢复。启用后Checkpoint 期间的数据将保留在缓存中。
sink.enable.batch-mode
默认值false是否使用批处理模式写入Doris。使能后写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。 同时开启后Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。
sink.flush.queue-size
默认值2在批处理模式下缓存的列大小。
sink.buffer-flush.max-rows
默认值50000批处理模式下单批写入的最大数据行数。
sink.buffer-flush.max-bytes
默认值10MB在批处理模式下单批写入的最大字节数。
sink.buffer-flush.interval
默认值10s批处理模式下异步刷新缓存的时间间隔
sink.ignore.update-before
默认值true是否忽略update-before事件默认忽略。
查找Join配置项
lookup.cache.max-rows
默认值-1查找缓存的最大行数默认值为-1不启用缓存
lookup.cache.ttl
默认值10s查找缓存的最大时间默认10s
lookup.max-retries
默认值1查找查询失败后重试的次数
lookup.jdbc.async
默认值false是否启用异步查找默认为false
lookup.jdbc.read.batch.size
默认值128异步查找下每个查询的最大批量大小
lookup.jdbc.read.batch.queue-size
默认值256异步查找时中间缓冲队列的大小
lookup.jdbc.read.thread-size
默认值3每个任务中用于查找的jdbc线程数
七、Doris 和 Flink 列类型映射
Doris类型Flink类型NULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGSTRINGSTRINGDECIMALV2DECIMALARRAYARRAYMAPMAPJSONSTRINGVARIANTSTRINGIPV4STRINGIPV6STRING
从connector-1.6.1版本开始增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。
八、使用Flink CDC访问Doris的示例
SET execution.checkpointing.interval 10s;
CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED
) WITH (connector mysql-cdc,hostname 127.0.0.1,port 3306,username root,password password,database-name database,table-name table
);-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier database.table,username root,password ,sink.properties.format json,sink.properties.read_json_by_line true,sink.enable-delete true, -- Synchronize delete eventssink.label-prefix doris_label
);insert into doris_sink select id,name from cdc_mysql_source;九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
-- enable checkpoint
SET execution.checkpointing.interval 10s;CREATE TABLE cdc_mysql_source (id int,name STRING,bank STRING,age int,PRIMARY KEY (id) NOT ENFORCED
) WITH (connector mysql-cdc,hostname 127.0.0.1,port 3306,username root,password password,database-name database,table-name table
);CREATE TABLE doris_sink (id INT,name STRING,bank STRING,age int
)
WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier database.table,username root,password ,sink.properties.format json,sink.properties.read_json_by_line true,sink.properties.columns id,name,bank,age,sink.properties.partial_columns true --Enable partial column updates
);insert into doris_sink select id,name,bank,age from cdc_mysql_source;
十、使用FlinkCDC访问多个表或整个数据库支持MySQL、Oracle、PostgreSQL、SQLServer
MySQL同步示例
FLINK_HOMEbin/flink run \-Dexecution.checkpointing.interval10s\-Dparallelism.default1\-c org.apache.doris.flink.tools.cdc.CdcTools\lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \mysql-sync-database\--database test_db \--mysql-conf hostname127.0.0.1 \--mysql-conf port3306 \--mysql-conf usernameroot \--mysql-conf password123456 \--mysql-conf database-namemysql_db \--including-tables tbl1|test.* \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password123456 \--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1Oracle同步示例
FLINK_HOMEbin/flink run \-Dexecution.checkpointing.interval10s \-Dparallelism.default1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\oracle-sync-database \--database test_db \--oracle-conf hostname127.0.0.1 \--oracle-conf port1521 \--oracle-conf usernameadmin \--oracle-conf passwordpassword \--oracle-conf database-nameXE \--oracle-conf schema-nameADMIN \--including-tables tbl1|tbl2 \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password\--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1PostgreSQL 同步示例
FLINK_HOME/bin/flink run \-Dexecution.checkpointing.interval10s \-Dparallelism.default1\-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \postgres-sync-database \--database db1\--postgres-conf hostname127.0.0.1 \--postgres-conf port5432 \--postgres-conf usernamepostgres \--postgres-conf password123456 \--postgres-conf database-namepostgres \--postgres-conf schema-namepublic \--postgres-conf slot.nametest \--postgres-conf decoding.plugin.namepgoutput \--including-tables tbl1|tbl2 \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password\--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1SQLServer同步示例
FLINK_HOME/bin/flink run \-Dexecution.checkpointing.interval10s \-Dparallelism.default1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \sqlserver-sync-database \--database db1\--sqlserver-conf hostname127.0.0.1 \--sqlserver-conf port1433 \--sqlserver-conf usernamesa \--sqlserver-conf password123456 \--sqlserver-conf database-nameCDC_DB \--sqlserver-conf schema-namedbo \--including-tables tbl1|tbl2 \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password\--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1十一、使用FlinkCDC更新Key列
一般来说在业务数据库中数字被用作表的主键例如学生表中使用数字id作为主键但随着业务的发展与数据对应的数字可能会发生变化。在这种情况下使用FlinkCDC Doris Connector进行数据同步可以自动更新Doris主键列中的数据。
原理
Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作op字段的值为c、u、d和r分别对应创建、更新、删除和读取。对于主键列的更新FlinkCDC将向下游发送DELETE和INSERT事件在数据同步到Doris后会自动更新主键列的数据。
示例
Flink程序可以参考上述CDC同步示例。任务成功提交后在MySQL端执行更新主键列语句update student set id ‘1002’ where id ‘1001’以修改Doris中的数据。
十二、使用Flink根据指定的列删除数据
通常Kafka中的消息使用特定的字段来标记操作类型例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据希望删除op_typedelete的数据。
默认情况下DorisSink将根据RowKind来区分事件类型。通常在cdc的情况下可以直接获取事件类型并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的而Kafka需要基于业务逻辑进行判断显示传递给隐藏列的值。
-- Such as upstream data: {op_type:delete,data:{id:1,name:zhangsan}}
CREATE TABLE KAFKA_SOURCE(data STRING,op_type STRING
) WITH (connector kafka,...
);CREATE TABLE DORIS_SINK(id INT,name STRING,__DORIS_DELETE_SIGN__ INT
) WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier db.table,username root,password ,sink.enable-delete false, -- false means not to get the event type from RowKindsink.properties.columns id, name, __DORIS_DELETE_SIGN__ -- Display the import column of the specified streamload
);INSERT INTO DORIS_SINK
SELECT json_value(data,$.id) as id,
json_value(data,$.name) as name,
if(op_typedelete,1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;这段代码是一个示例演示了如何使用Flink从Kafka源表读取数据并将其写入Doris目标表。具体来说如果源表中的数据op_type字段的值为delete则希望在Doris目标表中删除相应的数据。首先在Kafka源表的定义中我们有一个data字段用于存储源数据的JSON字符串以及一个op_type字段用于标识操作类型。然后在Doris目标表的定义中我们有一个id字段和一个name字段来存储数据的具体内容还有一个名为__DORIS_DELETE_SIGN__的隐藏列用于标识是否要进行删除操作。在INSERT INTO语句中我们将从Kafka源表中选择data字段的id和name并使用json_value函数提取相应的值。同时我们使用if函数将op_type字段的值与delete进行比较如果相等则将__DORIS_DELETE_SIGN__赋值为1否则赋值为0。最后将处理后的数据插入到Doris目标表中。总之这段代码的作用是根据源表中的op_type字段值将对应的数据删除或写入到Doris目标表中。
十三、最佳实践应用场景
使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到DorisMysql、Oracle、PostgreSQL中然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。
其他注意事项
Flink Doris Connector主要依赖于Checkpoint进行流式写入因此Checkpoint之间的时间间隔就是数据的可见延迟时间。为了确保Flink的Exactly Once语义Flink Doris Connector默认启用两阶段提交Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。
十四、常见问题解答
Doris Source读取数据后为什么流会结束
目前Doris Source是有界流不支持CDC读取。
Flink能否读取Doris并执行条件下推
通过配置doris.filter.query参数可以实现。
如何写入位图类型
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier test.bitmap_test,username root,password ,sink.label-prefix doris_label,sink.properties.columns dt,page,user_id,user_idto_bitmap(user_id)
)errCode 2, detailMessage Label [label_0_1] has already been used, relate to txn [19650]
在Exactly-Once场景中Flink Job必须从最新的Checkpoint/Savepoint重新启动否则会报告上述错误。当不需要Exactly-Once时可以通过关闭2PC提交sink.enable-2pcfalse或更改不同的sink.label-prefix来解决。
errCode 2, detailMessage transaction [19650] not found
发生在Commit阶段Checkpoint中记录的事务ID在FE端已过期在此时再次提交时会出现上述错误。此时无法从Checkpoint启动可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间默认为12小时。
errCode 2, detailMessage current running txns on db 10006 is 100, larger than limit 100
这是因为同一库的并发导入超过了100可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息请参考max_running_txn_num_per_db。同时如果一个任务频繁修改标签并重新启动也可能导致此错误发生。在2pc场景重复/聚合模型中每个任务的标签需要唯一在从Checkpoint重新启动时Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止占用事务。在Unique模型下也可以关闭2pc实现幂等写入。
当Flink向Uniq模型写入一批数据时如何确保数据的顺序
您可以添加序列列的配置来确保顺序。
Flink任务没有报错但数据无法同步
在Connector1.1.0之前数据是批量写入的并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后它依赖于Checkpoint并且必须启用Checkpoint才能进行写入。
tablet writer write failed, tablet_id190958, txn_id3505530, err-235
通常发生在Connector1.1.0之前这是因为写入频率过快导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。
源表和Doris表应如何对应
在使用Flink Connector导入数据时需要注意两个方面。第一源表的列和类型应与Flink SQL中的列和类型对应第二Flink SQL中的列和类型必须与Doris表的列和类型匹配。
TApplicationException: get_next failed: out of sequence response: expected 4 but got 3
这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。
DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc
您可以在TaskManager中搜索日志中止事务响应并根据HTTP返回码判断是客户端问题还是服务器问题。
org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx
这个问题主要是由于条件中的varchar/string类型需要进行引号转义。正确的写法是xxx ‘‘xxx’’。这样Flink SQL解析器会将连续的两个单引号解释为一个单引号字符而不是字符串的结束并将拼接的字符串作为属性的值。例如t1 ‘2024-01-01’ 可以写为 ‘doris.filter.query’ ‘t1 ’‘2024-01-01’。
Failed to connect to backend: http://host:webserver_port, and BE is still alive
这个问题可能是由于配置了be的IP地址而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时be的地址是通过fe进行解析的。例如如果将be地址添加为’127.0.0.1’那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’并且Flink将连接到该地址。当出现这种问题时可以通过将be的实际对应的外部IP地址添加到with属性中来解决‘benodes’“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步可以使用以下属性–sink-conf benodesbe_ip:webserver,be_ip:webserver…。
当使用Flink-connector将MySQL数据同步到Doris时时间戳之间存在几小时的时间差。
Flink Connector默认使用UTC8时区从MySQL同步整个数据库。如果您的数据位于不同的时区您可以使用以下配置进行调整例如–mysql-conf debezium.date.format.timestamp.zone“UTC3”。