当前位置: 首页 > news >正文

成都网站建设公司汇总qq是哪个公司的

成都网站建设公司汇总,qq是哪个公司的,网上书城网站开发的目的与意,wordpress 运行很慢Apache Flink 有两种关系型 API 来做流批统一处理#xff1a;Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询API#xff0c;它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入…Apache Flink 有两种关系型 API 来做流批统一处理Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询API它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的流式还是有界的批处理在两个接口中指定的查询都具有相同的语义并指定相同的结果。 Table API 和 SQL 两种 API 是紧密集成的以及 DataStream API。你可以在这些 API 之间以及一些基于这些 API 的库之间轻松的切换。例如您可以使用 MATCH_RECOGNIZE 子句从表中检测模式然后使用 DataStream API 基于匹配的模式构建警报。 1. Table API 和 SQL 程序的结构 所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例展示了 Table API 和 SQL 程序的通用结构。 import org.apache.flink.table.api.*; import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;// Create a TableEnvironment for batch or streaming execution. // See the Create a TableEnvironment section for details. TableEnvironment tableEnv TableEnvironment.create(/*…*/);// Create a source table tableEnv.createTemporaryTable(SourceTable, TableDescriptor.forConnector(datagen).schema(Schema.newBuilder().column(f0, DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build());// Create a sink table (using SQL DDL) tableEnv.executeSql(CREATE TEMPORARY TABLE SinkTable WITH (connector blackhole) LIKE SourceTable (EXCLUDING OPTIONS) );// Create a Table object from a Table API query Table table1 tableEnv.from(SourceTable);// Create a Table object from a SQL query Table table2 tableEnv.sqlQuery(SELECT * FROM SourceTable);// Emit a Table API result Table to a TableSink, same for SQL result TableResult tableResult table1.insertInto(SinkTable).execute(); 2. 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 的核心概念。它负责: 在内部的 catalog 中注册 Table注册外部的 catalog加载可插拔模块执行 SQL 查询注册自定义函数 scalar、table 或 aggregationDataStream 和 Table 之间的转换(面向 StreamTableEnvironment ) Table 总是与特定的 TableEnvironment 绑定。 不能在同一条查询中使用不同 TableEnvironment 中的表例如对它们进行 join 或 union 操作。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。 import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings settings EnvironmentSettings.newInstance().inStreamingMode()//.inBatchMode().build();TableEnvironment tEnv TableEnvironment.create(settings); 或者用户可以从现有的 StreamExecutionEnvironment 创建一个  StreamTableEnvironment 与 DataStream API 互操作。 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv StreamTableEnvironment.create(env); 3. 在 Catalog 中创建表 TableEnvironment 维护着一个由标识符identifier创建的表 catalog 的映射。标识符由三个部分组成catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明就会使用当前默认值。 Table 可以是虚拟的视图 VIEWS也可以是常规的表 TABLES。视图 VIEWS可以从已经存在的Table中创建一般是 Table API 或者 SQL 的查询结果。 表TABLES描述的是外部数据例如文件、数据库表或者消息队列。 3.1 临时表Temporary Table和永久表Permanent Table 表可以是临时的并与单个 Flink 会话session的生命周期相关也可以是永久的并且在多个 Flink 会话和群集cluster中可见。 永久表需要Catalog例如 Hive Metastore以维护表的元数据。一旦永久表被创建它将对任何连接到 catalog 的 Flink 会话可见且持续存在直至被明确删除。 另一方面临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间namespace中创建。即使它们对应的数据库被删除临时表也不会被删除。 3.1.1 屏蔽Shadowing 可以使用与已存在的永久表相同的标识符去注册临时表。临时表会屏蔽永久表并且只要临时表存在永久表就无法访问。所有使用该标识符的查询都将作用于临时表。 3.2 创建表 3.2.1 虚拟表 在 SQL 的术语中Table API 的对象对应于视图虚拟表。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建 // get a TableEnvironment TableEnvironment tableEnv ...; // see Create a TableEnvironment section// table is the result of a simple projection query Table projTable tableEnv.from(X).select(...);// register the Table projTable as table projectedTable tableEnv.createTemporaryView(projectedTable, projTable); 注意 从传统数据库系统的角度来看Table 对象与 VIEW 视图非常像。也就是定义了 Table 的查询是没有被优化的 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table那么它会被内嵌每个查询中并被执行多次 也就是说注册了的Table的结果不会被共享。 3.2.2 Connector Tables 另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。这样的表可以直接使用 Table API 创建也可以通过切换到 SQL DDL 来创建。 // Using table descriptors final TableDescriptor sourceDescriptor TableDescriptor.forConnector(datagen).schema(Schema.newBuilder().column(f0, DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build();tableEnv.createTable(SourceTableA, sourceDescriptor); tableEnv.createTemporaryTable(SourceTableB, sourceDescriptor);// Using SQL DDL tableEnv.executeSql(CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)); 3.3 扩展表标识符 表总是通过三元标识符注册包括 catalog 名、数据库名和表名。用户可以指定一个 catalog 和数据库作为 “当前catalog” 和当前数据库。有了这些那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。标识符遵循 SQL 标准因此使用时需要用反引号进行转义。 TableEnvironment tEnv ...; tEnv.useCatalog(custom_catalog); tEnv.useDatabase(custom_database);Table table ...;// register the view named exampleView in the catalog named custom_catalog // in the database named custom_database tableEnv.createTemporaryView(exampleView, table);// register the view named exampleView in the catalog named custom_catalog // in the database named other_database tableEnv.createTemporaryView(other_database.exampleView, table);// register the view named example.View in the catalog named custom_catalog // in the database named custom_database tableEnv.createTemporaryView(example.View, table);// register the view named exampleView in the catalog named other_catalog // in the database named other_database tableEnv.createTemporaryView(other_catalog.other_database.exampleView, table); 4. 查询表 4.1 Table API Table API 是关于 Scala 和 Java 的集成语言式查询 API。与 SQL 相反Table API 的查询不是由字符串指定而是在宿主语言中逐步构建。Table API 是基于 Table 类的该类表示一个表流或批处理并提供使用关系操作的方法。这些方法返回一个新的 Table 对象该对象表示对输入 Table 进行关系操作的结果。 一些关系操作由多个方法调用组成例如 table.groupBy(...).select()其中 groupBy(...) 指定 table 的分组而 select(...) 在 table 分组上的投影。 以下示例展示了一个简单的 Table API 聚合查询 // get a TableEnvironment TableEnvironment tableEnv ...; // see Create a TableEnvironment section// register Orders table// scan registered Orders table Table orders tableEnv.from(Orders); // compute revenue for all customers from France Table revenue orders.filter($(cCountry).isEqual(FRANCE)).groupBy($(cID), $(cName)).select($(cID), $(cName), $(revenue).sum().as(revSum));// emit or convert Table // execute query 4.2 SQL Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。 下面的示例演示了如何指定查询并将结果作为 Table 对象返回。 // get a TableEnvironment TableEnvironment tableEnv ...; // see Create a TableEnvironment section// register Orders table// compute revenue for all customers from France Table revenue tableEnv.sqlQuery(SELECT cID, cName, SUM(revenue) AS revSum FROM Orders WHERE cCountry FRANCE GROUP BY cID, cName);// emit or convert Table // execute query 如下的示例展示了如何指定一个更新查询将查询的结果插入到已注册的表中。 // get a TableEnvironment TableEnvironment tableEnv ...; // see Create a TableEnvironment section// register Orders table // register RevenueFrance output table// compute revenue for all customers from France and emit to RevenueFrance tableEnv.executeSql(INSERT INTO RevenueFrance SELECT cID, cName, SUM(revenue) AS revSum FROM Orders WHERE cCountry FRANCE GROUP BY cID, cName); 4.3 混用 Table API 和 SQL Table API 和 SQL 查询的混用非常简单因为它们都返回 Table 对象 可以在 SQL 查询返回的 Table 对象上定义 Table API 查询。在 TableEnvironment 中注册的结果表可以在 SQL 查询的 FROM 子句中引用通过这种方法就可以在 Table API 查询的结果上定义 SQL 查询。 5. 输出表 Table 通过写入 TableSink 输出。TableSink 是一个通用接口用于支持多种文件格式如 CSV、Apache Parquet、Apache Avro、存储系统如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch或消息队列系统如 Apache Kafka、RabbitMQ。 批处理 Table 只能写入 BatchTableSink而流处理 Table 需要指定写入 AppendStreamTableSinkRetractStreamTableSink 或者 UpsertStreamTableSink。 方法 Table.insertInto(String tableName) 定义了一个完整的端到端管道将源表中的数据传输到一个被注册的输出表中。 该方法通过名称在 catalog 中查找输出表并确认 Table schema 和输出表 schema 一致。 可以通过方法 TablePipeline.explain() 和 TablePipeline.execute() 分别来解释和执行一个数据流管道。 下面的示例演示如何输出 Table // get a TableEnvironment TableEnvironment tableEnv ...; // see Create a TableEnvironment section// create an output Table final Schema schema Schema.newBuilder().column(a, DataTypes.INT()).column(b, DataTypes.STRING()).column(c, DataTypes.BIGINT()).build();tableEnv.createTemporaryTable(CsvSinkTable, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /path/to/file).format(FormatDescriptor.forFormat(csv).option(field-delimiter, |).build()).build());// compute a result Table using Table API operators and/or SQL queries Table result ...;// Prepare the insert into pipeline TablePipeline pipeline result.insertInto(CsvSinkTable);// Print explain details pipeline.printExplain();// emit the result Table to the registered TableSink pipeline.execute(); 6. 翻译与执行查询 不论输入数据源是流式的还是批式的Table API 和 SQL 查询都会被转换成 DataStream 程序。 查询在内部表示为逻辑查询计划并被翻译成两个阶段 优化逻辑执行计划翻译成 DataStream 程序 Table API 或者 SQL 查询在下列情况下会被翻译 当 TableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句一旦该方法被调用 SQL 语句立即被翻译。当 TablePipeline.execute() 被调用时。该方法是用来执行一个源表到输出表的数据流一旦该方法被调用 TABLE API 程序立即被翻译。当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地一旦该方法被调用 TABLE API 程序立即被翻译。当 StatementSet.execute() 被调用时。TablePipeline 通过 StatementSet.add() 输出给某个 Sink和 INSERT 语句 通过调用 StatementSet.addInsertSql()会先被缓存到 StatementSet 中StatementSet.execute() 方法被调用时所有的 sink 会被优化成一张有向无环图。当 Table 被转换成 DataStream 时。转换完成后它就成为一个普通的 DataStream 程序并会在调用 StreamExecutionEnvironment.execute() 时被执行。 7. 查询优化 Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化例如 基于 Apache Calcite 的子查询解相关投影剪裁分区剪裁过滤器下推子计划消除重复数据以避免重复计算特殊子查询重写包括两部分 将 IN 和 EXISTS 转换为 left semi-joins将 NOT IN 和 NOT EXISTS 转换为 left anti-join 可选 join 重新排序 通过 table.optimizer.join-reorder-enabled 启用 注意 当前仅在子查询重写的结合条件下支持 IN / EXISTS / NOT IN / NOT EXISTS。优化器不仅基于计划而且还基于可从数据源获得的丰富统计信息以及每个算子例如 iocpu网络和内存的细粒度成本来做出明智的决策。高级用户可以通过 CalciteConfig 对象提供自定义优化可以通过调用 TableEnvironmentgetConfigsetPlannerConfig 将其提供给 TableEnvironment。 8. 解释表 Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多 sink 计划的结果。它返回一个描述三种计划的字符串 关系查询的抽象语法树the Abstract Syntax Tree即未优化的逻辑查询计划优化的逻辑查询计划物理执行计划。 可以用 TableEnvironment.explainSql() 方法和 TableEnvironment.executeSql() 方法支持执行一个 EXPLAIN 语句获取逻辑和优化查询计划. 以下代码展示了一个示例以及对给定 Table 使用 Table.explain() 方法的相应输出 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv StreamTableEnvironment.create(env);DataStreamTuple2Integer, String stream1 env.fromElements(new Tuple2(1, hello)); DataStreamTuple2Integer, String stream2 env.fromElements(new Tuple2(1, hello));// explain Table API Table table1 tEnv.fromDataStream(stream1, $(count), $(word)); Table table2 tEnv.fromDataStream(stream2, $(count), $(word)); Table table table1.where($(word).like(F%)).unionAll(table2);System.out.println(table.explain()); 上述例子的结果是 Abstract Syntax Tree LogicalUnion(all[true]) :- LogicalFilter(condition[LIKE($1, _UTF-16LEF%)]) : - LogicalTableScan(table[[Unregistered_DataStream_1]]) - LogicalTableScan(table[[Unregistered_DataStream_2]]) Optimized Physical Plan Union(all[true], union[count, word]) :- Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)]) : - DataStreamScan(table[[Unregistered_DataStream_1]], fields[count, word]) - DataStreamScan(table[[Unregistered_DataStream_2]], fields[count, word]) Optimized Execution Plan Union(all[true], union[count, word]) :- Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)]) : - DataStreamScan(table[[Unregistered_DataStream_1]], fields[count, word]) - DataStreamScan(table[[Unregistered_DataStream_2]], fields[count, word]) 以下代码展示了一个示例以及使用 StatementSet.explain() 的多 sink 计划的相应输出 EnvironmentSettings settings EnvironmentSettings.inStreamingMode(); TableEnvironment tEnv TableEnvironment.create(settings);final Schema schema Schema.newBuilder().column(count, DataTypes.INT()).column(word, DataTypes.STRING()).build();tEnv.createTemporaryTable(MySource1, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /source/path1).format(csv).build()); tEnv.createTemporaryTable(MySource2, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /source/path2).format(csv).build()); tEnv.createTemporaryTable(MySink1, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /sink/path1).format(csv).build()); tEnv.createTemporaryTable(MySink2, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /sink/path2).format(csv).build());StatementSet stmtSet tEnv.createStatementSet();Table table1 tEnv.from(MySource1).where($(word).like(F%)); stmtSet.add(table1.insertInto(MySink1));Table table2 table1.unionAll(tEnv.from(MySource2)); stmtSet.add(table2.insertInto(MySink2));String explanation stmtSet.explain(); System.out.println(explanation); 多 sink 计划的结果是 Abstract Syntax Tree LogicalLegacySink(name[default_catalog.default_database.MySink1], fields[count, word]) - LogicalFilter(condition[LIKE($1, _UTF-16LEF%)])- LogicalTableScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])LogicalLegacySink(name[default_catalog.default_database.MySink2], fields[count, word]) - LogicalUnion(all[true]) :- LogicalFilter(condition[LIKE($1, _UTF-16LEF%)]) : - LogicalTableScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) - LogicalTableScan(table[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]]) Optimized Physical Plan LegacySink(name[default_catalog.default_database.MySink1], fields[count, word]) - Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)]) - LegacyTableSourceScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields[count, word])LegacySink(name[default_catalog.default_database.MySink2], fields[count, word]) - Union(all[true], union[count, word]) :- Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)]) : - LegacyTableSourceScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields[count, word]) - LegacyTableSourceScan(table[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields[count, word]) Optimized Execution Plan Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)])(reuse_id[1]) - LegacyTableSourceScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields[count, word])LegacySink(name[default_catalog.default_database.MySink1], fields[count, word]) - Reused(reference_id[1])LegacySink(name[default_catalog.default_database.MySink2], fields[count, word]) - Union(all[true], union[count, word]) :- Reused(reference_id[1]) - LegacyTableSourceScan(table[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields[count, word])
http://www.w-s-a.com/news/138020/

