日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實(shí)戰(zhàn)

技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實(shí)戰(zhàn)

作者:IT技術(shù)分享 2019-09-16 12:55:27

存儲(chǔ)

大數(shù)據(jù)

Kafka 在實(shí)際的應(yīng)用場(chǎng)景中,數(shù)據(jù)存儲(chǔ)在HBase集群中,但是由于一些特殊的原因,需要將數(shù)據(jù)從HBase遷移到Kafka。正常情況下,一般都是源數(shù)據(jù)到Kafka,再有消費(fèi)者處理數(shù)據(jù),將數(shù)據(jù)寫入HBase。但是,如果逆向處理,如何將HBase的數(shù)據(jù)遷移到Kafka呢?

10年積累的網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站設(shè)計(jì)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)制作后付款的網(wǎng)站建設(shè)流程,更有曹縣免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。

1.概述

在實(shí)際的應(yīng)用場(chǎng)景中,數(shù)據(jù)存儲(chǔ)在HBase集群中,但是由于一些特殊的原因,需要將數(shù)據(jù)從HBase遷移到Kafka。正常情況下,一般都是源數(shù)據(jù)到Kafka,再有消費(fèi)者處理數(shù)據(jù),將數(shù)據(jù)寫入HBase。但是,如果逆向處理,如何將HBase的數(shù)據(jù)遷移到Kafka呢?今天筆者就給大家來(lái)分享一下具體的實(shí)現(xiàn)流程。

2.內(nèi)容

一般業(yè)務(wù)場(chǎng)景如下,數(shù)據(jù)源頭產(chǎn)生數(shù)據(jù),進(jìn)入Kafka,然后由消費(fèi)者(如Flink、Spark、Kafka API)處理數(shù)據(jù)后進(jìn)入到HBase。這是一個(gè)很典型的實(shí)時(shí)處理流程。流程圖如下:

上述這類實(shí)時(shí)處理流程,處理數(shù)據(jù)都比較容易,畢竟數(shù)據(jù)流向是順序處理的。但是,如果將這個(gè)流程逆向,那么就會(huì)遇到一些問(wèn)題。

2.1 海量數(shù)據(jù)

HBase的分布式特性,集群的橫向拓展,HBase中的數(shù)據(jù)往往都是百億、千億級(jí)別,或者數(shù)量級(jí)更大。這類級(jí)別的數(shù)據(jù),對(duì)于這類逆向數(shù)據(jù)流的場(chǎng)景,會(huì)有個(gè)很麻煩的問(wèn)題,那就是取數(shù)問(wèn)題。如何將這海量數(shù)據(jù)從HBase中取出來(lái)?

2.2 沒(méi)有數(shù)據(jù)分區(qū)

我們知道HBase做數(shù)據(jù)Get或者List很快,也比較容易。而它又沒(méi)有類似Hive這類數(shù)據(jù)倉(cāng)庫(kù)分區(qū)的概念,不能提供某段時(shí)間內(nèi)的數(shù)據(jù)。如果要提取最近一周的數(shù)據(jù),可能全表掃描,通過(guò)過(guò)濾時(shí)間戳來(lái)獲取一周的數(shù)據(jù)。數(shù)量小的時(shí)候,可能問(wèn)題不大,而數(shù)據(jù)量很大的時(shí)候,全表去掃描HBase很困難。

3.解決思路

對(duì)于這類逆向數(shù)據(jù)流程,如何處理。其實(shí),我們可以利用HBase Get和List的特性來(lái)實(shí)現(xiàn)。因?yàn)镠Base通過(guò)RowKey來(lái)構(gòu)建了一級(jí)索引,對(duì)于RowKey級(jí)別的取數(shù),速度是很快的。實(shí)現(xiàn)流程細(xì)節(jié)如下:

數(shù)據(jù)流程如上圖所示,下面筆者為大家來(lái)剖析每個(gè)流程的實(shí)現(xiàn)細(xì)節(jié),以及注意事項(xiàng)。

3.1 Rowkey抽取

