优化网站打开速度,台州做网站建设,培训前端开发,企业公司网站建设前言
flink是实时计算的重要集成组件#xff0c;这里演示如何集成#xff0c;并且使用一个小例子。例子是kafka输入消息#xff0c;用逗号隔开#xff0c;统计每个相同单词出现的次数#xff0c;这么一个功能。 一、kafka环境准备
1.1 启动kafka
这里我使用的kafka版本…
前言
flink是实时计算的重要集成组件这里演示如何集成并且使用一个小例子。例子是kafka输入消息用逗号隔开统计每个相同单词出现的次数这么一个功能。 一、kafka环境准备
1.1 启动kafka
这里我使用的kafka版本是3.2.0部署的方法可以参考 kafka部署
cd kafka_2.13-3.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties启动后查看java进程是否存在存在后执行下一步。
1.2 新建topic
新建一个专门用于flink消费topic
bin/kafka-topics.sh --create --topic flinkTest --bootstrap-server 192.168.184.129:90921.3 测试生产消费是否正常
生产端
bin/kafka-console-producer.sh --topic flinkTest --bootstrap-server 192.168.184.129:9092客户端
bin/kafka-console-consumer.sh --topic flinkTest --from-beginning --bootstrap-server 192.168.184.129:90921.4 测试生产消费
在生产端输入aaa 查看客户端是否能消费到 可以看到客户端已经消费成功了kafka环境准备好了。
二、flink集成kafka
2.1 pom文件修改
pom文件修改之前先看看官网的指导依赖是什么样的 这里我们使用的是datastream api去做 flink1.17.0官方文档 这里说明了相关的依赖需要引入的依赖包的版本还有使用kafka消费的时候需要引入的连接包版本 完整的pom引入依赖如下
?xml version1.0 encodingUTF-8?project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.wh.flink/groupIdartifactIdflink/artifactIdversion1.0-SNAPSHOT/versionnameflink/name!-- FIXME change it to the projects website --urlhttp://www.example.com/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetflink.version1.17.1/flink.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency!-- Flink 依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependency!-- Flink Kafka连接器的依赖 --
!-- dependency--
!-- groupIdorg.apache.flink/groupId--
!-- artifactIdflink-connector-kafka-0.11_2.11/artifactId--
!-- version${flink.version}/version--
!-- /dependency--dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependency!-- Flink 开发Scala需要导入以下依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_2.12/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version!--scopeprovided/scope--/dependency!--dependency--!--groupIdorg.scala-lang/groupId--!--artifactIdscala-library/artifactId--!--version2.11.12/version--!--/dependency--!-- log4j 和slf4j 包,如果在控制台不想看到日志可以将下面的包注释掉--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-log4j12/artifactId--!--version1.7.25/version--!--scopetest/scope--!--/dependency--!--dependency--!--groupIdlog4j/groupId--!--artifactIdlog4j/artifactId--!--version1.2.17/version--!--/dependency--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-api/artifactId--!--version1.7.25/version--!--/dependency--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-nop/artifactId--!--version1.7.25/version--!--scopetest/scope--!--/dependency--!--dependency--!--groupIdorg.slf4j/groupId--!--artifactIdslf4j-simple/artifactId--!--version1.7.5/version--!--/dependency--/dependenciesbuildplugins!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 --
!-- plugin--
!-- groupIdorg.scala-tools/groupId--
!-- artifactIdmaven-scala-plugin/artifactId--
!-- version2.15.2/version--
!-- executions--
!-- execution--
!-- goals--
!-- goalcompile/goal--
!-- goaltestCompile/goal--
!-- /goals--
!-- /execution--
!-- /executions--
!-- /plugin--pluginartifactIdmaven-assembly-plugin/artifactIdversion2.4/versionconfiguration!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --!--appendAssemblyIdfalse/appendAssemblyId--archivemanifestmainClasscom.hadoop.demo.service.flinkDemo.FlinkDemo/mainClass/manifest/archivedescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalassembly/goal/goals/execution/executions/plugin/plugins/build
/project项目结构如图
2.2 代码编写
package com.hadoop.demo.service.flinkDemo;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.Iterator;public class FlinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//新建kafka连接KafkaSourceString kfkSource KafkaSource.Stringbuilder().setBootstrapServers(192.168.184.129:9092).setGroupId(flink).setTopics(flinkTest).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();//添加到flink环境DataStreamSourceString lines env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), kafka source);//根据逗号分组SingleOutputStreamOperatorTuple2String, Integer map lines.flatMap(new FlatMapIteratorString, String() {Overridepublic IteratorString flatMap(String s) throws Exception {return Arrays.asList(s.split(,)).iterator();}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String s) throws Exception {return new Tuple2(s, 1);}});//统计每个单词的数量SingleOutputStreamOperatorTuple2String, Integer sum map.keyBy(0).sum(1);sum.print();//System.out.println(sum.get);env.execute();}}
2.3 maven打包
点击打包按钮这里注意要选择带依赖的jar包否则会出现以下错误。
NoClassDefFoundError: org/apache/flink/connector/kafka/source/KafkaSource三、测试
3.1启动 hadoop集群启动flink集群
这里如果不知道怎么搭建这两个集群可以看我其他文章 hadoop集成flink
./hadoop.sh start
./bin/yarn-session.sh --detached3.2 上传jar包到flink集群 上传后填写主类类名点击提交
3.3 测试
点击后可以看到执行job这里能看到在运行的job 点击运行的task 点击输出 这里可以看到输出内容 在kafka消费端输入内容 这里的jbs出现了4次看下输出控制台 可以看到这里依次累加了四次说明统计生效了。 总结
这里只是做了一个简单的消费kafka的flink例子消费成功后还可以通过sink发送出去还可以用transform进行转换这里后面再演示如果不对的可以指出。