相关文章:

  • 邯郸市城乡建设管理局网站vimwiki wordpress
  • 如何修改wordpress站名如何制作公司网站
  • 宁波网站建设与推广方案网站有了备案号之后能做什么
  • 汕头手机端建站模板pinterest app下载
  • 网站主机免费宁波网站建设优化诊断
  • 吧网站做软件的软件下载简单的ui界面制作
  • 陕西网站制作公司网页制作与设计代码
  • 做网站行情郑州微信网站开发
  • 河间网站建设制作null wordpress theme
  • h5网站制作网站开发网站建设文翻译工作
  • 网站建设 税种秦皇岛哪有网站优化公司
  • 专业开发网站设计找人做网页需要多少钱
  • 手机购物网站 建站网站建设网站制作网站设计
  • 基于iview的网站开发模板小程序制作需要什么语言
  • 精美网站设计保定建行网站首页登录
  • 网站建设常见问题做网站保存什么格式最好
  • 营销型网站建设与网页设计网站建设 amp 找VX cp5173
  • 新网站该如何做网站优化呢儿童手工
  • 湖北现代城市建设集团网站搜索引擎优化的作用
  • 上海做网站吧开一家软件开发公司需要什么
  • 阿里巴巴网站建设改图片建设厅官方网站河南
  • 邓砚谷电子商务网站建设镇江网
  • 网站空间支持什么程序工作服款式
  • 网站单页品牌网站建设 蝌蚪5小
  • 怎么做外贸网站需注意哪些做电脑系统的网站
  • 网站建设介绍推广用语河南网站优化外包服务
  • 课程网站模板贵州省城乡与建设厅网站
  • 网站模板及源码谁家网站用户体验做的好
  • 做网站的技术要求搜索栏在wordpress菜单上位置
  • 如何给网站弄ftpwordpress怎么添加关键词描述