当前位置: 首页 > news >正文

建设网站具备的知识网站登录人太多进不去怎么办

建设网站具备的知识,网站登录人太多进不去怎么办,高明网站设计,公司内部网站页面设计大数据系列之#xff1a;Flink Doris Connector#xff0c;实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、… 大数据系列之Flink Doris Connector实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、使用FlinkSQL通过CDC访问并实现部分列更新的示例十、使用FlinkCDC访问多个表或整个数据库支持MySQL、Oracle、PostgreSQL、SQLServer十一、使用FlinkCDC更新Key列十二、使用Flink根据指定的列删除数据十三、最佳实践应用场景十四、常见问题解答 可以通过Flink操作读取、插入、修改、删除支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。 注意 修改和删除仅支持唯一键模型。当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法您需要自行实现。 一、版本兼容性 二、使用 Maven 添加 flink-doris-connector !-- flink-doris-connector -- dependencygroupIdorg.apache.doris/groupIdartifactIdflink-doris-connector-1.16/artifactIdversion1.6.0/version /dependency请根据不同的Flink版本替换相应的Connector和Flink依赖版本。也可以从这里下载相关版本的jar包。 flink-doris-connector下载地址 https://repo.maven.apache.org/maven2/org/apache/doris/ 编译 编译时直接运行sh build.sh即可。编译成功后会在dist目录下生成目标jar包如flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如Flink 运行在 Local 模式则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下将此文件放入预部署包中。 三、Flink SQL read -- doris source CREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH (connector doris,fenodes FE_IP:HTTP_PORT,table.identifier database.table,username root,password password );write --enable checkpoint SET execution.checkpointing.interval 10s;-- doris sink CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH (connector doris,fenodes FE_IP:HTTP_PORT,table.identifier db.table,username root,password password,sink.label-prefix doris_label );-- submit insert job INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source四、DataStream read DorisOptions.Builder builder DorisOptions.builder().setFenodes(FE_IP:HTTP_PORT).setTableIdentifier(db.table).setUsername(root).setPassword(password);DorisSourceList? dorisSource DorisSource.List?builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), doris source).print();write DorisSink通过StreamLoad向Doris写入数据DataStream写入时支持不同的序列化方式 字符串数据流SimpleStringSerializer // enable checkpoint env.enableCheckpointing(10000); // using batch mode for bounded data env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.BuilderString builder DorisSink.builder(); DorisOptions.Builder dorisBuilder DorisOptions.builder(); dorisBuilder.setFenodes(FE_IP:HTTP_PORT).setTableIdentifier(db.table).setUsername(root).setPassword(password);Properties properties new Properties(); // When the upstream is writing json, the configuration needs to be enabled. //properties.setProperty(format, json); //properties.setProperty(read_json_by_line, true); DorisExecutionOptions.Builder executionBuilder DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix(label-doris) //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); ;builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string source ListTuple2String, Integer data new ArrayList(); data.add(new Tuple2(doris,1)); DataStreamSourceTuple2String, Integer source env. fromCollection(data);source.map((MapFunctionTuple2String, Integer, String) t - t.f0 \t t.f1).sinkTo(builder.build());//mock json string source //env.fromElements({\name\:\zhangsan\,\age\:1}).sinkTo(builder.build());RowData数据流RowDataSerializer // enable checkpoint env.enableCheckpointing(10000); // using batch mode for bounded data env.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink option DorisSink.BuilderRowData builder DorisSink.builder(); DorisOptions.Builder dorisBuilder DorisOptions.builder(); dorisBuilder.setFenodes(FE_IP:HTTP_PORT).setTableIdentifier(db.table).setUsername(root).setPassword(password);// json format to streamload Properties properties new Properties(); properties.setProperty(format, json); properties.setProperty(read_json_by_line, true); DorisExecutionOptions.Builder executionBuilder DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix(label-doris) //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); //streamload params//flink rowdatas schema String[] fields {city, longitude, latitude, destroy_date}; DataType[] types {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType(json) //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata source DataStreamRowData source env. fromElements().map(new MapFunctionString, RowData() {Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData new GenericRowData(4);genericRowData.setField(0, StringData.fromString(beijing));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);genericRowData.setField(3, LocalDate.now().toEpochDay());return genericRowData;}});source. sinkTo(builder. build());SchemaChange数据流JsonDebeziumSchemaSerializer // enable checkpoint env.enableCheckpointing(10000);Properties props new Properties(); props. setProperty(format, json); props.setProperty(read_json_by_line, true); DorisOptions dorisOptions DorisOptions. builder().setFenodes(127.0.0.1:8030).setTableIdentifier(test.t1).setUsername(root).setPassword().build();DorisExecutionOptions.Builder executionBuilder DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix(label-prefix).setStreamLoadProp(props).setDeletable(true);DorisSink.BuilderString builder DorisSink.builder(); builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL Source).sinkTo(builder.build());五、Lookup Join CREATE TABLE fact_table (id BIGINT,name STRING,city STRING,process_time as proctime() ) WITH (connector kafka,... );create table dim_city(city STRING,level INT ,province STRING,country STRING ) WITH (connector doris,fenodes 127.0.0.1:8030,jdbc-url jdbc:mysql://127.0.0.1:9030,table.identifier dim.dim_city,username root,password );SELECT a.id, a.name, a.city, c.province, c.country,c.level FROM fact_table a LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c ON a.city c.city这个命令是用于创建两个表和一个查询语句。第一个表是名为fact_table的表它有四个列分别为id、“name”、“city和process_time”。其中id列是BIGINT类型name和city列是STRING类型process_time列是一个基于当前系统时间的计算列它使用proctime()函数实现。第二个表是名为dim_city的表它有四个列分别为city、“level”、“province和country”。其中“city”、“province和country列是STRING类型“level列是INT类型。该表使用Doris作为存储引擎连接器为connector”并且需要指定连接器的其他参数如fenodes”、“jdbc-url”、“table.identifier”、username和password等。最后一个命令是一个查询语句它使用LEFT JOIN将fact_table和dim_city两个表进行连接并使用FOR SYSTEM_TIME AS OF来指定连接时的时间戳这里使用了process_time列的值。查询结果包括id、“name”、“city”、“province”、country和level这些列。 六、配置 通用配置项 fenodes Doris FE http地址支持多个地址用逗号分隔 benodes Doris BE http地址支持多个地址以逗号分隔。 jdbc-url jdbc连接信息如jdbc:mysql://127.0.0.1:9030 table.identifier Doris表名如db.tbl auto-redirect 默认值true是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入不再显示BE信息。 doris.request.retries 默认值3向 Doris 发送请求的重试次数 doris.request.connect.timeout.ms 默认值30000向 Doris 发送请求的连接超时 doris.request.read.timeout.ms 默认值30000读取向 Doris 发送请求的超时 源配置项 doris.request.query.timeout.s 默认值3600查询Doris的超时时间默认1小时-1表示无超时限制 doris.request.tablet.size 默认值Integer. MAX_VALUE一个Partition对应的Doris Tablet数量。该值设置得越小生成的Partition就越多。这提高了 Flink 端的并行度但同时也给 Doris 带来了更大的压力。 doris.batch.size 默认值1024一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。 doris.exec.mem.limit 默认值2147483648单个查询的内存限制。默认为2GB以字节为单位 doris.deserialize.arrow.async 默认值FALSE是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch doris.deserialize.queue.size 默认值64Arrow格式的内部处理队列的异步转换当doris.deserialize.arrow.async为true时有效 doris.read.field 读取Doris表的列名列表以逗号分隔 doris.filter.query 过滤读取数据的表达式这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄18。 接收器配置项 sink.label-prefix Stream加载导入使用的标签前缀。在2pc场景下需要全局唯一性来保证Flink的EOS语义。 sink.properties.* 导入流负载参数。 例如 ‘sink.properties.column_separator’ , ’ 定义列分隔符 ‘sink.properties.escape_delimiters’ ‘true’ 特殊字符作为分隔符 ‘\x01’ 将转换为二进制 0x01JSON格式导入 ‘sink.properties.format’ ‘json’ ‘sink.properties.按行读取 json’ ‘true’ 详细参数请参考这里。 sink.enable-delete 默认值TRUE是否启用删除。该选项需要Doris表开启批量删除功能Doris 0.15版本默认开启且仅支持Unique模型。 sink.enable-2pc 默认值TRUE是否启用两阶段提交2pc默认为true以保证Exactly-Once语义。 sink.buffer-size 默认值1MB写入数据缓存缓冲区的大小以字节为单位。不建议修改默认配置即可 sink.buffer-count 默认值3写入数据缓冲区的数量。不建议修改默认配置即可 sink.max-retries 默认值3Commit失败后最大重试次数默认3 sink.use-cache 默认值false发生异常时是否使用内存缓存进行恢复。启用后Checkpoint 期间的数据将保留在缓存中。 sink.enable.batch-mode 默认值false是否使用批处理模式写入Doris。使能后写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。 同时开启后Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。 sink.flush.queue-size 默认值2在批处理模式下缓存的列大小。 sink.buffer-flush.max-rows 默认值50000批处理模式下单批写入的最大数据行数。 sink.buffer-flush.max-bytes 默认值10MB在批处理模式下单批写入的最大字节数。 sink.buffer-flush.interval 默认值10s批处理模式下异步刷新缓存的时间间隔 sink.ignore.update-before 默认值true是否忽略update-before事件默认忽略。 查找Join配置项 lookup.cache.max-rows 默认值-1查找缓存的最大行数默认值为-1不启用缓存 lookup.cache.ttl 默认值10s查找缓存的最大时间默认10s lookup.max-retries 默认值1查找查询失败后重试的次数 lookup.jdbc.async 默认值false是否启用异步查找默认为false lookup.jdbc.read.batch.size 默认值128异步查找下每个查询的最大批量大小 lookup.jdbc.read.batch.queue-size 默认值256异步查找时中间缓冲队列的大小 lookup.jdbc.read.thread-size 默认值3每个任务中用于查找的jdbc线程数 七、Doris 和 Flink 列类型映射 Doris类型Flink类型NULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGSTRINGSTRINGDECIMALV2DECIMALARRAYARRAYMAPMAPJSONSTRINGVARIANTSTRINGIPV4STRINGIPV6STRING 从connector-1.6.1版本开始增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。 八、使用Flink CDC访问Doris的示例 SET execution.checkpointing.interval 10s; CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector mysql-cdc,hostname 127.0.0.1,port 3306,username root,password password,database-name database,table-name table );-- Support synchronous insert/update/delete events CREATE TABLE doris_sink ( id INT, name STRING ) WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier database.table,username root,password ,sink.properties.format json,sink.properties.read_json_by_line true,sink.enable-delete true, -- Synchronize delete eventssink.label-prefix doris_label );insert into doris_sink select id,name from cdc_mysql_source;九、使用FlinkSQL通过CDC访问并实现部分列更新的示例 -- enable checkpoint SET execution.checkpointing.interval 10s;CREATE TABLE cdc_mysql_source (id int,name STRING,bank STRING,age int,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector mysql-cdc,hostname 127.0.0.1,port 3306,username root,password password,database-name database,table-name table );CREATE TABLE doris_sink (id INT,name STRING,bank STRING,age int ) WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier database.table,username root,password ,sink.properties.format json,sink.properties.read_json_by_line true,sink.properties.columns id,name,bank,age,sink.properties.partial_columns true --Enable partial column updates );insert into doris_sink select id,name,bank,age from cdc_mysql_source; 十、使用FlinkCDC访问多个表或整个数据库支持MySQL、Oracle、PostgreSQL、SQLServer MySQL同步示例 FLINK_HOMEbin/flink run \-Dexecution.checkpointing.interval10s\-Dparallelism.default1\-c org.apache.doris.flink.tools.cdc.CdcTools\lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \mysql-sync-database\--database test_db \--mysql-conf hostname127.0.0.1 \--mysql-conf port3306 \--mysql-conf usernameroot \--mysql-conf password123456 \--mysql-conf database-namemysql_db \--including-tables tbl1|test.* \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password123456 \--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1Oracle同步示例 FLINK_HOMEbin/flink run \-Dexecution.checkpointing.interval10s \-Dparallelism.default1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\oracle-sync-database \--database test_db \--oracle-conf hostname127.0.0.1 \--oracle-conf port1521 \--oracle-conf usernameadmin \--oracle-conf passwordpassword \--oracle-conf database-nameXE \--oracle-conf schema-nameADMIN \--including-tables tbl1|tbl2 \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password\--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1PostgreSQL 同步示例 FLINK_HOME/bin/flink run \-Dexecution.checkpointing.interval10s \-Dparallelism.default1\-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \postgres-sync-database \--database db1\--postgres-conf hostname127.0.0.1 \--postgres-conf port5432 \--postgres-conf usernamepostgres \--postgres-conf password123456 \--postgres-conf database-namepostgres \--postgres-conf schema-namepublic \--postgres-conf slot.nametest \--postgres-conf decoding.plugin.namepgoutput \--including-tables tbl1|tbl2 \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password\--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1SQLServer同步示例 FLINK_HOME/bin/flink run \-Dexecution.checkpointing.interval10s \-Dparallelism.default1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \sqlserver-sync-database \--database db1\--sqlserver-conf hostname127.0.0.1 \--sqlserver-conf port1433 \--sqlserver-conf usernamesa \--sqlserver-conf password123456 \--sqlserver-conf database-nameCDC_DB \--sqlserver-conf schema-namedbo \--including-tables tbl1|tbl2 \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password\--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel \--table-conf replication_num1十一、使用FlinkCDC更新Key列 一般来说在业务数据库中数字被用作表的主键例如学生表中使用数字id作为主键但随着业务的发展与数据对应的数字可能会发生变化。在这种情况下使用FlinkCDC Doris Connector进行数据同步可以自动更新Doris主键列中的数据。 原理 Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作op字段的值为c、u、d和r分别对应创建、更新、删除和读取。对于主键列的更新FlinkCDC将向下游发送DELETE和INSERT事件在数据同步到Doris后会自动更新主键列的数据。 示例 Flink程序可以参考上述CDC同步示例。任务成功提交后在MySQL端执行更新主键列语句update student set id ‘1002’ where id ‘1001’以修改Doris中的数据。 十二、使用Flink根据指定的列删除数据 通常Kafka中的消息使用特定的字段来标记操作类型例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据希望删除op_typedelete的数据。 默认情况下DorisSink将根据RowKind来区分事件类型。通常在cdc的情况下可以直接获取事件类型并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的而Kafka需要基于业务逻辑进行判断显示传递给隐藏列的值。 -- Such as upstream data: {op_type:delete,data:{id:1,name:zhangsan}} CREATE TABLE KAFKA_SOURCE(data STRING,op_type STRING ) WITH (connector kafka,... );CREATE TABLE DORIS_SINK(id INT,name STRING,__DORIS_DELETE_SIGN__ INT ) WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier db.table,username root,password ,sink.enable-delete false, -- false means not to get the event type from RowKindsink.properties.columns id, name, __DORIS_DELETE_SIGN__ -- Display the import column of the specified streamload );INSERT INTO DORIS_SINK SELECT json_value(data,$.id) as id, json_value(data,$.name) as name, if(op_typedelete,1,0) as __DORIS_DELETE_SIGN__ from KAFKA_SOURCE;这段代码是一个示例演示了如何使用Flink从Kafka源表读取数据并将其写入Doris目标表。具体来说如果源表中的数据op_type字段的值为delete则希望在Doris目标表中删除相应的数据。首先在Kafka源表的定义中我们有一个data字段用于存储源数据的JSON字符串以及一个op_type字段用于标识操作类型。然后在Doris目标表的定义中我们有一个id字段和一个name字段来存储数据的具体内容还有一个名为__DORIS_DELETE_SIGN__的隐藏列用于标识是否要进行删除操作。在INSERT INTO语句中我们将从Kafka源表中选择data字段的id和name并使用json_value函数提取相应的值。同时我们使用if函数将op_type字段的值与delete进行比较如果相等则将__DORIS_DELETE_SIGN__赋值为1否则赋值为0。最后将处理后的数据插入到Doris目标表中。总之这段代码的作用是根据源表中的op_type字段值将对应的数据删除或写入到Doris目标表中。 十三、最佳实践应用场景 使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到DorisMysql、Oracle、PostgreSQL中然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。 其他注意事项 Flink Doris Connector主要依赖于Checkpoint进行流式写入因此Checkpoint之间的时间间隔就是数据的可见延迟时间。为了确保Flink的Exactly Once语义Flink Doris Connector默认启用两阶段提交Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。 十四、常见问题解答 Doris Source读取数据后为什么流会结束 目前Doris Source是有界流不支持CDC读取。 Flink能否读取Doris并执行条件下推 通过配置doris.filter.query参数可以实现。 如何写入位图类型 CREATE TABLE bitmap_sink ( dt int, page string, user_id int ) WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier test.bitmap_test,username root,password ,sink.label-prefix doris_label,sink.properties.columns dt,page,user_id,user_idto_bitmap(user_id) )errCode 2, detailMessage Label [label_0_1] has already been used, relate to txn [19650] 在Exactly-Once场景中Flink Job必须从最新的Checkpoint/Savepoint重新启动否则会报告上述错误。当不需要Exactly-Once时可以通过关闭2PC提交sink.enable-2pcfalse或更改不同的sink.label-prefix来解决。 errCode 2, detailMessage transaction [19650] not found 发生在Commit阶段Checkpoint中记录的事务ID在FE端已过期在此时再次提交时会出现上述错误。此时无法从Checkpoint启动可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间默认为12小时。 errCode 2, detailMessage current running txns on db 10006 is 100, larger than limit 100 这是因为同一库的并发导入超过了100可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息请参考max_running_txn_num_per_db。同时如果一个任务频繁修改标签并重新启动也可能导致此错误发生。在2pc场景重复/聚合模型中每个任务的标签需要唯一在从Checkpoint重新启动时Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止占用事务。在Unique模型下也可以关闭2pc实现幂等写入。 当Flink向Uniq模型写入一批数据时如何确保数据的顺序 您可以添加序列列的配置来确保顺序。 Flink任务没有报错但数据无法同步 在Connector1.1.0之前数据是批量写入的并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后它依赖于Checkpoint并且必须启用Checkpoint才能进行写入。 tablet writer write failed, tablet_id190958, txn_id3505530, err-235 通常发生在Connector1.1.0之前这是因为写入频率过快导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。 源表和Doris表应如何对应 在使用Flink Connector导入数据时需要注意两个方面。第一源表的列和类型应与Flink SQL中的列和类型对应第二Flink SQL中的列和类型必须与Doris表的列和类型匹配。 TApplicationException: get_next failed: out of sequence response: expected 4 but got 3 这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。 DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc 您可以在TaskManager中搜索日志中止事务响应并根据HTTP返回码判断是客户端问题还是服务器问题。 org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx 这个问题主要是由于条件中的varchar/string类型需要进行引号转义。正确的写法是xxx ‘‘xxx’’。这样Flink SQL解析器会将连续的两个单引号解释为一个单引号字符而不是字符串的结束并将拼接的字符串作为属性的值。例如t1 ‍ ‘2024-01-01’ 可以写为 ‘doris.filter.query’ ‘t1 ‍’‘2024-01-01’。 Failed to connect to backend: http://host:webserver_port, and BE is still alive 这个问题可能是由于配置了be的IP地址而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时be的地址是通过fe进行解析的。例如如果将be地址添加为’127.0.0.1’那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’并且Flink将连接到该地址。当出现这种问题时可以通过将be的实际对应的外部IP地址添加到with属性中来解决‘benodes’“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步可以使用以下属性–sink-conf benodesbe_ip:webserver,be_ip:webserver…。 当使用Flink-connector将MySQL数据同步到Doris时时间戳之间存在几小时的时间差。 Flink Connector默认使用UTC8时区从MySQL同步整个数据库。如果您的数据位于不同的时区您可以使用以下配置进行调整例如–mysql-conf debezium.date.format.timestamp.zone“UTC3”。
http://www.w-s-a.com/news/152194/

