日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
Flume架構(gòu)與源碼分析-MemoryChannel事務(wù)實(shí)現(xiàn)

Flume提供了可靠地日志采集功能,其高可靠是通過(guò)事務(wù)機(jī)制實(shí)現(xiàn)的。而對(duì)于Channel的事務(wù)我們本部分會(huì)介紹MemoryChannel和FileChannel的實(shí)現(xiàn)。

創(chuàng)新互聯(lián)公司專(zhuān)注為客戶(hù)提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、梧州網(wǎng)絡(luò)推廣、小程序定制開(kāi)發(fā)、梧州網(wǎng)絡(luò)營(yíng)銷(xiāo)、梧州企業(yè)策劃、梧州品牌公關(guān)、搜索引擎seo、人物專(zhuān)訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供梧州建站搭建服務(wù),24小時(shí)服務(wù)熱線:13518219792,官方網(wǎng)址:www.cdcxhl.com

首先我們看下BasicChannelSemantics實(shí)現(xiàn):

Java代碼

 
 
  1. public abstract class BasicChannelSemantics extends AbstractChannel {   
  2.   //1、事務(wù)使用ThreadLocal存儲(chǔ),保證事務(wù)線程安全   
  3.   private ThreadLocal currentTransaction   
  4.       = new ThreadLocal();   
  5.    
  6.   private boolean initialized = false;   
  7.   //2、進(jìn)行一些初始化工作   
  8.   protected void initialize() {}   
  9.   //3、提供給實(shí)現(xiàn)類(lèi)的創(chuàng)建事務(wù)的回調(diào)   
  10.   protected abstract BasicTransactionSemantics createTransaction();   
  11.   //4、往Channel放Event,其直接委托給事務(wù)的put方法實(shí)現(xiàn)   
  12.   @Override   
  13.   public void put(Event event) throws ChannelException {   
  14.     BasicTransactionSemantics transaction = currentTransaction.get();   
  15.     Preconditions.checkState(transaction != null,   
  16.         "No transaction exists for this thread");   
  17.     transaction.put(event);   
  18.   }   
  19.   //5、從Channel獲取Event,也是直接委托給事務(wù)的take方法實(shí)現(xiàn)   
  20.   @Override   
  21.   public Event take() throws ChannelException {   
  22.     BasicTransactionSemantics transaction = currentTransaction.get();   
  23.     Preconditions.checkState(transaction != null,   
  24.         "No transaction exists for this thread");   
  25.     return transaction.take();   
  26.   }   
  27.    
  28.   //6、獲取事務(wù),如果本實(shí)例沒(méi)有初始化則先初始化;否則先從ThreadLocal獲取事務(wù),如果沒(méi)有或者關(guān)閉了則創(chuàng)建一個(gè)并綁定到ThreadLocal。   
  29.   @Override   
  30.   public Transaction getTransaction() {   
  31.    
  32.     if (!initialized) {   
  33.       synchronized (this) {   
  34.         if (!initialized) {   
  35.           initialize();   
  36.           initialized = true;   
  37.         }   
  38.       }   
  39.     }   
  40.    
  41.     BasicTransactionSemantics transaction = currentTransaction.get();   
  42.     if (transaction == null || transaction.getState().equals(   
  43.             BasicTransactionSemantics.State.CLOSED)) {   
  44.       transaction = createTransaction();   
  45.       currentTransaction.set(transaction);   
  46.     }   
  47.     return transaction;   
  48.   }   
  49. }   

MemoryChannel事務(wù)實(shí)現(xiàn)

首先我們來(lái)看下MemoryChannel的實(shí)現(xiàn),其是一個(gè)純內(nèi)存的Channel實(shí)現(xiàn),整個(gè)事務(wù)操作都是在內(nèi)存中完成。首先看下其內(nèi)存結(jié)構(gòu):

1、首先由一個(gè)Channel Queue用于存儲(chǔ)整個(gè)Channel的Event數(shù)據(jù);

