当前位置: 首页 > news >正文

平台门户网站建设泉州模板开发建站

平台门户网站建设,泉州模板开发建站,电商网站商品详情页,门户网站 页面集成版本说明 Flink和kafka的版本号有一定的匹配关系#xff0c;操作成功的版本#xff1a; Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnreposi…版本说明 Flink和kafka的版本号有一定的匹配关系操作成功的版本 Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1 上传到flink的lib目录下 [hadoopnode2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib 分发flink-connector-kafka-1.17.1.jar xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar 启动yarn-session [hadoopnode2 ~]$ myhadoop.sh start [hadoopnode2 ~]$ yarn-session.sh -d启动kafka集群 [hadoopnode2 ~]$ zk.sh start [hadoopnode2 ~]$ kf.sh start 创建kafka主题 查看主题 [hadoopnode2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list ​ 如果没有ws1,则创建 [hadoopnode2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1 ​ 普通Kafka表 connector kafka 进入Flink SQL客户端 [hadoopnode2 ~]$ sql-client.sh embedded -s yarn-session ... 省略若干日志输出 ... Flink SQL 创建Kafka的映射表 CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL, id int, ts bigint , vc int ) WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test, -- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并行度只写往kafka一个分区 sink.partitioner fixed,topic ws1,format json ); 可以往kafka读数据也可以往kafka写数据。 插入数据到Kafka表 如果没有source表先创建source表如果source表存在则不需要再创建。 CREATE TABLE source ( id INT, ts BIGINT, vc INT ) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100 ); 把source表插入t1表 insert into t1(id,ts,vc) select * from source; 如果报错 [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer 依然同样错误还不行把kafka libs目录下的kafka-clients-3.3.1.jar把jar包发到Flink的lib目录同时也注意重启sql-client、yarn-session也要重启重要 cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib 查看是否复制成功 $ ls $FLINK_HOME/lib 重启sql-client重新操作成功如下 Flink SQL CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL,id int, ts bigint , vc int )WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test,-- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并度只写往kafka一个分区sink.partitioner fixed,topic ws1,format json); [INFO] Execute statement succeed. ​ Flink SQL CREATE TABLE source ( id INT, ts BIGINT, vc INT) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100); [INFO] Execute statement succeed. ​ Flink SQL insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil       [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy                       [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - Found Web Interface node3:41749 of application application_1718331886020_0001. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b1765f969c3ae637bd4c8100efbb0c4e ​ 查询Kafka表 select * from t1; 报错 [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord​ 重启yarn session重新操作成功如下 Flink SQL CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL,id int, ts bigint , vc int )WITH (connector kafka,properties.bootstrap.servers node2:9092,node3:9092,node4:9094,properties.group.id test,-- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并??度只写往kafka一个分区sink.partitioner fixed,topic ws1,format json); [INFO] Execute statement succeed. ​ Flink SQL CREATE TABLE source ( id INT, ts BIGINT, vc INT) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100); [INFO] Execute statement succeed. ​ Flink SQL insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil       [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy                       [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - Found Web Interface node4:38788 of application application_1718331886020_0002. insert into t1(id,ts,vc) select * from source; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 84292f84d1fce4756ccd8ae294b6163a ​ ​ Flink SQL select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil       [] - The configuration directory (/home/hadoop/soft/flink-1.17.1/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy                       [] - Connecting to ResourceManager at node3/192.168.193.143:8032 2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - Found Web Interface node4:38788 of application application_1718331886020_0002. select * from t1; [INFO] Result retrieval cancelled. ​ Flink SQL ​ upsert-kafka表 connector upsert-kafka 如果当前表存在更新操作那么普通的kafka连接器将无法满足此时可以使用Upsert Kafka连接器。 创建upsert-kafka的映射表(必须定义主键) CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED ) WITH (connector upsert-kafka,properties.bootstrap.servers node2:9092,topic ws2,key.format json,value.format json ); 如果没有kafka名为ws2的topic将自动被创建。 插入upsert-kafka表 insert into t2 select id,sum(vc) sumVC from source group by id; 查询upsert-kafka表 upsert-kafka 无法从指定的偏移量读取只会从主题的源读取。如此才知道整个数据的更新过程。并且通过 -UUI 等符号来显示数据的变化过程。 设置显示模式 SET sql-client.execution.result-modetableau; 查询t2表数据 select * from t2; 如果发现没有输出数据原因是之前的source表已经生成到end1000000就不再生成数据了。 进入Flink Web UIcancel掉所有running job重新操作成功如下 删除表 Flink SQL show tables; ------------ | table name | ------------ |     source | |         t1 | |         t2 | ------------ 3 rows in set ​ Flink SQL drop table source; Flink SQL drop table t1; Flink SQL drop table t2; 创建表 CREATE TABLE source ( id INT, ts BIGINT, vc INT ) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100 ); CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED ) WITH (connector upsert-kafka,properties.bootstrap.servers node2:9092,topic ws2,key.format json,value.format json ); 设置显示模式 SET sql-client.execution.result-modetableau; 查询表 select * from t2; 完成enjoy it!
http://www.w-s-a.com/news/630366/

相关文章:

  • 购物网站销售管理合肥网络推广平台
  • 网站建设规划书txt微盘注册帐号
  • 小说网站开发实训报告企业网盘收费标准
  • mvc网站开发医疗医院网站建设
  • 天津市建设厅官方网站wordpress设置404
  • 贵阳好的网站建设免费正能量网站下载ww
  • 免费学习的网站平台自建站seo如何做
  • 海南三亚做网站公众号版面设计创意
  • 学校网站建设目的与意义合肥网页定制
  • 网站查询地址网站建设与维护费用
  • 做网站哪些软件比较好合肥外贸网站建设公司
  • 建网站需要哪些条件专业网站设计报价
  • 定制网站开发技术化妆品的网站布局设计图片大全
  • 网站模糊设计发布产品的免费平台有哪些
  • 网站建站什么目录桂林网站建设内容
  • 光明新区城市建设局网站长沙营销型网站制作费用
  • 网站建设制度制定wordpress主题哥
  • 门户网站的种类php网站开发实训心得
  • 流程图制作网页网络优化seo
  • 个人公益网站怎么制作wordpress flat theme
  • 做营销型网站的公司篇高端网站愿建设
  • 五莲网站建设维护推广凡科做网站的方法
  • 山东省住房建设厅网站首页网站文章更新怎么通知搜索引擎
  • 商务网站的可行性分析包括大流量网站 优化
  • 推广网站有效的方法网站数据统计
  • 自建视频网站WordPress数据库添加管理员
  • 新民电商网站建设价格咨询网站建设高效解决之道
  • 做网站需要哪些步骤网站设计介绍
  • 物流网站制作目的国外中文网站排行榜单
  • 苏州网站建设招标网站ftp的所有权归谁