相关文章:

  • 光谷网站建设请检查网络
  • 申请建设网站的报告书商务网站开发课程体会
  • 网站开发实训总结致谢群晖wordpress设置
  • 关于酒店网站建设的摘要天津市建设工程信息网官网首页
  • 网站alexa排名查询手机网站制作器
  • 建设小辣猫的网站电子毕业设计网站建设
  • 询广西南宁网站运营礼品定制
  • 建筑公司网站作用免费查看招标信息的网站
  • 建筑设计公司名字起名大全html网站 怎么做seo
  • 网站群建设模板迁移原站迁移pc巩义网站建设案例课堂
  • 烟台高端网站开发wordpress 设置权限
  • 中小企业网站制作流程网站开发和设计人员的岗位要求
  • 公司网站建设多少费用河北城乡建设官网站
  • 国科联创网站建设广告传媒公司招聘信息
  • 网站后台文章删了 怎么前台还有一级做爰片软件网站
  • 辽宁省建设注册中心网站wordpress 博客插件
  • 做电商看的网站有哪些网站建设需求策划书
  • 关于网站建设交易流程的描述一句话哪些网站用户体验好
  • 男女做暖暖的网站大全深圳平台网站建设外包
  • 凯里展示型网站设计抖音代运营收费详细价格
  • 外包网站会自己做原型吗网站制作怎样盈利
  • 为什么在百度搜不到我的网站电商网站开发过程
  • 什么是网站反链网页设计页面链接
  • 佛山企业网站制作韩国seocaso
  • 微信公司网站vue做社区网站
  • 蒙阴网站优化五核网站建设
  • 企业微商城网站建设wordpress新闻是哪个表
  • 重庆网站开发培训机构电商网站创办过程
  • 企业建网站得多少钱长沙财优化公司
  • 网站开发api平台扒完网站代码之后怎么做模板