当前位置: 首页 > news >正文

万江做网站的公司怎样建设个人手机网站

万江做网站的公司,怎样建设个人手机网站,传奇游戏在线玩网页版,前端开发常用网站Flink 系列文章 一、Flink 专栏 Flink 专栏系统介绍某一知识点#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分#xff0c;比如术语、架构、编程模型、编程指南、基本的…Flink 系列文章 一、Flink 专栏 Flink 专栏系统介绍某一知识点并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分和实际的生产应用联系更为密切以及有一定开发难度的内容。 5、Flink 监控系列 本部分和实际的运维、监控工作相关。 二、Flink 示例专栏 Flink 示例专栏是 Flink 专栏的辅助说明一般不会介绍知识点的信息更多的是提供一个一个可以具体使用的示例。本专栏不再分目录通过链接即可看出介绍的内容。 两专栏的所有文章入口点击Flink 系列文章汇总索引 文章目录 Flink 系列文章一、maven依赖二、时态表的join1、统计需求对应的SQL2、Without connnector 实现代码3、With connnector 实现代码 本文通过两个示例介绍了时态表TemporalTableFunction的join操作。 如果需要了解更多内容可以在本人Flink 专栏中了解更新系统的内容。 本文除了maven依赖外没有其他依赖。 本文更详细的内容可参考文章 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 本专题分为以下几篇文章 【flink番外篇】9、Flink Table API 支持的操作示例1-通过Table API和SQL创建表 【flink番外篇】9、Flink Table API 支持的操作示例2- 通过Table API 和 SQL 创建视图 【flink番外篇】9、Flink Table API 支持的操作示例3- 通过API查询表和使用窗口函数的查询 【flink番外篇】9、Flink Table API 支持的操作示例4- Table API 对表的查询、过滤操作 【flink番外篇】9、Flink Table API 支持的操作示例5- 表的列操作 【flink番外篇】9、Flink Table API 支持的操作示例6- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作 【flink番外篇】9、Flink Table API 支持的操作示例7- 表的join操作内联接、外联接以及联接自定义函数等 【flink番外篇】9、Flink Table API 支持的操作示例8- 时态表的joinscala版本 【flink番外篇】9、Flink Table API 支持的操作示例9- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作 【flink番外篇】9、Flink Table API 支持的操作示例10- 表的OrderBy、Offset 和 Fetch、insert操作 【flink番外篇】9、Flink Table API 支持的操作示例11- Group Windowstumbling、sliding和session操作 【flink番外篇】9、Flink Table API 支持的操作示例12- Over Windows有界和无界的over window操作 【flink番外篇】9、Flink Table API 支持的操作示例13- Row-basedmap、flatmap、aggregate、group window aggregate等操作 【flink番外篇】9、Flink Table API 支持的操作示例14- 时态表的joinjava版本 【flink番外篇】9、Flink Table API 支持的操作示例1-完整版 【flink番外篇】9、Flink Table API 支持的操作示例2-完整版 一、maven依赖 本文maven依赖参考文章【flink番外篇】9、Flink Table API 支持的操作示例1-通过Table API和SQL创建表 中的依赖为节省篇幅不再赘述。 二、时态表的join 假设有一张订单表Orders和一张汇率表Rates那么订单来自于不同的地区所以支付的币种各不一样那么假设需要统计每个订单在下单时候Yen币种对应的金额。 1、统计需求对应的SQL SELECT o.currency, o.amount, r.rateo.amount * r.rate AS yen_amount FROMOrders AS o,LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency o.currency2、Without connnector 实现代码 就是使用静态数据实现其验证结果在代码中的注释部分。 /** Author: alanchan* LastEditors: alanchan* Description: */import static org.apache.flink.table.api.Expressions.$;import java.time.Duration; import java.util.Arrays; import java.util.List;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;public class TestTemporalTableFunctionDemo {// 维表DataNoArgsConstructorAllArgsConstructorpublic static class Rate {private String currency;private Integer rate;private Long rate_time;}// 事实表DataNoArgsConstructorAllArgsConstructorpublic static class Order {private Long total;private String currency;private Long order_time;}final static ListRate rateList Arrays.asList(new Rate(US Dollar, 102, 1L),new Rate(Euro, 114, 1L),new Rate(Yen, 1, 1L),new Rate(Euro, 116, 5L),new Rate(Euro, 119, 7L));final static ListOrder orderList Arrays.asList(new Order(2L, Euro, 2L),new Order(1L, US Dollar, 3L),new Order(50L, Yen, 4L),new Order(3L, Euro, 5L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// order 实时流 事实表DataStreamOrder orderDs env.fromCollection(orderList).assignTimestampsAndWatermarks(WatermarkStrategy.OrderforBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((order, rTimeStamp) - order.getOrder_time()));// rate 实时流 维度表DataStreamRate rateDs env.fromCollection(rateList).assignTimestampsAndWatermarks(WatermarkStrategy.RateforBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((rate, rTimeStamp) - rate.getRate_time()));// 转变为TableTable orderTable tenv.fromDataStream(orderDs, $(total), $(currency), $(order_time).rowtime());Table rateTable tenv.fromDataStream(rateDs, $(currency), $(rate), $(rate_time).rowtime());tenv.createTemporaryView(alan_orderTable, orderTable);tenv.createTemporaryView(alan_rateTable, rateTable);// 定义一个TemporalTableFunctionTemporalTableFunction rateDim rateTable.createTemporalTableFunction($(rate_time), $(currency));// 注册表函数// tenv.registerFunction(alan_rateDim, rateDim);tenv.createTemporarySystemFunction(alan_rateDim, rateDim);String sql select o.*,r.rate from alan_orderTable as o,Lateral table (alan_rateDim(o.order_time)) r where r.currency o.currency ;// 关联查询Table result tenv.sqlQuery(sql);// 打印输出DataStream resultDs tenv.toAppendStream(result, Row.class);resultDs.print();// rate 流数据(维度表)// rateList// order 流数据// orderList// 控制台输出// 2 I[2, Euro, 1970-01-01T00:00:00.002, 114]// 5 I[50, Yen, 1970-01-01T00:00:00.004, 1]// 16 I[1, US Dollar, 1970-01-01T00:00:00.003, 102]// 2 I[3, Euro, 1970-01-01T00:00:00.005, 116]env.execute();}}3、With connnector 实现代码 本处使用的是kafka作为数据源来实现。其验证结果在代码中的注释部分。 /** Author: alanchan* LastEditors: alanchan* Description: */ package org.tablesql.join;import static org.apache.flink.table.api.Expressions.$;import java.time.Duration; import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; import org.tablesql.join.bean.CityInfo; import org.tablesql.join.bean.CityInfoSchema; import org.tablesql.join.bean.UserInfo; import org.tablesql.join.bean.UserInfoSchema;public class TestJoinDimByKafkaEventTimeDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// Kafka的ip和要消费的topic,//Kafka设置Properties props new Properties();props.setProperty(bootstrap.servers, 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092);props.setProperty(group.id, group.cyb.2);// 读取用户信息KafkaFlinkKafkaConsumerUserInfo userConsumer new FlinkKafkaConsumerUserInfo(user, new UserInfoSchema(),props);userConsumer.setStartFromEarliest();userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.UserInfoforBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((user, rTimeStamp) - user.getTs()) // 该句如果不加则是默认为kafka的事件时间);// 读取城市维度信息KafkaFlinkKafkaConsumerCityInfo cityConsumer new FlinkKafkaConsumerCityInfo(city, new CityInfoSchema(), props);cityConsumer.setStartFromEarliest();cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.CityInfoforBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((city, rTimeStamp) - city.getTs()) // 该句如果不加则是默认为kafka的事件时间);Table userTable tableEnv.fromDataStream(env.addSource(userConsumer), $(userName), $(cityId), $(ts).rowtime());Table cityTable tableEnv.fromDataStream(env.addSource(cityConsumer), $(cityId), $(cityName),$(ts).rowtime());tableEnv.createTemporaryView(userTable, userTable);tableEnv.createTemporaryView(cityTable, cityTable);// 定义一个TemporalTableFunctionTemporalTableFunction dimCity cityTable.createTemporalTableFunction($(ts), $(cityId));// 注册表函数// tableEnv.registerFunction(dimCity, dimCity);tableEnv.createTemporarySystemFunction(dimCity, dimCity);Table u tableEnv.sqlQuery(select * from userTable);// u.printSchema();tableEnv.toAppendStream(u, Row.class).print(user流接收到);Table c tableEnv.sqlQuery(select * from cityTable);// c.printSchema();tableEnv.toAppendStream(c, Row.class).print(city流接收到);// 关联查询Table result tableEnv.sqlQuery(select u.userName,u.cityId,d.cityName,u.ts from userTable as u , Lateral table (dimCity(u.ts)) d where u.cityIdd.cityId);// 打印输出DataStream resultDs tableEnv.toAppendStream(result, Row.class);resultDs.print(\t关联输出);// 用户信息格式// {userName:user1,cityId:1,ts:0}// {userName:user1,cityId:1,ts:1}// {userName:user1,cityId:1,ts:4}// {userName:user1,cityId:1,ts:5}// {userName:user1,cityId:1,ts:7}// {userName:user1,cityId:1,ts:9}// {userName:user1,cityId:1,ts:11}// kafka-console-producer.sh --broker-list server1:9092 --topic user// 城市维度格式// {cityId:1,cityName:nanjing,ts:15}// {cityId:1,cityName:beijing,ts:1}// {cityId:1,cityName:shanghai,ts:5}// {cityId:1,cityName:shanghai,ts:7}// {cityId:1,cityName:wuhan,ts:10}// kafka-console-producer.sh --broker-list server1:9092 --topic city// 输出// city流接收到:6 I[1, beijing, 1970-01-01T00:00:00.001]// user流接收到:6 I[user1, 1, 1970-01-01T00:00:00.004]// city流接收到:6 I[1, shanghai, 1970-01-01T00:00:00.005]// user流接收到:6 I[user1, 1, 1970-01-01T00:00:00.005]// city流接收到:6 I[1, shanghai, 1970-01-01T00:00:00.007]// user流接收到:6 I[user1, 1, 1970-01-01T00:00:00.007]// city流接收到:6 I[1, wuhan, 1970-01-01T00:00:00.010]// user流接收到:6 I[user1, 1, 1970-01-01T00:00:00.009]// user流接收到:6 I[user1, 1, 1970-01-01T00:00:00.011]// 关联输出:12 I[user1, 1, beijing, 1970-01-01T00:00:00.001]// 关联输出:12 I[user1, 1, beijing, 1970-01-01T00:00:00.004]// 关联输出:12 I[user1, 1, shanghai, 1970-01-01T00:00:00.005]// 关联输出:12 I[user1, 1, shanghai, 1970-01-01T00:00:00.007]// 关联输出:12 I[user1, 1, shanghai, 1970-01-01T00:00:00.009]env.execute(joinDemo);}} 以上本文通过两个示例介绍了时态表TemporalTableFunction的join操作。 如果需要了解更多内容可以在本人Flink 专栏中了解更新系统的内容。 本文更详细的内容可参考文章 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 本专题分为以下几篇文章 【flink番外篇】9、Flink Table API 支持的操作示例1-通过Table API和SQL创建表 【flink番外篇】9、Flink Table API 支持的操作示例2- 通过Table API 和 SQL 创建视图 【flink番外篇】9、Flink Table API 支持的操作示例3- 通过API查询表和使用窗口函数的查询 【flink番外篇】9、Flink Table API 支持的操作示例4- Table API 对表的查询、过滤操作 【flink番外篇】9、Flink Table API 支持的操作示例5- 表的列操作 【flink番外篇】9、Flink Table API 支持的操作示例6- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作 【flink番外篇】9、Flink Table API 支持的操作示例7- 表的join操作内联接、外联接以及联接自定义函数等 【flink番外篇】9、Flink Table API 支持的操作示例8- 时态表的joinscala版本 【flink番外篇】9、Flink Table API 支持的操作示例9- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作 【flink番外篇】9、Flink Table API 支持的操作示例10- 表的OrderBy、Offset 和 Fetch、insert操作 【flink番外篇】9、Flink Table API 支持的操作示例11- Group Windowstumbling、sliding和session操作 【flink番外篇】9、Flink Table API 支持的操作示例12- Over Windows有界和无界的over window操作 【flink番外篇】9、Flink Table API 支持的操作示例13- Row-basedmap、flatmap、aggregate、group window aggregate等操作 【flink番外篇】9、Flink Table API 支持的操作示例14- 时态表的joinjava版本 【flink番外篇】9、Flink Table API 支持的操作示例1-完整版 【flink番外篇】9、Flink Table API 支持的操作示例2-完整版
http://www.w-s-a.com/news/230193/

