对于流查询,Regular Join 的语法是最灵活的,它允许任何类型的更新(插入、更新、删除)输入表。
网站建设哪家好,找成都创新互联公司!专注于网页设计、网站建设、微信开发、成都小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了武胜免费建站欢迎大家使用!
Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):
Flink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
INNER JOIN readRecord ON matchResult.guid = readRecord.guid;
输出结果解析:
-- L 流数据达到,由于没有 Join 到 R 流数据而且是 inner join 便不输出结果
+I[111, book1] -- R 流数据达到, Join 到 L 流数据,便输出 +I[111, book1]
-- R 流数据达到,由于没有 Join 到 L 流数据而且是 inner join 便不输出结果
+I[222, book2] -- L 流数据达到, Join 到 R 流数据便输出结果
Flink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
LEFT JOIN readRecord ON matchResult.guid = readRecord.guid;
输出结果解析:
+I[111, null] -- L 流数据达到,没有 Join 到 R 流数据,便输出 +[L, null]
-D[111, null] -- R 流的数据到达,发现 L 流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null]
+I[111, book1] -- 再输出 +[L, R]
-- 这里模拟一条 R 流 guid = 222 的数据到达,由于是 left join 且没有 join 到 L 流,因此不做输出
+I[222, book2] -- 当 L 流 guid = 222 的数据达到 join R 流 后输出结果 +[L, R]
Flink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
FULL JOIN readRecord ON matchResult.guid = readRecord.guid;
输出结果解析:
+I[111, null] -- L 流数据达到,没有 Join 到 R 流数据,便输出 +I[L, null]
+I[null, book2] -- R 流数据达到,没有 Join 到 R 流数据,便输出 +I[null, R]
-D[null, book2] -- L 流新数据到达,发现之前 R 流之前输出过没有 Join 到的数据,则发起回撤流,先输出 -D[null, R]
+I[222, book2] -- 再输出 +I[L, R]
-D[111, null] -- 反之同理
+I[111, book1]
在 Regular Join 时 Flink 会将两条没有时间窗口限制的流的所有数据存储在 State 中,由于流是无穷无尽持续流入的,随着时间的不断推进,内存中积累的状态会越来越多。
针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。这样,就可以确保每个状态都能得到及时的清理,可以通过 table.exec.state.ttl 参数进行控制(注意:这同时也会对结果的准确性有所影响,因此需要合理的权衡)。
分享文章:关于FlinkRegularJoin与TTL的理解
网页地址:http://www.shufengxianlan.com/qtweb/news18/97018.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联