做电子商务网站公司,拼多多一件代发免费货源,建设学风建设专题网站,百度竞价排名机制Flink系列之#xff1a;Table API Connectors之JSON Format 一、JSON Format二、依赖三、创建一张基于 JSON Format 的表四、Format 参数五、数据类型映射关系 一、JSON Format
JSON Format 能读写 JSON 格式的数据。当前#xff0c;JSON schema 是从 table schema 中自动推… Flink系列之Table API Connectors之JSON Format 一、JSON Format二、依赖三、创建一张基于 JSON Format 的表四、Format 参数五、数据类型映射关系 一、JSON Format
JSON Format 能读写 JSON 格式的数据。当前JSON schema 是从 table schema 中自动推导而得的。
二、依赖
为了使用 Json 格式使用构建自动化工具例如 Maven 或 SBT的项目和带有 SQL JAR 包的 SQL 客户端都需要以下依赖项。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion1.18.0/version
/dependency三、创建一张基于 JSON Format 的表
以下是一个利用 Kafka 以及 JSON Format 构建表的例子。
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3)
) WITH (connector kafka,topic user_behavior,properties.bootstrap.servers localhost:9092,properties.group.id testGroup,format json,json.fail-on-missing-field false,json.ignore-parse-errors true
)四、Format 参数
参数是否必须默认值类型描述format必选(none)String声明使用的格式这里应为’json’。json.fail-on-missing-field可选falseBoolean当解析字段缺失时是跳过当前字段或行还是抛出错误失败默认为 false即抛出错误失败。json.ignore-parse-errors可选falseBoolean当解析异常时是跳过当前字段或行还是抛出错误失败默认为 false即抛出错误失败。如果忽略字段的解析异常则会将该字段值设置为null。json.timestamp-format.standard可选‘SQL’String声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为’SQL’ 以及 ‘ISO-8601’可选参数 ‘SQL’ 将会以 “yyyy-MM-dd HH:mm:ss.s{precision}” 的格式解析 TIMESTAMP, 例如 “2020-12-30 12:13:14.123” 以 “yyyy-MM-dd HH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30 12:13:14.123Z” 且会以相同的格式输出。可选参数 ‘ISO-8601’ 将会以 “yyyy-MM-ddTHH:mm:ss.s{precision}” 的格式解析输入 TIMESTAMP, 例如 “2020-12-30T12:13:14.123” 以 “yyyy-MM-ddTHH:mm:ss.s{precision}‘Z’” 的格式解析 TIMESTAMP_LTZ, 例如 “2020-12-30T12:13:14.123Z” 且会以相同的格式输出。json.map-null-key.mode选填‘FAIL’String指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’, ‘DROP’ 和 ‘LITERAL’:Option ‘FAIL’ 将抛出异常如果遇到 Map 中 key 值为空的数据。Option ‘DROP’ 将丢弃 Map 中 key 值为空的数据项。Option ‘LITERAL’ 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ‘json.map-null-key.literal’ 定义。json.map-null-key.literal选填‘null’String当 ‘json.map-null-key.mode’ 是 LITERAL 的时候指定字符串常量替换 Map 中的空 key 值。json.encode.decimal-as-plain-number选填falseBoolean将所有 DECIMAL 类型的数据保持原状不使用科学计数法表示。例0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时则会表示为 0.000000027。decode.json-parser.enabled选填trueBooleanJsonParser 是 Jackson 提供的流式读取 JSON 数据的 API。与 JsonNode 方式相比这种方式读取速度更快内存消耗更少。同时JsonParser 在读取数据时还支持嵌套字段的投影下推。该参数默认启用。如果遇到任何不兼容性问题可以禁用并回退到 JsonNode 方式。
五、数据类型映射关系
当前JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL类型JSON类型CHAR/VARCHAR/STRINGstringBOOLEANbooleanBINARY/VARBINARYstring with encoding: base64DECIMALnumberTINYINTnumberSMALLINTnumberINTnumberBIGINTnumberFLOATnumberDOUBLEnumberDATEstring with format: dateTIMEstring with format: timeTIMESTAMPstring with format: date-timeTIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)INTERVALnumberARRAYarrayMAP / MULTISETobjectROWobject