2、每個(gè)事務(wù)都有一個(gè)Take Queue和Put Queue分別用于存儲(chǔ)事務(wù)相關(guān)的取數(shù)據(jù)和放數(shù)據(jù),等事務(wù)提交時(shí)才完全同步到Channel Queue,或者失敗把取數(shù)據(jù)回滾到Channel Queue。

MemoryChannel時(shí)設(shè)計(jì)時(shí)考慮了兩個(gè)容量:Channel Queue容量和事務(wù)容量,而這兩個(gè)容量涉及到了數(shù)量容量和字節(jié)數(shù)容量。

另外因?yàn)槎鄠€(gè)事務(wù)要操作Channel Queue,還要考慮Channel Queue的動(dòng)態(tài)擴(kuò)容問(wèn)題,因此MemoryChannel使用了鎖來(lái)實(shí)現(xiàn);而容量問(wèn)題則使用了信號(hào)量來(lái)實(shí)現(xiàn)。

在configure方法中進(jìn)行了一些參數(shù)的初始化,如容量、Channel Queue等。首先看下Channel Queue的容量是如何計(jì)算的:

Java代碼

 
 
  1. try {   
  2.   capacity = context.getInteger("capacity", defaultCapacity);   
  3. } catch(NumberFormatException e) {   
  4.   capacity = defaultCapacity;   
  5. }   
  6.    
  7. if (capacity <= 0) {   
  8.   capacity = defaultCapacity;   
  9. }    

即首先從配置文件讀取數(shù)量容量,如果沒(méi)有配置則是默認(rèn)容量(默認(rèn)100),而配置的容量小于等于0,則也是默認(rèn)容量。

接下來(lái)是初始化事務(wù)數(shù)量容量:

Java代碼

 
 
  1. try {   
  2.   transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);   
  3. } catch(NumberFormatException e) {   
  4.   transCapacity = defaultTransCapacity;   
  5. }   
  6. if (transCapacity <= 0) {   
  7.   transCapacity = defaultTransCapacity;   
  8. }   
  9. Preconditions.checkState(transCapacity <= capacity,   
  10. "Transaction Capacity of Memory Channel cannot be higher than " +   
  11.         "the capacity.");   

整個(gè)過(guò)程和Channel Queue數(shù)量容量初始化類(lèi)似,但是***做了前置條件判斷,事務(wù)容量必須小于等于Channel Queue容量。

接下來(lái)是字節(jié)容量限制:

Java代碼

 
 
  1. try {   
  2.   byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);   
  3. } catch(NumberFormatException e) {   
  4.   byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;   
  5. }   
  6. try {   
  7.   byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);   
  8.   if (byteCapacity < 1) {   
  9.     byteCapacity = Integer.MAX_VALUE;   
  10.   }   
  11. } catch(NumberFormatException e) {   
  12.   byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);   
  13. }    

byteCapacityBufferPercentage:用來(lái)確定byteCapacity的一個(gè)百分比參數(shù),即我們定義的字節(jié)容量和實(shí)際事件容量的百分比,因?yàn)槲覀兌x的字節(jié)容量主要考慮Event body,而忽略Event header,因此需要減去Event header部分的內(nèi)存占用,可以認(rèn)為該參數(shù)定義了Event header占了實(shí)際字節(jié)容量的百分比,默認(rèn)20%;

byteCapacity:首先讀取配置文件定義的byteCapacity,如果沒(méi)有定義,則使用默認(rèn)defaultByteCapacity,而defaultByteCapacity默認(rèn)是JVM物理內(nèi)存的80%(Runtime.getRuntime().maxMemory() * .80);那么實(shí)際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默認(rèn)100,即計(jì)算百分比的一個(gè)系數(shù)。

接下來(lái)定義keepAlive參數(shù):

