Skip to main content

Confluent Avro

说明

支持:

  • Format: Serialization Schema 序列化格式
  • Format: Deserialization Schema 反序列化格式

Avro Schema Registry(avro-confluent)格式允许你读取被io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录, 并写入可以被io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化读取的记录。

当读取(反序列化)这种格式的数据时,根据数据中的schema版本id从配置的Confluent schema Registry中获取Avro写入schema,同时从表schema推断读取schema。

当用这种格式写入(序列化)一条数据时,Avro schema将从表schema推断出用于检索的schema id:

  • flink-1.13.x:主要通过avro-confluent.schema-registry.subject配置的主题名进行查找。
  • flink-1.14.x:主要通过avro-confluent.subject配置的主题名进行查找。

Avro Schema Registry格式只能与Apache Kafka SQL连接器或Upsert Kafka SQL连接器结合使用。

依赖

为了使用Avro Schema Registry格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.13.0</version>
</dependency>

注意自己使用的 flink 版本。

使用Avro-Confluent格式

使用原始UTF-8字符串作为Kafka键以及在Schema Registry中注册的Avro记录作为Kafka值注册的表:

flink-1.13.x:

CREATE TABLE user_created (
-- -- 一个映射到kafka原生UTF-8字符串key的字段
the_kafka_key STRING,
-- 一些Avro属性字段作为kafka value
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8字符串作为kafka key,使用“the_kafka_key”表字段
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)

flink-1.14.x:

CREATE TABLE user_created (
-- -- 一个映射到kafka原生UTF-8字符串key的字段
the_kafka_key STRING,
-- 一些Avro属性字段作为kafka value
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8字符串作为kafka key,使用“the_kafka_key”表字段
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)

我们可以如下方式将数据写入kafka表:

INSERT INTO user_created
SELECT
-- 赋值user id字段值作为kafka key
id as the_kafka_key,
-- 所有字段值
id, name, email
FROM some_table

Kafka键和值都在Schema Registry中注册为Avro record:

flink-1.13.x:

CREATE TABLE user_created (
-- 一个映射到“id” avro属性字段作为kafka key
kafka_key_id STRING,
-- 一些映射到avro属性字段作为kafka value
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',
-- 注意:由于哈希分区的存在,Kafka key上下文中的schema演化几乎不可能向后或向前兼容。
'key.format' = 'avro-confluent',
'key.avro-confluent.schema-registry.url' = 'http://localhost:8082',
-- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',
-- 从flink 1.13版本开始,subject有默认值,尽管可以被覆盖
'key.avro-confluent.schema-registry.subject' = 'user_events_example2-key2',
'value.avro-confluent.schema-registry.subject' = 'user_events_example2-value2'
)

flink-1.14.x:

CREATE TABLE user_created (
-- 一个映射到“id” avro属性字段作为kafka key
kafka_key_id STRING,
-- 一些映射到avro属性字段作为kafka value
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',
-- 注意:由于哈希分区的存在,Kafka key上下文中的schema演化几乎不可能向后或向前兼容。
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://localhost:8082',
-- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',
-- 从flink 1.13版本开始,subject有默认值,尽管可以被覆盖
'key.avro-confluent.subject' = 'user_events_example2-key2',
'value.avro-confluent.subject' = 'user_events_example2-value2'
)

使用upsert-kafka连接器的表示例,其中Kafka value在Schema Registry中注册为Avro记录:

flink-1.13.x:

CREATE TABLE user_created (
-- 一个映射到kafka原生UTF-8字符串key的字段
kafka_key_id STRING,
-- 一些映射到avro属性的字段作为kafka value
id STRING,
name STRING,
email STRING,
-- upsert-kafka连接器要求有一个主键来定义upsert行为
PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8字符串作为kafka key
-- 在这个案例中不指定'key.fields',因为它由表的主键指定
'key.format' = 'raw',
-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)

flink-1.14.x:

CREATE TABLE user_created (
-- 一个映射到kafka原生UTF-8字符串key的字段
kafka_key_id STRING,
-- 一些映射到avro属性的字段作为kafka value
id STRING,
name STRING,
email STRING,
-- upsert-kafka连接器要求有一个主键来定义upsert行为
PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8字符串作为kafka key
-- 在这个案例中不指定'key.fields',因为它由表的主键指定
'key.format' = 'raw',
-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- 在这个例子中,我们希望Kafka key和value的Avro类型都包含字段'id' => 在与Kafka key字段相关联的表字段名前添加一个前缀,以避免冲突
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)

Format参数

选项要求是否可以被转发
从 flink-1.15.x 开始支持
默认值类型描述
format必选(none)String指定使用哪种模式,这儿应该是 avro-confluent
avro-confluent.basic-auth.credentials-source可选(none)Stringschema注册的基础认证证书资
avro-confluent.basic-auth.user-info可选(none)Stringschema注册的基础认证用户信息
avro-confluent.bearer-auth.credentials-source可选(none)Stringschema注册的持有者认证证书源
avro-confluent.bearer-auth.token可选(none)Stringschema注册的持有者认证令牌 token 源
从 flink-1.14.x 开始支持
avro-confluent.properties
可选(node)Map转发到下面 schema 注册的属性 map 表,这对于没有通过Flink配置选项正式公开的选项很有用,但是 Flink 选项拥有更高的优先级。
avro-confluent.ssl.keystore.location可选(none)StringSSL秘钥库文件存储位置
avro-confluent.ssl.keystore.password可选(none)StringSSL秘钥库密码
avro-confluent.ssl.truststore.location可选(none)StringSSL truststore的文件存储位置
avro-confluent.ssl.truststore.password可选(none)StringSSL truststore的密码
flink-1.13.x:avro-confluent.schema-registry.subject
flink-1.14.x:avro-confluent.subject
可选(none)StringConfluent模式注册中心主题,在该主题下注册此格式在序列化期间使用的schema。默认情况下,kafkaupsert-kafka 连接器使用 <topic_name>-value<topic_name>-key 作为默认主题名。但对于其他连接器(例如: filesystem ),当用作接收器时,subject选项是必需的。
flink-1.13.x:avro-confluent.schema-registry.url
flink-1.14.x:avro-confluent.url
必选(none)String用于获取/注册Confluent Schema Registry schema的URL

数据类型匹配

目前,Apache Flink总是使用表schema在反序列化期间派生Avro读取schema,在序列化期间派生Avro写入schema。

目前还不支持直接显式定义Avro模式。 Avro和Flink数据类型之间的映射请参见Apache Avro Format

除了上面列出的类型外,Flink还支持读写可空类型。Flink将可为空的类型映射到Avro联合(某值,null),其中某值是从Flink类型转换而来的Avro类型。

有关Avro类型的更多信息,可以参考Avro规范