当前位置: 首页 > news >正文

优化网站打开速度台州做网站建设

优化网站打开速度,台州做网站建设,培训前端开发,企业公司网站建设前言 flink是实时计算的重要集成组件#xff0c;这里演示如何集成#xff0c;并且使用一个小例子。例子是kafka输入消息#xff0c;用逗号隔开#xff0c;统计每个相同单词出现的次数#xff0c;这么一个功能。 一、kafka环境准备 1.1 启动kafka 这里我使用的kafka版本… 前言 flink是实时计算的重要集成组件这里演示如何集成并且使用一个小例子。例子是kafka输入消息用逗号隔开统计每个相同单词出现的次数这么一个功能。 一、kafka环境准备 1.1 启动kafka 这里我使用的kafka版本是3.2.0部署的方法可以参考 kafka部署 cd kafka_2.13-3.2.0 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties启动后查看java进程是否存在存在后执行下一步。 1.2 新建topic 新建一个专门用于flink消费topic bin/kafka-topics.sh --create --topic flinkTest --bootstrap-server 192.168.184.129:90921.3 测试生产消费是否正常 生产端 bin/kafka-console-producer.sh --topic flinkTest --bootstrap-server 192.168.184.129:9092客户端 bin/kafka-console-consumer.sh --topic flinkTest --from-beginning --bootstrap-server 192.168.184.129:90921.4 测试生产消费 在生产端输入aaa 查看客户端是否能消费到 可以看到客户端已经消费成功了kafka环境准备好了。 二、flink集成kafka 2.1 pom文件修改 pom文件修改之前先看看官网的指导依赖是什么样的 这里我们使用的是datastream api去做 flink1.17.0官方文档 这里说明了相关的依赖需要引入的依赖包的版本还有使用kafka消费的时候需要引入的连接包版本 完整的pom引入依赖如下 ?xml version1.0 encodingUTF-8?project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.wh.flink/groupIdartifactIdflink/artifactIdversion1.0-SNAPSHOT/versionnameflink/name!-- FIXME change it to the projects website --urlhttp://www.example.com/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetflink.version1.17.1/flink.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency!-- Flink 依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependency!-- Flink Kafka连接器的依赖 -- !-- dependency-- !-- groupIdorg.apache.flink/groupId-- !-- artifactIdflink-connector-kafka-0.11_2.11/artifactId-- !-- version${flink.version}/version-- !-- /dependency--dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependency!-- Flink 开发Scala需要导入以下依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_2.12/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependency!--dependency--!--groupIdorg.scala-lang/groupId--!--artifactIdscala-library/artifactId--!--version2.11.12/version--!--/dependency--!-- log4j 和slf4j 包,如果在控制台不想看到日志可以将下面的包注释掉--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-log4j12/artifactId--!--version1.7.25/version--!--scopetest/scope--!--/dependency--!--dependency--!--groupIdlog4j/groupId--!--artifactIdlog4j/artifactId--!--version1.2.17/version--!--/dependency--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-api/artifactId--!--version1.7.25/version--!--/dependency--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-nop/artifactId--!--version1.7.25/version--!--scopetest/scope--!--/dependency--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-simple/artifactId--!--version1.7.5/version--!--/dependency--/dependenciesbuildplugins!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -- !-- plugin-- !-- groupIdorg.scala-tools/groupId-- !-- artifactIdmaven-scala-plugin/artifactId-- !-- version2.15.2/version-- !-- executions-- !-- execution-- !-- goals-- !-- goalcompile/goal-- !-- goaltestCompile/goal-- !-- /goals-- !-- /execution-- !-- /executions-- !-- /plugin--pluginartifactIdmaven-assembly-plugin/artifactIdversion2.4/versionconfiguration!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --!--appendAssemblyIdfalse/appendAssemblyId--archivemanifestmainClasscom.hadoop.demo.service.flinkDemo.FlinkDemo/mainClass/manifest/archivedescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalassembly/goal/goals/execution/executions/plugin/plugins/build /project项目结构如图 2.2 代码编写 package com.hadoop.demo.service.flinkDemo;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.FlatMapIterator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays; import java.util.Iterator;public class FlinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//新建kafka连接KafkaSourceString kfkSource KafkaSource.Stringbuilder().setBootstrapServers(192.168.184.129:9092).setGroupId(flink).setTopics(flinkTest).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();//添加到flink环境DataStreamSourceString lines env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), kafka source);//根据逗号分组SingleOutputStreamOperatorTuple2String, Integer map lines.flatMap(new FlatMapIteratorString, String() {Overridepublic IteratorString flatMap(String s) throws Exception {return Arrays.asList(s.split(,)).iterator();}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String s) throws Exception {return new Tuple2(s, 1);}});//统计每个单词的数量SingleOutputStreamOperatorTuple2String, Integer sum map.keyBy(0).sum(1);sum.print();//System.out.println(sum.get);env.execute();}} 2.3 maven打包 点击打包按钮这里注意要选择带依赖的jar包否则会出现以下错误。 NoClassDefFoundError: org/apache/flink/connector/kafka/source/KafkaSource三、测试 3.1启动 hadoop集群启动flink集群 这里如果不知道怎么搭建这两个集群可以看我其他文章 hadoop集成flink ./hadoop.sh start ./bin/yarn-session.sh --detached3.2 上传jar包到flink集群 上传后填写主类类名点击提交 3.3 测试 点击后可以看到执行job这里能看到在运行的job 点击运行的task 点击输出 这里可以看到输出内容 在kafka消费端输入内容 这里的jbs出现了4次看下输出控制台 可以看到这里依次累加了四次说明统计生效了。 总结 这里只是做了一个简单的消费kafka的flink例子消费成功后还可以通过sink发送出去还可以用transform进行转换这里后面再演示如果不对的可以指出。
http://www.w-s-a.com/news/65699/

相关文章:

  • 自助网站建设怎么建设房地产的最新政策
  • 企业网站 生成html网站侵权怎么做公证或证据保存
  • php 手机网站cms系统购物网站制作流程
  • 网络公司网站开发河北省城乡住房和建设厅网站
  • 做网站配置wordpress 中文api
  • 怎样把网站做的好看县蒙文网站建设汇报
  • 网站的优化什么做广西桂林新闻最新消息
  • 做网站准备什么软件搜索引擎广告推广
  • 网站开发地图板块浮动网页设计与制作的模板
  • 中国建设招聘信息网站昆明做网站建设的公司排名
  • 那些网站可以做自媒体wordpress 分类seo
  • 淮安市盱眙县建设局网站北京西站到八达岭长城最快路线
  • 在线免费网站企业查查官网入口官网
  • 天津网站优化公司哪家专业超融合系统
  • 邹平网站建设公司报价网站建设备案多长时间
  • 三合一网站开发教程wordpress主题汉化中文版
  • 广州网站建设高端全网营销图片
  • 措勤网站建设罗定城乡建设局网站
  • 苏州建网站流程wordpress不显示内容你
  • 网站流量数据golang建设网站
  • 2020电商网站排行榜如何开设网站
  • 绍兴seo网站管理创新的网站建站
  • 做网站需要的图片网站的视频怎么下载
  • 教人做家务的网站滕州网站建设网站行吗
  • 湖北专业的网瘾学校哪家口碑好seo百度百科
  • 保定网站制作软件网页制作工具程
  • o2o网站建设教程计算机培训班培训费用
  • 赤峰网站制作php智能建站系统
  • 做高防鞋 哪个网站能上架net网站开发net网站开发
  • 做网站公司郑州推广计划步骤