Java代碼

 
 
  1. try {   
  2.   keepAlive = context.getInteger("keep-alive", defaultKeepAlive);   
  3. } catch(NumberFormatException e) {   
  4.   keepAlive = defaultKeepAlive;   
  5. }    

keepAlive定義了操作Channel Queue的等待超時(shí)事件,默認(rèn)3s。

接著初始化Channel Queue:

Java代碼

 
 
  1. if(queue != null) {   
  2.   try {   
  3.     resizeQueue(capacity);   
  4.   } catch (InterruptedException e) {   
  5.     Thread.currentThread().interrupt();   
  6.   }   
  7. } else {   
  8.   synchronized(queueLock) {   
  9.     queue = new LinkedBlockingDeque(capacity);   
  10.     queueRemaining = new Semaphore(capacity);   
  11.     queueStored = new Semaphore(0);   
  12.   }   
  13. }    

首先如果Channel Queue不為null,表示動(dòng)態(tài)擴(kuò)容;否則進(jìn)行Channel Queue的創(chuàng)建。

首先看下***創(chuàng)建Channel Queue,首先使用queueLock鎖定,即在操作Channel Queue時(shí)都需要鎖定,因?yàn)橹罢f(shuō)過(guò)Channel Queue可能動(dòng)態(tài)擴(kuò)容,然后初始化信號(hào)量:Channel Queue剩余容量和向Channel Queue申請(qǐng)存儲(chǔ)的容量,用于事務(wù)操作中預(yù)占Channel Queue容量。

接著是調(diào)用resizeQueue動(dòng)態(tài)擴(kuò)容:

Java代碼

 
 
  1. private void resizeQueue(int capacity) throws InterruptedException {   
  2.   int oldCapacity;   
  3.   synchronized(queueLock) { //首先計(jì)算擴(kuò)容前的Channel Queue的容量   
  4.     oldCapacity = queue.size() + queue.remainingCapacity();   
  5.   }   
  6.    
  7.   if(oldCapacity == capacity) {//如果新容量和老容量相等,不需要擴(kuò)容   
  8.     return;   
  9.   } else if (oldCapacity > capacity) {//如果老容量大于新容量,縮容   
  10.     //首先要預(yù)占老容量-新容量的大小,以便縮容容量   
  11. if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {   
  12.    //如果獲取失敗,默認(rèn)是記錄日志然后忽略   
  13. } else {   
  14.   //否則,直接縮容,然后復(fù)制老Queue的數(shù)據(jù),縮容時(shí)需要鎖定queueLock,因?yàn)檫@一系列操作要線程安全   
  15.       synchronized(queueLock) {   
  16.         LinkedBlockingDeque newQueue = new LinkedBlockingDeque(capacity);   
  17.         newQueue.addAll(queue);   
  18.         queue = newQueue;   
  19.       }   
  20.     }   
  21.   } else {   
  22.     //如果不是縮容,則直接擴(kuò)容即可   
  23.     synchronized(queueLock) {   
  24.       LinkedBlockingDeque newQueue = new LinkedBlockingDeque(capacity);   
  25.       newQueue.addAll(queue);   
  26.       queue = newQueue;   
  27. }   
  28. //增加/減少Channel Queue的新的容量   
  29.     queueRemaining.release(capacity - oldCapacity);   
  30.   }   
  31. }   
  32.    
  33. 到此,整個(gè)Channel Queue相關(guān)的數(shù)據(jù)初始化完畢,接著會(huì)調(diào)用start方法進(jìn)行初始化:   
  34. public synchronized void start() {   
  35.   channelCounter.start();   
  36.   channelCounter.setChannelSize(queue.size());   
  37.   channelCounter.setChannelCapacity(Long.valueOf(   
  38.           queue.size() + queue.remainingCapacity()));   
  39.   super.start();   
  40. }    

此處初始化了一個(gè)ChannelCounter,是一個(gè)計(jì)數(shù)器,記錄如當(dāng)前隊(duì)列放入Event數(shù)、取出Event數(shù)、成功數(shù)等。

