当前位置: 首页 > news >正文

网站建设信息发布wordpress文件下载站

网站建设信息发布,wordpress文件下载站,山西省建设工程信息网,acg wordpress模板1、启动Flink SQL 首先启动Flink的集群#xff0c;选择独立集群模式或者是session的模式。此处选择是时session的模式#xff1a;yarn-session.sh -d 在启动Flink SQL的client#xff1a; sql-client.sh 2、kafka SQL 连接器 在使用kafka作为数据源的时候需要上传jar包到…1、启动Flink SQL 首先启动Flink的集群选择独立集群模式或者是session的模式。此处选择是时session的模式yarn-session.sh -d 在启动Flink SQL的client sql-client.sh 2、kafka SQL 连接器 在使用kafka作为数据源的时候需要上传jar包到flnik的lib下/usr/local/soft/flink-1.15.2/lib可以去官网找对应的版本下载上传。1、创建表再流上定义表 再flink中创建表相当于创建一个视图视图中不存数据只有查询视图时才会去原表中读取数据CREATE TABLE students (sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector kafka,topic student,properties.bootstrap.servers master:9092,node1:9092,node2:9092,properties.group.id testGroup,scan.startup.mode earliest-offset,format csv )2、查询数据(连续查询)select clazz,count(1) as c from students group by clazz; 3、客户端为维护和可视化结果提供了三种的模式 1、表格模式默认使用的模式table mode在内存中实体化结果并将结果用规则的分页表格可视化展示出来 SET sql-client.execution.result-mode table; 2、变更日志模式changelog mode不会实体化和可视化结果而是由插入和撤销-组成的持续查询产生结果流。 SET sql-client.execution.result-mode changelog; 3、Tableau模式tableau mode更接近传统的数据库会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type) SET sql-client.execution.result-mode tableau; 4、 Flink SQL流批一体 1、流处理 a、流处理即可以处理有界流也可以处理无界流 b、流处理的输出的结果是连续的结果 c、流处理的底层是持续流的模型上游的Task和下游的Task同时启动等待数据的到达 SET execution.runtime-mode streaming; 2、批处理 a、批处理只能用于处理有界流 b、输出的是最终的结果 c、批处理的底层是MapReduce模型会先执行上游的Task在执行下游的Task  SET execution.runtime-mode batch; Flink做批处理读取一个文件-- 创建一个有界流的表 CREATE TABLE students_hdfs (sid STRING,name STRING,age INT,sex STRING,clazz STRING )WITH (connector filesystem, -- 必选指定连接器类型path hdfs://master:9000/data/spark/stu/students.txt, -- 必选指定路径format csv -- 必选文件系统连接器指定 format );select clazz,count(1) as c from students_hdfs group by clazz 5、Flink SQL的连接器 1、kafka SQL 连接器 对于一些参数需要从官网进行了解。 1、kafka source  -- 创建kafka 表 CREATE TABLE students_kafka (offset BIGINT METADATA VIRTUAL, -- 偏移量event_time TIMESTAMP(3) METADATA FROM timestamp, --数据进入kafka的时间可以当作事件时间使用sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector kafka,topic students, -- 数据的topicproperties.bootstrap.servers master:9092,node1:9092,node2:9092, -- broker 列表properties.group.id testGroup, -- 消费者组scan.startup.mode earliest-offset, -- 读取数据的位置earliest-offset latest-offsetformat csv -- 读取数据的格式 ); 2、kafka sink  -- 创建kafka 表 CREATE TABLE students_kafka_sink (sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector kafka,topic students_sink, -- 数据的topicproperties.bootstrap.servers master:9092,node1:9092,node2:9092, -- broker 列表properties.group.id testGroup, -- 消费者组scan.startup.mode earliest-offset, -- 读取数据的位置earliest-offset latest-offsetformat csv -- 读取数据的格式 );-- 将查询结果保存到kafka中 insert into students_kafka_sink select * from students_hdfs;kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink 3、将更新的流写入到kafka中  因为在Kafka是一个消息队列是不会去重的。所以只需要将读取数据的格式改成canal-json。当数据被读取回来还是原来的流模式。 CREATE TABLE clazz_num_kafka (clazz STRING,num BIGINT ) WITH (connector kafka,topic clazz_num, -- 数据的topicproperties.bootstrap.servers master:9092,node1:9092,node2:9092, -- broker 列表properties.group.id testGroup, -- 消费者组scan.startup.mode earliest-offset, -- 读取数据的位置earliest-offset latest-offsetformat canal-json -- 读取数据的格式 );-- 将更新的数据写入kafka需要使用canal-json格式数据中会带上操作类型 {data:[{clazz:文科一班,num:71}],type:INSERT} {data:[{clazz:理科三班,num:67}],type:DELETE}insert into clazz_num_kafka select clazz,count(1) as num from students group by clazz;kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num 2、 hdfs SQL 连接器 1、hdfs source Flink读取文件可以使用有界流的方式也可以是无界流方式。 -- 有界流 CREATE TABLE students_hdfs_batch (sid STRING,name STRING,age INT,sex STRING,clazz STRING )WITH (connector filesystem, -- 必选指定连接器类型path hdfs://master:9000/data/student, -- 必选指定路径format csv -- 必选文件系统连接器指定 format );select * from students_hdfs_batch;-- 无界流 -- 基于hdfs做流处理读取数据是以文件为单位延迟比kafka大 CREATE TABLE students_hdfs_stream (sid STRING,name STRING,age INT,sex STRING,clazz STRING )WITH (connector filesystem, -- 必选指定连接器类型path hdfs://master:9000/data/student, -- 必选指定路径format csv , -- 必选文件系统连接器指定 formatsource.monitor-interval 5000 -- 每隔一段时间扫描目录生成一个无界流 );select * from students_hdfs_stream; 2、hdfs sink -- 1、批处理模式(使用方式和底层原理和hive类似) SET execution.runtime-mode batch;-- 创建表 CREATE TABLE clazz_num_hdfs (clazz STRING,num BIGINT )WITH (connector filesystem, -- 必选指定连接器类型path hdfs://master:9000/data/clazz_num, -- 必选指定路径format csv -- 必选文件系统连接器指定 format ); -- 将查询结果保存到表中 insert into clazz_num_hdfs select clazz,count(1) as num from students_hdfs_batch group by clazz;-- 2、流处理模式 SET execution.runtime-mode streaming; -- 创建表如果查询数据返回的十更新更改的流需要使用canal-json格式 CREATE TABLE clazz_num_hdfs_canal_json (clazz STRING,num BIGINT )WITH (connector filesystem, -- 必选指定连接器类型path hdfs://master:9000/data/clazz_num_canal_json, -- 必选指定路径format canal-json -- 必选文件系统连接器指定 format );insert into clazz_num_hdfs_canal_json select clazz,count(1) as num from students_hdfs_stream group by clazz; 3、MySQL SQL 连接器 1、整合 # 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib flink-connector-jdbc-1.15.2.jar mysql-connector-java-5.1.49.jar# 2、需要重启flink集群 yarn application -kill [appid] yarn-session.sh -d# 3、重新进入sql命令行 sql-client.sh 2、mysql   source  -- 有界流 -- flink中表的字段类型和字段名需要和mysql保持一致 CREATE TABLE students_jdbc (id BIGINT,name STRING,age BIGINT,gender STRING,clazz STRING,PRIMARY KEY (id) NOT ENFORCED -- 主键 ) WITH (connector jdbc,url jdbc:mysql://master:3306/student,table-name students,username root,password 123456 );select * from students_jdbc; 3、mysql sink  -- 创建kafka 表 CREATE TABLE students_kafka (offset BIGINT METADATA VIRTUAL, -- 偏移量event_time TIMESTAMP(3) METADATA FROM timestamp, --数据进入kafka的时间可以当作事件时间使用sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector kafka,topic students, -- 数据的topicproperties.bootstrap.servers master:9092,node1:9092,node2:9092, -- broker 列表properties.group.id testGroup, -- 消费者组scan.startup.mode earliest-offset, -- 读取数据的位置earliest-offset latest-offsetformat csv -- 读取数据的格式 );-- 创建mysql sink表 CREATE TABLE clazz_num_mysql (clazz STRING,num BIGINT,PRIMARY KEY (clazz) NOT ENFORCED -- 主键 ) WITH (connector jdbc,url jdbc:mysql://master:3306/student,table-name clazz_num,username root,password 123456 );--- 再mysql创建接收表 CREATE TABLE clazz_num (clazz varchar(10),num BIGINT,PRIMARY KEY (clazz) -- 主键 ) ;-- 将sql查询结果实时写入mysql -- 将更新更改的流写入mysql,flink会自动按照主键更新数据 insert into clazz_num_mysql select clazz, count(1) as num from students_kafka group by clazz;kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 插入一条数据 4、DataGen用于生成随机数据一般用在高性能测试上 -- 创建包只能用于source表 CREATE TABLE students_datagen (sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector datagen,rows-per-second5, -- 每秒随机生成的数据量fields.age.min1,fields.age.max100,fields.sid.length10,fields.name.length2,fields.sex.length1,fields.clazz.length4 ); 5、print用于高性能测试 只能用于sink表 CREATE TABLE print_table (sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector print );insert into print_table select * from students_datagen;结果需要在提交的任务中查看。 6、BlackHole 是用于高性能测试使用在后面可以用于Flink的反压的测试。 CREATE TABLE blackhole_table (sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector blackhole );insert into blackhole_table select * from students_datagen; 6、SQL 语法 1、Hints 用于提示执行在Flink中可以动态的修改表中的属性在Spark中可以用于广播。在修改动态表中属性后不需要在重新建表就可以读取修改后的需求。 CREATE TABLE students_kafka (offset BIGINT METADATA VIRTUAL, -- 偏移量event_time TIMESTAMP(3) METADATA FROM timestamp, --数据进入kafka的时间可以当作事件时间使用sid STRING,name STRING,age INT,sex STRING,clazz STRING ) WITH (connector kafka,topic students, -- 数据的topicproperties.bootstrap.servers master:9092,node1:9092,node2:9092, -- broker 列表properties.group.id testGroup, -- 消费者组scan.startup.mode latest-offset, -- 读取数据的位置earliest-offset latest-offsetformat csv -- 读取数据的格式 );-- 动态修改表属性可以在查询数据时修改读取kafka数据的位置不需要重新创建表 select * from students_kafka /* OPTIONS(scan.startup.mode earliest-offset) */;-- 有界流 CREATE TABLE students_hdfs (sid STRING,name STRING,age INT,sex STRING,clazz STRING )WITH (connector filesystem, -- 必选指定连接器类型path hdfs://master:9000/data/student, -- 必选指定路径format csv -- 必选文件系统连接器指定 format );-- 可以在查询hdfs时不需要再重新的创建表就可以动态改成无界流 select * from students_hdfs /* OPTIONS(source.monitor-interval 5000 ) */; 2、WITH: 当一段SQL语句在被多次使用的时候就将通过with给这个SQL起一个别名类似于封装起来就是为这个SQL创建一个临时的视图并不是真正的视图方便下次使用。 CREATE TABLE students_hdfs (sid STRING,name STRING,age INT,sex STRING,clazz STRING )WITH (connector filesystem, -- 必选指定连接器类型path hdfs://master:9000/data/student, -- 必选指定路径format csv -- 必选文件系统连接器指定 format );-- 可以在查询hdfs时不需要再重新的创建表就可以动态改成无界流 select * from students_hdfs /* OPTIONS(source.monitor-interval 5000 ) */;-- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用 with tmp as (select * from students_hdfs /* OPTIONS(source.monitor-interval 5000 ) */where clazz文科一班 ) select * from tmp union all select * from tmp; 3、DISTINCT: 在flink 的流处理中使用distinctflink需要将之前的数据保存在状态中如果数据一直增加状态会越来越大 状态越来越大checkpoint时间会增加最终会导致flink任务出问题 select count(distinct sid) from students_kafka /* OPTIONS(scan.startup.mode earliest-offset) */;select count(sid) from (select distinct *from students_kafka /* OPTIONS(scan.startup.mode earliest-offset) */ ); 注意事项 1、 当Flink Client客户端退出来以后里面创建的动态表就不存在了。这些表结构是元数据是存储在内存中的。 2、当在进行where过滤的时候字符串会出现三种情况空的字符串、空格字符串、null的字符串三者是有区别的 这三者是不同的概念在进行where过滤的时候过滤的条件是不同的。 1、过滤空的字符串where s ‘空字符串’2、过滤空格字符串where s ‘空格’3、过滤null字符串where s null Flink SQL中常见的函数from_unixtime 以字符串格式 string 返回数字参数 numberic 的表示形式默认为 ‘yyyy-MM-dd HH:mm:ss’to_timestamp 将格式为 string2默认为‘yyyy-MM-dd HH:mm:ss’的字符串 string1 转换为 timestamp
http://www.w-s-a.com/news/695674/

