当前位置: 首页 > news >正文

企业手机网站建设价位网页设计模板图片高清

企业手机网站建设价位,网页设计模板图片高清,在大网站做网页广告需要多少钱,网络广告有哪些特点Pyspark 注#xff1a;大家觉得博客好的话#xff0c;别忘了点赞收藏呀#xff0c;本人每周都会更新关于人工智能和大数据相关的内容#xff0c;内容多为原创#xff0c;Python Java Scala SQL 代码#xff0c;CV NLP 推荐系统等#xff0c;Spark Flink Kafka Hbase Hi…Pyspark 注大家觉得博客好的话别忘了点赞收藏呀本人每周都会更新关于人工智能和大数据相关的内容内容多为原创Python Java Scala SQL 代码CV NLP 推荐系统等Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货各种顶会的论文解读一起进步。 今天继续和大家分享一下Pyspark_结构化流4 #博学谷IT学习技术支持 文章目录Pyspark前言一、数据模拟器代码二、需求说明和代码实现总结前言 接上次继续Pyspark_结构化流今天主要是一个结构化流结合kafka的一个小案例。 一、数据模拟器代码 1- 创建一个topic, 放置后续物联网数据: search-log-topic ./kafka-topics.sh --create --zookeeper node1:2181 --topic search-log-topic --partitions 3 --replication-factor 2 import json import random import time import os from kafka import KafkaProducer# 锁定远端操作环境, 避免存在多个版本环境的问题 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python# 快捷键: main 回车 if __name__ __main__:print(模拟物联网数据)# 1- 构建一个kafka的生产者:producer KafkaProducer(bootstrap_servers[node1:9092, node2:9092, node3:9092],acksall,value_serializerlambda m: json.dumps(m).encode(utf-8))# 2- 物联网设备类型deviceTypes [洗衣机, 油烟机, 空调, 窗帘, 灯, 窗户, 煤气报警器, 水表, 燃气表]while True:index random.choice(range(0, len(deviceTypes)))deviceID fdevice_{index}_{random.randrange(1, 20)}deviceType deviceTypes[index]deviceSignal random.choice(range(10, 100))# 组装数据集print({deviceID: deviceID, deviceType: deviceType, deviceSignal: deviceSignal,time: time.strftime(%s)})# 发送数据producer.send(topicsearch-log-topic,value{deviceID: deviceID, deviceType: deviceType, deviceSignal: deviceSignal,time: time.strftime(%s)})# 间隔时间 5s内随机time.sleep(random.choice(range(1, 5))) 生成的kafka数据 {‘deviceID’: ‘device_0_14’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 18, ‘time’: ‘1680157073’} {‘deviceID’: ‘device_2_8’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 30, ‘time’: ‘1680157074’} {‘deviceID’: ‘device_0_17’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 84, ‘time’: ‘1680157076’} {‘deviceID’: ‘device_2_15’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 99, ‘time’: ‘1680157078’} {‘deviceID’: ‘device_1_17’, ‘deviceType’: ‘油烟机’, ‘deviceSignal’: 50, ‘time’: ‘1680157081’} 二、需求说明和代码实现 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合 from pyspark.sql import SparkSession import pyspark.sql.functions as F import os# 锁定远端环境, 确保环境统一 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:print(综合案例: 物联网案例实现)# 1- 创建SparkSession对象spark SparkSession.builder \.appName(file_source) \.master(local[1]) \.config(spark.sql.shuffle.partitions, 4) \.getOrCreate()# 2- 从Kafka中读取消息数据df spark.readStream \.format(kafka) \.option(kafka.bootstrap.servers, node1:9092,node2:9092,node3:9092) \.option(subscribe, search-log-topic) \.option(startingOffsets, earliest) \.load()# 3- 处理数据# 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合# 数据: {deviceID: device_4_4, deviceType: 灯, deviceSignal: 20, time: 1677243108}df df.selectExpr(CAST(value AS STRING))# 思考 如何做呢?# 需要将这个Json字符串中各个字段都获取出来, 形成一个多列的数据# 专业名称: JSON拉平# 涉及函数: get_json_object() json_tuple()# df.createTempView(t1)# SQL# df spark.sql(# select# get_json_object(value,$.deviceID) as deviceID,# get_json_object(value,$.deviceType) as deviceType,# get_json_object(value,$.deviceSignal) as deviceSignal,# get_json_object(value,$.time) as time# from t1# )# df spark.sql(# select# json_tuple(value,deviceID,deviceType,deviceSignal,time) as (deviceID,deviceType,deviceSignal,time)# from t1# )# DSL# df df.select(# F.get_json_object(value, $.deviceID).alias(deviceID),# F.get_json_object(value,$.deviceType).alias(deviceType),# F.get_json_object(value,$.deviceSignal).alias(deviceSignal),# F.get_json_object(value,$.time).alias(time)# )df df.select(F.json_tuple(value, deviceID, deviceType, deviceSignal, time).alias(deviceID, deviceType,deviceSignal, time))# 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合df df.where(df[deviceSignal] 30).groupBy(deviceType).agg(F.count(deviceID).alias(device_cnt),F.round(F.avg(deviceSignal), 2).alias(deviceSignal_avg))# 4- 打印结果df.writeStream.format(console).outputMode(complete).start().awaitTermination() 总结 今天主要和大家分享了如何用Pyspark_结构化流结合kafka模拟物连网小案例。
http://www.w-s-a.com/news/585667/

相关文章:

  • 温州做网站哪儿新云网站模版
  • 网站开发 视频存在哪检察院前期网站建设
  • 备案中的网站信息怎么填如何做分享赚钱的网站
  • 网站行程表怎么做注册公司费用要多少
  • 常见电子商务网站基本模式南山网站公司定
  • 网站搭建代码网站里面送礼物要钱怎么做代码
  • 大学英文网站建设举措wordpress 学院 模板
  • 爱 做 网站吗php网站作业模版
  • wordpress代码乱吗台州做网站seo的
  • 做ptt网站wordpress中文企业网站
  • 深圳雨棚制作深圳 网站优化公司排名
  • 深圳优秀网站建设价格wordpress没人用
  • 国企网站建设需要注意沈阳招标信息网
  • 东莞360推广的网站是谁做的上海网络推广产品
  • 网站可以换主机吗中国十大网站建设企业
  • 怎么做盗版小说网站官网做有下拉列表的网站的图片
  • 邢台网站建设电话网站界面类型
  • 网站制作合同模板做一个网站能挣多少钱
  • 汶上1500元网站建设互联网高端官网
  • 广州做公司网站网站开发培训机构
  • 网站建设与维护 课件网页版qq安全中心登录入口
  • 做三个月网站广告收入dw如何制作网页
  • ...课程网站建设简介工信部 网站备案查询
  • 网站代码建设 实例企业网站建设大概的费用
  • 制作网站软件排行榜过年做啥网站致富
  • 哪里有做网站企业seo关键词优化
  • 上海金山网站建设公司手机淘宝客网站怎么做的
  • 网站开发需要公司做网站费用计入什么科目
  • 网站优化有哪些类型免费制作app的傻瓜软件
  • 如何做网站咨询wordpress get