之前已經(jīng)分析了大部分Channel會(huì)把put和take直接委托給事務(wù)去完成,因此接下來(lái)看下MemoryTransaction的實(shí)現(xiàn)。

首先看下MemoryTransaction的初始化:

Java代碼

 
 
  1. private class MemoryTransaction extends BasicTransactionSemantics {   
  2.   private LinkedBlockingDeque takeList;   
  3.   private LinkedBlockingDeque putList;   
  4.   private final ChannelCounter channelCounter;   
  5.   private int putByteCounter = 0;   
  6.   private int takeByteCounter = 0;   
  7.   public MemoryTransaction(int transCapacity, ChannelCounter counter) {   
  8.     putList = new LinkedBlockingDeque(transCapacity);   
  9.     takeList = new LinkedBlockingDeque(transCapacity);   
  10.     channelCounter = counter;   
  11.   }    

可以看出MemoryTransaction涉及到兩個(gè)事務(wù)容量大小定義的隊(duì)列(鏈表阻塞隊(duì)列)、隊(duì)列字節(jié)計(jì)數(shù)器、另外一個(gè)是Channel操作的計(jì)數(shù)器。

事務(wù)中的放入操作如下:

Java代碼

 
 
  1. protected void doPut(Event event) throws InterruptedException {   
  2.   //1、增加放入事件計(jì)數(shù)器   
  3.   channelCounter.incrementEventPutAttemptCount();   
  4.   //2、estimateEventSize計(jì)算當(dāng)前Event body大小   
  5.   int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);   
  6.   //3、往事務(wù)隊(duì)列的putList中放入Event,如果滿(mǎn)了,則拋異?;貪L事務(wù)   
  7.   if (!putList.offer(event)) {   
  8.       throw new ChannelException(   
  9.       "Put queue for MemoryTransaction of capacity " +   
  10.         putList.size() + " full, consider committing more frequently, " +   
  11.         "increasing capacity or increasing thread count");   
  12.   }   
  13.   //4、增加放入隊(duì)列字節(jié)數(shù)計(jì)數(shù)器   
  14.   putByteCounter += eventByteSize;   
  15. }    

整個(gè)doPut操作相對(duì)來(lái)說(shuō)比較簡(jiǎn)單,就是往事務(wù)putList隊(duì)列放入Event,如果滿(mǎn)了則直接拋異?;貪L事務(wù);否則放入putList暫存,等事務(wù)提交時(shí)轉(zhuǎn)移到Channel Queue。另外需要增加放入隊(duì)列的字節(jié)數(shù)計(jì)數(shù)器,以便之后做字節(jié)容量限制。

接下來(lái)是事務(wù)中的取出操作:

Java代碼

 
 
  1. protected Event doTake() throws InterruptedException {   
  2.   //1、增加取出事件計(jì)數(shù)器   
  3.   channelCounter.incrementEventTakeAttemptCount();   
  4.   //2、如果takeList隊(duì)列沒(méi)有剩余容量,即當(dāng)前事務(wù)已經(jīng)消費(fèi)了***容量的Event   
  5.   if(takeList.remainingCapacity() == 0) {   
  6.     throw new ChannelException("Take list for MemoryTransaction, capacity " +   
  7.         takeList.size() + " full, consider committing more frequently, " +   
  8.         "increasing capacity, or increasing thread count");   
  9.   }   
  10.   //3、queueStored試圖獲取一個(gè)信號(hào)量,超時(shí)直接返回null   
  11.   if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {   
  12.     return null;   
  13.   }   
  14.   //4、從Channel Queue獲取一個(gè)Event   
  15.   Event event;   
  16.   synchronized(queueLock) {//對(duì)Channel Queue的操作必須加queueLock,因?yàn)橹罢f(shuō)的動(dòng)態(tài)擴(kuò)容問(wèn)題   
  17.     event = queue.poll();   
  18.   }   
  19.   //5、因?yàn)樾盘?hào)量的保證,Channel Queue不應(yīng)該返回null,出現(xiàn)了就不正常了   
  20.   Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +   
  21.       "signalling existence of entry");   
  22.   //6、暫存到事務(wù)的takeList隊(duì)列   
  23.   takeList.put(event);   
  24.   //7、計(jì)算當(dāng)前Event body大小并增加取出隊(duì)列字節(jié)數(shù)計(jì)數(shù)器   
  25.   int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);   
  26.   takeByteCounter += eventByteSize;   
  27.   return event;   
  28. }   

