有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

介绍

Elasticsearch Connector 提供了对 Elasticsearch 的写入和读取支持。目前 Oceanus 支持 Elasticsearch 6.x 和 7.x 版本。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持(写入、批数据源)
1.14
支持
1.16
支持

使用范围

Elasticsearch 支持写入,可以作为 Tuple 数据流的目的表(Sink),也可以作为 Upsert 数据流的目的表(Sink,自动以文档 _id 字段生成主键,并更新之前的文档版本)。
如果希望将 JDBC 数据库的变动记录,将其作为流式源表消费,可以使用 DebeziumCanal 等,对 JDBC 数据库的变更进行捕获和订阅,然后 Flink 即可对这些变更事件进行进一步的处理。可参见 Kafka
Oceanus 支持 Elasticsearch 的批模式读,目前只支持 Elasticsearch 7。

DDL 定义

用作 Elasticsearch 6 数据目的(Sink)

CREATE TABLE elasticsearch6_sink_table (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 对应 Elasticsearch 中的 _id
) WITH (
'connector' = 'elasticsearch-6', -- 输出到 Elasticsearch 6
'username' = '$username', -- 选填 用户名
'password' = '$password', -- 选填 密码
'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch 的连接地址
'index' = 'my-index', -- Elasticsearch 的 Index 名
'document-type' = '_doc', -- Elasticsearch 的 Document 类型
'format' = 'json' -- 输出数据格式,目前只支持 'json'
);

用作 Elasticsearch 7 数据目的(Sink)

CREATE TABLE elasticsearch7_sink_table (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 对应 Elasticsearch 中的 _id
) WITH (
'connector' = 'elasticsearch-7', -- 输出到 Elasticsearch 7
'username' = '$username', -- 选填 用户名
'password' = '$password', -- 选填 密码
'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch 的连接地址
'index' = 'my-index', -- Elasticsearch 的 Index 名
'format' = 'json' -- 输出数据格式,目前只支持 'json'
);

作为 Elasticsearch 7 批数据源(Source)

CREATE TABLE elasticsearch7_source_table (
`id` bigint,
`event_date` int,
`app` int,
primary key (`id`) not enforced
) with (
-- 必填参数
'connector' = 'es-source',
'endPoint' = '127.0.0.1', -- Elasticsearch 的连接 ip
'accessId' = 'elastic', -- 用户名
'accessKey' = 'PASSWORD', -- 密码
'indexName' = 'my-index', -- Elasticsearch 的 Index 名
'format' = 'json', -- 数据格式,只支持 'json'
-- 可选参数
'scheme' = 'http', -- 连接协议
'port' = '9200', -- 端口
'batchSize' = '2000', -- 每个 scroll 请求从 Elasticsearch 集群获取的最大文档数
'keepScrollAliveSecs' = '60' -- scroll 上下文保留的最长时间,单位为分钟
);

WITH 参数

作为数据目的

参数值
必填
默认值
描述
connector
当写入 Elasticsearch 6.x 版本时,取值elasticsearch-6。当写入 Elasticsearch 7.x 及以上版本时,取值elasticsearch-7
username
用户名。
password
密码。
hosts
Elasticsearch 的连接地址。
index
数据要写入的 Index。支持固定 Index(例如 'myIndex'),也支持动态 Index(例如'index-{log_ts|yyyy-MM-dd}')。
document-type
6.x 版本:必填
7.x 版本:不需要
Elasticsearch 文档的 Type 信息。当选择 elasticsearch-7 时,不能填写这个字段,否则会报错。
document-id.key-delimiter
_
为复合键生成 _id 时的分隔符 (默认是 "_")。例如有 a、b、c 三个主键,某条数据的 a 字段为 "1",b 字段为 "2",c 字段为 "3",使用默认分隔符,则最终写入 Elasticsearch 的 _id 是 "1_2_3"。
drop-delete
false
是否过滤上游传来的 DELETE(删除)消息。
此外,在多表 LEFT JOIN 且 JOIN Key 非主键的场景下,启用该选项后,可以解决 Elasticsearch 收到较多临时 null 值数据的问题。需要注意的是,JOIN 左右表的字段不能含有 null 值,否则可能会丢失部分数据。
failure-handler
fail
指定请求 Elasticsearch 失败时,错误处理策略。选项为:
fail:抛出一个异常。
ignore:忽略错误,直接继续。
retry-rejected:重试写入该条记录。
另外也支持自定义错误处理器,这里可以填写用户自己编写的 Handler 的类全名(需要上传自定义程序包)。
sink.flush-on-checkpoint
true
Flink 进行快照时,是否等待现有记录完全写入 Elasticsearch 。如果设置为 false,则可能造成恢复时部分数据丢失或者重复等异常情况,但快照速度会提升。
sink.bulk-flush.max-actions
1000
批量写入的最大条数。设置为 0 则禁用批量功能。
sink.bulk-flush.max-size
2mb
批量写入缓存的最大容量,必须以 mb 为单位。设置为 0 则禁用批量功能。
sink.bulk-flush.interval
1s
批量写入的刷新周期。设置为0则禁用批量功能。
sink.bulk-flush.backoff.strategy
DISABLED
批量写入时,失败重试的策略。
DISABLED:不重试。
CONSTANT:等待 sink.bulk-flush.backoff.delay 选项设置的毫秒后重试。
EXPONENTIAL:一开始等待 sink.bulk-flush.backoff.delay 选项设置的毫秒后重试,每次失败后将指数增加下次的等待时间。
sink.bulk-flush.backoff.max-retries
8
批量写入时,最多失败重试的次数。
sink.bulk-flush.backoff.delay
50ms
批量写入失败时,每次重试之间的等待间隔(对于 CONSTANT 策略而言)或间隔的初始基数(对于 EXPONENTIAL 策略而言)。
connection.max-retry-timeout
重试请求的最大超时时间,例如:"20 s"。
connection.path-prefix
指定每个 REST 请求的前缀,例如 '/v1'。通常不需要设置该选项。
format
json
指定输出的格式,默认是内置的 json 格式,可以使用 前文(Kafka)描述过的 JSON 格式选项,例如 json.fail-on-missing-fieldjson.ignore-parse-errorsjson.timestamp-format.standard 等。
retry-on-conflict
更新操作中,允许因版本冲突异常而重试的最大次数。超过该次数后将抛出异常导致作业失败。
说明:暂时只支持 Flink-1.13。
?

