当前位置: 首页 > news >正文

建湖网站定制玉林市城市建设投资有限公司网站

建湖网站定制,玉林市城市建设投资有限公司网站,手机网站程序源码,大连百度推广seo视频地址#xff1a;尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第9章 数仓开发之DWD层 P041 P042 P043 P044 P045 P046 P047 P048 P049 P050 P051 P052 第9章 数仓开发之DWD层 P041 9.3 流量域用户跳出事务事实表 P042 DwdTrafficUserJum… 视频地址尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第9章 数仓开发之DWD层 P041 P042 P043 P044 P045 P046 P047 P048 P049 P050 P051 P052 第9章 数仓开发之DWD层 P041 9.3 流量域用户跳出事务事实表 P042 DwdTrafficUserJumpDetail // TODO 1 创建环境设置状态后端 // TODO 2 从kafka的page主题读取数据 // TODO 3 过滤加转换数据 // TODO 4 添加水位线 // TODO 5 按照mid分组 P043 package com.atguigu.edu.realtime.app.dwd.log;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternFlatSelectFunction; import org.apache.flink.cep.PatternFlatTimeoutFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import java.util.List; import java.util.Map;/*** author yhm* create 2023-04-21 17:54*/ public class DwdTrafficUserJumpDetail {public static void main(String[] args) throws Exception {// TODO 1 创建环境设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(4);// TODO 2 从kafka的page主题读取数据String topicName dwd_traffic_page_log;DataStreamSourceString logDS env.fromSource(KafkaUtil.getKafkaConsumer(topicName, dwd_traffic_user_jump_detail), WatermarkStrategy.noWatermarks(), user_jump_source);// 测试数据DataStreamString kafkaDS env.fromElements({\common\:{\mid\:\101\},\page\:{\page_id\:\home\},\ts\:10000} ,{\common\:{\mid\:\102\},\page\:{\page_id\:\home\},\ts\:12000},{\common\:{\mid\:\102\},\page\:{\page_id\:\good_list\},\ts\:15000} ,{\common\:{\mid\:\102\},\page\:{\page_id\:\good_list\,\last_page_id\: \detail\},\ts\:30000} );// TODO 3 过滤加转换数据SingleOutputStreamOperatorJSONObject jsonObjStream kafkaDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);out.collect(jsonObject);} catch (Exception e) {e.printStackTrace();}}});// TODO 4 添加水位线SingleOutputStreamOperatorJSONObject withWatermarkStream jsonObjStream.assignTimestampsAndWatermarks(WatermarkStrategy.JSONObjectforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerJSONObject() {Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return element.getLong(ts);}}));// TODO 5 按照mid分组KeyedStreamJSONObject, String keyedStream withWatermarkStream.keyBy(new KeySelectorJSONObject, String() {Overridepublic String getKey(JSONObject jsonObject) throws Exception {return jsonObject.getJSONObject(common).getString(mid);}});// TODO 6 定义cep匹配规则PatternJSONObject, JSONObject pattern Pattern.JSONObjectbegin(first).where(new IterativeConditionJSONObject() {Overridepublic boolean filter(JSONObject jsonObject, ContextJSONObject ctx) throws Exception {// 一个会话的开头 - last_page_id 为空String lastPageId jsonObject.getJSONObject(page).getString(last_page_id);return lastPageId null;}}).next(second).where(new IterativeConditionJSONObject() {Overridepublic boolean filter(JSONObject jsonObject, ContextJSONObject ctx) throws Exception {// 满足匹配的条件// 紧密相连又一个会话的开头String lastPageId jsonObject.getJSONObject(page).getString(last_page_id);return lastPageId null;}}).within(Time.seconds(10L));// TODO 7 将CEP作用到流上PatternStreamJSONObject patternStream CEP.pattern(keyedStream, pattern);// TODO 8 提取匹配数据和超时数据OutputTagString timeoutTag new OutputTagString(timeoutTag) {};SingleOutputStreamOperatorString flatSelectStream patternStream.flatSelect(timeoutTag, new PatternFlatTimeoutFunctionJSONObject, String() {Overridepublic void timeout(MapString, ListJSONObject pattern, long timeoutTimestamp, CollectorString out) throws Exception {JSONObject first pattern.get(first).get(0);out.collect(first.toJSONString());}}, new PatternFlatSelectFunctionJSONObject, String() {Overridepublic void flatSelect(MapString, ListJSONObject pattern, CollectorString out) throws Exception {JSONObject first pattern.get(first).get(0);out.collect(first.toJSONString());}});SideOutputDataStreamString timeoutStream flatSelectStream.getSideOutput(timeoutTag);// TODO 9 合并数据写出到kafkaDataStreamString unionStream flatSelectStream.union(timeoutStream);String targetTopic dwd_traffic_user_jump_detail;unionStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, user_jump_trans));// TODO 10 执行任务env.execute();} } P044 超时数据 P045 9.4 学习域播放事务事实表 P046 DwdLearnPlay、DwdLearnPlayBean //TODO 1 创建环境设置状态后端 //TODO 2 读取kafka播放日志数据 //TODO 3 清洗转换 //TODO 4 添加水位线 P047 package com.atguigu.edu.realtime.app.dwd.log;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DwdLearnPlayBean; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.time.Duration;/*** author yhm* create 2023-04-23 14:21*/ public class DwdLearnPlay {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);//TODO 2 读取kafka播放日志数据String topicName dwd_traffic_play_pre_process;String groupId dwd_learn_play;DataStreamSourceString playSource env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId), WatermarkStrategy.noWatermarks(), learn_play);//TODO 3 清洗转换SingleOutputStreamOperatorDwdLearnPlayBean learnBeanStream playSource.flatMap(new FlatMapFunctionString, DwdLearnPlayBean() {Overridepublic void flatMap(String value, CollectorDwdLearnPlayBean out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);JSONObject common jsonObject.getJSONObject(common);JSONObject appVideo jsonObject.getJSONObject(appVideo);Long ts jsonObject.getLong(ts);DwdLearnPlayBean learnPlayBean DwdLearnPlayBean.builder().provinceId(common.getString(ar)).brand(common.getString(ba)).channel(common.getString(ch)).isNew(common.getString(is_new)).model(common.getString(md)).machineId(common.getString(mid)).operatingSystem(common.getString(os)).sourceId(common.getString(sc)).sessionId(common.getString(sid)).userId(common.getString(uid)).versionCode(common.getString(vc)).playSec(appVideo.getInteger(play_sec)).videoId(appVideo.getString(video_id)).positionSec(appVideo.getInteger(position_sec)).ts(ts).build();out.collect(learnPlayBean);} catch (Exception e) {e.printStackTrace();}}});//TODO 4 添加水位线SingleOutputStreamOperatorDwdLearnPlayBean withWatermarkStream learnBeanStream.assignTimestampsAndWatermarks(WatermarkStrategy.DwdLearnPlayBeanforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssignerDwdLearnPlayBean() {Overridepublic long extractTimestamp(DwdLearnPlayBean element, long recordTimestamp) {return element.getTs();}}));//TODO 5 按照会话id分组KeyedStreamDwdLearnPlayBean, String keyedStream withWatermarkStream.keyBy(new KeySelectorDwdLearnPlayBean, String() {Overridepublic String getKey(DwdLearnPlayBean value) throws Exception {return value.getSessionId();}});//TODO 6 聚合统计WindowedStreamDwdLearnPlayBean, String, TimeWindow windowStream keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(3L)));SingleOutputStreamOperatorDwdLearnPlayBean reducedStream windowStream.reduce(new ReduceFunctionDwdLearnPlayBean() {Overridepublic DwdLearnPlayBean reduce(DwdLearnPlayBean value1, DwdLearnPlayBean value2) throws Exception {value1.setPlaySec(value1.getPlaySec() value2.getPlaySec());if (value2.getTs() value1.getTs()) {value1.setPositionSec(value2.getPositionSec());}return value1;}}, new ProcessWindowFunctionDwdLearnPlayBean, DwdLearnPlayBean, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableDwdLearnPlayBean elements, CollectorDwdLearnPlayBean out) throws Exception {for (DwdLearnPlayBean element : elements) {out.collect(element);}}});//TODO 7 转换结构SingleOutputStreamOperatorString jsonStrStream reducedStream.map(JSON::toJSONString);//TODO 8 输出到kafka主题Kafka dwd_learn_playString targetTopic dwd_learn_play;jsonStrStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic,learn_pay_trans));//TODO 9 执行任务env.execute();} } P048 先启动消费者DwdLearnPlay再mock数据。 kafka没有消费到数据DwdLearnPlay将并发改为1(TODO 1)、改时间(TODO 6时间改为3s)窗口和并发调小一些。 同一个人看的同一个视频时间不一样看的位置也不一样。 [atguigunode001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_learn_play [atguigunode001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/ [atguigunode001 01-onlineEducation]$ ll 总用量 30460 -rw-rw-r-- 1 atguigu atguigu 2223 9月 19 10:43 application.yml -rw-rw-r-- 1 atguigu atguigu 4057995 7月 25 10:28 edu0222.sql -rw-rw-r-- 1 atguigu atguigu 27112074 7月 25 10:28 edu2021-mock-2022-06-18.jar drwxrwxr-x 2 atguigu atguigu 4096 11月 2 11:13 log -rw-rw-r-- 1 atguigu atguigu 1156 7月 25 10:44 logback.xml -rw-rw-r-- 1 atguigu atguigu 633 7月 25 10:45 path.json [atguigunode001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/data_mocker/01-onlineEducation/edu2021-mock-2022-06-18.jar!/BOOT-INF/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/data_mocker/01-onlineEducation/edu2021-mock-2022-06-18.jar!/BOOT-INF/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] {brand:Xiaomi,channel:xiaomi,isNew:0,machineId:mid_293,model:Xiaomi Mix2 ,operatingSystem:Android 10.0,playSec:30,positionSec:690,provinceId:18,sessionId:a1fb6d22-f8ef-40e6-89c2-262cd5a351be,sourceId:1,ts:1645460612085,userId:46,versionCode:v2.1.134,videoId:108} {brand:Xiaomi,channel:xiaomi,isNew:0,machineId:mid_293,model:Xiaomi Mix2 ,operatingSystem:Android 10.0,playSec:30,positionSec:720,provinceId:18,sessionId:a1fb6d22-f8ef-40e6-89c2-262cd5a351be,sourceId:1,ts:1645460642085,userId:46,versionCode:v2.1.134,videoId:108} {brand:Xiaomi,channel:xiaomi,isNew:0,machineId:mid_293,model:Xiaomi Mix2 ,operatingSystem:Android 10.0,playSec:30,positionSec:690,provinceId:18,sessionId:a1fb6d22-f8ef-40e6-89c2-262cd5a351be,sourceId:1,ts:1645460612085,userId:46,versionCode:v2.1.134,videoId:108 }P049 9.5 用户域用户登录事务事实表 9.5.1 主要任务 读取页面日志数据筛选用户登录记录写入 Kafka 用户登录主题。 9.5.2 思路分析 9.5.3 图解 P050 DwdUserUserLogin //TODO 1 创建环境设置状态后端 //TODO 2 读取kafka的dwd_traffic_page_log主题数据 //TODO 3 过滤及转换 //TODO 4 添加水位线 //TODO 5 按照会话id分组 P051 DwdUserUserLogin、DwdUserUserLoginBean package com.atguigu.edu.realtime.app.dwd.log;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DwdUserUserLoginBean; import com.atguigu.edu.realtime.util.DateFormatUtil; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration;/*** author yhm* create 2023-04-23 16:02*/ public class DwdUserUserLogin {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);//TODO 2 读取kafka的dwd_traffic_page_log主题数据String topicName dwd_traffic_page_log;String groupId dwd_user_user_login;DataStreamSourceString pageStream env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId), WatermarkStrategy.noWatermarks(), user_login);//TODO 3 过滤及转换SingleOutputStreamOperatorJSONObject jsonObjStream pageStream.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);if (jsonObject.getJSONObject(common).getString(uid) ! null) {out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();}}});//TODO 4 添加水位线SingleOutputStreamOperatorJSONObject withWaterMarkStream jsonObjStream.assignTimestampsAndWatermarks(WatermarkStrategy.JSONObjectforBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssignerJSONObject() {Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return element.getLong(ts);}}));//TODO 5 按照会话id分组KeyedStreamJSONObject, String keyedStream withWaterMarkStream.keyBy(new KeySelectorJSONObject, String() {Overridepublic String getKey(JSONObject value) throws Exception {return value.getJSONObject(common).getString(mid);}});//TODO 6 使用状态找出每个会话第一条数据SingleOutputStreamOperatorJSONObject firstStream keyedStream.process(new KeyedProcessFunctionString, JSONObject, JSONObject() {ValueStateJSONObject firstLoginDtState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptorJSONObject valueStateDescriptor new ValueStateDescriptor(first_login_dt, JSONObject.class);// 添加状态存活时间valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());firstLoginDtState getRuntimeContext().getState(valueStateDescriptor);}Overridepublic void processElement(JSONObject jsonObject, Context ctx, CollectorJSONObject out) throws Exception {// 处理数据// 获取状态JSONObject firstLoginDt firstLoginDtState.value();Long ts jsonObject.getLong(ts);if (firstLoginDt null) {firstLoginDtState.update(jsonObject);// 第一条数据到的时候开启定时器ctx.timerService().registerEventTimeTimer(ts 10 * 1000L);} else {Long lastTs firstLoginDt.getLong(ts);if (ts lastTs) {firstLoginDtState.update(jsonObject);}}}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorJSONObject out) throws Exception {super.onTimer(timestamp, ctx, out);out.collect(firstLoginDtState.value());}});//TODO 7 转换结构SingleOutputStreamOperatorString mapStream firstStream.map(new MapFunctionJSONObject, String() {Overridepublic String map(JSONObject jsonObj) throws Exception {JSONObject common jsonObj.getJSONObject(common);Long ts jsonObj.getLong(ts);String loginTime DateFormatUtil.toYmdHms(ts);String dateId loginTime.substring(0, 10);DwdUserUserLoginBean dwdUserUserLoginBean DwdUserUserLoginBean.builder().userId(common.getString(uid)).dateId(dateId).loginTime(loginTime).channel(common.getString(ch)).provinceId(common.getString(ar)).versionCode(common.getString(vc)).midId(common.getString(mid)).brand(common.getString(ba)).model(common.getString(md)).sourceId(common.getString(sc)).operatingSystem(common.getString(os)).ts(ts).build();return JSON.toJSONString(dwdUserUserLoginBean);}});//TODO 8 输出数据String sinkTopic dwd_user_user_login;mapStream.sinkTo(KafkaUtil.getKafkaProducer(sinkTopic, user_login_trans));//TODO 9 执行任务env.execute();} } P052 [atguigunode001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_user_user_login [atguigunode001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/ [atguigunode001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
http://www.w-s-a.com/news/337723/

