日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
干貨分享:利用Java多線程技術(shù)導(dǎo)入數(shù)據(jù)到Elasticsearch

 前言

近期接到一個(gè)任務(wù),需要改造現(xiàn)有從mysql往Elasticsearch導(dǎo)入數(shù)據(jù)MTE(mysqlToEs)小工具,由于之前采用單線程導(dǎo)入,千億數(shù)據(jù)需要兩周左右的時(shí)間才能導(dǎo)入完成,導(dǎo)入效率非常低。所以樓主花了3天的時(shí)間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導(dǎo)入工具,單臺(tái)服務(wù)器導(dǎo)入效率提高十幾倍(合理調(diào)整線程數(shù)據(jù),效率更高)。

為魯?shù)榈鹊貐^(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及魯?shù)榫W(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站制作、魯?shù)榫W(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!

關(guān)鍵技術(shù)棧

  • Elasticsearch
  • jdbc
  • ExecutorService\Thread
  • sql

工具說明

maven依賴

 
 
 
  1.  
  2.  mysql 
  3.  mysql-connector-java 
  4.  ${mysql.version} 
  5.  
  6.  
  7.  org.elasticsearch 
  8.  elasticsearch 
  9.  ${elasticsearch.version} 
  10.  
  11.  
  12.  org.elasticsearch.client 
  13.  transport 
  14.  ${elasticsearch.version} 
  15.  
  16.  
  17.  org.projectlombok 
  18.  lombok 
  19.  ${lombok.version} 
  20.  
  21.  
  22.  com.alibaba 
  23.  fastjson 
  24.  ${fastjson.version} 
  25.  

java線程池設(shè)置

默認(rèn)線程池大小為21個(gè),可調(diào)整。其中POR為處理流程已辦數(shù)據(jù)線程池,ROR為處理流程已閱數(shù)據(jù)線程池。

 
 
 
  1. private static int THREADS = 21; 
  2. public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); 
  3. public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS); 

