查询配置
任务执行配置
以下选项可用于调优查询执行的性能。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
table.exec.async-lookup.buffer-capacity Batch Streaming | 100 | Integer | 异步查找join触发的最大异步i/o操作的数量。 |
table.exec.async-lookup.timeout Batch Streaming | 3 min | Duration | 异步操作完成的异步超时时间。 |
1.15.x开始支持 table.exec.deduplicate.insert-update-after-sensitive-enabled Streaming | true | Boolean | 设置任务(尤其是 sink )是否对 INSERT 消息和 UPDATE_AFTER 消息敏感。如果为 false,有时(比如删除最后一行)flink 会将第一行消息设置为 UPDATE_AFTER 而不是 INSERT 。 |
1.15.x开始支持 table.exec.deduplicate.mini-batch.compact-changes-enabled Streaming | false | Boolean | 设置在开启 mini-batch 时,是否压缩发送到下游的数据变更。 如果设置为 true,flink 将会压缩更改,并且只发送最新的变更到下游。注意,如果下游需要每个版本的数据,该优化将会失效。 如果设置为 false,在没有开启 mini-batch 时,flink 将会发送所有的变更到下游。 |
table.exec.disabled-operators Batch | (none) | String | 主要为了测试。用逗号分隔的算子名称列表,买个名称代表一类禁止操作的算子。可以被禁止的算子包括:"NestedLoopJoin", "ShuffleHashJoin", "BroadcastHashJoin", "SortMergeJoin", "HashAgg", "SortAgg"。默认不禁止任何算子。 |
1.15.x开始支持 table.exec.legacy-cast-behaviour Batch Streaming | DISABLED | 枚举 可用值: ENABLED DISABLED | 确定 CAST 操作是按照之前的方式执行,还是按照新的修复了很多问题,并且改进过的新方式进行。 可选值有: ENABLED:CAST 将会按照之前的方式执行。 DISABLED:CAST 将会按照新的正确的方式执行。 |
table.exec.mini-batch.allow-latency Streaming | 0 ms | Duration | MiniBatch 缓存输入数据的最大延迟时间。MiniBatch 可以优化数据缓存,以减少state状态访问。MiniBatch 在允许的时间间隔内收到最大的缓存数据量时触发。注意:如果table.exec.mini-batch.enabled设置为true,该值必须大于0。 |
table.exec.mini-batch.enabled Streaming | false | Boolean | 是否开启MiniBatch 优化。MiniBatch 可以优化数据缓存,以减少state状态访问。默认false禁用。可以设置为true来开启。注意:如果开启mini-batch,则必须设置'table.exec.mini-batch.allow-latency'和'table.exec.mini-batch.size'。 |
table.exec.mini-batch.size Streaming | -1 | Long | MiniBatch 可以缓存的最大输入数据数量。MiniBatch 可以优化数据缓存,以减少state状态访问。MiniBatch 在允许的时间间隔内收到最大的缓存数据量时触发。注意:如果table.exec.mini-batch.enabled设置为true,该值必须为正值。 |
1.15.x开始支持 table.exec.rank.topn-cache-size Streaming | 10000 | Long | Rank 操作会缓存部分状态数据以减少对状态数据的访问。设置的缓存大小为每个 rank 任务中数据的数量。 |
table.exec.resource.default-parallelism Batch Streaming | -1 | Integer | 设置所有算子的默认并行度(比如aggregate,join,filter)。该配置的优先级高于StreamExecutionEnvironment (实际上,该配置会覆盖StreamExecutionEnvironment 设置的并行度)。-1表示不设置默认并行度,然后使用StreamExecutionEnvironment设置的并行度。 |
只支持1.13.x版本,在1.14.x中被删除 table.exec.shuffle-mode Batch | ALL_EDGES_BLOCKING | String | 设置执行的shuffle模式。 可用值有: ALL_EDGES_BLOCKING: 所有edges使用阻塞shuffle. FORWARD_EDGES_PIPELINED: Forward edges will use pipelined shuffle, others blocking. POINTWISE_EDGES_PIPELINED: Pointwise edges will use pipelined shuffle, others blocking. Pointwise edges include forward and rescale edges. ALL_EDGES_PIPELINED: All edges will use pipelined shuffle. batch: 和 ALL_EDGES_BLOCKING 一样,过期值。 pipelined:和 ALL_EDGES_PIPELINED 一样,过期值。 注意:阻塞shuffle表好似数据在被发送到消费者之前,将会全部产生,pipelined shuffle表示数据一旦产生,就会立即被发送给消费者。 |
1.15.x开始支持 table.exec.simplify-operator-name-enabled Batch Streaming | true | Boolean | 当设置为 true 时,优化器将使用 id 和 ExecNode 的返回类型来简化展示算子的名称,并且将详细信息放到描述中。默认值为 true。 |
1.15.x开始支持 table.exec.sink.keyed-shuffle Streaming | AUTO | 枚举 可用值: NONE AUTO FORCE | 为了最小化很多用户在向有主键的表中写入数据时遇到的分布式乱序问题,在 sink 的并行和上游算子的并行度不同,并且上游为只追加类型时, flink 会自动增加一个默认的 keyed shuffle。 该优化只会在上游能够确保多个记录主键有序时生效,否则,增加的 shuffle 并不能解决问题(这种情况下,更有效的做法是刚开始就在 source 端对数据执行去重操作,或者是使用定义了主键的 upsert source ,以此来影响数据的评估) 默认情况下, keyed shuffle 将会在 sink 的并行度和上游并行度不同时被添加,也可以设置不使用 shuffle(NONE),或者是强制使用 shuffle(FORCE)。 可用值如下:NONE、AUTO、FORCE。 |
table.exec.sink.not-null-enforcer Batch Streaming | ERROR | 枚举 可用值: ERROR DROP | 决定当 NOT NULL 字段遇到 null 值时,flink 怎么处理。 可用值: ERROR:NOT NULL 字段遇到 null 值时抛出运行时异常。 DROP:NOT NULL 字段遇到 null 值时直接丢弃数据。 |
1.15.x开始支持 table.exec.sink.type-length-enforcer Batch Streaming | IGNORE | 枚举 可用值: IGNORE TRIM_PAD | 是否截取 CHAR(length)/VARCHAR(length)/BINARY(length)/VARBINARY(length) 或填充 CHAR(length)/BINARY(length) 类型字段的值,以此来让他们的长度和 CHAR/VARCHAR/BINARY/VARBINARY 类型字段定义的长度一样。 可选值有: IGNORE:不截取或填充,忽略 CHAR/VARCHAR/BINARY/VARBINARY 长度的定义。 TRIM_PAD:截取并填充字符串和 binary 值,以匹配 CHAR/VARCHAR/BINARY/VARBINARY 类型定义的长度 |
table.exec.sink.upsert-materialize Streaming | AUTO | 枚举 可用值: NONE AUTO FORCE | 由于分布式系统中的 shuffle 会造成 ChalgeLog 数据的乱序,所以 sink 接收到的数据可能在全局的 upsert 中乱序,所以要在 upsert sink 之前添加一个 upsert 物化算子。该算子接收上游 changelog 数据,并且给下游生成一个 upsert 视图。 默认情况下,在唯一 key 遇到分布式乱序时,该物化算子会被添加,也可以选择不物化(NONE),或者是强制物化(FORCE)。 可选值有:NONE、AUTO、FORCE。 |
table.exec.sort.async-merge-enabled Batch | true | Boolean | 是否异步合并排序的溢出文件。 |
table.exec.sort.default-limit Batch | -1 | Integer | 在使用order by语句后,用户没有使用limit语句,则默认使用该设置limit值。-1表示忽略该限制。 |
table.exec.sort.max-num-file-handles Batch | 128 | Integer | 外部归并排序的最大扇入文件数。该配置限制每个算子操作的文件数量。如果该值设置过小,可能会导致中间合并。但是如果设置过大,则会导致被同时打开的文件数太多,占用内存,并导致随机读取。 |
table.exec.source.cdc-events-duplicate Streaming | false | Boolean | 指定任务中的CDC(更改数据获取)source产生重复更改事件时,框架是否需要进行去重,获取一致性结果。CDC source会产生所有的更改事件,包括:INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE。比如:kafka source使用Debezium 格式化。该配置默认值为false。 然而,有重复更改事件是一种常见的情况。因为CDC工具(比如Debezium),在遇到失败时,会使用至少一次语义,因此,在异常情况下,Debezium 会交付重复的更改事件到kafka,然后flink将获取到重复的时间。这可能会导致flink查询产生错误的结果,或者是不期望遇到的异常。 因此,如果CDC工具设置的至少一次语义,则要求更改此配置。开启该配置要求CDC cource定义PRIMARY KEY主键。主键将用于对更改事件去重,并且生成有状态的changelog流。 |
table.exec.source.idle-timeout Streaming | 0 ms | Duration | 当一个source在超时时间内没有接收到任何数据时,它将被标记为临时空闲。这允许下游任务在其空闲时不需要等待来自该source的水印而发送其水印。缺省值为0,表示不开启source空闲检测。 |
table.exec.spill-compression.block-size Batch | 64 kb | MemorySize | 溢出数据时用于压缩的内存大小。内存越大,压缩比越高,但是作业消耗的内存资源也更多。 |
table.exec.spill-compression.enabled Batch | true | Boolean | 是否压缩溢出数据。目前,我们只支持对sort、hash-agg和hash-join算子压缩溢出数据。 |
table.exec.window-agg.buffer-size-limit Batch | 100000 | Integer | 设置group window agg算子中使用的窗口元素缓冲区大小限制。 |
优化配置
以下配置可用于调整查询优化器,以获得更好的执行计划。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
table.optimizer.agg-phase-strategy Batch Streaming | AUTO | String | AUTO:不指定聚合策略。根据情况选择两阶段聚合或者是一阶段聚合。 TWO_PHASE:指定使用两阶段聚合,两阶段包括:localAggregate和globalAggregate。如果聚合不支持两阶段聚合优化,则会采用一阶段聚合。 ONE_PHASE:指定使用一阶段聚合,只包括:CompleteGlobalAggregate。 |
table.optimizer.distinct-agg.split.bucket-num Streaming | 1024 | Integer | 配置切分distinct聚合时的bucket桶的总数。该数字用于第一阶段聚合,其用来通过“hash_code(distinct_key)%BUCKET_NUM”计算出额外的分组key,以将数据打散到不同子任务。 |
table.optimizer.join-reorder-enabled Batch Streaming | false | Boolean | 在优化器中启用join重新排序。默认为禁用。 |
table.optimizer.multiple-input-enabled Batch | true | Boolean | 当设置为true时,优化器将会合并pipelined shuff输入为多个算子,以减少shuff,优化性能。默认值为true。 |
table.optimizer.reuse-source-enabled Batch Streaming | true | Boolean | 当设置为true时,优化器将尝试发现重复的表source,然后重用他们。要启用该设置,必须设置table.optimizer.reuse-sub-plan-enabled为true。 |
table.optimizer.reuse-sub-plan-enabled Batch Streaming | true | Boolean | 当设置为true时,优化器将尝试发现重复的子任务,然后重用他们。 |
1.15.x开始支持 table.optimizer.source.aggregate-pushdown-enabled Batch | true | Boolean | 当设置为 true 时,如果 TableSource 实现了 SupportsAggregatePushDown ,优化器就会将本地聚合进行下推。 |
table.optimizer.source.predicate-pushdown-enabled Batch Streaming | true | Boolean | 当设置为true时,优化器将谓词下推为 FilterableTableSource,默认为true。 |
表配置
以下选项可用于调整表计划器的行为。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
1.15.x开始支持 table.builtin-catalog-name Batch Streaming | default_catalog | String | 指定实例化 TableEnvironment 对象时初始化的 catalog 名称。 |
1.15.x开始支持 table.builtin-database-name Batch Streaming | default_database | String | 指定实例化 TableEnvironment 对象时在初始化 catalog 中的默认数据库名称。 |
table.dml-sync Batch Streaming | false | Boolean | 指定DML任务(比如插入操作)为异步/同步执行。默认为异步执行,因此可以同时提交多个DML任务。如果设置为true,则插入操作会等待任务完成才会结束。 |
table.dynamic-table-options.enabled Batch Streaming | 1.13.x:false 1.14.x:true | Boolean | 是否启用用于动态表的OPTIONS提示,如果禁用,则指定OPTIONS之后会抛出异常。 |
table.generated-code.max-length Batch Streaming | 1.13.x:64000 1.14.x:4000 | Integer | 1.13.x:指定一个阈值,将生成的代码拆分为子函数调用。Java的最大方法长度为64kb。如果有必要,则可以通过该参数设置更细的粒度。 1.14.x:指定一个阈值,将生成的代码拆分为子函数调用。Java的最大方法长度为64kb。如果有必要,则可以通过该参数设置更细的粒度。默认值为 4000 ,代替 64KB,JIT 会拒绝代码超过 8K 字节的方法执行。 |
table.local-time-zone Batch Streaming | default | String | 定义当前会话的本地时间时区id。该值用于转化或转化为TIMESTAMP WITH LOCAL TIME ZONE时间类型。在内部实现中,timestamps with local time zone通常表示UTC时区(0时区)。然而,当将该类型转化为不包含时区的数据类型(比如TIMESTAMP、TIME、简单的STRING)时,将会用到会话时区设置。该值可以使用完全的名称(比如:“America/Los_Angeles”),也可以使用自定义的时区ID(比如:“GMT+08:00”)。 |
只支持1.13.x版本,在1.14.x中被删除 table.planner Batch Streaming | BLINK | 枚举 可用值: BLINK、OLD | 使用“blink”或“old”计划器,默认为blink计划器。对于TableEnvironment来说,该设置用于创建TableEnvironment对象,而且对象创建之后无法修改该设置。注意:old计划器将会在flink 1.14版本中移除,因此该配置将会被废弃。 |
1.15.x开始支持 table.plan.compile.catalog-objects Batch Streaming | ALL | 枚举 可用值: ALL SCHEMA IDENTIFIER | 指定计划器编译时存储 catalog 对象,比如:表、函数、数据类型等的策略,该策略会影响在恢复算子期间需要提供的 catalog 元数据,并且影响计划占用空间的大小。 该配置选项不会影响匿名、内置或临时对象。如果可能的话,匿名或内置对象将会被完全持久化,包括元数据和选项配置,否则会编译失败。临时对象只会持久化他们的标识符,并且在恢复时提供给会话上下文。 可选值: ALL:所有的元数据,包括 catalog 表、函数或数据类型,都会在计划编译期间被持久化到计划中。对于 catalog 中的表来说,包括表的标识符、schema信息和选项配置。对于 catalog 中的函数,包括函数的标识符和 class 类。对于 catalog 中的数据类型,包括标识符和整个数据结构。使用该策略,在恢复算子期间,catalog 的元数据不必再可用。 SCHEMA:除了标识符之外,catalog 表、函数或数据类型的SCHEMA信息将在编译期间持久化到计划中。SCHEMA 允许在计划恢复算子期间检测目录中的不兼容更改。但是,所有其他元数据仍将从 catalog 中检索。 IDENTIFIER:只有 catalog 表、函数或数据类型会在编译期间被持久化到计划中。在算子恢复期间,所有的元数据都需要从 catalog 中检索。使用该策略,计划将会有更少的信息冗余。 |
1.15.x开始支持 table.plan.force-recompile Streaming | false | Boolean | 当该值为 false 时, 如果计划输出文件已经存在,则 COMPILE PLAN 语句会运行失败,除非使用了 IF NOT EXISTS 。当该值为 true 时,COMPILE PLAN 会覆盖已存在的计划输出文件。我们建议只在 debug 时启用该特性。 |
1.15.x开始支持 table.plan.restore.catalog-objects Batch Streaming | All | 枚举 可用值: ALL ALL_ENFORCED IDENTIFIER | 指定通过给定的计划恢复 catalog 对象,比如表、函数或者是数据类型,并在必要时执行 catalog 查找的策略。他会影响提供 catalog 的需要,并且充实部分计划信息。 可用值: ALL:读取所有持久化到计划中的元数据,包括 catalog 表、函数和数据类型。该策略会根据标识符从 catalog 中查找,以填充缺失的信息或丰富可变选项。当原始对象在 catalog 中不可用时,如果计划中存在所有必要的信息, pipeline 仍然可以恢复。 ALL_ENFORCED:要求所有已经持久化到计划中的元数据,包括 catalog 表、函数和数据类型。该策略既不会根据标识符在 catalog 中查找,也不会使用 catalog 信息充实可变选项。如果计划中没有包含所有必要的信息,恢复就会失败。 IDENTIFIER:只使用 catalog 表、函数或数据类型的标识符,并始终在 catalog 中查找。如果原始对象在目录中不再可用,恢复就会失败。计划中可能包含的其他元数据将被忽略。 |
table.sql-dialect Batch Streaming | default | String | 定义转化SQL查询的方言。不同的方言支持不同的SQL语法,目前支持default和hive方言。 |