接下來(lái)是提交事務(wù):

Java代碼

 
 
  1. protected void doCommit() throws InterruptedException {   
  2.   //1、計(jì)算改變的Event數(shù)量,即取出數(shù)量-放入數(shù)量;如果放入的多,那么改變的Event數(shù)量將是負(fù)數(shù)   
  3.   int remainingChange = takeList.size() - putList.size();   
  4.   //2、  如果remainingChange小于0,則需要獲取Channel Queue剩余容量的信號(hào)量   
  5.   if(remainingChange < 0) {   
  6.     //2.1、首先獲取putByteCounter個(gè)字節(jié)容量信號(hào)量,如果失敗說(shuō)明超過(guò)字節(jié)容量限制了,回滾事務(wù)   
  7.     if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {   
  8.       throw new ChannelException("Cannot commit transaction. Byte capacity " +   
  9.         "allocated to store event body " + byteCapacity * byteCapacitySlotSize +   
  10.         "reached. Please increase heap space/byte capacity allocated to " +   
  11.         "the channel as the sinks may not be keeping up with the sources");   
  12.     }   
  13.     //2.2、獲取Channel Queue的-remainingChange個(gè)信號(hào)量用于放入-remainingChange個(gè)Event,如果獲取不到,則釋放putByteCounter個(gè)字節(jié)容量信號(hào)量,并拋出異?;貪L事務(wù)   
  14.     if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {   
  15.       bytesRemaining.release(putByteCounter);   
  16.       throw new ChannelFullException("Space for commit to queue couldn't be acquired." +   
  17.           " Sinks are likely not keeping up with sources, or the buffer size is too tight");   
  18.     }   
  19.   }   
  20.   int puts = putList.size();   
  21.   int takes = takeList.size();   
  22.   synchronized(queueLock) {//操作Channel Queue時(shí)一定要鎖定queueLock   
  23.     if(puts > 0 ) {   
  24.       while(!putList.isEmpty()) { //3.1、如果有Event,則循環(huán)放入Channel Queue   
  25.         if(!queue.offer(putList.removeFirst())) {    
  26.           //3.2、如果放入Channel Queue失敗了,說(shuō)明信號(hào)量控制出問(wèn)題了,這種情況不應(yīng)該發(fā)生   
  27.           throw new RuntimeException("Queue add failed, this shouldn't be able to happen");   
  28.         }   
  29.       }   
  30.     }   
  31.     //4、操作成功后,清空putList和takeList隊(duì)列   
  32.     putList.clear();   
  33.     takeList.clear();   
  34.   }   
  35.   //5.1、釋放takeByteCounter個(gè)字節(jié)容量信號(hào)量   
  36.   bytesRemaining.release(takeByteCounter);   
  37.   //5.2、重置字節(jié)計(jì)數(shù)器   
  38.   takeByteCounter = 0;   
  39.   putByteCounter = 0;   
  40.   //5.3、釋放puts個(gè)queueStored信號(hào)量,這樣doTake方法就可以獲取數(shù)據(jù)了   
  41.   queueStored.release(puts);   
  42.   //5.4、釋放remainingChange個(gè)queueRemaining信號(hào)量   
  43.   if(remainingChange > 0) {   
  44.     queueRemaining.release(remainingChange);   
  45.   }   
  46.   //6、ChannelCounter一些數(shù)據(jù)計(jì)數(shù)   
  47.   if (puts > 0) {   
  48.     channelCounter.addToEventPutSuccessCount(puts);   
  49.   }   
  50.   if (takes > 0) {   
  51.     channelCounter.addToEventTakeSuccessCount(takes);   
  52.   }   
  53.    
  54.   channelCounter.setChannelSize(queue.size());   
  55. }    

