Skip to main content

Ogg

说明

从 flink-1.15.x 开始支持。

支持:

  • Changelog-Data-Capture Format CDC
  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Oracle GoldenGate(简称 ogg) 是一个提供实时数据转化平台的管理服务,使用复制的方式保证数据高可用以及实时分析。 消费者可以设计、执行功能、并且监控他们的数据副本和流式数据处理方案,而无需收集或管理计算环境。 Ogg 对 changelog 数据提供了一个 format schema ,并且提供了 JSON 格式的序列化数据。

Flink 支持在 Flink SQL 系统中解析 Ogg JSON 数据为 INSERT/UPDATE/DELETE 数据,在很多情况下,这个特性是非常有用的,比如:

  • 从数据库同步增量数据到其他系统
  • 日志审计
  • 在数据库中实时物化视图
  • 时态连接数据库表的变更历史等等

Flink 也支持在 Flink SQL 中编码 INSERT/UPDATE/DELETE 消息为 Ogg JSON 消息,并且发射到其他系统,比如 kafka 。 然而,Flink 目前还不能合并 UPDATE_BEFOREUPDATE_AFTER 为单个 UPDATE 消息。因此,Flink 会将 UPDATE_BEFOREUPDATE_AFTER 编码为 DELETEINSERT Ogg 消息。

依赖

Ogg Json

为了使用Ogg 格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.15.0</version>
</dependency>

注意自己的 flink 版本。

注:请参考 Ogg Kafka 处理 文档 来了解怎么设置 Ogg Kafka 处理器来同步 changelog 数据到 kafka 主题。

使用 Ogg 格式

Ogg 对 changelog 提供了统一的 format,下面是一个简单的案例,展示了从 Oracle PRODUCTS 表中捕捉更新操作数据为 JSON 格式:

{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}

注:请参考 Debezium 文档来了解每个属性的含义。

Oracle PRODUCTS 表有4个字段 (id, name, description, weight),上面的 JSON 数据是 PRODUCTS 表的一个更新变更事件, id 为 111 的 weight 值从 5.18 变成了 5.15。假设这个数据同步到了 kafka 的 products_ogg 主题,然后我们就可以使用下面的 DDL 语句消费这个主题,并解析这个变更事件。

CREATE TABLE topic_products (
-- schema和 oracle 的 "products" 表是完全相同的
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_ogg',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'ogg-json'
)

将主题注册为 flink 表之后,就可以将 Ogg 消息作为 changelog source 来消费了。

-- Oracle "PRODUCTS" 表的实时物化视图,该视图计算了镶贴工产品最新的平均重量
SELECT name, AVG(weight)
FROM topic_products
GROUP BY name;

-- 同步 Oracle "PRODUCTS" 表的所有数据和增量变更数据到 Elasticsearch 的 "products" 索引中,以便将来进行搜索
INSERT INTO elasticsearch_products
SELECT *
FROM topic_products;

可用元数据

下面的元数据可以暴露到表定义的只读虚拟(VIRTUAL)字段中。

注意:只有在对应的连接器可以传递 format 元数据时,format 元数据属性才可用。目前,只有 kafka 连接器可以暴露元数据属性到他的 value format。

Key数据类型描述
tableSTRING NULL表的全限定名称。表的权限名称格式为:CATALOG NAME.SCHEMA NAME.TABLE NAME
primary-keysARRAY<STRING> NULL源表主键字段名称数组,如果 includePrimaryKeys 配置设置为 true ,则主键属性值只包含在 JSON 格式的输出数据中。
ingestion-timestampTIMESTAMP_LTZ(6) NULL连接器处理事件的时间戳,对应 Ogg 数据中的 current_ts 属性。
event-timestampTIMESTAMP_LTZ(6) NULLsource 系统创建事件的时间戳,对应于 Ogg 数据中的 op_ts 属性。

下面的案例展示如何访问 kafka 中 Ogg 元数据属性:

CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
primary_keys ARRAY<STRING> METADATA FROM 'value.primary-keys' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'ogg-json'
);

Format 参数

参数要求默认值类型描述
format必选(none)String指定使用的 format ,这儿应该为:ogg-json
ogg-json.ignore-parse-errors可选falseBoolean跳过转化失败的属性和行而不是失败,如果遇到错误,属性值将被设置为 null
ogg-json.timestamp-format.standard可选'SQL'String指定输入和输出时间戳的格式,目前支持的值为:SQLISO-8601
SQL:转化时间戳为 yyyy-MM-dd HH:mm:ss.s{precision} 格式,比如: '2020-12-30 12:13:14.123'。
ISO-8601:转化时间戳为 yyyy-MM-ddTHH:mm:ss.s{precision} 格式,比如:'2020-12-30T12:13:14.123'。
ogg-json.map-null-key.mode可选'FAIL'String指定 map 类型数据遇到 key 值为 null 时的处理方式。目前支持的值为:FAILDROPLITERAL
FAIL:遇到 map 数据中的 key 为 null 时抛出异常。
DROP:删除 map 数据中的 key 为 null 的 entry。
LITERAL:替换 key 为 null 值的字符串字面量。字符串字面量值通过 ogg-json.map-null-key.literal 选项定义。
ogg-json.map-null-key.literal可选'null'String指定当 ogg-json.map-null-key.mode 选项值为 LITERAL 时,要替换为的字符串字面量值。

数据类型映射

目前,Ogg 格式使用 JSON 格式来序列化和反序列化。 请参考 JSON 格式文档来获取更多有关数据类型映射的细节。