作为数据源

参数值
必填
默认值
描述
connector
固定值 es-source
endPoint
Elasticsearch 的连接 IP,示例 127.0.0.1
accessId
用户名
accessKey
密码
indexName
要读取的 Index
format
指定读取的格式,只支持内置的 json 格式,可以使用 前文(Kafka)描述过的 JSON 格式选项,例如 json.fail-on-missing-fieldjson.ignore-parse-errorsjson.timestamp-format.standard 等。
scheme
http
Elasticsearch 连接模式,例如 httphttps
?
port
9200
Elasticsearch 连接端口
batchSize
2000
每个 scroll 请求从 Elasticsearch 集群获取的最大文档数
keepScrollAliveSecs
60
scroll上下文保留的最长时间,单位为分钟

代码示例

作为数据目的

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);
?
CREATE TABLE elasticsearch7_sink_table (
`id` INT,
`name` STRING
) WITH (
'connector' = 'elasticsearch-7', -- 输出到 Elasticsearch 7
'username' = '$username', -- 选填 用户名
'password' = '$password', -- 选填 密码
'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch 的连接地址
'index' = 'my-index', -- Elasticsearch 的 Index 名
'sink.bulk-flush.max-actions' = '1000', -- 数据刷新频率
'sink.bulk-flush.interval' = '1s' -- 数据刷新周期
'format' = 'json' -- 输出数据格式,目前只支持 'json'
);
?
INSERT INTO elasticsearch7_sink_table select * from datagen_source_table;

作为批数据源

CREATE TABLE elasticsearch7_source_table (
`id` bigint,
`event_date` int,
`app` int,
primary key (`id`) not enforced
) with (
-- 必填参数
'connector' = 'es-source',
'endPoint' = '127.0.0.1', -- Elasticsearch 的连接 ip
'accessId' = 'elastic', -- 用户名
'accessKey' = 'PASSWORD', -- 密码
'indexName' = 'my-index', -- Elasticsearch 的 Index 名
'format' = 'json', -- 数据格式,只支持 'json'
-- 可选参数
'scheme' = 'http', -- 连接协议
'port' = '9200', -- 端口
'batchSize' = '2000', -- 每个 scroll 请求从 Elasticsearch 集群获取的最大文档数
'keepScrollAliveSecs' = '60' -- scroll 上下文保留的最长时间,单位为分钟
);
?
CREATE TABLE logger_sink (
`id` bigint,
`event_date` int,
`app` int,
primary key (`id`) not enforced
) with (
'connector' = 'logger'
);
?
INSERT INTO logger_sink SELECT * from elasticsearch7_source_table;

注意事项

如果您希望连接其他版本的 Elasticsearch,请通过附加自定义程序包的方式,上传相应的 Elasticsearch Sink 的 JAR 包。

监控指标说明

Oceanus 为 ES Connector 增加了很多实用的统计指标。单击 Flink UI 的运行图中的 ES Sink 算子,即可搜索并查看指标:
numberOfInsertRecords:获取输出的 +I 消息数。
numberOfDeleteRecords:获取输出的 -D 消息数。
numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
numberOfUpdateAfterRecords:获取输出的 +U 消息数。


http://www.vxiaotou.com