苏州网站搜索排名,网站的动画效果代码,淄博网站成功案例,旺道seo优化文章目录 概述定义核心功能适用场景架构原理总体架构统一存储基本概念文件布局 部署环境准备环境部署 实战Catalog文件系统Hive Catalog 创建表创建Catalog管理表查询创建表#xff08;CTAS#xff09;创建外部表创建临时表 修改表修改表修改列修改水印 概述
定义 Apache Pa… 文章目录 概述定义核心功能适用场景架构原理总体架构统一存储基本概念文件布局 部署环境准备环境部署 实战Catalog文件系统Hive Catalog 创建表创建Catalog管理表查询创建表CTAS创建外部表创建临时表 修改表修改表修改列修改水印 概述
定义 Apache Paimon 官网 https://paimon.apache.org/ 最新稳定版本为0.4.0-incubating0.5-SNAPSHOT正在开发 Apache Paimon 文档地址 https://paimon.apache.org/docs/master/ Apache Paimon 源码地址 https://github.com/apache/incubator-paimon/ Apache Paimon (incubating) 目前属于Apache 软件基金会 (ASF) 的孵化项目其原项目为由Flink官方维护的Flink Table Store其设计为一个开源流数据湖平台包揽Streaming实时计算能力和LakeHouse架构优势统一了存储具有高速数据摄取变更日志跟踪和高效的实时分析强大能力。 Apache Paimon 采用开放的数据格式和技术理念不仅支持Flink SQL编写和本地查询还可以与其他诸多业界主流计算引擎进行对接。
核心功能
统一批处理和流处理Paimon支持批写和批读以及流式写更改和流式读表更改日志。数据湖Paimon具有成本低、可靠性高、元数据可扩展等优点具有数据湖存储的所有优势。合并引擎Paimon支持丰富的合并引擎。缺省情况下保留主键的最后一项记录可以“部分更新”或“聚合”。变更日志生成Paimon支持丰富的Changelog producer例如“lookup”和“full-compaction”可以从任何数据源生成正确且完整的变更日志从而简化流管道的构建。丰富的表类型 除了主键表Paimon还支持append-only只追加表自动压缩小文件并提供有序的流读取来替换消息队列。模式演化支持完整的模式演化例如可以重新命名列和重新排序。
适用场景
Apache Paimon 适用于需要在流数据上进行实时查询和分析的场景。它可以帮助用户更容易地构建流式数据湖实现高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。例如在金融、电子商务、物联网等行业中可以使用 Apache Paimon 来实现实时推荐、欺诈检测、异常检测等应用。
架构原理
总体架构
Paimon 创新的结合了湖存储 LSM 列式格式 (ORC, Parquet)为湖存储带来大规模实时更新能力 读/写Paimon支持多种方式来读/写数据和执行OLAP查询。 对于读取它支持从历史快照(批处理模式)、从最新偏移量(流模式)中读取数据或者以混合方式读取增量快照。对于写操作它支持从数据库变更日志(CDC)进行流同步或者从离线数据进行批量插入/覆盖。 生态系统除了Apache Flink, Paimon还支持其他计算引擎的读取如Apache Hive、Apache Spark、Presto和Trino。 内部在底层Paimon将列文件存储在文件系统/对象存储中并使用LSM树结构来支持大量数据更新和高性能查询。
统一存储
对于像Apache Flink这样的流媒体引擎通常有三种类型的连接器:
消息队列比如Apache Kafka它在这个管道的源和中间阶段都被使用以保证延迟保持在秒内。OLAP系统如ClickHouse它以流方式接收处理过的数据并为用户的特别查询提供服务。批处理存储如Apache Hive它支持传统批处理的各种操作包括INSERT OVERWRITE。
Paimon提供表抽象。它的使用方式与传统数据库没有什么不同:
在批处理执行模式下它就像一个Hive表支持批处理SQL的各种操作。查询最新快照。在流式执行模式下它的作用类似于消息队列。查询它的行为类似于从历史数据永远不会过期的消息队列查询流更改日志。
基本概念
快照快照捕获表在某个时间点的状态。用户可以通过最新的快照访问表的最新数据。通过时间旅行用户还可以通过较早的快照访问表的先前状态。分区 Paimon采用与Apache Hive相同的分区概念来分离数据。分区是一种可选的方法可以根据特定列(如日期、城市和部门)的值将表划分为相关部分。每个表可以有一个或多个分区键来标识一个特定的分区。通过分区用户可以有效地操作表中的记录切片。如果定义了主键分区键必须是主键的子集。 桶 未分区的表或分区表中的分区被细分为桶为数据提供额外的结构可用于更有效的查询。bucket的范围由记录中一个或多个列的哈希值决定。用户可以通过提供bucket-key选项来指定bucket列。如果没有指定桶键选项则使用主键(如果定义了)或完整记录作为桶键。bucket是用于读写的最小存储单元因此bucket的数量限制了最大的处理并行性。但是这个数字不应该太大因为它将导致大量小文件和低读取性能。一般情况下建议每个bucket中的数据大小为1GB左右。 一致性保证 Paimon编写器使用两阶段提交协议自动将一批记录提交到表中。每次提交在提交时最多产生两个快照。对于任何两个同时修改一个表的写入器只要他们没有修改同一个桶他们的提交就是可序列化的。如果修改的是同一个桶则只保证快照隔离。也就是说最终的表状态可能是两次提交的混合状态但不会丢失任何更改。
文件布局
表的所有文件都存储在一个基本目录下。Paimon文件以分层的方式组织。下图说明了文件布局。从快照文件开始Paimon读取器可以递归地访问表中的所有记录。 快照文件所有快照文件都保存在快照目录下。快照文件是一个JSON文件其中包含有关该快照的信息包括架构文件使用包含此快照的所有更改的清单列表。Manifest文件所有清单列表和清单文件都存储在manifest目录中。清单列表是清单文件名的列表清单文件是包含有关LSM数据文件和更改日志文件的更改的文件。例如在相应的快照中创建了哪个LSM数据文件删除了哪个文件。数据文件数据文件按分区和桶分组。每个桶目录包含一个LSM树及其变更日志文件。目前Paimon支持使用orc(默认)、parquet和avro作为数据文件格式。LSM树Paimon采用LSM树(日志结构的合并树)作为文件存储的数据结构。数据文件中的记录按其主键排序在Sorted Runs中数据文件的主键范围从不重叠。不同Sorted Runs可能有重叠的主键范围甚至可能包含相同的主键。在查询LSM树时必须将所有Sorted Runs组合起来并且必须根据用户指定的合并引擎和每条记录的时间戳合并具有相同主键的所有记录。写入LSM树的新记录将首先在内存中进行缓冲。当内存缓冲区已满时将对内存中的所有记录进行排序并刷新到磁盘。得益于 LSM 数据结构的追加写能力Paimon 在大规模的更新数据输入的场景中提供了出色的性能。 合并当越来越多的记录写入LSM树时Sorted Runs次数将会增加。因为查询LSM树需要将所有Sorted Runs组合在一起太多的Sorted Runs将导致查询性能差甚至导致内存不足。为了限制排序运行的次数必须偶尔将几个Sorted Runs合并为一个大的Sorted Runs这个过程称为合并或压缩合并是一个资源密集型过程它会消耗一定的CPU时间和磁盘IO因此过于频繁的压缩可能会导致写速度变慢。这是查询性能和写性能之间的权衡。目前Paimon采用了一种与Rocksdb的通用压实类似的压实策略。默认情况下当Paimon将记录追加到LSM树时它还将根据需要执行压缩还可以选择在专用压缩作业中执行所有压缩。
部署
环境准备
官方提供对应引擎的版本支持如下 环境部署
# 这里选择下载最新版本Flink1.17.1
wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
# 解压文件
tar -xvf flink-1.17.1-bin-scala_2.12.tgz
# 进入flink目录
cd flink-1.17.1配置环境变量修改flink-conf.yaml配置文件
# 如果是Flink17版本以下env.java.opts.all则需改为env.java.opts
env.java.opts.all: -Dfile.encodingUTF-8
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4
execution.checkpointing.interval: 10s
# 用于存储和检查点状态
state.backend: rocksdb
# 存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs://myns/flink/myns
# savepoints 的默认目标目录(可选)
state.savepoints.dir: hdfs://myns/flink/savepoints
# 用于启用/禁用增量 checkpoints 的标志
state.backend.incremental: true使用paimon非常简单和其他数据湖产品一样都是将jar包放在引擎的目录下
# 解决依赖问题将hadoop的hadoop-mapreduce-client-core-3.3.4.jar拷贝到flink
cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar ./lib/
# 下载最新版的paimon目前0.5属于快照版本,可以先进入lib目录然后下载到当前lib也可以通过其他地方下载然后上传拷贝到flink的lib目录下
cd lib/
wget https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.17/0.5-SNAPSHOT/paimon-flink-1.17-0.5-20230802.034234-105.jar
# 由于后续会使用到其他连接器这里先下载安装好后面直接使用即可先下载flink-sql-connector-hive连接器
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
# 下载flink-sql-connector-mysql-cdc连接器
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# 下载flink-sql-connector-kafka连接器
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar
# 重新回到flink的根目录
cd ../
# 先保证hadoop环境通过yarn启动flink集群
./bin/yarn-session.sh -d
# 以提交yarn-session方式启动sql客户端
./bin/sql-client.sh -s yarn-session测试环境是否可用
set sql-client.execution.result-mode tableau;
show databases;
show tables;
select 1;通过yarn管理页面可以看到有 Flink session cluster运行job点击该记录的ApplicationMaster跳转到flink管理页面也可以看到刚才job已经完成环境准备完毕。
实战
Catalog
Paimon Catalog可以持久化元数据当前支持两种类型的metastore
文件系统默认将元数据和表文件存储在文件系统中。hive在hive metastore存储元数据用户可以直接从hive访问表。
文件系统
下面的Flink SQL注册并使用一个名为fs_catalog的Paimon编目。元数据和表文件存放在hdfs://myns/paimon/fs下。
CREATE CATALOG fs_catalog WITH (
type paimon,
warehouse hdfs://myns/paimon/fs
);show catalogs;Hive Catalog
使用Hive Catalog前需要先启动hive元数据服务
nohup hive --service metastore 通过使用Paimon Hive catalog对catalog的更改将直接影响到相应的Hive metastore。使用Hive catalog数据库名、表名和字段名应该是小写的。
CREATE CATALOG hive_catalog WITH (type paimon,metastore hive,uri thrift://hadoop2:9083,hive-conf-dir /home/commons/apache-hive-3.1.3-bin/conf/, warehouse hdfs://myns/paimon/hive
);
show catalogs;USE CATALOG hive_catalog;
CREATE TABLE test1 (id BIGINT,a INT,b STRING,dt STRING COMMENT timestamp string in format yyyyMMdd,PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);在指定的Catalog中创建表 关闭重新进入sql-client后只剩下默认的default_catalog因此可以在启动客户端时执行指定创建catalog语句vim conf/sql-client-init.sql
set sql-client.execution.result-mode tableau; CREATE CATALOG fs_catalog WITH (
type paimon,
warehouse hdfs://myns/paimon/fs
);CREATE CATALOG hive_catalog WITH (type paimon,metastore hive,uri thrift://hadoop2:9083,hive-conf-dir /home/commons/apache-hive-3.1.3-bin/conf/, warehouse hdfs://myns/paimon/hive
);USE CATALOG hive_catalog;通过-i参数启动执行sql文件启动后就可以看到hive_catalog之前已创建的表了
./bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql创建表
创建Catalog管理表
在Paimon Catalog中创建的表由Catalog管理也就是管理表。当表从目录中删除时它的表文件也将被删除与hive内部表相似。
带主键表
CREATE TABLE user_behavior1 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);在删除表之前应该停止在表上插入作业否则不能完全删除表文件。
分区表
CREATE TABLE user_behavior2 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);指定统计模式Paimon将自动收集数据文件的统计信息以加快查询过程。支持四种模式: Full收集完整指标:null_count, min, Max。Truncate (length)长度可以是任何正数默认模式是Truncate(16)这意味着收集null计数min/max值截断长度为16。这主要是为了避免太大的列将扩大清单文件。计数只收集空计数。None关闭元数据采集。 字段默认值Paimon表目前支持为表属性中的字段设置默认值注意不能指定分区字段和主键字段。
CREATE TABLE user_behavior2 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with(fields.item_id.deafult-value0
);查询创建表CTAS
表可以通过查询结果创建或填充简单创建表。
CREATE TABLE items1 (user_id BIGINT,item_id BIGINT
);
CREATE TABLE items2 AS SELECT * FROM items1;/* 分区表 */
CREATE TABLE user_behavior_p1 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE user_behavior_p2 WITH (partition dt) AS SELECT * FROM user_behavior_p1;/* change options */
CREATE TABLE user_behavior_3 (user_id BIGINT,item_id BIGINT
) WITH (file.format orc);
CREATE TABLE user_behavior_4 WITH (file.format parquet) AS SELECT * FROM user_behavior_3;/* 主键 */
CREATE TABLE user_behavior_5 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) ;
CREATE TABLE user_behavior_6 WITH (primary-key dt,hh) AS SELECT * FROM user_behavior_5;/* 主键 分区 */
CREATE TABLE user_behavior_all (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
CREATE TABLE user_behavior_all_as WITH (primary-key dt,hh, partition dt) AS SELECT * FROM user_behavior_all;CREATE TABLE LIKE
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) ;CREATE TABLE user_behavior_like LIKE user_behavior;表属性
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH (bucket 2,bucket-key user_id
);创建外部表
外部表由Catalog记录但不由Catalog管理。如果删除外部表则不会删除其表文件。可以在任何目录中使用Paimon外部表。如果不想创建Paimon目录而只想读/写表那么可以考虑使用外部表。Flink SQL支持读写外部表外部Paimon表是通过指定连接器和路径表属性创建的。
use catalog default_catalog;
CREATE TABLE user_behavior_external (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (connector paimon,path hdfs://myns/paimon/external,auto-create true -- 如果表路径不存在此table属性将为空表创建表文件目前仅支持Flink
);创建临时表
临时表仅由Flink支持。与外部表一样临时表只是记录而不是由当前Flink SQL会话管理。如果临时表被删除它的资源不会被删除。当Flink SQL会话关闭时也会删除临时表。如果希望将Paimon Catalog与其他表一起使用但又不希望将它们存储在其他Catalog中则可以创建一个临时表。
USE CATALOG hive_catalog;
CREATE TEMPORARY TABLE temp_table (k INT,v STRING
) WITH (connector filesystem,path hdfs://myns/paimon/temp/temp_table.csv,format csv
);# 可以使用临时表和其他表进行关联查询
SELECT my_table.k, my_table.v, temp_table.v FROM my_table JOIN temp_table ON my_table.k temp_table.k;修改表
修改表
CREATE TABLE my_table (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
# 修改表属性
ALTER TABLE my_table SET (write-buffer-size 256 MB
);
# 修改表名
ALTER TABLE my_table RENAME TO my_table_new;
# 删除表的属性
ALTER TABLE my_table RESET (write-buffer-size);修改列
# 填写列
ALTER TABLE my_table ADD (c1 INT, c2 STRING);
# 重命名列
ALTER TABLE my_table RENAME c0 TO c1;
# 删除列
ALTER TABLE my_table DROP (c1, c2);
# 更改列的空属性
CREATE TABLE my_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT NULL);
-- 将列 coupon_info 从NOT NULL更改为可空
ALTER TABLE my_table MODIFY coupon_info FLOAT;
-- 将列 coupon_info 从可空改为NOT NULL如果已经有NULL值设置如下表选项在修改表之前静默删除这些记录。
SET table.exec.sink.not-null-enforcer DROP;
ALTER TABLE my_table MODIFY coupon_info FLOAT NOT NULL;
# 更改列注释
ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT buy count
# 添加列位置
ALTER TABLE my_table ADD c INT FIRST;
ALTER TABLE my_table ADD c INT AFTER b;
# 改变列位置
ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;
# 修改列类型
ALTER TABLE my_table MODIFY col_a DOUBLE;修改水印
# 添加WATERMARK下面的SQL从现有的列log_ts中添加一个计算列ts并在列ts上添加一个策略为ts - INTERVAL 1 HOUR的水印该水印被标记为表my_table的事件时间属性。
CREATE TABLE my_test_wm (id BIGINT,name STRING,log_ts BIGINT
);
ALTER TABLE my_test_wm ADD (ts AS TO_TIMESTAMP_LTZ(log_ts,3),WATERMARK FOR ts AS ts - INTERVAL 1 HOUR
);
# 修改WATERMARK
ALTER TABLE my_test_wm MODIFY WATERMARK FOR ts AS ts - INTERVAL 2 HOUR
# 删除WATERMARK
ALTER TABLE my_test_wm DROP WATERMARK本人博客网站IT小神 www.itxiaoshen.com