相关文章:

  • 宁波网站建设电话网络推广外包一年多少钱
  • 检索标准的网站怎么制作企业网站
  • 下列关于网站开发中网页发布wordpress 粘帖图片
  • 网站建设遇到的问题及对策宁波网站建设营销推广
  • 各大招聘网站常州百度快速优化
  • 做网站线稿软件有哪些做门户网站需要注册公司吗
  • 建设企业网站模板下载优化方案怎么写
  • 做像淘宝网的网站网站单页面制作
  • 网站建设流程表龙岩网站建设较好的公司
  • 龙岗建站费用手机免费建立网站吗
  • 江门高端网站建设怎样制作wordpress手机主题
  • 淘宝网站如何在邮件里做超链接wordpress图片投票插件
  • 镇平哪家网站做的好招聘网站如何建设
  • 建网站一般多少钱幸福里wordpress怎么可视化构建页面
  • 广东网站建设建站模板主机托管公司
  • 网站开发师是做什么的网站域名在哪里备案
  • 什么是网站国内高速空间国外做3d模型的网站
  • 效果建网站的公凡科网登陆
  • 网站域名续费多少钱在线制作图片软件
  • 济南城乡住房建设厅网站中国会议营销网站
  • 展示类网站cms网站seo方法
  • 莒县做网站的公司设计师网站模版
  • 顺德顺的网站建设备份的网站建设方案书
  • 如何做网站广告山东电商网站建设
  • 新手建什么网站赚钱吗WordPress搜狗不收录
  • 石家庄招聘哪个网站做的好网站设计建设公司服务商
  • 建设公司网站大概需要多少钱建站平台和网站开发的区别
  • 淄川区住房和城乡建设局网站门户网站模板源码下载
  • 室内设计公司 网站建设建站塔山双喜
  • 网站建设属于什么经营范围销售网站开发业务