相关文章:

  • 单位建设一个网站的费用正规刷手机单做任务网站
  • 北京定制网站价格开网店怎么卖到外国
  • 做网站 后端是谁来做的工程建设指挥部网站
  • wordpress建站 云打印昆明 网站设计
  • 太原网站建设设计网站建设策划书(建设前的市场分析)
  • 哪里有制作网站电商新手入门知识
  • 制作网站的后台文昌网站建设 myvodo
  • 网站 购买移动网站制作
  • 南京网站网站建设学校英山做网站多少钱
  • 珠海网站建设网如何注册公司公众号
  • 手机网站页面制作网站怎么做快照
  • asp网站怎么仿站推广软件下载平台
  • 电子商务网站建设期末试题08答案互联网怎么做
  • 规范门户网站的建设和管理办法微信网站开发公司电话
  • 免费行情网站凡客的官网
  • 做网站运营的女生多吗海淀企业网站建设
  • 网站运行环境配置网站建设个一般需要花费多少钱
  • 广西平台网站建设报价wordpress 免费 企业 主题
  • 四川省建设厅职称查询网站辽宁省住房和城乡建设部网站
  • 公司网站后台登陆网站放到云服务器上怎么做
  • 济南 网站定制做网站购买域名
  • 代理分佣后台网站开发怎么用源码做网站视频
  • 天津网站建设招标wordpress七牛图片插件
  • 建设合同施工合同示范文本汕头市网络优化推广平台
  • 网站关键词修改老王搜索引擎入口
  • 那个网站做搬家推广比较好建设部网站办事大厅栏目
  • 做企业销售分析的网站广州网站设计建设
  • 建站流程wordpress怎么开伪静态
  • 服务器不是自己的做违法网站videopro wordpress
  • 北京建网站的公司哪个比较好网站开通告知书