Skip to main content

Jdbc Connector

Flink 官方 提供了JDBC的连接器,用于从 JDBC 中读取或者向其中写入数据,可提供 AT_LEAST_ONCE (至少一次)的处理语义

StreamX中基于两阶段提交实现了 EXACTLY_ONCE (精确一次)语义的JdbcSink,并且采用HikariCP为连接池,让数据的读取和写入更简单更准确

Jdbc 信息配置

StreamXJdbc Connector的实现用到了HikariCP连接池,相关的配置在jdbc的namespace下,约定的配置如下:

jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
username: root
password: 123456
driverClassName: com.mysql.jdbc.Driver
connectionTimeout: 30000
idleTimeout: 30000
maxLifetime: 30000
maximumPoolSize: 6
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true

semantic 语义配置

semantic这个参数是在JdbcSink写时候的语义,仅对 JdbcSink 有效,JdbcSource会自动屏蔽该参数,有三个可选项

  • EXACTLY_ONCE
  • AT_LEAST_ONCE
  • NONE

EXACTLY_ONCE

如果JdbcSink配置了 EXACTLY_ONCE语义,则底层采用了两阶段提交的实现方式来完成写入,此时要flink配合开启Checkpointing才会生效,如何开启checkpoint请参考第二章关于checkpoint配置部分

AT_LEAST_ONCE && NONE

默认不指定会采用NONE语义,这两种配置效果一样,都是保证 至少一次 语义

提示

开启EXACTLY_ONCE精确一次的好处是显而易见的,保证了数据的准确性,但成本也是高昂的,需要checkpoint的支持,底层模拟了事务的提交读,对实时性有一定的损耗,如果你的业务对数据的准确性要求不是那么高,则建议采用AT_LEAST_ONCE语义

其他配置

除了特殊的semantic 配置项之外,其他的所有的配置都必须遵守 HikariCP 连接池的配置,具体可配置项和各个参数的作用请参考光 HikariCP官网文档.

Jdbc 读取数据

StreamXJdbcSource用来读取数据,并且根据数据的offset做到数据读时可回放,我们看看具体如何用JdbcSource读取数据,假如需求如下

  • t_order表中读取数据,以timestamp字段为参照,起始值为2020-12-16 12:00:00往后抽取数据
  • 将读取到的数据构造成Order对象返回

jdbc配置和读取代码如下

jdbc:
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456

java api为例,这里要传入两个参数

  • SQLQueryFunction<T> queryFunc
  • SQLResultFunction<T> resultFunc

queryFunc获取一条sql

queryFunc是要传入一个SQLQueryFunction类型的function,该function用于获取查询sql的,会将最后一条记录返回给开发者,然后需要开发者根据最后一条记录返回一条新的查询sql,queryFunc定义如下:

/**
* @author benjobs
*/
@FunctionalInterface
public interface SQLQueryFunction<T> extends Serializable {
/**
* 获取要查询的SQL
*
* @return
* @throws Exception
*/
String query(T last) throws Exception;
}

所以上面的代码中,第一次上来lastOne(最后一条记录)为null,会判断一下,为null则取需求里默认的offset,查询的sql里根据timestamp字段正序排,这样在第一次查询之后,会返回最后的那条记录,下次直接可以使用这条记录作为下一次查询的根据

注意事项

JdbcSource实现了CheckpointedFunction,即当程序开启 checkpoint 后,会将这些诸如laseOffset的状态数据保存到state backend,这样程序挂了,再次启动会自动从checkpoint中恢复offset,会接着上次的位置继续读取数据, 一般在生产环境,更灵活的方式是将lastOffset写入如redis等存储中,每次查询完之后再将最后的记录更新到redis,这样即便程序意外挂了,再次启动,也可以从redis中获取到最后的offset进行数据的抽取,也可以很方便的人为的任意调整这个offset进行数据的回放

resultFunc 处理查询到的数据

resultFunc的参数类型是SQLResultFunction<T>,是将一个查询到的结果集放到Iterable<Map<String, ?>>中返回给开发者,可以看到返回了一个迭代器Iterable,迭代器每次迭代返回一个Map,该Map里记录了一行完整的记录,Mapkey为查询字段,value为值,SQLResultFunction<T>定义如下

/**
* @author benjobs
*/
@FunctionalInterface
public interface SQLResultFunction<T> extends Serializable {
/**
* 将查下结果以Iterable<Map>的方式返回,开发者去实现转成对象.
*
* @param map
* @return
*/
Iterable<T> result(Iterable<Map<String, ?>> iterable);
}

Jdbc 读取写入

StreamXJdbcSink是用来写入数据,我们看看具体如何用JdbcSink写入数据,假如需求是需要从kakfa中读取数据,写入到mysql

kafka.source:
bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
pattern: user
group.id: user_02
auto.offset.reset: earliest # (earliest | latest)
...

jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
注意事项

配置里jdbc下的 semantic 是写入的语义,在上面Jdbc信息配置有介绍,该配置只会在JdbcSink下生效,StreamX中基于两阶段提交实现了 EXACTLY_ONCE 语义, 这本身需要被操作的数据库(mysql,oracle,MariaDB,MS SQL Server)等支持事务,理论上所有支持标准Jdbc事务的数据库都可以做到EXACTLY_ONCE(精确一次)的写入

根据数据流生成目标SQL

在写入的时候,需要知道具体写入的sql语句,该sql语句需要开发者通过function的方式提供,在scala api中,直接在sink方法后跟上function即可,java api 则是通过sql()方法传入一个SQLFromFunction类型的function

下面以java api为例说明,我们来看看javaapi 中提供sql的function方法的定义

/**
* @author benjobs
*/
@FunctionalInterface
public interface SQLFromFunction<T> extends Serializable {
/**
* @param bean
* @return
*/
String from(T bean);
}

SQLFromFunction上的泛型<T>即为DataStream里实际的数据类型,该function里有一个方法form(T bean),这个bean即为当前DataStream中的一条具体数据,会将该数据返给开发者,开发者来决定基于这条数据,生成一条具体可以往数据库中插入的sql

设置写入批次大小

在 非 EXACTLY_ONCE(精确一次的语义下)可以适当的设置batch.size来提高Jdbc写入的性能(前提是业务允许的情况下),具体配置如下

jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
batch.size: 1000

这样一来就不是来一条数据就立即写入,而是积攒一个匹配然后执行批量插入

注意事项

这个设置仅在非EXACTLY_ONCE语义下生效,带来的好处是可以提高Jdbc写入的性能,一次大批量的插入数据,缺点是数据写入势必会有延迟,请根据实际使用情况谨慎使用

多实例Jdbc支持

手动指定Jdbc连接信息