企业手机网站建设价位,网页设计模板图片高清,在大网站做网页广告需要多少钱,网络广告有哪些特点Pyspark
注#xff1a;大家觉得博客好的话#xff0c;别忘了点赞收藏呀#xff0c;本人每周都会更新关于人工智能和大数据相关的内容#xff0c;内容多为原创#xff0c;Python Java Scala SQL 代码#xff0c;CV NLP 推荐系统等#xff0c;Spark Flink Kafka Hbase Hi…Pyspark
注大家觉得博客好的话别忘了点赞收藏呀本人每周都会更新关于人工智能和大数据相关的内容内容多为原创Python Java Scala SQL 代码CV NLP 推荐系统等Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货各种顶会的论文解读一起进步。 今天继续和大家分享一下Pyspark_结构化流4 #博学谷IT学习技术支持 文章目录Pyspark前言一、数据模拟器代码二、需求说明和代码实现总结前言
接上次继续Pyspark_结构化流今天主要是一个结构化流结合kafka的一个小案例。 一、数据模拟器代码
1- 创建一个topic, 放置后续物联网数据: search-log-topic ./kafka-topics.sh --create --zookeeper node1:2181 --topic search-log-topic --partitions 3 --replication-factor 2
import json
import random
import time
import os
from kafka import KafkaProducer# 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python# 快捷键: main 回车
if __name__ __main__:print(模拟物联网数据)# 1- 构建一个kafka的生产者:producer KafkaProducer(bootstrap_servers[node1:9092, node2:9092, node3:9092],acksall,value_serializerlambda m: json.dumps(m).encode(utf-8))# 2- 物联网设备类型deviceTypes [洗衣机, 油烟机, 空调, 窗帘, 灯, 窗户, 煤气报警器, 水表, 燃气表]while True:index random.choice(range(0, len(deviceTypes)))deviceID fdevice_{index}_{random.randrange(1, 20)}deviceType deviceTypes[index]deviceSignal random.choice(range(10, 100))# 组装数据集print({deviceID: deviceID, deviceType: deviceType, deviceSignal: deviceSignal,time: time.strftime(%s)})# 发送数据producer.send(topicsearch-log-topic,value{deviceID: deviceID, deviceType: deviceType, deviceSignal: deviceSignal,time: time.strftime(%s)})# 间隔时间 5s内随机time.sleep(random.choice(range(1, 5)))
生成的kafka数据 {‘deviceID’: ‘device_0_14’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 18, ‘time’: ‘1680157073’} {‘deviceID’: ‘device_2_8’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 30, ‘time’: ‘1680157074’} {‘deviceID’: ‘device_0_17’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 84, ‘time’: ‘1680157076’} {‘deviceID’: ‘device_2_15’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 99, ‘time’: ‘1680157078’} {‘deviceID’: ‘device_1_17’, ‘deviceType’: ‘油烟机’, ‘deviceSignal’: 50, ‘time’: ‘1680157081’}
二、需求说明和代码实现
求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os# 锁定远端环境, 确保环境统一
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:print(综合案例: 物联网案例实现)# 1- 创建SparkSession对象spark SparkSession.builder \.appName(file_source) \.master(local[1]) \.config(spark.sql.shuffle.partitions, 4) \.getOrCreate()# 2- 从Kafka中读取消息数据df spark.readStream \.format(kafka) \.option(kafka.bootstrap.servers, node1:9092,node2:9092,node3:9092) \.option(subscribe, search-log-topic) \.option(startingOffsets, earliest) \.load()# 3- 处理数据# 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合# 数据: {deviceID: device_4_4, deviceType: 灯, deviceSignal: 20, time: 1677243108}df df.selectExpr(CAST(value AS STRING))# 思考 如何做呢?# 需要将这个Json字符串中各个字段都获取出来, 形成一个多列的数据# 专业名称: JSON拉平# 涉及函数: get_json_object() json_tuple()# df.createTempView(t1)# SQL# df spark.sql(# select# get_json_object(value,$.deviceID) as deviceID,# get_json_object(value,$.deviceType) as deviceType,# get_json_object(value,$.deviceSignal) as deviceSignal,# get_json_object(value,$.time) as time# from t1# )# df spark.sql(# select# json_tuple(value,deviceID,deviceType,deviceSignal,time) as (deviceID,deviceType,deviceSignal,time)# from t1# )# DSL# df df.select(# F.get_json_object(value, $.deviceID).alias(deviceID),# F.get_json_object(value,$.deviceType).alias(deviceType),# F.get_json_object(value,$.deviceSignal).alias(deviceSignal),# F.get_json_object(value,$.time).alias(time)# )df df.select(F.json_tuple(value, deviceID, deviceType, deviceSignal, time).alias(deviceID, deviceType,deviceSignal, time))# 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合df df.where(df[deviceSignal] 30).groupBy(deviceType).agg(F.count(deviceID).alias(device_cnt),F.round(F.avg(deviceSignal), 2).alias(deviceSignal_avg))# 4- 打印结果df.writeStream.format(console).outputMode(complete).start().awaitTermination() 总结
今天主要和大家分享了如何用Pyspark_结构化流结合kafka模拟物连网小案例。