相关文章:

  • 一个一起做网站东莞设计兼职网站建设
  • 杭州网站程序开发公司在哪个公司建设网站好
  • 网店的网站设计方案济南手机建站价格
  • 网站做了301重定向域名会自动跳转吗唐山地方志网站建设
  • 学校网站建设说明书海南省建设执业资格注册管理中心网站
  • 东莞哪家网站建设好网站风格设定
  • 自驾游网站模板搭建wordpress步骤
  • wordpress视频网站上传视频提升学历是什么意思
  • 江西省城乡建设厅建设网站浙江建设
  • 网站联系我们页面临平做网站
  • 如何用网站做cpa交互比较好的网站
  • 一家只做特卖的网站wordpress修改模板教程
  • 与恶魔做交易的网站成都到西安高铁票价
  • 太原网站制作哪家便宜长春昆仑建设股份有限公司网站
  • 优质做网站价格设计手机商城网站建设
  • 高校网站建设制度无锡网站建设排名
  • 做网站的软件wd的叫啥无锡公司网站建设服务
  • 网站建设一般需要多久网站服务器基本要素有哪些
  • 大连开发区网站开发公司免费网站建设哪个好?
  • 关于建设门户网站的通知海曙区建设局网站
  • 韩国建设部网站温州企业网站制作
  • 苏州网站建设优化贵州网站建设lonwone
  • 网站建设与推广方案模板网站建设教程搭建浊贝湖南岚鸿给力
  • 网站建设内部下单流程图昆明网站制作公司
  • 手机网站焦点图在线外链推广
  • 做静态页面的网站中国建设银行河南省分行网站
  • 镇平县两学一做专题网站佛山家居网站全网营销
  • 做网站的需求wordpress图片怎么居中
  • 网站开发的技术流程图抖音seo排名优化软件
  • dedecms做电商网站得物app官方下载安装