新聞中心
Flink與Redis的深度對(duì)接

公司主營(yíng)業(yè)務(wù):成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。創(chuàng)新互聯(lián)推出汾陽(yáng)免費(fèi)做網(wǎng)站回饋大家。
Flink是一個(gè)開源的大數(shù)據(jù)流處理框架,它可以高效地處理流式數(shù)據(jù)和批量數(shù)據(jù)處理任務(wù)。為了更好地支持大規(guī)模數(shù)據(jù)的實(shí)時(shí)處理,F(xiàn)link結(jié)合外部系統(tǒng)的存儲(chǔ)技術(shù),可以更好地優(yōu)化數(shù)據(jù)處理流程,提高數(shù)據(jù)處理性能。
Redis是一款性能卓越、易用性強(qiáng)的基于內(nèi)存的開源非關(guān)系型數(shù)據(jù)庫(kù),它可以支持?jǐn)?shù)千萬(wàn)級(jí)別的數(shù)據(jù)存儲(chǔ)應(yīng)用,也可以用作分布式事務(wù)處理,消息中間件等等。Redis的快速存取和安全有效的操作,使其成為Flink的需求更多的可選擇的存儲(chǔ)技術(shù)。
Flink和Redis的深度對(duì)接,旨在使Flink易于訪問(wèn)Redis服務(wù)上的數(shù)據(jù),從而實(shí)現(xiàn)數(shù)據(jù)處理任務(wù)的有效實(shí)現(xiàn)和運(yùn)行。可以兩種方式來(lái)實(shí)現(xiàn)Flink與Redis的深度對(duì)接,第一種是使用Redis內(nèi)置的Java客戶端來(lái)擴(kuò)展Flink;第二種是使用Redis Connector插件來(lái)擴(kuò)展Flink。
1、 利用Redis內(nèi)置的客戶端擴(kuò)展Flink
Flink可以通過(guò)支持內(nèi)置的java客戶端來(lái)和Redis服務(wù)進(jìn)行深度對(duì)接,下面是一個(gè)簡(jiǎn)單的例子:
import redis.clients.jedis.Jedis;
public class RedisSinkExample{
public static void mn(String[] args){
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet> dataSet = env.fromElements(
new Tuple2(“key1”, “Hello, Redis!”),
new Tuple2(“key2”, “Hello, World!”));
dataSet.mapPartition(new RichMapPartitionFunction, Long>(){
@Override
public void mapPartition(Iterable> values, Collector out) throws Exception {
Jedis jedis = new Jedis(“l(fā)ocalhost”);
values.forEach(e -> {
jedis.set(e.f0, e.f1);
});
out.collect(values.spliterator().estimateSize());
}
}).print();
}
}
上面的示例中,F(xiàn)link和Redis服務(wù)的深度對(duì)接是通過(guò)使用Redis內(nèi)置的 java 客戶端來(lái)實(shí)現(xiàn)的。
2、使用Redis Connector擴(kuò)展Flink
雖然Flink可以使用Redis內(nèi)置的Java客戶端來(lái)實(shí)現(xiàn)數(shù)據(jù)的深度對(duì)接,但它的實(shí)現(xiàn)方式非常不方便,在多開發(fā)人員和復(fù)雜項(xiàng)目中,使用 Redis Connector 擴(kuò)展Flink可以出奇的方便。
Flink使用Redis Connector可以提供如下功能:
1. 通過(guò)Redis數(shù)據(jù)管理倉(cāng)庫(kù) ,支持從Redis中獲取和發(fā)布數(shù)據(jù)這樣的分布式交換;
2. 通過(guò)Redis數(shù)據(jù)持久化,將數(shù)據(jù)持久化到Redis集群中;
3. 支持Flink和Redis的流數(shù)據(jù)的雙向交互,更新Redis中的數(shù)據(jù);
實(shí)現(xiàn)Flink和Redis Connector的深度對(duì)接,你可以使用如下代碼:
// Create the environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 數(shù)據(jù)表以外部Redis結(jié)合
RedisOptions redisOptions = new RedisOptions();
redisOptions.setHost(“l(fā)ocalhost”);
redisOptions.setPort(6379);
// 設(shè)置Redis連接
RedisTableSource redisTableSource = new RedisTableSource(
“tableName”, // Redis表名
redisOptions, // Redis連接信息
new String[]{“key1”, “key2”} // 要查詢的Redis鍵值
);
env.registerTableSource(“source”, redisTableSource); // 注冊(cè)Redis數(shù)據(jù)表
// 執(zhí)行SQL
Table envTable = env.sqlQuery(“SELECT key1, key2 FROM source”);
// 顯示結(jié)果
envTable.printSchema();
envTable.execute().print();
通過(guò)Redis Connector插件,F(xiàn)link開發(fā)者可以更輕松地進(jìn)行Flink和Redis的深度對(duì)接,從而實(shí)現(xiàn)更有效的數(shù)據(jù)處理和持久化。
從上面的分析可以看出,F(xiàn)link 和 Redis的深度對(duì)接有助于優(yōu)化數(shù)據(jù)處理流程,并且能夠更有效地利用Redis的高速存取和安全有
香港服務(wù)器選創(chuàng)新互聯(lián),香港虛擬主機(jī)被稱為香港虛擬空間/香港網(wǎng)站空間,或者簡(jiǎn)稱香港主機(jī)/香港空間。香港虛擬主機(jī)特點(diǎn)是免備案空間開通就用, 創(chuàng)新互聯(lián)香港主機(jī)精選cn2+bgp線路訪問(wèn)快、穩(wěn)定!
文章標(biāo)題:紅色連接flink與redis的深度對(duì)接(redis連接flink)
文章來(lái)源:http://m.fisionsoft.com.cn/article/djoopeh.html


咨詢
建站咨詢