我們知道HBase針對(duì)Rowkey取數(shù)做了一級(jí)索引,所以我們可以利用這個(gè)特性來(lái)展開。我們可以將海量數(shù)據(jù)中的Rowkey從HBase表中抽取,然后按照我們制定的抽取規(guī)則和存儲(chǔ)規(guī)則將抽取的Rowkey存儲(chǔ)到HDFS上。

這里需要注意一個(gè)問(wèn)題,那就是關(guān)于HBase Rowkey的抽取,海量數(shù)據(jù)級(jí)別的Rowkey抽取,建議采用MapReduce來(lái)實(shí)現(xiàn)。這個(gè)得益于HBase提供了TableMapReduceUtil類來(lái)實(shí)現(xiàn),通過(guò)MapReduce任務(wù),將HBase中的Rowkey在map階段按照指定的時(shí)間范圍進(jìn)行過(guò)濾,在reduce階段將rowkey拆分為多個(gè)文件,最后存儲(chǔ)到HDFS上。

這里可能會(huì)有同學(xué)有疑問(wèn),都用MapReduce抽取Rowkey了,為啥不直接在掃描處理列簇下的列數(shù)據(jù)呢?這里,我們?cè)趩?dòng)MapReduce任務(wù)的時(shí)候,Scan HBase的數(shù)據(jù)時(shí)只過(guò)濾Rowkey(利用FirstKeyOnlyFilter來(lái)實(shí)現(xiàn)),不對(duì)列簇?cái)?shù)據(jù)做處理,這樣會(huì)快很多。對(duì)HBase RegionServer的壓力也會(huì)小很多。

  • RowColumnrow001info:namerow001info:agerow001info:sexrow001info:sn

這里舉個(gè)例子,比如上表中的數(shù)據(jù),其實(shí)我們只需要取出Rowkey(row001)。但是,實(shí)際業(yè)務(wù)數(shù)據(jù)中,HBase表描述一條數(shù)據(jù)可能有很多特征屬性(例如姓名、性別、年齡、身份證等等),可能有些業(yè)務(wù)數(shù)據(jù)一個(gè)列簇下有十幾個(gè)特征,但是他們卻只有一個(gè)Rowkey,我們也只需要這一個(gè)Rowkey。那么,我們使用FirstKeyOnlyFilter來(lái)實(shí)現(xiàn)就很合適了。

  
 
 
 
  1. /**
  2.  * A filter that will only return the first KV from each row.
  3.  * 

  4.  * This filter can be used to more efficiently perform row count operations.
  5.  */

這個(gè)是FirstKeyOnlyFilter的一段功能描述,它用于返回第一條KV數(shù)據(jù),官方其實(shí)用它來(lái)做計(jì)數(shù)使用,這里我們稍加改進(jìn),把FirstKeyOnlyFilter用來(lái)做抽取Rowkey。

3.2 Rowkey生成

抽取的Rowkey如何生成,這里可能根據(jù)實(shí)際的數(shù)量級(jí)來(lái)確認(rèn)Reduce個(gè)數(shù)。建議生成Rowkey文件時(shí),切合實(shí)際的數(shù)據(jù)量來(lái)算Reduce的個(gè)數(shù)。盡量不用為了使用方便就一個(gè)HDFS文件,這樣后面不好維護(hù)。舉個(gè)例子,比如HBase表有100GB,我們可以拆分為100個(gè)文件。

3.3 數(shù)據(jù)處理

在步驟1中,按照抽取規(guī)則和存儲(chǔ)規(guī)則,將數(shù)據(jù)從HBase中通過(guò)MapReduce抽取Rowkey并存儲(chǔ)到HDFS上。然后,我們?cè)谕ㄟ^(guò)MapReduce任務(wù)讀取HDFS上的Rowkey文件,通過(guò)List的方式去HBase中獲取數(shù)據(jù)。拆解細(xì)節(jié)如下:

