高端网站设计制作的,专业的论坛网站建设,app怎么创建,WordPress主题vieu破解版Catalog Catalog概述Catalog分类 GenericInMemoryCatalogJdbcCatalog下载JAR包及使用重启操作创建Catalog查看与使用Catalog自动初始化catalog HiveCatalog下载JAR包及使用重启操作hive metastore服务创建Catalog查看与使用CatalogFlink与Hive中操作自动初始化catalog 用户自定… Catalog Catalog概述Catalog分类 GenericInMemoryCatalogJdbcCatalog下载JAR包及使用重启操作创建Catalog查看与使用Catalog自动初始化catalog HiveCatalog下载JAR包及使用重启操作hive metastore服务创建Catalog查看与使用CatalogFlink与Hive中操作自动初始化catalog 用户自定义Catalog实现Catalog使用Catalog Catalog API数据库操作表操作视图操作分区操作函数操作 Catalog
概述 Catalog提供了元数据信息例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。 数据处理最关键的方面之一是管理元数据。 元数据可以是临时的例如临时表、或者通过TableEnvironment注册的 UDF。 元数据也可以是持久化的例如Hive Metastore中的元数据。 Catalog提供了一个统一的API用于管理元数据并使其可以从Table API和SQL查询语句中来访问。 Catalog分类 在Flink中Catalog可以分为4类GenericInMemoryCatalog、JdbcCatalog、HiveCatalog、用户自定义Catalog 1.GenericInMemoryCatalog GenericInMemoryCatalog是基于内存实现的 Catalog所有元数据只在 session 的生命周期内可用。 2.JdbcCatalog JdbcCatalog使得用户可以将Flink通过JDBC协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前 JDBC Catalog仅有的两种实现。 3.HiveCatalog HiveCatalog有两个用途作为原Flink元数据的持久化存储以及作为读写现有Hive元数据的接口。 Hive Metastore以小写形式存储所有元数据对象名称。而GenericInMemoryCatalog区分大小写。 4.用户自定义Catalog Catalog是可扩展的用户可以通过实现Catalog接口来开发自定义Catalog。 想要在SQL CLI中使用自定义 Catalog用户除了需要实现自定义的Catalog 之外还需要为这个Catalog实现对应的CatalogFactory接口。 CatalogFactory定义了一组属性用于SQL CLI启动时配置Catalog。 这组属性集将传递给发现服务在该服务中服务会尝试将属性关联到CatalogFactory并初始化相应的Catalog 实例。 GenericInMemoryCatalog 基于内存实现的Catalog所有元数据只在session的生命周期一个Flink任务运行生命周期内内可用。默认自动创建名为default_catalog的内存Catalog这个Catalog默认只有一个名为default_database的数据库。 JdbcCatalog JdbcCatalog使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现将元数据存储在数据库中。 这里以JdbcCatalog-MySQL使用为例。 注意JdbcCatalog不支持建表只是打通flink与mysql的连接可以去读写mysql现有的库表。 下载JAR包及使用
下载flink-connector-jdbc
下载mysql-connector-j
上传JAR包到flink/lib下
cp ./flink-connector-jdbc-3.1.0-1.17.jar /usr/local/program/flink/libcp ./mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib重启操作
重启flink集群和sql-client
bin/start-cluster.shbin/sql-client.sh创建Catalog
JdbcCatalog支持以下选项:
name:必需Catalog名称default-database:连接到的默认数据库username: Postgres/MySQL帐户的用户名password:帐号密码base-url:数据库的jdbc url(不含数据库名)Postgres Catalog是jdbc:postgresql://ip:端口MySQL Catalog是jdbc: mysql://ip:端口CREATE CATALOG jdbc_catalog WITH(type jdbc,default-database demo,username root,password 123456,base-url jdbc:mysql://node01:3306
);查看与使用Catalog
查看Catalog
Flink SQL show catalogs;
-----------------
| catalog name |
-----------------
| default_catalog |
| jdbc_catalog |
-----------------
2 rows in set使用指定Catalog
Flink SQL use catalog jdbc_catalog;
[INFO] Execute statement succeed.查看当前的CATALOG
Flink SQL SHOW CURRENT CATALOG;
----------------------
| current catalog name |
----------------------
| jdbc_catalog |
----------------------
1 row in set操作数据库表
Flink SQL show current database;
-----------------------
| current database name |
-----------------------
| demo |
-----------------------
1 row in setFlink SQL show tables;
------------
| table name |
------------
| tb_user |
------------
1 row in setFlink SQL select * from tb_user;
[INFO] Result retrieval cancelled.Flink SQL insert into tb_user values(0,java,20);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9d78ec378ad635d291bd730ba86245d8自动初始化catalog
进入SQL客户端自动初始化catalo创建vim sql-client-init.sql初始化脚本
SET sql-client.execution.result-mode tableau;CREATE CATALOG jdbc_catalog WITH(type jdbc,default-database demo,username root,password 123456,base-url jdbc:mysql://node01:3306
);use catalog jdbc_catalog;进入客户端时指定初始化文件
bin/sql-client.sh -i ./sql-client-init.sql再查看catalog
Flink SQL show catalogs;
-----------------
| catalog name |
-----------------
| default_catalog |
| jdbc_catalog |
-----------------
2 rows in setHiveCatalog
HiveCatalog有两个用途
单纯作为 Flink元数据的持久化存储作为读写现有Hive元数据的接口注意Hive MetaStore以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称而 GenericInMemoryCatalog会区分大小写。 下载JAR包及使用
下载flink-sql-connector-hive
下载mysql-connector-j
上传jar包到flink的lib
cp ./flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar /usr/local/program/flink/lib/cp ./mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib重启操作
重启flink集群和sql-client
bin/start-cluster.shbin/sql-client.shhive metastore服务
启动外置的hive metastore服务 Hive metastore必须作为独立服务运行因此在Hive的hive-site.xml中添加配置 propertynamehive.metastore.uris/namevaluethrift://node01:9083/value/property# 前台运行
hive --service metastore# 后台运行
hive --service metastore 创建Catalog 创建Catalog参数说明 配置项必需默认值类型说明typeYes(none)StringCatalog类型创建HiveCatalog时必须设置为’hive’nameYes(none)StringCatalog的唯一名称hive-conf-dirNo(none)String包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议则认为是本地文件系统。如果不指定该选项则在类路径中搜索hive-site.xmldefault-databaseNodefaultStringHive Catalog使用的默认数据库hive-versionNo(none)StringHiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本除非自动检测失败hadoop-conf-dirNo(none)StringHadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项例如如果你想分别配置每个HiveCatalog
CREATE CATALOG myhive WITH (type hive,default-database default,hive-conf-dir /usr/local/program/hive/conf
);查看与使用Catalog
查看Catalog
Flink SQL SHOW CATALOGS;
-----------------
| catalog name |
-----------------
| default_catalog |
| myhive |
-----------------
2 rows in set--查看当前的CATALOG
SHOW CURRENT CATALOG;使用指定Catalog
Flink SQL use catalog myhive;
[INFO] Execute statement succeed.Flink与Hive中操作
Flink中查看
Flink SQL SHOW DATABASES;
---------------
| database name |
---------------
| default |
---------------
1 row in set操作Hive
# 创建数据库demo
hive (default) create database demo;# 切换数据库
hive (default) use demo;# 创建表tb_user
hive (demo) create table tb_user(id int,name string, age int);# 插入数据
hive (demo) insert into tb_user values(1,test,22);Flink中再次查看
Flink SQL SHOW DATABASES;
---------------
| database name |
---------------
| default |
| demo |
---------------
2 rows in setFlink SQL use demo;
[INFO] Execute statement succeed.Flink SQL show tables;
------------
| table name |
------------
| tb_user |
------------Flink SQL SET sql-client.execution.result-mode tableau;
[INFO] Execute statement succeed.Flink SQL select * from tb_user;2023-07-09 21:58:25,620 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 1--------------------------------------------------------------
| op | id | name | age |
--------------------------------------------------------------
| I | 1 | test | 22 |
--------------------------------------------------------------
Received a total of 1 row在Flink中插入
Flink SQL insert into tb_user values(2,flink,22);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9fe32af97cfb9e507ce84263cae65d23Flink SQL select * from tb_user;2023-07-09 22:05:47,521 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 2--------------------------------------------------------------
| op | id | name | age |
--------------------------------------------------------------
| I | 1 | test | 22 |
| I | 2 | flink | 22 |
--------------------------------------------------------------
Received a total of 2 rowsHive中查询
hive (demo) select * from tb_user;自动初始化catalog
进入SQL客户端自动初始化catalog创建vim sql-client-init.sql初始化脚本
SET sql-client.execution.result-mode tableau;CREATE CATALOG myhive WITH (type hive,default-database default,hive-conf-dir /usr/local/program/hive/conf
);use catalog myhive ;进入客户端时指定初始化文件
bin/sql-client.sh -i ./sql-client-init.sql可以发现数据信息任然存在
Flink SQL use catalog myhive;
[INFO] Execute statement succeed.Flink SQL show databases;
---------------
| database name |
---------------
| default |
| demo |
---------------
2 rows in set用户自定义Catalog
实现Catalog 用户可以通过实现Catalog接口来开发自定义 Catalog public class CustomCatalog implements Catalog {public CustomCatalog(String catalogName, String defaultDatabase) {}Overridepublic void open() {// 实现 Catalog 打开的逻辑}Overridepublic void close() {// 实现 Catalog 关闭的逻辑}Overridepublic ListString listDatabases() {// 实现获取数据库列表的逻辑return null;}Overridepublic CatalogDatabase getDatabase(String databaseName) {// 实现获取指定数据库的逻辑return null;}Overridepublic boolean databaseExists(String databaseName) {// 实现检查数据库是否存在的逻辑return false;}Overridepublic void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) {// 实现创建数据库的逻辑}Overridepublic void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) {// 实现删除数据库的逻辑}Overridepublic ListString listTables(String databaseName) {// 实现获取数据库中表的列表的逻辑return null;}Overridepublic CatalogBaseTable getTable(ObjectPath tablePath) {// 实现获取指定表的逻辑return null;}Overridepublic boolean tableExists(ObjectPath tablePath) {// 实现检查表是否存在的逻辑return false;}Overridepublic void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) {// 实现创建表的逻辑}Overridepublic void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) {// 实现删除表的逻辑}Overridepublic ListString listFunctions(String dbName) {// 实现获取数据库中函数的逻辑return null;}// 其他方法的实现
}使用Catalog public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 注册自定义 CatalogtableEnv.registerCatalog(my_catalog, new CustomCatalog(my_catalog, default));// 使用自定义 CatalogtableEnv.useCatalog(my_catalog);// 执行 SQL 查询或 Table API 操作tableEnv.sqlQuery(SELECT * FROM my_table).execute().print();}Catalog API
数据库操作 public static void main(String[] args) throws Exception {// 创建一个基于内存的Catalog实例GenericInMemoryCatalog catalog new GenericInMemoryCatalog(myCatalog);catalog.open();// 创建数据库MapString, String properties new HashMap();properties.put(key, value);CatalogDatabase database new CatalogDatabaseImpl(properties, create comment);catalog.createDatabase(mydb, database, false);// 列出Catalog中的所有数据库System.out.println(列出Catalog中的所有数据库 catalog.listDatabases());// 获取数据库CatalogDatabase createDb catalog.getDatabase(mydb);System.out.println(获取数据库,comment createDb.getComment() ,properties createDb.getProperties());// 修改数据库MapString, String properties2 new HashMap();properties2.put(key, value1);catalog.alterDatabase(mydb, new CatalogDatabaseImpl(properties2, alter comment), false);// 获取数据库CatalogDatabase alterDb catalog.getDatabase(mydb);System.out.println(获取数据库,comment alterDb.getComment() ,properties alterDb.getProperties());// 检查数据库是否存在System.out.println(检查数据库是否存在 catalog.databaseExists(mydb));// 删除数据库catalog.dropDatabase(mydb, false);// 关闭 Catalogcatalog.close();}列出Catalog中的所有数据库 [default, mydb]
获取数据库,comment create comment ,properties {keyvalue}
获取数据库,comment alter comment ,properties {keyvalue1}
检查数据库是否存在 true表操作
// 创建表
catalog.createTable(new ObjectPath(mydb, mytable), new CatalogTableImpl(...), false);// 删除表
catalog.dropTable(new ObjectPath(mydb, mytable), false);// 修改表
catalog.alterTable(new ObjectPath(mydb, mytable), new CatalogTableImpl(...), false);// 重命名表
catalog.renameTable(new ObjectPath(mydb, mytable), my_new_table);// 获取表
catalog.getTable(mytable);// 检查表是否存在
catalog.tableExists(mytable);// 列出数据库中的所有表
catalog.listTables(mydb);
视图操作
// 创建视图
catalog.createTable(new ObjectPath(mydb, myview), new CatalogViewImpl(...), false);// 删除视图
catalog.dropTable(new ObjectPath(mydb, myview), false);// 修改视图
catalog.alterTable(new ObjectPath(mydb, mytable), new CatalogViewImpl(...), false);// 重命名视图
catalog.renameTable(new ObjectPath(mydb, myview), my_new_view, false);// 获取视图
catalog.getTable(myview);// 检查视图是否存在
catalog.tableExists(mytable);// 列出数据库中的所有视图
catalog.listViews(mydb);
分区操作
// 创建分区
catalog.createPartition(new ObjectPath(mydb, mytable),new CatalogPartitionSpec(...),new CatalogPartitionImpl(...),false);// 删除分区
catalog.dropPartition(new ObjectPath(mydb, mytable), new CatalogPartitionSpec(...), false);// 修改分区
catalog.alterPartition(new ObjectPath(mydb, mytable),new CatalogPartitionSpec(...),new CatalogPartitionImpl(...),false);// 获取分区
catalog.getPartition(new ObjectPath(mydb, mytable), new CatalogPartitionSpec(...));// 检查分区是否存在
catalog.partitionExists(new ObjectPath(mydb, mytable), new CatalogPartitionSpec(...));// 列出表的所有分区
catalog.listPartitions(new ObjectPath(mydb, mytable));// 根据给定的分区规范列出表的分区
catalog.listPartitions(new ObjectPath(mydb, mytable), new CatalogPartitionSpec(...));// 根据表达式过滤器列出表的分区
catalog.listPartitions(new ObjectPath(mydb, mytable), Arrays.asList(epr1, ...));
函数操作
// 创建函数
catalog.createFunction(new ObjectPath(mydb, myfunc), new CatalogFunctionImpl(...), false);// 删除函数
catalog.dropFunction(new ObjectPath(mydb, myfunc), false);// 修改函数
catalog.alterFunction(new ObjectPath(mydb, myfunc), new CatalogFunctionImpl(...), false);// 获取函数
catalog.getFunction(myfunc);// 检查函数是否存在
catalog.functionExists(myfunc);// 列出数据库中的所有函数
catalog.listFunctions(mydb);