当前位置: 首页 > news >正文

创建自己网站的步骤吸引人的微信软文

创建自己网站的步骤,吸引人的微信软文,找郴州一家做网站的公司电话,做局域网网站教程#x1f680; 作者 #xff1a;“大数据小禅” #x1f680; 文章简介 #xff1a;【Flink实战】玩转Flink里面核心的Source Operator实战 #x1f680; 欢迎小伙伴们 点赞#x1f44d;、收藏⭐、留言#x1f4ac; 目录导航 Flink 的API层级介绍Source Operator速览Flin… 作者 “大数据小禅” 文章简介 【Flink实战】玩转Flink里面核心的Source Operator实战 欢迎小伙伴们 点赞、收藏⭐、留言 目录导航 Flink 的API层级介绍Source Operator速览Flink 预定义的Source 数据源 案例实战Flink自定义的Source 数据源案例-订单来源实战 Flink 的API层级介绍Source Operator速览 Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象 第一层是最底层的抽象为有状态实时流处理抽象实现是 Process Function用于底层处理 第二层抽象是 Core APIs许多应用程序不需要使用到上述最底层抽象的 API而是使用 Core APIs 进行开发 例如各种形式的用户自定义转换transformations、联接joins、聚合aggregations、窗口windows和状态state操作等此层 API 中处理的数据类型在每种编程语言中都有其对应的类。 第三层抽象是 Table API。 是以表Table为中心的声明式编程APITable API 使用起来很简洁但是表达能力差 类似数据库中关系模型中的操作比如 select、project、join、group-by 和 aggregate 等允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API但是其程序实现都是 SQL 查询表达式 SQL 抽象与 Table API 抽象之间的关联是非常紧密的 注意Table和SQL层变动多还在持续发展中大致知道即可核心是第一和第二层 Flink编程模型 Source来源 元素集合 env.fromElementsenv.fromColletionenv.fromSequence(start,end); 文件/文件系统 env.readTextFile(本地文件);env.readTextFile(HDFS文件); 基于Socket env.socketTextStream(“ip”, 8888) 自定义Source实现接口自定义数据源rich相关的api更丰富 并行度为1 SourceFunctionRichSourceFunction 并行度大于1 ParallelSourceFunctionRichParallelSourceFunction Connectors与第三方系统进行对接用于source或者sink都可以 Flink本身提供Connector例如kafka、RabbitMQ、ES等注意Flink程序打包一定要将相应的connetor相关类打包进去不然就会失败 Apache Bahir连接器 里面也有kafka、RabbitMQ、ES的连接器更多 总结 和外部系统进行读取写入的 第一种 Flink 里面预定义的 source 和 sink。第二种 Flink 内部也提供部分 Boundled connectors。第三种是第三方 Apache Bahir 项目中的连接器。第四种是通过异步 IO 方式 异步I/O是Flink提供的非常底层的与外部系统交互 Flink 预定义的Source 数据源 案例实战 Source来源 元素集合 env.fromElementsenv.fromColletionenv.fromSequence(start,end); public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//相同类型元素的数据流 sourceDataStreamString stringDS1 env.fromElements(java,SpringBoot, spring cloud,redis, kafka,小滴课堂);stringDS1.print(stringDS1);DataStreamString stringDS2 env.fromCollection(Arrays.asList(微服务项目大课,java,alibabacloud,rabbitmq,hadoop,hbase));stringDS2.print(stringDS2);DataStreamSourceLong longDS3 env.fromSequence(0,10);longDS3.print(longDS3);//DataStream需要调用execute,可以取个名称env.execute(xdclass job);} 文件/文件系统 env.readTextFile(本地文件);env.readTextFile(HDFS文件); public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamString textDS env.readTextFile(/Users/xdclass/Desktop/xdclass_access.log);//DataStreamString textDS env.readTextFile(hdfs://xdclass_node:8010/file/log/words.txt);textDS.print();env.execute(xdclass job); }基于Socket env.socketTextStream(“ip”, 8888) public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamString stringDataStream env.socketTextStream(127.0.0.1,8888);stringDataStream.print();env.execute( job); }Flink自定义的Source 数据源案例-订单来源实战 自定义Source实现接口自定义数据源 并行度为1 SourceFunctionRichSourceFunction 并行度大于1 ParallelSourceFunctionRichParallelSourceFunction Rich相关的api更丰富多了Open、Close方法用于初始化连接等 创建接口 Data AllArgsConstructor NoArgsConstructor public class VideoOrder {private String tradeNo;private String title;private int money;private int userId;private Date createTime;}public class VideoOrderSource extends RichParallelSourceFunctionVideoOrder {private volatile Boolean flag true;private Random random new Random();private static ListString list new ArrayList();static {list.add(spring boot2.x课程);list.add(微服务SpringCloud课程);list.add(RabbitMQ消息队列);list.add(Kafka课程);list.add(Flink流式技术课程);list.add(工业级微服务项目大课训练营);list.add(Linux课程);}Overridepublic void run(SourceContextVideoOrder ctx) throws Exception {while (flag){Thread.sleep(1000);String id UUID.randomUUID().toString();int userId random.nextInt(10);int money random.nextInt(100);int videoNum random.nextInt(list.size());String title list.get(videoNum);ctx.collect(new VideoOrder(id,title,money,userId,new Date()));}}/*** 取消任务*/Overridepublic void cancel() {flag false;} }案例 public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamVideoOrder videoOrderDataStream env.addSource(new VideoOrderSource());videoOrderDataStream.print();//DataStream需要调用execute,可以取个名称env.execute(custom source job);} 不断产生很多订单
http://www.w-s-a.com/news/148505/

相关文章:

  • 网站建设与网页设计论述题软件开发公司在哪里
  • 二级网站建设方案模板亚马逊网站建设案例
  • 网站开发兼职团队门户网站如何制作
  • 高州市网站建设开发区招聘信息
  • 上海专业网站制作设计公司企业邮箱怎样注册
  • 网站建设在商标第几类网站建设 设计创意
  • 做一网站APP多少钱重庆中色十二冶金建设有限公司网站
  • 网上做效果图网站有哪些软件徐州泉山区建设局网站
  • 凯里网站制作网站篡改搜索引擎js
  • 如何使用凡科建设网站武安城乡建设网站
  • 网站建设网站及上传wordpress火车头发布
  • 有没有做网站的团队电脑版传奇网站
  • 建立企业网站公司医疗创意小产品设计
  • 深圳 做网站 车公庙免费的招标网有哪些
  • 网站在那里备案成都成华区网站建设
  • 做网站选哪家好搜索引擎优化的目标体系包括哪些
  • 做数据可视化的网站ppt2016是制作网页的软件
  • 济宁市建设工程质量监督站网站徐州网站优化推广
  • 北京网站设计多少钱php做商品网站
  • 能打开的网站你了解的彩票网站开发dadi163
  • 手机做网站价格优秀企业网站建设价格
  • 电商网站建设企业做网站的客户多吗
  • 有做思维图的网站吗西安建设市场诚信信息平台网站
  • 网站建设求职具备什么30岁学网站开发
  • 官方网站minecraft北京低价做网站
  • 网站建设报价兴田德润机械加工网络接单
  • 免费的推广网站安卓app制作平台
  • 长春火车站附近美食建设信用卡银行积分兑换商城网站
  • 网站提交网址如何备份wordpress网页
  • 龙腾盛世网站建设医院管理系统