此處涉及到兩個(gè)信號(hào)量:

queueStored表示Channel Queue已存儲(chǔ)事件容量(已存儲(chǔ)的事件數(shù)量),隊(duì)列取出事件時(shí)-1,放入事件成功時(shí)+N,取出失敗時(shí)-N,即Channel Queue存儲(chǔ)了多少事件。queueStored信號(hào)量默認(rèn)為0。當(dāng)doTake取出Event時(shí)減少一個(gè)queueStored信號(hào)量,當(dāng)doCommit提交事務(wù)時(shí)需要增加putList 隊(duì)列大小的queueStored信號(hào)量,當(dāng)doRollback回滾事務(wù)時(shí)需要減少takeList隊(duì)列大小的queueStored信號(hào)量。

queueRemaining表示Channel Queue可存儲(chǔ)事件容量(可存儲(chǔ)的事件數(shù)量),取出事件成功時(shí)+N,放入事件成功時(shí)-N。queueRemaining信號(hào)量默認(rèn)為Channel Queue容量。其在提交事務(wù)時(shí)首先通過(guò)remainingChange = takeList.size() - putList.size()計(jì)算獲得需要增加多少變更事件;如果小于0表示放入的事件比取出的多,表示有- remainingChange個(gè)事件放入,此時(shí)應(yīng)該減少-queueRemaining信號(hào)量;而如果大于0,則表示取出的事件比放入的多,表示有queueRemaining個(gè)事件取出,此時(shí)應(yīng)該增加queueRemaining信號(hào)量;即消費(fèi)事件時(shí)減少信號(hào)量,生產(chǎn)事件時(shí)增加信號(hào)量。

而bytesRemaining是字節(jié)容量信號(hào)量,超出容量則回滾事務(wù)。

***看下回滾事務(wù):

Java代碼

 
 
  1. protected void doRollback() {   
  2.     int takes = takeList.size();   
  3.     synchronized(queueLock) { //操作Channel Queue時(shí)一定鎖住queueLock   
  4.       //1、前置條件判斷,檢查是否有足夠容量回滾事務(wù)   
  5.       Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +   
  6.           "queue to rollback takes. This should never happen, please report");   
  7.       //2、回滾事務(wù)的takeList隊(duì)列到Channel Queue   
  8.       while(!takeList.isEmpty()) {   
  9.         queue.addFirst(takeList.removeLast());   
  10.       }   
  11.       putList.clear();   
  12.     }   
  13.     //3、釋放putByteCounter個(gè)bytesRemaining信號(hào)量   
  14.     bytesRemaining.release(putByteCounter);   
  15.    
  16.     //4、計(jì)數(shù)器重置   
  17.     putByteCounter = 0;   
  18.     takeByteCounter = 0;   
  19.     //5、釋放takeList隊(duì)列大小個(gè)已存儲(chǔ)事件容量   
  20.     queueStored.release(takes);   
  21.     channelCounter.setChannelSize(queue.size());   
  22.   }   
  23. }    

也就是說(shuō)在回滾時(shí),需要把takeList中暫存的事件回滾到Channel Queue,并回滾queueStored信號(hào)量。

【本文是專(zhuān)欄作者張開(kāi)濤的原創(chuàng)文章,作者微信公眾號(hào):開(kāi)濤的博客,id:kaitao-1234567】


文章標(biāo)題:Flume架構(gòu)與源碼分析-MemoryChannel事務(wù)實(shí)現(xiàn)
文章位置:http://m.5511xx.com/article/dpodhpo.html