当前位置: 首页 > news >正文

郑州网站开发建设微博营销网站源码

郑州网站开发建设,微博营销网站源码,汉中网站建设,深圳网页设计培训中心Joins Inner Join 官网说明#xff1a;和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名#xff0c;并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。先创建2个表#xff0c;两个表的字段是相同的#xff0c;我想验证…Joins Inner Join 官网说明和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。先创建2个表两个表的字段是相同的我想验证下是不是必须两个表列名得不同orders1 table_env.from_elements([(1,Jack, FRANCE, 10, datetime.now()timedelta(hours1)),(2,Bob, USA, 20, datetime.now()timedelta(hours2))],DataTypes.ROW([DataTypes.FIELD(id, DataTypes.INT()), DataTypes.FIELD(name, DataTypes.STRING()), DataTypes.FIELD(country, DataTypes.STRING()),DataTypes.FIELD(revenue, DataTypes.INT()),DataTypes.FIELD(r_time, DataTypes.TIMESTAMP(3))]))orders2 table_env.from_elements([(1,Jack12, FRANCE12, 30, datetime.now()timedelta(hours1)),(2,Bob12, USA12, 30, datetime.now()timedelta(hours2))],DataTypes.ROW([DataTypes.FIELD(id, DataTypes.INT()), DataTypes.FIELD(name, DataTypes.STRING()), DataTypes.FIELD(country, DataTypes.STRING()),DataTypes.FIELD(revenue, DataTypes.INT()),DataTypes.FIELD(r_time, DataTypes.TIMESTAMP(3))]))试着运行left orders1.select(col(id), col(name), col(country)) right orders2.select(col(id), col(name), col(country))result left.join(right).where(col(id) col(id)).select(col(id), col(name), col(country)) result .execute().print()报错说是无法区分开country, name, id 这三个字段所以看样子真不行 org.apache.flink.table.api.ValidationException: join relations with ambiguous names: [country, name, id]所以在生成left/right集合的时候alias下字段名left orders1.select(col(id), col(name), col(country)) right orders2.select(col(id).alias(id1), col(name).alias(name1), col(country).alias(country1)) result left.join(right).where(col(id) col(id1)).select(col(id), col(name1), col(country)) result .execute().print() 这样就能将两个列相同的表进行Inner Join 但是这种方式不太靠谱等以后有别的方式在补充。--------------------------------------------------------------------------------- | op | id | name1 | country | --------------------------------------------------------------------------------- | I | 1 | Jack12 | FRANCE | | I | 2 | Bob12 | USA | ---------------------------------------------------------------------------------Outer Join和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名并且必须定义至少一个等式连接谓词。与innter join 差不多left orders1.select(col(id), col(name), col(country)) right orders2.select(col(id).alias(id1), col(name).alias(name1), col(country).alias(country1)) #左 left_outer_result left.left_outer_join(right, col(id) col(id1)).select(col(id), col(name1), col(country)) #右 right_outer_result left.right_outer_join(right, col(id) col(id1)).select(col(id), col(name1), col(country)) #全 full_outer_result left.full_outer_join(right, col(id) col(id1)).select(col(id), col(name1), col(country)) result.execute().print()Interval JoinInterval join 是可以通过流模式处理的常规 join 的子集。Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。这种条件可以由两个合适的范围谓词、、、或一个比较两个输入表相同时间属性即处理时间或事件时间的等值谓词来定义。from pyflink.table.expressions import colleft orders1.select(col(id), col(name), col(country),col(r_time)) right orders2.select(col(id).alias(id1), col(name).alias(name1), col(country).alias(country1),col(r_time).alias(r_time1))joined_table left.join(right).where((col(id) col(id1)) (col(r_time) col(r_time1) - lit(1).hours) (col(r_time) col(r_time1) lit(2).seconds)) result joined_table.select(col(id), col(name1), col(country), col(r_time1)) result.execute().print()---------------------------------------------------------------------------------------------------------- | op | id | name1 | country | r_time1 | ---------------------------------------------------------------------------------------------------------- | I | 1 | Jack12 | FRANCE | 2023-02-23 15:51:17.793 | | I | 2 | Bob12 | USA | 2023-02-23 16:51:17.793 | ----------------------------------------------------------------------------------------------------------Inner Join with Table Function (UDTF)join 表和表函数的结果。左外部表的每一行都会 join 表函数相应调用产生的所有行。 如果表函数调用返回空结果则删除左侧外部表的一行。通过调用UDTF函数来实现一些数据处理。from pyflink.table.udf import udtf from pyflink.common import Row split_res table_env.from_elements([(1,2,),(3,4,) ],[a]) # 注册 User-Defined Table Function # result_type 参数声明 split function 的结果类型 udtf(result_types[DataTypes.STRING(), DataTypes.STRING()]) def split(s):splits s.split(,)yield splits[0], splits[1]# join joined_table split_res.join_lateral(split(col(a)).alias(s, t)) result joined_table.select(col(a), col(s), col(t)) result.execute().print()---------------------------------------------------------------------------------------------------- | op | a | s | t | ---------------------------------------------------------------------------------------------------- | I | 1,2 | 1 | 2 | | I | 3,4 | 3 | 4 | 这样运行结果是出来了也没问题但是会报错了暂时没找到解决办法后期有机会查查可能大概是有界流数据运行超时的问题Py4JJavaError: An error occurred while calling o2665.print. : java.lang.RuntimeException: Failed to fetch next resultat org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:147)at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)at sun.reflect.GeneratedMethodAccessor112.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)at java.lang.Thread.run(Thread.java:748)Union 和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。 并集操作。#生成2张表table_env一定是有界的无界表不支持Union table1 table_env.from_elements([(hello|word, 1), (abc|def, 2)], [a, b]) table2 table_env.from_elements([(hello|word, 3), (abc|def, 4)], [a, b]) result table1.union(table2) result.execute().print()------------------------------------------------------ | a | b | ------------------------------------------------------ | abc|def | 4 | | abc|def | 2 | | hello|word | 3 | | hello|word | 1 | ------------------------------------------------------UnionAll 和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。UNION ALL 包含重复数据#生成2张表 支持无界 table1 table_env.from_elements([(hello|word, 1), (abc|def, 2)], [a, b]) table2 table_env.from_elements([(hello|word, 3), (abc|def, 4)], [a, b]) result table1.union_all(table2) result.execute().print()Intersect 和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。如果一条记录在一张或两张表中存在多次则只返回一条记录也就是说结果表中不存在重复的记录。两张表必须具有相同的字段类型。交集操作#生成2张表 只支持batch table1 table_env.from_elements([(hello|word, 1), (abc|def, 2)], [a, b]) table2 table_env.from_elements([(hello|word, 3), (abc|def, 4)], [a, b]) result table1.intersect(table2) result.execute().print()IntersectAll 和 SQL INTERSECT ALL 子句类似。IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次那么该记录返回的次数同该记录在两个表中都出现的次数一致也就是说结果表可能存在重复记录。两张表必须具有相同的字段类型。#生成2张表 只支持batch table1 table_env.from_elements([(hello|word, 1), (abc|def, 2)], [a, b]) table2 table_env.from_elements([(hello|word, 3), (abc|def, 4)], [a, b]) result table1.intersect_all(table2) result.execute().print()Minus 和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。左表中的重复记录只返回一次换句话说结果表中没有重复记录。两张表必须具有相同的字段类型。#生成2张表 只支持batch table1 table_env.from_elements([(hello|word, 1), (abc|def, 2)], [a, b]) table2 table_env.from_elements([(hello|word, 3), (abc|def, 4)], [a, b]) result table1.minus(table2) result.execute().print()MinusAll 和 SQL EXCEPT ALL 子句类似。MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录在结果表中出现 (n - m) 次例如也就是说结果中删掉了在右表中存在重复记录的条数的记录。两张表必须具有相同的字段类型。#生成2张表 只支持batch table1 table_env.from_elements([(hello|word, 1), (abc|def, 2)], [a, b]) table2 table_env.from_elements([(hello|word, 3), (abc|def, 4)], [a, b]) result table1.minus_all(table2) result.execute().print()In和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中那么 In 子句返回 true。子查询表必须由一列组成。这个列必须与表达式具有相同的数据类型。#生成2张表 都支持 left orders1.select(col(id), col(name), col(country)) right orders2.select(col(id)) result left.select(col(id), col(name), col(country)).where(col(id).in_(right)) result.execute().print()--------------------------------------------------------------------------------- | op | id | name | country | --------------------------------------------------------------------------------- | I | 1 | Jack | FRANCE | | I | 2 | Bob | USA | ---------------------------------------------------------------------------------OrderBy 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表该操作需要对时间属性进行排序或进行后续的 fetch 操作。如果是无界表只能直接对时间属性排序如果其他属性需要后续的fetch操作orders table_env.from_elements([(Jack, FRANCE, 10, datetime.now()timedelta(hours2)),(Rose, ENGLAND, 30, datetime.now()timedelta(hours12)),(Jack, FRANCE, 20, datetime.now()timedelta(hours22)),(Bob, CH, 40, datetime.now()timedelta(hours32)),(Bob, CH, 50, datetime.now()timedelta(hours32)),(YU, CH, 100, datetime.now()timedelta(hours5))],DataTypes.ROW([DataTypes.FIELD(name, DataTypes.STRING()), DataTypes.FIELD(country, DataTypes.STRING()),DataTypes.FIELD(revenue, DataTypes.INT()),DataTypes.FIELD(r_time, DataTypes.TIMESTAMP(3))])) #时间排序 resultorders.order_by(col(r_time).asc) result.execute().print()------------------------------------------------------------------------------------------------------ | name | country | revenue | r_time | ------------------------------------------------------------------------------------------------------ | Jack | FRANCE | 10 | 2023-02-23 19:42:48.538 | | YU | CH | 100 | 2023-02-23 22:42:48.538 | | Rose | ENGLAND | 30 | 2023-02-24 05:42:48.538 | | Jack | FRANCE | 20 | 2023-02-24 15:42:48.538 | | Bob | CH | 40 | 2023-02-25 01:42:48.538 | | Bob | CH | 50 | 2023-02-25 01:42:48.538 | ------------------------------------------------------------------------------------------------------Offset Fetch 和 SQL 的 OFFSET 和 FETCH 子句类似。Offset 操作根据偏移位置来限定可能是已排序的结果集。Fetch 操作将可能已排序的结果集限制为前 n 行。通常这两个操作前面都有一个排序操作。对于无界表offset 操作需要 fetch 操作。# 从已排序的结果集中返回前2条记录 result1 orders.order_by(col(r_time).asc).fetch(2)# 从已排序的结果集中返回跳过1条记录之后的所有记录 result2 orders.order_by(col(r_time).asc).offset(1)# 从已排序的结果集中返回跳过2条记录之后的前5条记录 result3 orders.order_by(col(r_time).asc).offset(2).fetch(5)Insert 和 SQL 查询中的 INSERT INTO 子句类似该方法执行对已注册的输出表的插入操作。 insertInto() 方法会将 INSERT INTO 转换为一个 TablePipeline。 该数据流可以用 TablePipeline.explain() 来解释用 TablePipeline.execute() 来执行。输出表必须已注册在 TableEnvironment详见表连接器中。此外已注册表的 schema 必须与查询中的 schema 相匹配。#myskintable 必须是已存在的结果表 #简单的例子仅供参考就是orders这个表经过一系列的操作后将结果写入另外一张已存在并且scheam对应的skin_table表中 revenue orders \.select(col(name), col(country), col(revenue)) \.where(col(country) FRANCE) \.group_by(col(name)) \.select(col(name), orders.revenue.sum.alias(rev_sum)).execute_insert(myskintable)
http://www.w-s-a.com/news/396502/

