重庆网络推广网站推广,国家出台建设工程政策的网站,网络推广策划书,html制作旅游网页实例Spark Streaming集成Kafka是生产上最多的方式#xff0c;其中集成Kafka 0.10是较为简单的#xff0c;即#xff1a;Kafka分区和Spark分区之间是1:1的对应关系#xff0c;以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整#xff0c;下面我们… Spark Streaming集成Kafka是生产上最多的方式其中集成Kafka 0.10是较为简单的即Kafka分区和Spark分区之间是1:1的对应关系以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整下面我们一起来看看吧。
一、创建一个Direct Stream
导入相关maven依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.5.3/version
/dependency import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeclass KafkaDriectStream {def main(args: Array[String]): Unit {// 创建一个具有2个线程和1秒批处理间隔的本地StreamingContext。val conf new SparkConf().setMaster(local[2]).setAppName(KafkaDriectStream)val ssc new StreamingContext(conf, Seconds(1))val kafkaParams Map[String, Object](bootstrap.servers - cdh1:9092,cdh2:9092,cdh3:9092,key.deserializer - classOf[StringDeserializer],value.deserializer - classOf[StringDeserializer],group.id - use_a_separate_group_id_for_each_stream,auto.offset.reset - latest,enable.auto.commit - (false: java.lang.Boolean))val topics Array(topicA, topicB)val inputDStream :InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))inputDStream.map(record (record.key, record.value))}
}
如果Spark批处理持续时间大于默认的Kafka心跳会话超时时间30秒请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批处理这将需要更改代理上的group.max.session.timeout.ms。
二、executor选择适合分区处理
新的Kafka Consumer API会将消息预取到缓冲区中。因此出于性能原因Spark集成Kafka时最好将缓存的Consumer 保留在executor上而不是为每个批次重新创建它们。
在大多数情况下应该使用LocationStrategies.PreferConsistent。这将在可用的executor之间均匀地分配分区。如果executor与Kafka 的broker位于相同的主机上则使用PreferBrokers这将在该分区的Kafka leader上安排分区。最后如果分区之间的负载严重偏差请使用PreferFixed。这允许指定分区到主机的显式映射任何未指定的分区都将使用一致的位置。
Consumer 缓存的默认最大大小为64。如果处理超过64个executor数量的Kafka分区可以通过更改spark.streaming.kafka.consumer.cache.maxCapacity设置。
如果想禁用Consumer 的缓存可以将spark.streaming.kafka.consumer.cache.enabled 设置成false。
缓存由topic分区和group.id控制因此对createDirectStream的每次调用使用单独的 group.id
三、根据topic、partition、offset创建RDD
// 导入依赖关系并创建kafka-params例如第一步创建Direct Streamval offsetRanges Array(// topic, partition, 包含起始offset, 不包含结束offsetOffsetRange(test, 0, 0, 100),OffsetRange(test, 1, 0, 100)
)//根据kafka TopicPartition 中的一段数据来创建一个RDD这是不是为了实现微批来提供支持呢
val rdd KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
请注意这里不能指定broker来消费因为spark streaming的Driver Consumer 可以自动查找broker的元数据。如果要指定broker需要将其与元数据绑定到一起。
四、获取offset
stream.foreachRDD { rdd val offsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter val o: OffsetRange offsetRanges(TaskContext.get.partitionId)println(s${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset})}
}
请注意HasOffsetRanges的类型转换只有在createDirectStream结果调用的第一个方法中完成时才会成功而不是在后面的方法链中完成。因为一旦发生shuffle和重分区RDD分区和Kafka分区之间的一对一关系就会遭到破坏。
五、存储offset
在kafka中为了实现精确一次的语义必须把结果处理和offset放到一个事务中去处理在与spark streaming集成时也不例外。必须在幂等输出之后存储offset或者将offset与输出一起存储在原子事务中。
offset可以存储在spark的checkpoint中也可以存储在kafka自身的内部topic中。将offset存储到kafka的好处是无论应用程序代码发生什么变化Kafka都是一个持久的存储。但是Kafka不是事务性的程序的输出必须仍然是幂等的。注意在流式计算中我们一般会将enable.auto.commit置为false。采用手动提交的方式。
stream.foreachRDD { rdd val offsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRanges// 一段时间后在输出完成之后,提交offsetstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
与HasOffsetRanges一样只有在createDirectStream的结果上调用时才能成功得到CanCommitOffsets 而不是在转换之后。获取到CanCommitOffsets 一般要等这批数据处理完再进行提交。
// 从提交到数据库的偏移量开始
val fromOffsets selectOffsetsFromYourDatabase.map { resultSet new TopicPartition(resultSet.string(topic), resultSet.int(partition)) - resultSet.long(offset)
}.toMapval stream KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)stream.foreachRDD { rdd val offsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRangesval results yourCalculation(rdd)// 开启事务// 更新结果// 更新offset// 结束事务
}
六、官方例子
object DirectKafkaWordCount {def main(args: Array[String]): Unit {if (args.length 3) {System.err.println(s|Usage: DirectKafkaWordCount brokers groupId topics| brokers is a list of one or more Kafka brokers| groupId is a consumer group name to consume from topics| topics is a list of one or more kafka topics to consume from|.stripMargin)System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(brokers, groupId, topics) args// 以2秒的批处理间隔创建上下文val sparkConf new SparkConf().setAppName(DirectKafkaWordCount)val ssc new StreamingContext(sparkConf, Seconds(2))//指定kafka、topic信息创建direct kafka streamval topicsSet topics.split(,).toSetval kafkaParams Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - brokers,ConsumerConfig.GROUP_ID_CONFIG - groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer])val messages KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// 获取一行数据并进行分割、统计、打印val lines messages.map(_.value)val words lines.flatMap(_.split( ))val wordCounts words.map(x (x, 1L)).reduceByKey(_ _)wordCounts.print()//启动计算ssc.start()ssc.awaitTermination()}
}
该例子消费Kafka中一个或多个topic的消息并进行单词统计需要三个参数1、Kafka broker的列表2、消费者组3、以逗号分隔的topic列表
1、创建2个topic kafka-topics --create --topic spark-streaming-wc1 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2 kafka-topics --create --topic spark-streaming-wc2 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2 2、启动程序 cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/ bin/run-example org.apache.spark.examples.streaming.DirectKafkaWordCount cdh1:9092,cdh2:9092 direct-kafka-wc-group spark-streaming-wc1,spark-streaming-wc2 3、向topic推送数据 kafka-console-producer --topic spark-streaming-wc1 --broker-list cdh1:9092,cdh2:9092,cdh3:9092 kafka-console-producer --topic spark-streaming-wc2 --broker-list cdh1:9092,cdh2:9092,cdh3:9092 4、查看结果 大多数高校硕博生毕业要求需要参加学术会议发表EI或者SCI检索的学术论文会议论文 可访问艾思科蓝官网浏览即将召开的学术会议列表。会议如下 第四届大数据、信息与计算机网络国际学术会议BDICN 2025
广州https://ais.cn/u/fi2yym
第四届电子信息工程、大数据与计算机技术国际学术会议EIBDCT 2025
青岛https://ais.cn/u/nuQr6f
第六届大数据与信息化教育国际学术会议ICBDIE 2025
苏州https://ais.cn/u/eYnmQr
第三届通信网络与机器学习国际学术会议(CNML 2025)
南京https://ais.cn/u/vUNva2