求个网站你懂我意思是,uc浏览网页版进入,做网站需要用什么系统,校园网络安全设计方案Lookup Join 定义#xff08;支持 Batch\Streaming#xff09;
Lookup Join 其实就是维表 Join#xff0c;比如拿离线数仓来说#xff0c;常常会有用户画像#xff0c;设备画像等数据#xff0c;而对应到实时数仓场景中#xff0c;这种实时获取外部缓存的 Join 就叫做维…Lookup Join 定义支持 Batch\Streaming
Lookup Join 其实就是维表 Join比如拿离线数仓来说常常会有用户画像设备画像等数据而对应到实时数仓场景中这种实时获取外部缓存的 Join 就叫做维表 Join。
应用场景
Lookup Join 是流与 RedisMysqlHBase 这种存储介质的 Join。Lookup 的意思就是实时查找而实时的画像数据一般都是存储在 RedisMysqlHBase 中这就是 Lookup Join 的由来
实际案例
kafka流表和mysql维表的关联 使用曝光用户日志流show_log关联用户画像维表user_profile关联到用户的维度之后提供给下游计算分性别年龄段的曝光用户数使用。
mysql端处理
[rootspop007~]# mysql -uroot -p123456mysql create database test;
mysql CREATE TABLE user_profile (user_id varchar(100) NOT NULL,age varchar(100) DEFAULT NULL,sex varchar(100) DEFAULT NULL,PRIMARY KEY (user_id)
) ENGINEInnoDB DEFAULT CHARSETutf8;
INSERT INTO test.user_profile (user_id,age,sex) VALUES(a,12-18,男),(b,18-24,女),(c,18-24,男);mysqlselect * from test.user_profile;
kafka端处理
# 1.创建Kafka主题 test_k指定分区数量为1副本数量为1
kafka-topics.sh \
--create \
--topic test_k \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1# 2.向 test_k 中写入JSON格式的样例数据
./kafka-console-producer.sh \
--topic test_k \
--bootstrap-server localhost:9092!!!!!这个错误是因为你使用的 Kafka 版本较旧不支持 --bootstrap-server 参数。旧版本的 Kafka 使用
参数代替 --broker-list
----------------------------------
./kafka-console-producer.sh \
--topic test_k \
--broker-list localhost:9092
-----------------------------------
#输入完上面脚本直接粘贴复制json
{log_id: 1, timestamp: 1635696063,user_id:a}
{log_id: 2, timestamp: 1635696180,user_id:b}
{log_id: 3, timestamp: 1635696300,user_id:c}
{log_id: 4, timestamp: 1635696360,user_id:b}
{log_id: 5, timestamp: 1635696420,user_id:c}
{log_id: 6, timestamp: 1635696420,user_id:d}# 3.创建一个消费者组 group_k1 来消费 test_k 数据
kafka-console-consumer.sh \
--topic test_k \
--bootstrap-server localhost:9092 \
--group group_k1 \
--from-beginning
Flinksql代码
前提jdbc的jar包和mysql的驱动包都需要事先放入$FLINK_HOME/lib目录下。flink-connector-jdbc-1.15.2.jarmysql-connector-java-8.0.29.jarcd $FLINK_HOME/bin
./sql-client.sh CREATE TABLE click_log_table (log_id BIGINT, timestamp bigint,user_id string,proctime AS PROCTIME()
) WITH (connector kafka,topic test_k,properties.bootstrap.servers 192.168.77.88:9092,properties.group.id group_k1,scan.startup.mode earliest-offset,format json
);CREATE TABLE user_profile (user_id string, age string,sex string
)
WITH (connector jdbc,url jdbc:mysql://192.168.77.88:3306/test,table-name user_profile,usernameroot,passwordroot
);SELECT s.log_id as log_id, s.timestamp as timestamp, s.user_id as user_id, s.proctime as proctime, u.sex as sex, u.age as age
FROM click_log_table AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id u.user_id;查看flinksql输出窗口显示log_id timestamp user_id proctime sex age1 1635696063 a 2024-11-19 00:28:14.404 男 12-182 1635696180 b 2024-11-19 00:28:14.407 女 18-243 1635696300 c 2024-11-19 00:28:14.409 男 18-244 1635696360 b 2024-11-19 00:28:14.412 女 18-245 1635696420 c 2024-11-19 00:28:14.422 男 18-246 1635696420 d 2024-11-19 00:28:14.424 (NULL) (NULL) 修改mysql的数据 查看动态表的变化
UPDATE user_profile
SET age 99-99, sex 0
WHERE user_id a;kafka端输入
{log_id: 11111111111, timestamp: 1635696063,user_id:a}
结果对应下图一kafka端再输入
{log_id: 222222, timestamp: 1635696063,user_id:a}
结果对应下图二删除和新增有空再写总结 Lookup Join 使用left join关联 左表全部输出右表能匹配上的输出匹配不上的用null填充。