相关文章:

  • 设计师常备设计网站大全中山精品网站建设信息
  • 杭州建设工程网seo服务是什么
  • 兼职做问卷调查的网站wordpress mysql设置
  • 怎么在百度上能搜到自己的网站山西seo谷歌关键词优化工具
  • 网站搭建免费模板飞鱼crm下载
  • 网站开发竞品分析app制作公司深圳
  • 网站建设ssc源码修复设计班级网站建设
  • 网站重定向凡科做网站不要钱
  • 佛山html5网站建设微信营销软件破解版
  • 网站单页做301南京百度推广
  • 私人做网站要多少钱展芒设计网页
  • 怎样网站制作设计如何在网上推广农产品
  • 做关键词排名卖网站聚名网
  • 吉林省住房城乡建设厅网站首页体育器材网站建设方案
  • 网站建设及维护专业手机金融界网站
  • 常州网站建设工作室建立网站有怎么用途
  • 如何盗取网站推广策划书模板
  • 游戏网站建设计划书网络开发需要学什么
  • 手机网站维护费网站开发包括网站过程
  • 懂做游戏钓鱼网站的网站建设技术的发展
  • 网站被百度收录百度一下你就知道 官网
  • 雅客网站建设做网站用什么做
  • 做宣传海报网站专业网站设计速寻亿企邦
  • 秦皇岛市住房和城乡建设局网站有关网站开发的参考文献
  • 晋城城乡建设局网站深圳外贸业务员工资
  • 招聘网站开发的公司销售运营主要做什么
  • 徐州网站无障碍建设wordpress证书
  • c语言可以做网站吗请人做网站收费多少
  • 中英双语网站怎么做网站为什么做静态
  • 毕业设计做音乐网站可以吗网站运营方案