网站实现,哪个公司的app开发公司,WordPress汉化卡片式主题,做qq空间的网站一、什么样的数据可以用于流式传输
Flink的DataStream API 允许流式传输他们可以序列化的任何内容。Flink自己的序列化程序用于
基本类型#xff1a;即字符串、长、整数、布尔值、数组复合类型#xff1a;元组、POJO和Scala样例类
基本类型我们已经很熟悉了#xff0c;下…一、什么样的数据可以用于流式传输
Flink的DataStream API 允许流式传输他们可以序列化的任何内容。Flink自己的序列化程序用于
基本类型即字符串、长、整数、布尔值、数组复合类型元组、POJO和Scala样例类
基本类型我们已经很熟悉了下面我们看下复合类型。
1、元组
对于javaFlink定义了Tuple0到Tuple25类型例如
Tuple2String, Integer person Tuple2.apply(Fred,35);String name person._1;
Integer age person._2;
2、POJO
如果满足以下条件Flink将数据类型识别为POJO类型并允许“按名称”字段引用
该类是公共且独立的没有非静态内部类该类有一个公共的无参数构造函数类以及所有超类中的所有非静态、非瞬态字段要么是公共的和非最终的要么具有遵循getter和setterJavabean命名约定的公共getter和setter方法。
示例
public class Person {public String name; public Integer age; public Person() {}public Person(String name, Integer age) { . . .}
} Person person new Person(Fred Flintstone, 35);
3、样例类
样例类Case classes和普通类差不多只有几点关键差别。样例类非常适合用于不可变的数据多用于模式匹配。
case class Book(isbn: String)val frankenstein Book(978-0486282114)
注意在实例化样例类Book时并没有使用关键字new这是因为样例类有一个默认的apply方法来负责对象的创建。
二、完整示例
该示例来自官方网站是将有关人员的记录流作为输入并对其进行过滤以仅包含成年人
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamPerson flintstones env.fromElements(new Person(Fred, 35),new Person(Wilma, 35),new Person(Pebbles, 2));DataStreamPerson adults flintstones.filter(new FilterFunctionPerson() {Overridepublic boolean filter(Person person) throws Exception {return person.age 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name name;this.age age;}public String toString() {return this.name.toString() : age this.age.toString();}}
}
1、执行环境
每个Flink应用程序都需要一个执行环境在该例中为env。流应用程序需要使用StreamExecutionEnvironment。
在应用程序中进行的DataStream API调用会构建一个附加到StreamExecutionEnvironment的作业图。当调用env.execute()时此图会打包并发送到JobManagerJobManager会并行化作业并将其切片分发给TaskManager执行。作业的每个并行切片都将在一个任务槽中执行。
如果不调用execute(), 应用程序则不会执行。 此分布式运行时取决于您的应用程序是否可序列化。它还要求集群中的每个节点都可以使用所有依赖项。
2、source
上面的示例使用env.fromElements(...)构造DataStreamPerson。这是一种将简单流组合在一起以用于原型或测试的便捷方法。StreamExecutionEnvironment上还有一个fromCollection(Collection)方法。因此也可以这样做
ListPerson people new ArrayListPerson();people.add(new Person(Fred, 35));
people.add(new Person(Wilma, 35));
people.add(new Person(Pebbles, 2));DataStreamPerson flintstones env.fromCollection(people);
在原型设计时将一些数据导入流的另一种方便方法是使用socket或文件
DataStreamString lines env.socketTextStream(localhost, 9999);
DataStreamString lines env.readTextFile(file:///path);
在实际应用中最常用的数据源是那些支持低延迟、高吞吐量并行读取以及倒带和重放的数据源——这是高性能和容错的先决条件——例如Apache Kafka、Kinesis和各种文件系统。REST API和数据库也经常使用。
3、sink
上面的示例使用adults.print()将其结果打印到任务管理器日志在IDE中运行时将显示在IDE的控制台中。这将在流的每个元素上调用toString()。
例如输出如下 1 Fred: age 35 2 Wilma: age 35 其中1和2表示哪个子任务即线程产生了输出。
在生产中常用的接收器包括FileSink、各种数据库和几个发布子系统。
--------------------------------------------------------------------------------------------------------------------------------- 大多数高校硕博生毕业要求需要参加学术会议发表EI或者SCI检索的学术论文会议论文 可访问艾思科蓝官网浏览即将召开的学术会议列表。会议如下 第八届大数据与应用统计国际学术研讨会ISBDAS 2025
https://ais.cn/u/fEzmy2
第二届生成式人工智能与信息安全国际学术会议GAIIS 2025
https://ais.cn/u/uAbENn
第四届电子技术与人工智能国际学术会议ETAI 2025
https://ais.cn/u/vqM7Nj
第四届网络安全、人工智能与数字经济国际学术会议CSAIDE 2025
https://ais.cn/u/ZrERn2