Map階段,我們從HDFS讀取Rowkey的數(shù)據(jù)文件,然后通過(guò)批量Get的方式從HBase取數(shù),然后組裝數(shù)據(jù)發(fā)送到Reduce階段。在Reduce階段,獲取來(lái)自Map階段的數(shù)據(jù),寫數(shù)據(jù)到Kafka,通過(guò)Kafka生產(chǎn)者回調(diào)函數(shù),獲取寫入Kafka狀態(tài)信息,根據(jù)狀態(tài)信息判斷數(shù)據(jù)是否寫入成功。如果成功,記錄成功的Rowkey到HDFS,便于統(tǒng)計(jì)成功的進(jìn)度;如果失敗,記錄失敗的Rowkey到HDFS,便于統(tǒng)計(jì)失敗的進(jìn)度。

3.4 失敗重跑

通過(guò)MapReduce任務(wù)寫數(shù)據(jù)到Kafka中,可能會(huì)有失敗的情況,對(duì)于失敗的情況,我們只需要記錄Rowkey到HDFS上,當(dāng)任務(wù)執(zhí)行完成后,再去程序檢查HDFS上是否存在失敗的Rowkey文件,如果存在,那么再次啟動(dòng)步驟3,即讀取HDFS上失敗的Rowkey文件,然后再List HBase中的數(shù)據(jù),進(jìn)行數(shù)據(jù)處理后,最后再寫Kafka,以此類推,直到HDFS上失敗的Rowkey處理完成為止。

4.實(shí)現(xiàn)代碼

這里實(shí)現(xiàn)的代碼量也并不復(fù)雜,下面提供一個(gè)偽代碼,可以在此基礎(chǔ)上進(jìn)行改造(例如Rowkey的抽取、MapReduce讀取Rowkey并批量Get HBase表,然后在寫入Kafka等)。示例代碼如下:

  
 
 
 
  1. public class MRROW2HDFS {
  2.  public static void main(String[] args) throws Exception {
  3.  Configuration config = HBaseConfiguration.create(); // HBase Config info
  4.  Job job = Job.getInstance(config, "MRROW2HDFS");
  5.  job.setJarByClass(MRROW2HDFS.class);
  6.  job.setReducerClass(ROWReducer.class);
  7.  String hbaseTableName = "hbase_tbl_name";
  8.  Scan scan = new Scan();
  9.  scan.setCaching(1000);
  10.  scan.setCacheBlocks(false);
  11.  scan.setFilter(new FirstKeyOnlyFilter());
  12.  TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job);
  13.  FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path
  14.  System.exit(job.waitForCompletion(true) ? 0 : 1);
  15.  }
  16.  public static class ROWMapper extends TableMapper {
  17.  @Override
  18.  protected void map(ImmutableBytesWritable key, Result value,
  19.  Mapper.Context context)
  20.  throws IOException, InterruptedException {
  21.  for (Cell cell : value.rawCells()) {
  22.  // Filter date range
  23.  // context.write(...);
  24.  }
  25.  }
  26.  }
  27.  
  28.  public static class ROWReducer extends Reducer{
  29.  private Text result = new Text();
  30.  
  31.  @Override
  32.  protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
  33.  for(Text val:values){
  34.  result.set(val);
  35.  context.write(key, result);
  36.  }
  37.  }
  38.  }
  39. }

5.總結(jié)

整個(gè)逆向數(shù)據(jù)處理流程,并不算復(fù)雜,實(shí)現(xiàn)也是很基本的MapReduce邏輯,沒(méi)有太復(fù)雜的邏輯處理。在處理的過(guò)程中,需要幾個(gè)細(xì)節(jié)問(wèn)題,Rowkey生成到HDFS上時(shí),可能存在行位空格的情況,在讀取HDFS上Rowkey文件去List時(shí),最好對(duì)每條數(shù)據(jù)做個(gè)過(guò)濾空格處理。另外,就是對(duì)于成功處理Rowkey和失敗處理Rowkey的記錄,這樣便于任務(wù)失敗重跑和數(shù)據(jù)對(duì)賬。可以知曉數(shù)據(jù)遷移進(jìn)度和完成情況。同時(shí),我們可以使用 Kafka Eagle 監(jiān)控工具來(lái)查看Kafka寫入進(jìn)度。


文章標(biāo)題:技術(shù)干貨分享:HBase數(shù)據(jù)遷移到Kafka實(shí)戰(zhàn)
URL標(biāo)題:http://m.5511xx.com/article/dpgodid.html