日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Flink1.12SQL向Redis實(shí)時寫數(shù)據(jù)

[[410005]]

本文轉(zhuǎn)載自微信公眾號「肌肉碼農(nóng)」,作者鄒學(xué)。轉(zhuǎn)載本文請聯(lián)系肌肉碼農(nóng)公眾號。

目前成都創(chuàng)新互聯(lián)已為1000多家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間網(wǎng)站托管運(yùn)營、企業(yè)網(wǎng)站設(shè)計、鐵西網(wǎng)站維護(hù)等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

插件名稱:flink-connector-redis

插件地址:https://github.com/jeff-zou/flink-connector-redis.git

項(xiàng)目介紹

基于bahir-flink二次開發(fā),使它支持SQL直接定義寫入redis,用戶通過DDL指定自己需要保存的字段。

使用方法:

命令行執(zhí)行 mvn package -DskipTests=true打包后,將生成的包flink-connector-redis_2.12-1.11.1.jar引入flink lib中即可,無需其它設(shè)置。

重構(gòu)介紹:

相對上一個版本簡化了參數(shù)設(shè)置,思路更清晰,上一版本字段的值會根據(jù)主鍵等條件來自動生成,這要求使用者需要了解相關(guān)規(guī)則,有一定的學(xué)習(xí)成本并且容易埋坑,重構(gòu)后字段的值由用戶在DDL中顯示地指定,如下:

 
 
 
  1. 'key-column'='username','value-column'='passport',' //直接指定字段名 

取消了必須有主鍵的限制,使用更簡單,如果有多個字段組合成key或者value,需要用戶在DML中使用concat_ws等方式組裝,不再是插件在后臺用不可見字符拼裝。

使用示例:

  • 1.SQL方式

示例代碼路徑: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLInsertTest.java

set示例,相當(dāng)于redis命令: set test test11

 
 
 
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  2.         EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); 
  3.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); 
  4.  
  5.         String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " + 
  6.                 "'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','key-column'='username','value-column'='passport','command'='set')" ; 
  7.  
  8.         tEnv.executeSql(ddl); 
  9.         String sql = " insert into sink_redis select * from (values ('test', 'test11'))"; 
  10.         TableResult tableResult = tEnv.executeSql(sql); 
  11.         tableResult.getJobClient().get() 
  12.                 .getJobExecutionResult() 
  13.                 .get(); 
  • 2.DataStream方式

示例代碼路徑:

src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamInsertTest.java

hset示例,相當(dāng)于redis命令:hset tom math 150

 
 
 
  1. Configuration configuration = new Configuration(); 
  2.         configuration.setString(RedisOptions.KEY_COLUMN, "name"); 
  3.         configuration.setString(RedisOptions.FIELD_COLUMN, "subject"); //對應(yīng)hash的field、 sorted set的score 
  4.         configuration.setString(RedisOptions.VALUE_COLUMN, "score"); 
  5.         configuration.setString(REDIS_MODE, REDIS_CLUSTER); 
  6.         configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name()); 
  7.  
  8.         RedisMapper redisMapper = RedisHandlerServices 
  9.                 .findRedisHandler(RedisMapperHandler.class, configuration.toMap()) 
  10.                 .createRedisMapper(configuration); 
  11.  
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  13.  
  14.         GenericRowData genericRowData = new GenericRowData(3); 
  15.         genericRowData.setField(0, "tom"); 
  16.         genericRowData.setField(1, "math"); 
  17.         genericRowData.setField(2, "150"); 
  18.         DataStream dataStream = env.fromElements(genericRowData); 
  19.  
  20.         TableSchema tableSchema =  new TableSchema.Builder() .field("name", DataTypes.STRING().notNull()).field("subject", DataTypes.STRING()).field("score", DataTypes.INT()).build(); 
  21.  
  22.         FlinkJedisConfigBase conf = getLocalRedisClusterConfig(); 
  23.         RedisSink redisSink = new RedisSink<>(conf, redisMapper, tableSchema); 
  24.  
  25.         dataStream.addSink(redisSink); 
  26.         env.execute("RedisSinkTest"); 

新聞標(biāo)題:Flink1.12SQL向Redis實(shí)時寫數(shù)據(jù)
本文網(wǎng)址:http://m.5511xx.com/article/dhjhjji.html