长沙英文网站建设公司,获客渠道有哪些,湖南电子科技网站建设,东营住房和城乡建设厅网站第1章 SparkStreaming 概述1.1 Spark Streaming 是什么Spark 流使得构建可扩展的容错流应用程序变得更加容易。**Spark Streaming 用于流式数据的处理。**Spark Streaming 支持的数据输入源很多#xff0c;例如#xff1a;Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字…第1章 SparkStreaming 概述1.1 Spark Streaming 是什么Spark 流使得构建可扩展的容错流应用程序变得更加容易。**Spark Streaming 用于流式数据的处理。**Spark Streaming 支持的数据输入源很多例如Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如map、reduce、join、window 等进行运算。而结果也能保存在很多地方如 HDFS数据库等。和 Spark 基于 RDD 的概念很相似Spark Streaming 使用离散化流(discretized stream)作为抽象表示叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部每个时间区间收到的数据都作为 RDD 存在而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将DStream 就是对 RDD 在实时数据处理场景的一种封装。1.2 Spark Streaming 的特点➢ 易用➢ 容错➢ 易整合到 Spark 体系1.3 Spark Streaming 架构1.3.1 架构图➢ 整体架构图➢ SparkStreaming 架构图1.3.2 背压机制Spark 1.5 以前版本用户如果要限制 Receiver 的数据接收速率可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现此举虽然可以通过限制接收速率来适配当前的处理能力防止内存溢出但也会引入其它问题。比如producer 数据生产高于 maxRate当前集群处理能力也高于 maxRate这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制即 Spark Streaming Backpressure: 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。为了更好的协调数据接收速率与资源处理能力1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制即 Spark Streaming Backpressure: 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。第 2 章 Dstream 入门2.1 WordCount 案例实操➢ 需求使用 netcat 工具向 9999 端口不断的发送数据通过 SparkStreaming 读取端口数据并统计不同单词出现的次数1) 添加依赖dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion3.0.0/version
/dependency2) 编写代码package spark.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** author Lucaslee* create 2023-02-23 14:12*/
object streaming_wordCount {def main(args: Array[String]): Unit {// TODO 创建环境对象//1.初始化 Spark 配置信息val sparkConf new SparkConf().setMaster(local[*]).setAppName(StreamWordCount)//2.初始化 SparkStreamingContext// StreamingContext创建时需要传递两个参数// 第一个参数表示环境配置// 第二个参数表示批量处理的周期采集周期val ssc new StreamingContext(sparkConf, Seconds(3))// TODO 逻辑处理// 获取端口数据val line ssc.socketTextStream(hadoop102, 9999)val word line.flatMap(words{words.split( )})val wordCount word.map((_, 1)).reduceByKey(_ _)wordCount.print()// TODO 关闭环境// 由于SparkStreaming采集器是长期执行的任务所以不能直接关闭// 如果main方法执行完毕应用程序也会自动结束。所以不能让main执行完毕//ssc.stop()// 1. 启动采集器ssc.start()// 2. 等待采集器的关闭ssc.awaitTermination()}
}
3) 启动程序并通过 netcat 发送数据nc -lk 9999
hello spark2.2 WordCount 解析Discretized Stream 是 Spark Streaming 的基础抽象代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。对数据的操作也是按照 RDD 为单位来进行的计算过程由 Spark Engine 来完成第 3 章 DStream 创建3.1 RDD 队列3.1.1 用法及说明测试过程中可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream每一个推送到这个队列中的 RDD都会作为一个 DStream 处理。3.1.2 案例实操➢ 需求循环创建几个 RDD将 RDD 放入队列。通过 SparkStream 创建 Dstream计算WordCount1) 编写代码import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming02_Queue {def main(args: Array[String]): Unit {// TODO 创建环境对象// StreamingContext创建时需要传递两个参数// 第一个参数表示环境配置val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)// 第二个参数表示批量处理的周期采集周期val ssc new StreamingContext(sparkConf, Seconds(3))//3.创建 RDD 队列val rddQueue new mutable.Queue[RDD[Int]]()//4.创建 QueueInputDStreamval inputStream ssc.queueStream(rddQueue,oneAtATime false)val mappedStream inputStream.map((_,1))val reducedStream mappedStream.reduceByKey(_ _)reducedStream.print()//7.启动任务ssc.start()//8.循环创建并向 RDD 队列中放入 RDDfor (i - 1 to 5) {rddQueue ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}
结果展示-------------------------------------------
Time: 1539075280000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)
-------------------------------------------
Time: 1539075284000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)
-------------------------------------------
Time: 1539075288000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)
-------------------------------------------
Time: 1539075292000 ms
-------------------------------------------3.2 自定义数据源3.2.1 用法及说明需要继承 Receiver并实现 onStart、onStop 方法来自定义数据源采集。3.2.2 案例实操需求自定义数据源实现监控某个端口号获取该端口号内容。1) 自定义数据源class CustomerReceiver(host: String, port: Int) extendsReceiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候调用该方法作用为读数据并将数据发送给 Sparkoverride def onStart(): Unit {new Thread(Socket Receiver) {override def run() {receive()}}.start()}//读数据并将数据发送给 Sparkdef receive(): Unit {//创建一个 Socketvar socket: Socket new Socket(host, port)//定义一个变量用来接收端口传过来的数据var input: String null//创建一个 BufferedReader 用于读取端口传来的数据val reader new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))//读取数据input reader.readLine()//当 receiver 没有关闭并且输入数据不为空则循环发送数据给 Sparkwhile (!isStopped() input ! null) {store(input)input reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart(restart)}override def onStop(): Unit {}
}2) 使用自定义的数据源采集数据object FileStream {def main(args: Array[String]): Unit {//1.初始化 Spark 配置信息val sparkConf new SparkConf().setMaster(local[*]).setAppName(StreamWordCount)//2.初始化 SparkStreamingContextval ssc new StreamingContext(sparkConf, Seconds(5))//3.创建自定义 receiver 的 Streamingval lineStream ssc.receiverStream(new CustomerReceiver(hadoop102, 9999))//4.将每一行数据做切分形成一个个单词val wordStream lineStream.flatMap(_.split(\t))//5.将单词映射成元组word,1val wordAndOneStream wordStream.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStream wordAndOneStream.reduceByKey(_ _)//7.打印wordAndCountStream.print()//8.启动 SparkStreamingContextssc.start()ssc.awaitTermination()}
}
小练习package spark.streamingimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.util.Random/*** author Lucaslee* create 2023-02-23 14:55*/
object DIY_Receiver {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val messageDS: ReceiverInputDStream[String] ssc.receiverStream(new MyReceiver())messageDS.print()ssc.start()ssc.awaitTermination()}/*自定义数据采集器1. 继承Receiver定义泛型, 传递参数2. 重写方法*/class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var flg true//最初启动的时候调用该方法作用为读数据并将数据发送给 Sparkoverride def onStart(): Unit {new Thread(new Runnable {override def run(): Unit {while ( flg ) {// 在这里改采集逻辑val message 采集的数据为 new Random().nextInt(10).toStringstore(message)Thread.sleep(500)}}}).start()}override def onStop(): Unit {flg false;}}}
3.3 Kafka 数据源面试、开发重点3.3.1 版本选型**ReceiverAPI**需要一个专门的 Executor 去接收数据然后发送给其他的 Executor 做计算。存在的问题接收数据的 Executor 和计算的 Executor 速度会有所不同特别在接收数据的 Executor速度大于计算的 Executor 速度会导致计算数据的节点内存溢出。早期版本中提供此方式当前版本不适用**DirectAPI**是由计算的 Executor 来主动消费 Kafka 的数据速度由自身控制。3.3.2 Kafka 0-8 Receiver 模式当前版本不适用1 需求通过 SparkStreaming 从 Kafka 读取数据并将读取过来的数据做简单计算最终打印到控制台。2导入依赖dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-8_2.11/artifactIdversion2.4.5/version
/dependency3编写代码import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ReceiverAPI {def main(args: Array[String]): Unit {//1.创建 SparkConfval sparkConf: SparkConf newSparkConf().setAppName(ReceiverWordCount).setMaster(local[*])//2.创建 StreamingContextval ssc new StreamingContext(sparkConf, Seconds(3))//3.读取 Kafka 数据创建 DStream(基于 Receive 方式)val kafkaDStream: ReceiverInputDStream[(String, String)] KafkaUtils.createStream(ssc,linux1:2181,linux2:2181,linux3:2181,atguigu,Map[String, Int](atguigu - 1))//4.计算 WordCountkafkaDStream.map { case (_, value) (value, 1)}.reduceByKey(_ _).print()//5.开启任务ssc.start()ssc.awaitTermination()}
}3.3.3 Kafka 0-8 Direct 模式当前版本不适用1需求通过 SparkStreaming 从 Kafka 读取数据并将读取过来的数据做简单计算最终打印到控制台。2导入依赖dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-8_2.11/artifactIdversion2.4.5/version
/dependency3编写代码自动维护 offsetimport kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectAPIAuto02 {val getSSC1: () StreamingContext () {val sparkConf: SparkConf newSparkConf().setAppName(ReceiverWordCount).setMaster(local[*])val ssc new StreamingContext(sparkConf, Seconds(3))ssc}def getSSC: StreamingContext {//1.创建 SparkConfval sparkConf: SparkConf newSparkConf().setAppName(ReceiverWordCount).setMaster(local[*])//2.创建 StreamingContextval ssc new StreamingContext(sparkConf, Seconds(3))//设置 CKssc.checkpoint(./ck2)//3.定义 Kafka 参数val kafkaPara: Map[String, String] Map[String, String](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -linux1:9092,linux2:9092,linux3:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu)//4.读取 Kafka 数据val kafkaDStream: InputDStream[(String, String)] KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaPara,Set(atguigu))//5.计算 WordCountkafkaDStream.map(_._2).flatMap(_.split( )).map((_, 1)).reduceByKey(_ _).print()//6.返回数据ssc}def main(args: Array[String]): Unit {//获取 SSCval ssc: StreamingContext StreamingContext.getActiveOrCreate(./ck2, () getSSC)//开启任务ssc.start()ssc.awaitTermination()}
}
4编写代码手动维护 offsetimport kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils,OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectAPIHandler {def main(args: Array[String]): Unit {//1.创建 SparkConfval sparkConf: SparkConf newSparkConf().setAppName(ReceiverWordCount).setMaster(local[*])//2.创建 StreamingContextval ssc new StreamingContext(sparkConf, Seconds(3))//3.Kafka 参数val kafkaPara: Map[String, String] Map[String, String](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu)//4.获取上一次启动最后保留的 OffsetgetOffset(MySQL)val fromOffsets: Map[TopicAndPartition, Long] Map[TopicAndPartition,Long](TopicAndPartition(atguigu, 0) - 20)//5.读取 Kafka 数据创建 DStreamval kafkaDStream: InputDStream[String] KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder, String](ssc,kafkaPara,fromOffsets,(m: MessageAndMetadata[String, String]) m.message())//6.创建一个数组用于存放当前消费数据的 offset 信息var offsetRanges Array.empty[OffsetRange]//7.获取当前消费数据的 offset 信息val wordToCountDStream: DStream[(String, Int)] kafkaDStream.transform { rddoffsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}.flatMap(_.split( )).map((_, 1)).reduceByKey(_ _)//8.打印 Offset 信息wordToCountDStream.foreachRDD(rdd {for (o - offsetRanges) {println(s${o.topic}:${o.partition}:${o.fromOffset}:${o.untilOffset})}rdd.foreach(println)})//9.开启任务ssc.start()ssc.awaitTermination()}
}3.3.4 Kafka 0-10 Direct 模式(使用)1需求通过 SparkStreaming 从 Kafka 读取数据并将读取过来的数据做简单计算最终打印到控制台。2导入依赖dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.0.0/version
/dependency
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactIdversion2.10.1/version
/dependency3编写代码package spark.streaming
import java.util.Randomimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** author Lucaslee* create 2023-02-23 15:14*/
object SparkStreaming04_Kafka {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)//2.创建 StreamingContextval ssc new StreamingContext(sparkConf, Seconds(3))//3.定义 Kafka 参数val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop103:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)//kafka消费者需要在集群中启动kafka生产者进行数据的生产在观察控制台打印结果//4.读取 Kafka 数据创建 DStreamval kafkaDataDS: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))//5.将每条消息的 KV 取出kafkaDataDS.map(_.value()).print()ssc.start()ssc.awaitTermination()}
}
打开一个Kafka生产者进程生产数据然后观察控制台输出数据查看 Kafka 消费进度bin/kafka-consumer-groups.sh --describe --bootstrap-server linux1:9092 --group
atguigu
第 4 章 DStream 转换DStream 上的操作与 RDD 的类似分为 Transformations转换和 Output Operations输出两种此外转换操作中还有一些比较特殊的原语如updateStateByKey()、transform()以及各种 Window 相关的原语。4.1 无状态转化操作无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意针对键值对的 DStream 转化操作(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。需要记住的是尽管这些函数看起来像作用在整个流上一样但事实上每个 DStream 在内部是由许多 RDD批次组成且无状态转化操作是分别应用到每个 RDD 上的。例如reduceByKey()会归约每个时间区间中的数据但不会归约不同区间之间的数据。4.1.1 TransformTransform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming06_State_Transform {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val lines ssc.socketTextStream(localhost, 9999)// transform方法可以将底层RDD获取到后进行操作// 1. DStream功能不完善// 2. 需要代码周期性的执行// Code : Driver端val newDS: DStream[String] lines.transform(rdd {// Code : Driver端周期性执行rdd.map(str {// Code : Executor端str})})// Code : Driver端val newDS1: DStream[String] lines.map(data {// Code : Executor端data})ssc.start()ssc.awaitTermination()}}
4.1.2 join两个流之间的 join 需要两个流的批次大小一致这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join与两个 RDD 的 join 效果相同。import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object JoinTest {def main(args: Array[String]): Unit {//1.创建 SparkConfval sparkConf: SparkConf newSparkConf().setMaster(local[*]).setAppName(JoinTest)//2.创建 StreamingContextval ssc new StreamingContext(sparkConf, Seconds(5))//3.从端口获取数据创建流val lineDStream1: ReceiverInputDStream[String] ssc.socketTextStream(linux1, 9999)val lineDStream2: ReceiverInputDStream[String] ssc.socketTextStream(linux2, 8888)//4.将两个流转换为 KV 类型val wordToOneDStream: DStream[(String, Int)] lineDStream1.flatMap(_.split( )).map((_, 1))val wordToADStream: DStream[(String, String)] lineDStream2.flatMap(_.split()).map((_, a))//5.流的 JOINval joinDStream: DStream[(String, (Int, String))] wordToOneDStream.join(wordToADStream)//6.打印joinDStream.print()//7.启动任务ssc.start()ssc.awaitTermination()}
}package com.atguigu.bigdata.spark.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming06_State_Join {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(5))val data9999 ssc.socketTextStream(localhost, 9999)val data8888 ssc.socketTextStream(localhost, 8888)val map9999: DStream[(String, Int)] data9999.map((_,9))val map8888: DStream[(String, Int)] data8888.map((_,8))// 所谓的DStream的Join操作其实就是两个RDD的joinval joinDS: DStream[(String, (Int, Int))] map9999.join(map8888)joinDS.print()ssc.start()ssc.awaitTermination()}}
4.2 有状态转化操作4.2.1 UpdateStateByKeyUpdateStateByKey 原语用于记录历史记录有时我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况updateStateByKey()为我们提供了对一个状态变量的访问用于键值对形式的 DStream。给定一个由(键事件)对构成的 DStream并传递一个指定如何根据新的事件更新每个键对应状态的函数它可以构建出一个新的 DStream其内部数据为(键状态) 对。updateStateByKey() 的结果会是一个新的 DStream其内部的 RDD 序列是由每个时间区间对应的(键状态)对组成的。updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能需要做下面两步1.定义状态状态可以是一个任意的数据类型。定义状态更新函数用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。使用 updateStateByKey 需要对检查点目录进行配置会使用检查点来保存状态。更新版的 wordcount1) 编写代码package spark.streaming
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming05_State {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))// 设置检查点路径ssc.checkpoint(cp)// 无状态数据操作只对当前的采集周期内的数据进行处理// 在某些场合下需要保留数据统计结果状态实现数据的汇总// 使用有状态操作时需要设定检查点路径val datas ssc.socketTextStream(hadoop102, 9999)val wordToOne datas.map((_,1))//val wordToCount wordToOne.reduceByKey(__)// updateStateByKey根据key对数据的状态进行更新// 传递的参数中含有两个值// 第一个值表示相同的key的value数据// 第二个值表示缓存区相同key的value数据val state wordToOne.updateStateByKey(( seq:Seq[Int], buff:Option[Int] ) {val newCount buff.getOrElse(0) seq.sumOption(newCount)})state.print()ssc.start()ssc.awaitTermination()}}
2) 启动程序并向 9999 端口发送数据nc -lk 9999
Hello World
Hello Scala3) 结果展示-------------------------------------------
Time: 1504685175000 ms
-------------------------------------------
-------------------------------------------
Time: 1504685181000 ms
-------------------------------------------
(shi,1)
(shui,1)
(ni,1)
-------------------------------------------
Time: 1504685187000 ms
-------------------------------------------
(shi,1)
(ma,1)
(hao,1)
(shui,1)4.2.2 WindowOperationsWindow Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数分别为窗口时长以及滑动步长。➢ 窗口时长计算内容的时间范围➢ 滑动步长隔多久触发一次计算。注意这两者都必须为采集周期大小的整数倍。WordCount 第三版3 秒一个批次窗口 12 秒滑步 6 秒。import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming06_State_Window {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val lines ssc.socketTextStream(localhost, 9999)val wordToOne lines.map((_,1))// 窗口的范围应该是采集周期的整数倍// 窗口可以滑动的但是默认情况下一个采集周期进行滑动// 这样的话可能会出现重复数据的计算为了避免这种情况可以改变滑动的滑动步长val windowDS: DStream[(String, Int)] wordToOne.window(Seconds(6), Seconds(6))val wordToCount windowDS.reduceByKey(__)wordToCount.print()ssc.start()ssc.awaitTermination()}}
关于 Window 的操作还有如下方法1window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream2countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数3reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流4reduceByKey****AndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数会返回一个新(K,V)对的 DStream此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。5reduceByKeyAndWindow(func,** invFunc**, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到这个函数只适用于”可逆的 reduce 函数”也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数reduce 任务的数量通过可选参数来配置。val ipDStream accessLogsDStream.map(logEntry (logEntry.getIpAddress(), 1))
val ipCountDStream ipDStream.reduceByKeyAndWindow({(x, y) x y},{(x, y) x - y},Seconds(30),Seconds(10))//加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长countByWindow()和 countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的 DStream而 countByValueAndWindow()返回的 DStream 则包含窗口中每个值的个数。案例import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming06_State_Window1 {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint(cp)val lines ssc.socketTextStream(localhost, 9999)val wordToOne lines.map((_,1))// reduceByKeyAndWindow : 当窗口范围比较大但是滑动幅度比较小那么可以采用增加数据和删除数据的方式// 无需重复计算提升性能。val windowDS: DStream[(String, Int)] wordToOne.reduceByKeyAndWindow((x:Int, y:Int) { x y},(x:Int, y:Int) {x - y},Seconds(9), Seconds(3))windowDS.print()ssc.start()ssc.awaitTermination()}}
第 5 章 DStream 输出输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作整个 context 就都不会启动。没有输出直接报错输出操作如下➢ print()在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中同样的操作叫 print()。➢ saveAsTextFiles(prefix, [suffix])以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。➢ saveAsObjectFiles(prefix, [suffix])以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为prefix-TIME_IN_MS[.suffix]. Python中目前不可用。➢ saveAsHadoopFiles(prefix, [suffix])将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为prefix-TIME_IN_MS[.suffix]。Python API 中目前不可用。➢ foreachRDD(func)这是最通用的输出操作即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统如将RDD 存入文件或者通过网络将其写入数据库。通用的输出操作 foreachRDD()它用来对 DStream 中的 RDD 运行任意计算。这和 transform()有些类似都可以让我们访问任意 RDD。在 foreachRDD()中可以重用我们在 Spark 中实现的所有行动操作。比如常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。注意连接不能写在 driver 层面序列化如果写在 foreach 则每个 RDD 中的每一条数据都创建得不偿失增加 foreachPartition在分区创建获取。第 6 章 优雅关闭流式任务需要 7*24 小时执行但是有时涉及到升级代码需要主动停止程序但是分布式程序没办法做到一个个进程去杀死所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。➢ MonitorStopimport java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
class MonitorStop(ssc: StreamingContext) extends Runnable {override def run(): Unit {val fs: FileSystem FileSystem.get(new URI(hdfs://linux1:9000), newConfiguration(), atguigu)while (true) {tryThread.sleep(5000)catch {case e: InterruptedException e.printStackTrace()}val state: StreamingContextState ssc.getStateval bool: Boolean fs.exists(new Path(hdfs://linux1:9000/stopSpark))if (bool) {if (state StreamingContextState.ACTIVE) {ssc.stop(stopSparkContext true, stopGracefully true)System.exit(0)}}}}
}package com.atguigu.bigdata.spark.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}object SparkStreaming08_Close {def main(args: Array[String]): Unit {/*线程的关闭val thread new Thread()thread.start()thread.stop(); // 强制关闭*/val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val lines ssc.socketTextStream(localhost, 9999)val wordToOne lines.map((_,1))wordToOne.print()ssc.start()// 如果想要关闭采集器那么需要创建新的线程// 而且需要在第三方程序中增加关闭状态new Thread(new Runnable {override def run(): Unit {// 优雅地关闭// 现实中应该这样写// 计算节点不在接收新的数据而是将现有的数据处理完毕然后关闭// Mysql : Table(stopSpark) Row data// Redis : DataK-V// ZK : /stopSpark// HDFS : /stopSpark/*while ( true ) {if (true) {// 获取SparkStreaming状态val state: StreamingContextState ssc.getState()if ( state StreamingContextState.ACTIVE ) {ssc.stop(true, true)}}Thread.sleep(5000)}*/// 这是测试代码Thread.sleep(5000)val state: StreamingContextState ssc.getState()if ( state StreamingContextState.ACTIVE ) {ssc.stop(true, true)}System.exit(0)}}).start()ssc.awaitTermination() // block 阻塞main线程}}
➢ SparkTest恢复数据import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {def createSSC(): _root_.org.apache.spark.streaming.StreamingContext {val update: (Seq[Int], Option[Int]) Some[Int] (values: Seq[Int], status:Option[Int]) {//当前批次内容的计算val sum: Int values.sum//取出状态信息中上一次状态val lastStatu: Int status.getOrElse(0)Some(sum lastStatu)}val sparkConf: SparkConf newSparkConf().setMaster(local[4]).setAppName(SparkTest)//设置优雅的关闭sparkConf.set(spark.streaming.stopGracefullyOnShutdown, true)val ssc new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint(./ck)val line: ReceiverInputDStream[String] ssc.socketTextStream(linux1, 9999)val word: DStream[String] line.flatMap(_.split( ))val wordAndOne: DStream[(String, Int)] word.map((_, 1))val wordAndCount: DStream[(String, Int)] wordAndOne.updateStateByKey(update)wordAndCount.print()ssc}def main(args: Array[String]): Unit {val ssc: StreamingContext StreamingContext.getActiveOrCreate(./ck, () createSSC())new Thread(new MonitorStop(ssc)).start()ssc.start()ssc.awaitTermination()}
}
package com.atguigu.bigdata.spark.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}object SparkStreaming09_Resume {def main(args: Array[String]): Unit {val ssc StreamingContext.getActiveOrCreate(cp, (){val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val lines ssc.socketTextStream(localhost, 9999)val wordToOne lines.map((_,1))wordToOne.print()ssc})ssc.checkpoint(cp)ssc.start()ssc.awaitTermination() // block 阻塞main线程}}
第 7 章 SparkStreaming 案例实操7.1 环境准备7.1.1 pom 文件dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion3.0.0/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion3.0.0/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.0.0/version/dependency!-- https://mvnrepository.com/artifact/com.alibaba/druid --dependencygroupIdcom.alibaba/groupIdartifactIddruid/artifactIdversion1.1.10/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.27/version
/dependency
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactIdversion2.10.1/version
/dependency
/dependencies7.2 实时数据生成模块➢ JDBCUtil(工具类)package spark.utilimport java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Properties
import javax.sql.DataSource
import com.alibaba.druid.pool.DruidDataSourceFactoryobject JDBCUtil {//初始化连接池var dataSource: DataSource init()//初始化连接池方法def init(): DataSource {val properties new Properties()properties.setProperty(driverClassName, com.mysql.jdbc.Driver)properties.setProperty(url, jdbc:mysql://hadoop102:3306/spark2023?useUnicodetruecharacterEncodingUTF-8)properties.setProperty(username, root)properties.setProperty(password, 123456)properties.setProperty(maxActive, 50)DruidDataSourceFactory.createDataSource(properties)}//获取 MySQL 连接def getConnection: Connection {dataSource.getConnection}//执行 SQL 语句,单条数据插入def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int {var rtn 0var pstmt: PreparedStatement nulltry {connection.setAutoCommit(false)pstmt connection.prepareStatement(sql)if (params ! null params.length 0) {for (i - params.indices) {pstmt.setObject(i 1, params(i))}}rtn pstmt.executeUpdate()connection.commit()pstmt.close()} catch {case e: Exception e.printStackTrace()}rtn}//执行 SQL 语句,批量数据插入def executeBatchUpdate(connection: Connection, sql: String, paramsList:Iterable[Array[Any]]): Array[Int] {var rtn: Array[Int] nullvar pstmt: PreparedStatement nulltry {connection.setAutoCommit(false)pstmt connection.prepareStatement(sql)for (params - paramsList) {if (params ! null params.length 0) {for (i - params.indices) {pstmt.setObject(i 1, params(i))}pstmt.addBatch()}}rtn pstmt.executeBatch()connection.commit()pstmt.close()} catch {case e: Exception e.printStackTrace()}rtn}//判断一条数据是否存在def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean {var flag: Boolean falsevar pstmt: PreparedStatement nulltry {pstmt connection.prepareStatement(sql)for (i - params.indices) {pstmt.setObject(i 1, params(i))}flag pstmt.executeQuery().next()pstmt.close()} catch {case e: Exception e.printStackTrace()}flag}//获取 MySQL 的一条数据def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]):Long {var result: Long 0Lvar pstmt: PreparedStatement nulltry {pstmt connection.prepareStatement(sql)for (i - params.indices) {pstmt.setObject(i 1, params(i))}val resultSet: ResultSet pstmt.executeQuery()while (resultSet.next()) {result resultSet.getLong(1)}resultSet.close()pstmt.close()} catch {case e: Exception e.printStackTrace()}result}//主方法,用于测试上述方法def main(args: Array[String]): Unit {}
}
➢ MockData(Kafka数据生产者)package spark.testimport java.util.{Properties, Random}
import scala.collection.mutable.ListBuffer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}/*** author Lucaslee* create 2023-02-24 11:31*/
object SparkStreaming10_MockData {def main(args: Array[String]): Unit {// 生成模拟数据 (生产者)// 格式 timestamp area city userid adid// 含义 时间戳 区域 城市 用户 广告// Application Kafka SparkStreaming Analysisval prop new Properties()// 添加配置prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092)prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer)prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer)val producer new KafkaProducer[String, String](prop)while ( true ) {mockdata().foreach(data {// 向Kafka中生成数据val record new ProducerRecord[String, String](atguigu, data)producer.send(record)println(data)})Thread.sleep(2000)}}def mockdata() {val list ListBuffer[String]()val areaList ListBuffer[String](华北, 华东, 华南)val cityList ListBuffer[String](北京, 上海, 深圳)for ( i - 1 to new Random().nextInt(50) ) {val area areaList(new Random().nextInt(3))val city cityList(new Random().nextInt(3))var userid new Random().nextInt(6) 1var adid new Random().nextInt(6) 1list.append(s${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid})}list}
}
➢ ConsumerData(Kafka数据消费者)package spark.test
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** author Lucaslee* create 2023-02-24 11:33*/
object SparkStreaming11_Req1 {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))// 消费者消费数据val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu11,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))kafkaDataDS.map(_.value()).print()ssc.start()ssc.awaitTermination()}}
7.3 需求一广告黑名单实现实时的动态黑名单机制将每天对某个广告点击超过 100 次的用户拉黑。注黑名单保存到 MySQL 中。7.3.1 思路分析1读取 Kafka 数据之后并对 MySQL 中存储的黑名单数据做校验2校验通过则对给用户点击广告次数累加一并存入 MySQL3在存入 MySQL 之后对数据做校验如果单日超过 100 次则将该用户加入黑名单。7.3.2 MySQL 建表创建库 spark20201存放黑名单用户的表CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);2存放单日各用户点击每个广告的次数CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);7.3.3 需求实现package spark.test
import java.sql.ResultSet
import java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import spark.util.JDBCUtilimport scala.collection.mutable.ListBufferobject SparkStreaming11_Req1_BlackList {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu2,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))val adClickData kafkaDataDS.map(kafkaData {val data kafkaData.value()val datas data.split( )AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))})val ds adClickData.transform(rdd {// TODO 通过JDBC周期性获取黑名单数据val blackList ListBuffer[String]()val conn JDBCUtil.getConnectionval pstat conn.prepareStatement(select userid from black_list)val rs: ResultSet pstat.executeQuery()while ( rs.next() ) {blackList.append(rs.getString(1))}rs.close()pstat.close()conn.close()// TODO 判断点击用户是否在黑名单中val filterRDD rdd.filter(data {!blackList.contains(data.user)})// TODO 如果用户不在黑名单中那么进行统计数量每个采集周期filterRDD.map(data {val sdf new SimpleDateFormat(yyyy-MM-dd)val day sdf.format(new java.util.Date( data.ts.toLong ))val user data.userval ad data.ad(( day, user, ad ), 1) // (word, count)}).reduceByKey(__)})ds.foreachRDD(rdd {rdd.foreach{case ( ( day, user, ad ), count ) {println(s${day} ${user} ${ad} ${count})if ( count 30 ) {// TODO 如果统计数量超过点击阈值(30)那么将用户拉入到黑名单val conn JDBCUtil.getConnectionval pstat conn.prepareStatement(|insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid ?.stripMargin)pstat.setString(1, user)pstat.setString(2, user)pstat.executeUpdate()pstat.close()conn.close()} else {// TODO 如果没有超过阈值那么需要将当天的广告点击数量进行更新。val conn JDBCUtil.getConnectionval pstat conn.prepareStatement(| select| *| from user_ad_count| where dt ? and userid ? and adid ?.stripMargin)pstat.setString(1, day)pstat.setString(2, user)pstat.setString(3, ad)val rs pstat.executeQuery()// 查询统计表数据if ( rs.next() ) {// 如果存在数据那么更新val pstat1 conn.prepareStatement(| update user_ad_count| set count count ?| where dt ? and userid ? and adid ?.stripMargin)pstat1.setInt(1, count)pstat1.setString(2, day)pstat1.setString(3, user)pstat1.setString(4, ad)pstat1.executeUpdate()pstat1.close()// TODO 判断更新后的点击数据是否超过阈值如果超过那么将用户拉入到黑名单。val pstat2 conn.prepareStatement(|select| *|from user_ad_count|where dt ? and userid ? and adid ? and count 30.stripMargin)pstat2.setString(1, day)pstat2.setString(2, user)pstat2.setString(3, ad)val rs2 pstat2.executeQuery()if ( rs2.next() ) {val pstat3 conn.prepareStatement(|insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid ?.stripMargin)pstat3.setString(1, user)pstat3.setString(2, user)pstat3.executeUpdate()pstat3.close()}rs2.close()pstat2.close()} else {// 如果不存在数据那么新增val pstat1 conn.prepareStatement(| insert into user_ad_count ( dt, userid, adid, count ) values ( ?, ?, ?, ? ).stripMargin)pstat1.setString(1, day)pstat1.setString(2, user)pstat1.setString(3, ad)pstat1.setInt(4, count)pstat1.executeUpdate()pstat1.close()}rs.close()pstat.close()conn.close()}}}})ssc.start()ssc.awaitTermination()}// 广告点击数据case class AdClickData( ts:String, area:String, city:String, user:String, ad:String )}对jdbc操作进行操作package spark.testimport java.sql.ResultSet
import java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import spark.util.JDBCUtilimport scala.collection.mutable.ListBufferobject SparkStreaming11_Req1_BlackList1 {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu3,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))val adClickData kafkaDataDS.map(kafkaData {val data kafkaData.value()val datas data.split( )AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))})val ds adClickData.transform(rdd {// TODO 通过JDBC周期性获取黑名单数据val blackList ListBuffer[String]()val conn JDBCUtil.getConnectionval pstat conn.prepareStatement(select userid from black_list)val rs: ResultSet pstat.executeQuery()while ( rs.next() ) {blackList.append(rs.getString(1))}rs.close()pstat.close()conn.close()// TODO 判断点击用户是否在黑名单中val filterRDD rdd.filter(data {!blackList.contains(data.user)})// TODO 如果用户不在黑名单中那么进行统计数量每个采集周期filterRDD.map(data {val sdf new SimpleDateFormat(yyyy-MM-dd)val day sdf.format(new java.util.Date( data.ts.toLong ))val user data.userval ad data.ad(( day, user, ad ), 1) // (word, count)}).reduceByKey(__)})ds.foreachRDD(rdd {// rdd. foreach方法会每一条数据创建连接// foreach方法是RDD的算子算子之外的代码是在Driver端执行算子内的代码是在Executor端执行// 这样就会涉及闭包操作Driver端的数据就需要传递到Executor端需要将数据进行序列化// 数据库的连接对象是不能序列化的。// RDD提供了一个算子可以有效提升效率 : foreachPartition// 可以一个分区创建一个连接对象这样可以大幅度减少连接对象的数量提升效率rdd.foreachPartition(iter {val conn JDBCUtil.getConnectioniter.foreach{case ( ( day, user, ad ), count ) {}}conn.close()})rdd.foreach{case ( ( day, user, ad ), count ) {println(s${day} ${user} ${ad} ${count})if ( count 30 ) {// TODO 如果统计数量超过点击阈值(30)那么将用户拉入到黑名单val conn JDBCUtil.getConnectionval sql |insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid ?.stripMarginJDBCUtil.executeUpdate(conn, sql, Array( user, user ))conn.close()} else {// TODO 如果没有超过阈值那么需要将当天的广告点击数量进行更新。val conn JDBCUtil.getConnectionval sql | select| *| from user_ad_count| where dt ? and userid ? and adid ?.stripMarginval flg JDBCUtil.isExist(conn, sql, Array( day, user, ad ))// 查询统计表数据if ( flg ) {// 如果存在数据那么更新val sql1 | update user_ad_count| set count count ?| where dt ? and userid ? and adid ?.stripMarginJDBCUtil.executeUpdate(conn, sql1, Array(count, day, user, ad))// TODO 判断更新后的点击数据是否超过阈值如果超过那么将用户拉入到黑名单。val sql2 |select| *|from user_ad_count|where dt ? and userid ? and adid ? and count 30.stripMarginval flg1 JDBCUtil.isExist(conn, sql2, Array( day, user, ad ))if ( flg1 ) {val sql3 |insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid ?.stripMarginJDBCUtil.executeUpdate(conn, sql3, Array( user, user ))}} else {val sql4 | insert into user_ad_count ( dt, userid, adid, count ) values ( ?, ?, ?, ? ).stripMarginJDBCUtil.executeUpdate(conn, sql4, Array( day, user, ad, count ))}conn.close()}}}})ssc.start()ssc.awaitTermination()}// 广告点击数据case class AdClickData( ts:String, area:String, city:String, user:String, ad:String )}7.4 需求二广告点击量实时统计描述实时统计每天各地区各城市各广告的点击总流量并将其存入 MySQL。7.4.1 思路分析1单个批次内对数据进行按照天维度的聚合统计;2结合 MySQL 数据跟当前批次数据更新原有的数据。7.4.2 MySQL 建表CREATE TABLE area_city_ad_count (
dt VARCHAR(255),
area VARCHAR(255),
city VARCHAR(255),
adid VARCHAR(255),count BIGINT,
PRIMARY KEY (dt,area,city,adid)
);7.4.3 代码实现package spark.testimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import spark.util.JDBCUtilimport java.text.SimpleDateFormatobject SparkStreaming12_Req2 {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(3))val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu4,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))val adClickData kafkaDataDS.map(kafkaData {val data kafkaData.value()val datas data.split( )AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))})val reduceDS adClickData.map(data {val sdf new SimpleDateFormat(yyyy-MM-dd)val day sdf.format(new java.util.Date( data.ts.toLong ))val area data.areaval city data.cityval ad data.ad( ( day, area, city, ad ), 1 )}).reduceByKey(__)reduceDS.foreachRDD(rdd {rdd.foreachPartition(iter {val conn JDBCUtil.getConnectionval pstat conn.prepareStatement(| insert into area_city_ad_count ( dt, area, city, adid, count )| values ( ?, ?, ?, ?, ? )| on DUPLICATE KEY| UPDATE count count ?.stripMargin)iter.foreach{case ( ( day, area, city, ad ), sum ) {pstat.setString(1,day )pstat.setString(2,area )pstat.setString(3, city)pstat.setString(4, ad)pstat.setInt(5, sum)pstat.setInt(6,sum )pstat.executeUpdate()}}pstat.close()conn.close()})})ssc.start()ssc.awaitTermination()}// 广告点击数据case class AdClickData( ts:String, area:String, city:String, user:String, ad:String )}
7.5 需求三最近一小时广告点击量结果展示1List [15:50-10,15:51-25,15:52-30]
2List [15:50-10,15:51-25,15:52-30]
3List [15:50-10,15:51-25,15:52-30]7.5.1 思路分析1开窗确定时间范围2在窗口内将数据转换数据结构为((adid,hm),count);3按照广告 id 进行分组处理组内按照时分排序。7.5.2 代码实现➢ 实现1package spark.testimport java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*** author Lucaslee* create 2023-02-24 13:15*/
object SparkStreaming13_Req3 {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(5))val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu5,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))val adClickData kafkaDataDS.map(kafkaData {val data kafkaData.value()val datas data.split( )AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))})// 最近一分钟每10秒计算一次// 12:01 12:00// 12:11 12:10// 12:19 12:10// 12:25 12:20// 12:59 12:50// 55 50, 49 40, 32 30// 55 / 10 * 10 50// 49 / 10 * 10 40// 32 / 10 * 10 30// 这里涉及窗口的计算val reduceDS adClickData.map(data {val ts data.ts.toLongval newTS ts / 10000 * 10000( newTS, 1 )}).reduceByKeyAndWindow((x:Int,y:Int){xy}, Seconds(60), Seconds(10))reduceDS.print()ssc.start()ssc.awaitTermination()}// 广告点击数据case class AdClickData( ts:String, area:String, city:String, user:String, ad:String )}
➢ 实现2小优化package spark.testimport java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable.ListBuffer
/*** author Lucaslee* create 2023-02-24 13:18*/
object SparkStreaming13_Req31 {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(SparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(5))val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu6,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))val adClickData kafkaDataDS.map(kafkaData {val data kafkaData.value()val datas data.split( )AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))})// 最近一分钟每10秒计算一次// 12:01 12:00// 12:11 12:10// 12:19 12:10// 12:25 12:20// 12:59 12:50// 55 50, 49 40, 32 30// 55 / 10 * 10 50// 49 / 10 * 10 40// 32 / 10 * 10 30// 这里涉及窗口的计算val reduceDS adClickData.map(data {val ts data.ts.toLongval newTS ts / 10000 * 10000( newTS, 1 )}).reduceByKeyAndWindow((x:Int,y:Int){xy}, Seconds(60), Seconds(10))//reduceDS.print()reduceDS.foreachRDD(rdd {val list ListBuffer[String]()val datas: Array[(Long, Int)] rdd.sortByKey(true).collect()datas.foreach{case ( time, cnt ) {val timeString new SimpleDateFormat(mm:ss).format(new java.util.Date(time.toLong))list.append(s{xtime:${timeString}, yval:${cnt}})}}// 输出文件val out new PrintWriter(new FileWriter(new File(E:\\java\\project\\sparkproject\\output\\adclick.json)))out.println([list.mkString(,)])out.flush()out.close()})ssc.start()ssc.awaitTermination()}// 广告点击数据case class AdClickData( ts:String, area:String, city:String, user:String, ad:String )}