免费A级毛片无码专区网站-成人国产精品视频一区二区-啊 日出水了 用力乖乖在线-国产黑色丝袜在线观看下-天天操美女夜夜操美女-日韩网站在线观看中文字幕-AV高清hd片XXX国产-亚洲av中文字字幕乱码综合-搬开女人下面使劲插视频

flinksql讀寫redis

0、前言最近有個(gè)需求,需要使用flinksql讀寫redis,由于官網(wǎng)上并沒有redis的connector,在網(wǎng)上找了很久,開源的幾個(gè)connector又沒法滿足要求,所有這里就自己動(dòng)手實(shí)現(xiàn)了一個(gè) 。已經(jīng)適配了各個(gè)版本的flink,從flink1.12到flink1.15 。
簡(jiǎn)單介紹一下功能吧:

  • 將redis作為流表時(shí)支持BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua腳本封裝的批量彈出提高消費(fèi)性能
  • 將redis作為維表時(shí)支持GET、HGET等命令;支持lookup緩存
  • 將redis作為sink表時(shí)支持LPUSH、RPUSH、SADD、SET、HSET等命令;支持指定key的ttl時(shí)間
  • 支持flink常見的序列化反序列化方式,如json、csv等,具體參見flink官網(wǎng):https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/
1、redis作為流表1.1、數(shù)據(jù)準(zhǔn)備
@Beforepublic void init() {/**設(shè)置當(dāng)前屬于測(cè)試模式,在這個(gè)測(cè)試模式下,當(dāng)流表數(shù)據(jù)消費(fèi)完成后程序會(huì)停止,方便測(cè)試,這個(gè)模式默認(rèn)false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"學(xué)生" + i + "\",\n" +"\"school\": \"學(xué)校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化學(xué)生數(shù)據(jù)*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班級(jí)數(shù)據(jù)*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");}/*** 初始化學(xué)校班級(jí)數(shù)據(jù)*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("學(xué)校" + j, String.valueOf(i), "銀河" + i + "班");}}}1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消費(fèi)指定的key的list或者set的數(shù)據(jù)@Testpublic void testBlpopSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(sink);String sql =" insert into sink_students select * from students";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}2、redis作為維表(不帶format)2.1、數(shù)據(jù)準(zhǔn)備@Beforepublic void init() {/**設(shè)置當(dāng)前屬于測(cè)試模式,在這個(gè)測(cè)試模式下,當(dāng)流表數(shù)據(jù)消費(fèi)完成后程序會(huì)停止,方便測(cè)試,這個(gè)模式默認(rèn)false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"學(xué)生" + i + "\",\n" +"\"school\": \"學(xué)校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化學(xué)生數(shù)據(jù)*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班級(jí)數(shù)據(jù)*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");}/*** 初始化學(xué)校班級(jí)數(shù)據(jù)*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("學(xué)校" + j, String.valueOf(i), "銀河" + i + "班");}}}

經(jīng)驗(yàn)總結(jié)擴(kuò)展閱讀