定義已辦生產(chǎn)者線程/已閱生產(chǎn)者線程:ZlPendProducer/ZlReadProducer

 
 
 
  1. public class ZlPendProducer implements Runnable { 
  2.  ... 
  3.  @Override 
  4.  public void run() { 
  5.  System.out.println(threadName + "::啟動(dòng)..."); 
  6.  for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) 
  7.  try { 
  8.  .... 
  9.  int size = 1000; 
  10.  for (int i = 0; i < count; i += size) { 
  11.  if (i + size > count) { 
  12.  //作用為size***沒有100條數(shù)據(jù)則剩余幾條newList中就裝幾條 
  13.  size = count - i; 
  14.  } 
  15.  String sql = "select * from " + tableName + " limit " + i + ", " + size; 
  16.  System.out.println(tableName + "::sql::" + sql); 
  17.  rs = statement.executeQuery(sql); 
  18.  List lst = new ArrayList<>(); 
  19.  while (rs.next()) { 
  20.  HistPendingEntity p = PendUtils.getHistPendingEntity(rs); 
  21.  lst.add(p); 
  22.  } 
  23.  MteExecutor.POR.submit(new ZlPendConsumer(lst)); 
  24.  Thread.sleep(2000); 
  25.  } 
  26.  .... 
  27.  } catch (Exception e) { 
  28.  e.printStackTrace(); 
  29.  } 
  30.  } 
  31. public class ZlReadProducer implements Runnable { 
  32.  ...已閱生產(chǎn)者處理邏輯同已辦生產(chǎn)者 

定義已辦消費(fèi)者線程/已閱生產(chǎn)者線程:ZlPendConsumer/ZlReadConsumer

 
 
 
  1. public class ZlPendConsumer implements Runnable { 
  2.  private String threadName; 
  3.  private List lst; 
  4.  public ZlPendConsumer(List lst) { 
  5.  this.lst = lst; 
  6.  } 
  7.  @Override 
  8.  public void run() { 
  9.  ... 
  10.  lst.forEach(v -> { 
  11.  try { 
  12.  String json = new Gson().toJson(v); 
  13.  EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null); 
  14.  Const.COUNTER.LD_P.incrementAndGet(); 
  15.  } catch (Exception e) { 
  16.  e.printStackTrace(); 
  17.  System.out.println("err::PendingId::" + v.getPendingId()); 
  18.  } 
  19.  }); 
  20.  ... 
  21.  } 
  22. public class ZlReadConsumer implements Runnable { 
  23.  //已閱消費(fèi)者處理邏輯同已辦消費(fèi)者 

定義導(dǎo)入Elasticsearch數(shù)據(jù)監(jiān)控線程:Monitor

監(jiān)控線程-Monitor為了計(jì)算每分鐘導(dǎo)入Elasticsearch的數(shù)據(jù)總條數(shù),利用監(jiān)控線程,可以調(diào)整線程池的線程數(shù)的大小,以便利用多線程更快速的導(dǎo)入數(shù)據(jù)。

 
 
 
  1. public void monitorToES() { 
  2.  new Thread(() -> { 
  3.  while (true) { 
  4.  StringBuilder sb = new StringBuilder(); 
  5.  sb.append("已辦表數(shù)::").append(Const.TBL.TBL_PEND_COUNT) 
  6.  .append("::已辦總數(shù)::").append(Const.COUNTER.LD_P_TOTAL) 
  7.  .append("::已辦入庫總數(shù)::").append(Const.COUNTER.LD_P); 
  8.  sb.append("~~~~已閱表數(shù)::").append(Const.TBL.TBL_READ_COUNT); 
  9.  sb.append("::已閱總數(shù)::").append(Const.COUNTER.LD_R_TOTAL) 
  10.  .append("::已閱入庫總數(shù)::").append(Const.COUNTER.LD_R); 
  11.  if (ldPrevPendCount == 0 && ldPrevReadCount == 0) { 
  12.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  13.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  14.  start = System.currentTimeMillis(); 
  15.  } else { 
  16.  long end = System.currentTimeMillis(); 
  17.  if ((end - start) / 1000 >= 60) { 
  18.  start = end; 
  19.  sb.append("\n#########################################\n"); 
  20.  sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條"); 
  21.  sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條"); 
  22.  ldPrevPendCount = Const.COUNTER.LD_P.get(); 
  23.  ldPrevReadCount = Const.COUNTER.LD_R.get(); 
  24.  } 
  25.  } 
  26.  System.out.println(sb.toString()); 
  27.  try { 
  28.  Thread.sleep(3000); 
  29.  } catch (InterruptedException e) { 
  30.  e.printStackTrace(); 
  31.  } 
  32.  } 
  33.  }).start(); 

初始化Elasticsearch:EsClient

 
 
 
  1. String cName = meta.get("cName");//es集群名字 
  2. String esNodes = meta.get("esNodes");//es集群ip節(jié)點(diǎn) 
  3. Settings esSetting = Settings.builder() 
  4.  .put("cluster.name", cName) 
  5.  .put("client.transport.sniff", true)//增加嗅探機(jī)制,找到ES集群 
  6.  .put("thread_pool.search.size", 5)//增加線程池個(gè)數(shù),暫時(shí)設(shè)為5 
  7.  .build(); 
  8. String[] nodes = esNodes.split(","); 
  9. client = new PreBuiltTransportClient(esSetting); 
  10. for (String node : nodes) { 
  11.  if (node.length() > 0) { 
  12.  String[] hostPort = node.split(":"); 
  13.  client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 
  14.  } 

初始化數(shù)據(jù)庫連接

 
 
 
  1. conn = DriverManager.getConnection(url, user, password); 

啟動(dòng)參數(shù)

 
 
 
  1. nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 & 

參數(shù)說明

ES-Cluster2019 為Elasticsearch集群名字

node1:9300,node2:9300,node3:9300為es的節(jié)點(diǎn)IP

130 130為已辦已閱分表的數(shù)據(jù)

程序入口:MteMain

   

 
 
 
  1. // 監(jiān)控線程 
  2. Monitor monitorService = new Monitor(); 
  3. monitorService.monitorToES(); 
  4. // 已辦生產(chǎn)者線程 
  5. Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer")); 
  6. pendProducerThread.start(); 
  7. // 已閱生產(chǎn)者線程 
  8. Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer")); 
  9. readProducerThread.start(); 

分享名稱:干貨分享:利用Java多線程技術(shù)導(dǎo)入數(shù)據(jù)到Elasticsearch
當(dāng)前URL:http://m.5511xx.com/article/dposshi.html