网站建设费 科研 设备费,网页qq登录网址,建立平台的目的,wordpress游戏代练主题1. 背景
公司最近在新建集群#xff0c;全部采用开源的大数据框架#xff0c;并且将之前使用的阿里云的所有服务进行下线#xff0c;其中就涉及到了旧任务的迁移。
2. 任务
2.1. 简述
我接手到一个之前的 spark 任务#xff0c;是读取阿里 LogStore 数据#xff0c;然…1. 背景
公司最近在新建集群全部采用开源的大数据框架并且将之前使用的阿里云的所有服务进行下线其中就涉及到了旧任务的迁移。
2. 任务
2.1. 简述
我接手到一个之前的 spark 任务是读取阿里 LogStore 数据然后使用 spark streaming将接收到的 LogStore 数据注册为表之后运行 spark sql 进行分批处理每 2 分钟一批最后写入时序数据库。
2.2. 处理逻辑
spark sql 首先计算接收到的 2 分钟数据对维度字段进行 group by,指标字段进行 sum、count 之类的聚合操作然后将这两分钟的结果和之前从当天 0 点开始累积到上个 2 分钟的结果进行 union all最后再次进行 group by 以及 sum、count 操作最后将结果写出。
整体需求是计算当天 0 点到每个 2 分钟的累加结果类似于 flink sql 中的渐进式或叫累计窗口。
3. 改造方案
去掉从阿里的 LogStore 接收数据而是从 kafka 接收数据后面所有的处理逻辑都一样。
4. 出现的问题
将改造、重构后的代码部署到新建的大数据集群上运行结果发现计算的结果总是比之前的环境中大一些。
然后我们就开始进行代码级别的排查一直以为是代码哪儿写错了。之前的代码接收 LogStore 的数据而且是只接收了一个流的数据但是改造之后需要接收三个 kafka 主题的数据在 spark 代码中就变成了三个 InputDStream然后分别将三个流注册为三张不同的表最后再进行一个大的 sql 处理示例代码见下面。
case class Table1(BeanProperty var goods1: String, BeanProperty var price1: Int) extends Serializable
case class Table2(BeanProperty var goods2: String, BeanProperty var price2: Int) extends Serializable
case class Table3(BeanProperty var goods3: String, BeanProperty var price3: Int) extends Serializableobject Stream extends Serializable {def main(args: Array[String]): Unit {val masterUri sys.props.getOrElse(spark.master, local[4])// 获取 spark 环境val conf new SparkConf()val spark: SparkSession SparkSession.builder().config(conf).master(masterUri).getOrCreate()val sparkContext spark.sparkContextval ssc: StreamingContext new StreamingContext(sparkContext, Seconds(120))val sqlContext spark.sqlContext// ------------------------------------------------------------------------------------------------------------------------------------------------------------val kafkaParams: mutable.Map[String, Object] mutable.Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - kafka01:9092,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer].getCanonicalName,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer].getCanonicalName,ConsumerConfig.GROUP_ID_CONFIG - test-1)// 保存 offset最后手动提交val offsetRangesList mutable.ListBuffer[Array[OffsetRange]]()val topic1 Array(topic1)val tableName1 table1val inputDS1: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic1, kafkaParams))inputDS1.foreachRDD(rdd {offsetRangesList rdd.asInstanceOf[HasOffsetRanges].offsetRanges})inputDS1.map(_.value()).map(x JSONUtil.toBean(x, classOf[Table1])).foreachRDD((rdd: RDD[Table1]) {spark.createDataFrame(rdd).createOrReplaceTempView(tableName1)})val topic2 Array(topic2)val tableName2 table2val inputDS2: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic2, kafkaParams))inputDS2.foreachRDD(rdd {offsetRangesList rdd.asInstanceOf[HasOffsetRanges].offsetRanges})inputDS2.map(_.value()).map(x JSONUtil.toBean(x, classOf[Table2])).foreachRDD((rdd: RDD[Table2]) {spark.createDataFrame(rdd).createOrReplaceTempView(tableName2)})val topic3 Array(topic3)val tableName3 table3val inputDS3: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic3, kafkaParams))inputDS3.foreachRDD(rdd {offsetRangesList rdd.asInstanceOf[HasOffsetRanges].offsetRanges})// 所有计算和结果写出都在下面维护inputDS3.map(_.value()).map(x JSONUtil.toBean(x, classOf[Table3])).foreachRDD(foreachFunc (rdd: RDD[Table3]) {spark.createDataFrame(rdd).createOrReplaceTempView(tableName3)// 从 hdfs 读取上一批次的计算结果val lastDataDF: DataFrame sqlContext.read.format(csv).option(header, true).load(hdfs:///spark/latest-data)lastDataDF.createOrReplaceTempView(last_result)// 计算最新的结果val resultDF: DataFrame spark.sql(真正要执行的 sql 语句)// 将结算结果进行输出这里简单调用 show 只是为了演示resultDF.show(10)// 将本批次结果写入 hdfs供下次计算前初始化使用resultDF.write.option(header, true).mode(SaveMode.Overwrite).csv(hdfs:///spark/latest-data)// 手动提交 offsetfor (offsetRanges - offsetRangesList) {inputDS3.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}offsetRangesList.clear()// 清理掉内存中的结果数据resultDF.unpersist()})ssc.start()ssc.awaitTermination()}}我的做法是将三个主题对应的流分别处理然后各自注册为表并且在最后一个主题的 foreachRdd 函数中进行 sql 的执行和结果的输出。
注意第三个主题对应流里面的处理流程
接收本批次数据先从 hdfs 对应路径获取上批次结果注册名为 last_result 的表。执行真正的 sql 计算逻辑。将结果写出为了代码演示只是简单的使用 show() 函数进行输出。将本批次的计算结果保存到 hdfs然后手动提交 offset。
由于我的逻辑中每次处理都需要将本批次的计算结果和 0 点到上一批次的计算进行合并处理所以每次都会将本批次的计算结果写出到 hdfs此时就出现了问题最后算出来的每批次结果值都比正确结果多一些。
然后我们就把每批次的结果值不但输出到 hdfs 进行保存而且还把他们输出到 mysql查看其详细的计算结果看到底是哪一步出了问题。
通过观察 mysql 中每批次的详细计算结果我们发现同一个商品在一个批次计算中居然出现了相同时间的两条计算结果数据但理论上应该是只有一条才对。此时我们才发现了问题所在由于 spark 框架计算由于某一批次的计算结果中对 group by 中出现的字段并没有做到真正的唯一聚合而是出现了多条。而且我是把每批次的计算结果都写入到 hdfs也没有对结果数据进行去重所以下批次数据计算时通过上批次写入到 hdfs 的结果进行 last_result 表的初始化后last_result 表中对于同一个维度组合就会出现多条数据本批次聚合计算完之后最终的结果值就多了。而且这种情况只出现在设置 spark 任务为多并发时才会出现如果提交时只给一个 executor并且只给 1 核 CPU就不会出现问题。
最后手动部署了 apache spark-2.3.2替换掉之前使用的 CDH-6.3.2 内置的 spark-2.4.0重新运行任务就没问题了。
5. 总结
CDH-6.3.2 中内置的 spark-2.4.0 有 bug在实时数据处理上如果是多并发处理遇到 group by 时对于同一个维度组合可能会出现多条数据。
至于 hive on spark 和 spark on hive 的方式使用 CDH 内置的这个版本的 spark 会不会出现问题目前还没去做验证不过我们还是决定重新部署线上使用的 spark替换为 apache spark 的稳定版本。