新聞中心
DML:窗口聚合
大家好我是老羊,由于窗口涉及到的知識內(nèi)容比較多,所以博主先為大家說明介紹下面內(nèi)容時的思路,大家跟著思路走。思路如下:

創(chuàng)新互聯(lián)公司專注于云州企業(yè)網(wǎng)站建設,成都響應式網(wǎng)站建設公司,商城建設。云州網(wǎng)站建設公司,為云州等地區(qū)提供建站服務。全流程按需規(guī)劃網(wǎng)站,專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務
- 先介紹 Flink SQL 支持的 4 種時間窗口。
- 分別詳細介紹上述的 4 種時間窗口的功能及 SQL 語法。
- 結合實際案例介紹 4 種時間窗口。
首先來看看 Flink SQL 中支持的 4 種窗口的運算。
- 滾動窗口(TUMBLE)。
- 滑動窗口(HOP)。
- Session 窗口(SESSION)。
- 漸進式窗口(CUMULATE)。
1、滾動窗口(TUMBLE)
滾動窗口定義:滾動窗口將每個元素指定給指定窗口大小的窗口。滾動窗口具有固定大小,且不重疊。例如,指定一個大小為 5 分鐘的滾動窗口。在這種情況下,F(xiàn)link 將每隔 5 分鐘開啟一個新的窗口,其中每一條數(shù)都會劃分到唯一一個 5 分鐘的窗口中,如下圖所示。
tumble window
應用場景:常見的按照一分鐘對數(shù)據(jù)進行聚合,計算一分鐘內(nèi) PV,UV 數(shù)據(jù)。
實際案例:簡單且常見的分維度分鐘級別同時在線用戶數(shù)、總銷售額。
那么上面這個案例的 SQL 要咋寫呢?
關于滾動窗口,在 1.13 版本之前和 1.13 及之后版本有兩種 Flink SQL 實現(xiàn)方式,分別是:
- Group Window Aggregation(1.13 之前只有此類方案,此方案在 1.13 及之后版本已經(jīng)標記為廢棄,不推薦小伙伴萌使用)。
- ? Windowing TVF(1.13 及之后建議使用 Windowing TVF)。
博主這里兩種方法都會介紹:
- Group Window Aggregation 方案(支持 Batch\Streaming 任務):
-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 數(shù)據(jù)處理邏輯
insert into sink_table
select
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數(shù)
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
tumble(row_time, interval '1' minute)
可以看到 Group Window Aggregation 滾動窗口的 SQL 語法就是把 tumble window 的聲明寫在了 group by 子句中,即 tumble(row_time, interval '1' minute),第一個參數(shù)為事件時間的時間戳;第二個參數(shù)為滾動窗口大小。
- Window TVF 方案(1.13 只支持 Streaming 任務):
-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 數(shù)據(jù)處理邏輯
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
count(distinct user_id) as uv
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滾動窗口的寫法就是把 tumble window 的聲明寫在了數(shù)據(jù)源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)),包含三部分參數(shù)。
第一個參數(shù) TABLE source_table 聲明數(shù)據(jù)源表;第二個參數(shù) DESCRIPTOR(row_time) 聲明數(shù)據(jù)源的時間戳;第三個參數(shù) INTERVAL '60' SECOND 聲明滾動窗口大小為 1 min。
SQL 語義:
由于離線沒有相同的時間窗口聚合概念,這里就直接說實時場景 SQL 語義,假設 Orders 為 kafka,target_table 也為 Kafka,這個 SQL 生成的實時任務,在執(zhí)行時,會生成三個算子:
數(shù)據(jù)源算子(From Order):連接到 Kafka topic,數(shù)據(jù)源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的 窗口聚合算子。
窗口聚合算子(TUMBLE 算子):接收到上游算子發(fā)的一條一條的數(shù)據(jù),然后將每一條數(shù)據(jù)按照時間戳劃分到對應的窗口中(根據(jù)事件時間、處理時間的不同語義進行劃分),上述案例為事件時間,事件時間中,滾動窗口算子接收到上游的 Watermark 大于窗口的結束時間時,則說明當前這一分鐘的滾動窗口已經(jīng)結束了,將窗口計算完的結果發(fā)往下游算子(一條一條發(fā)給下游 數(shù)據(jù)匯算子)。
數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫入到 target_table Kafka 中。
這個實時任務也是 24 小時一直在運行的,所有的算子在同一時刻都是處于 running 狀態(tài)的。
注意:
事件時間中滾動窗口的窗口計算觸發(fā)是由 Watermark 推動的。
2、滑動窗口(HOP)
滑動窗口定義:滑動窗口也是將元素指定給固定長度的窗口。與滾動窗口功能一樣,也有窗口大小的概念。不一樣的地方在于,滑動窗口有另一個參數(shù)控制窗口計算的頻率(滑動窗口滑動的步長)。因此,如果滑動的步長小于窗口大小,則滑動窗口之間每個窗口是可以重疊。在這種情況下,一條數(shù)據(jù)就會分配到多個窗口當中。舉例,有 10 分鐘大小的窗口,滑動步長為 5 分鐘。這樣,每 5 分鐘會劃分一次窗口,這個窗口包含的數(shù)據(jù)是過去 10 分鐘內(nèi)的數(shù)據(jù),如下圖所示。
hop window
應用場景:比如計算同時在線的數(shù)據(jù),要求結果的輸出頻率是 1 分鐘一次,每次計算的數(shù)據(jù)是過去 5 分鐘的數(shù)據(jù)(有的場景下用戶可能在線,但是可能會 2 分鐘不活躍,但是這也要算在同時在線數(shù)據(jù)中,所以取最近 5 分鐘的數(shù)據(jù)就能計算進去了)。
實際案例:簡單且常見的分維度分鐘級別同時在線用戶數(shù),1 分鐘輸出一次,計算最近 5 分鐘的數(shù)據(jù)。
依然是 Group Window Aggregation、Windowing TVF 兩種方案:
- Group Window Aggregation 方案(支持 Batch\Streaming 任務):
-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 數(shù)據(jù)處理邏輯
insert into sink_table
SELECT dim,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '1' minute, interval '5' minute) AS STRING)) * 1000 as window_start,
count(distinct user_id) as uv
FROM source_table
GROUP BY dim
, hop(row_time, interval '1' minute, interval '5' minute)
可以看到 Group Window Aggregation 滾動窗口的寫法就是把 hop window 的聲明寫在了 group by 子句中,即 hop(row_time, interval '1' minute, interval '5' minute)。其中:
第一個參數(shù)為事件時間的時間戳;第二個參數(shù)為滑動窗口的滑動步長;第三個參數(shù)為滑動窗口大小。
- Windowing TVF 方案(1.13 只支持 Streaming 任務):
-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 數(shù)據(jù)處理邏輯
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(distinct user_id) as bucket_uv
FROM TABLE(HOP(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滾動窗口的寫法就是把 hop window 的聲明寫在了數(shù)據(jù)源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES)),包含四部分參數(shù):
第一個參數(shù) TABLE source_table 聲明數(shù)據(jù)源表;第二個參數(shù) DESCRIPTOR(row_time) 聲明數(shù)據(jù)源的時間戳;第三個參數(shù) INTERVAL '1' MINUTES 聲明滾動窗口滑動步長大小為 1 min。第四個參數(shù) INTERVAL '5' MINUTES 聲明滾動窗口大小為 5 min。
SQL 語義:
滑動窗口語義和滾動窗口類似,這里不再贅述。
3、Session 窗口(SESSION)
- Session 窗口定義:Session 時間窗口和滾動、滑動窗口不一樣,其沒有固定的持續(xù)時間,如果在定義的間隔期(Session Gap)內(nèi)沒有新的數(shù)據(jù)出現(xiàn),則 Session 就會窗口關閉。如下圖對比所示:
session window
實際案例:計算每個用戶在活躍期間(一個 Session)總共購買的商品數(shù)量,如果用戶 5 分鐘沒有活動則視為 Session 斷開。
目前 1.13 版本中 Flink SQL 不支持 Session 窗口的 Window TVF,所以這里就只介紹 Group Window Aggregation 方案:
- Group Window Aggregation 方案(支持 Batch\Streaming 任務):
-- 數(shù)據(jù)源表,用戶購買行為記錄表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT, -- 購買商品數(shù)量
window_start bigint
) WITH (
'connector' = 'print'
);
-- 數(shù)據(jù)處理邏輯
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' minute) AS STRING)) * 1000 as window_start,
count(1) as pv
FROM source_table
GROUP BY dim
, session(row_time, interval '5' minute)
注意:
上述 SQL 任務是在整個 Session 窗口結束之后才會把數(shù)據(jù)輸出。Session 窗口即支持 處理時間 也支持 事件時間。但是處理時間只支持在Streaming 任務中運行,Batch 任務不支持。
可以看到 Group Window Aggregation 中 Session 窗口的寫法就是把 session window 的聲明寫在了 group by 子句中,即 session(row_time, interval '5' minute)。其中:
第一個參數(shù)為事件時間的時間戳;第二個參數(shù)為 Session gap 間隔。
SQL 語義:
Session 窗口語義和滾動窗口類似,這里不再贅述。
4、漸進式窗口(CUMULATE)
漸進式窗口定義(1.13 只支持 Streaming 任務):漸進式窗口在其實就是 固定窗口間隔內(nèi)提前觸發(fā)的的滾動窗口,其實就是 Tumble Window + early-fire 的一個事件時間的版本。例如,從每日零點到當前這一分鐘繪制累積 UV,其中 10:00 時的 UV 表示從 00:00 到 10:00 的 UV總數(shù)。漸進式窗口可以認為是首先開一個最大窗口大小的滾動窗口,然后根據(jù)用戶設置的觸發(fā)的時間間隔將這個滾動窗口拆分為多個窗口,這些窗口具有相同的窗口起點和不同的窗口終點。如下圖所示:
cumulate window
應用場景:周期內(nèi)累計 PV,UV 指標(如每天累計到當前這一分鐘的 PV,UV)。這類指標是一段周期內(nèi)的累計狀態(tài),對分析師來說更具統(tǒng)計分析價值,而且?guī)缀跛械膹秃现笜硕际腔诖祟愔笜说慕y(tǒng)計(不然離線為啥都要累計一天的數(shù)據(jù),而不要一分鐘累計的數(shù)據(jù)呢)。
實際案例:每天的截止當前分鐘的累計 money(sum(money)),去重 id 數(shù)(count(distinct id))。每天代表漸進式窗口大小為 1 天,分鐘代表漸進式窗口移動步長為分鐘級別。舉例如下:
明細輸入數(shù)據(jù):
預期經(jīng)過漸進式窗口計算的輸出數(shù)據(jù):
轉(zhuǎn)化為折線圖長這樣:
當日累計
可以看到,其特點就在于,每一分鐘的輸出結果都是當天零點累計到當前的結果。
漸進式窗口目前只有 Windowing TVF 方案支持:
- Windowing TVF 方案(1.13 只支持 Streaming 任務):
-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 用戶 id
user_id BIGINT,
-- 用戶
money BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
window_end bigint,
window_start bigint,
sum_money BIGINT,
count_distinct_id bigint
) WITH (
'connector' = 'print'
);
-- 數(shù)據(jù)處理邏輯
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
window_start,
sum(money) as sum_money,
count(distinct id) as count_distinct_id
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end
可以看到 Windowing TVF 滾動窗口的寫法就是把 cumulate window 的聲明寫在了數(shù)據(jù)源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)),其中包含四部分參數(shù):
第一個參數(shù) TABLE source_table 聲明數(shù)據(jù)源表;第二個參數(shù) DESCRIPTOR(row_time) 聲明數(shù)據(jù)源的時間戳;第三個參數(shù) INTERVAL '60' SECOND 聲明漸進式窗口觸發(fā)的漸進步長為 1 min。第四個參數(shù) INTERVAL '1' DAY 聲明整個漸進式窗口的大小為 1 天,到了第二天新開一個窗口重新累計。
SQL 語義:
漸進式窗口語義和滾動窗口類似,這里不再贅述。
5、Window TVF 支持 Grouping Sets、Rollup、Cube
具體應用場景:實際的案例場景中,經(jīng)常會有多個維度進行組合(cube)計算指標的場景。如果把每個維度組合的代碼寫一遍,然后
union all 起來,這樣寫起來非常麻煩,而且會導致一個數(shù)據(jù)源讀取多遍。
這時,有離線 Hive SQL 使用經(jīng)驗的小伙伴萌就會想到,如果有了 Grouping Sets,我們就可以直接用 Grouping Sets 將維度組合寫在一條 SQL 中,寫起來方便并且執(zhí)行效率也高。當然,F(xiàn)link 支持這個功能。
但是目前 Grouping Sets 只在 Window TVF 中支持,不支持 Group Window Aggregation。
來一個實際案例感受一下,計算每日零點累計到當前這一分鐘的分匯總、age、sex、age+sex 維度的用戶數(shù)。
-- 用戶訪問明細表
CREATE TABLE source_table (
age STRING,
sex STRING,
user_id BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.age.length' = '1',
'fields.sex.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000'
);
CREATE TABLE sink_table (
age STRING,
sex STRING,
uv BIGINT,
window_end bigint
) WITH (
'connector' = 'print'
);
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end,
-- grouping sets 寫法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
小伙伴萌這里需要注意下!
Flink SQL 中 Grouping Sets 的語法和 Hive SQL 的語法有一些不同,如果我們使用 Hive SQL 實現(xiàn)上述 SQL 的語義,其實現(xiàn)如下:
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM source_table
GROUP BY
age
, sex
-- hive sql grouping sets 寫法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
網(wǎng)站標題:FlinkSQL知其所以然:萬字詳述FlinkSQL四種時間窗口語義!
分享URL:http://m.fisionsoft.com.cn/article/dhhjihe.html


咨詢
建站咨詢
