【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink

朴灿烈づ我的快乐病毒、 2022-08-04 09:26 267阅读 0赞

转载:http://blog.csdn.net/simonchi/article/details/43308677

概览

flume-ng中最重要的核心三大组件就是source,channel,sink

source负责从源端收集数据,产出event

channel负责暂存event,以备下游取走消费

sink负责消费通道中的event,写到最终的输出端上

Center

以上是总体的一个简单结构图,下面我们来深入每一个组件的内部看看:

1、Source

source接口的定义如下:

[java] view plain copy print ? 在CODE上查看代码片 派生到我的代码片

  1. @InterfaceAudience.Public
  2. @InterfaceStability.Stable
  3. public interface Source extends LifecycleAware, NamedComponent {
  4. /**
  5. * Specifies which channel processor will handle this source’s events.
  6. *
  7. * @param channelProcessor
  8. */
  9. public void setChannelProcessor(ChannelProcessor channelProcessor);
  10. /**
  11. * Returns the channel processor that will handle this source’s events.
  12. */
  13. public ChannelProcessor getChannelProcessor();
  14. }

source生成event并且调用配置的channelprocessor的相关方法,持续的将events存入配置的channel里

channelProcessor中有通道选择器和拦截器链,该过程处在source端收到数据和放入通道直接

看一个source的具体工作流程:ExecSource

该source继承了两个类

1、NamedComponent

负责给每一个组件取一个唯一标识,就是名字,这个名字来源于我们的配置

2、LifecycleAware

负责组件的启停和状态维护

Source接口的直接实现类是AbstractSource抽象类

该类中就定义了通道处理器

还有一个生命状态周期的枚举类型

[java] view plain copy print ? 在CODE上查看代码片 派生到我的代码片

  1. public enum LifecycleState {
  2. IDLE, START, STOP, ERROR;
  3. public static final LifecycleState[] START_OR_ERROR = new LifecycleState[] {
  4. START, ERROR };
  5. public static final LifecycleState[] STOP_OR_ERROR = new LifecycleState[] {
  6. STOP, ERROR };
  7. }

这里就定义了一个组件会有的4种状态

实现接口的启停组件方法,方法体中只有一个状态的赋值,具体实现,我们来看一个具体的Source——ExecSource

读取配置方面很简单,这里就不说了,看下start方法

[java] view plain copy print ? 在CODE上查看代码片 派生到我的代码片

  1. public void start() {
  2. logger.info(“Exec source starting with command:{}“, command);
  3. executor = Executors.newSingleThreadExecutor();
  4. runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
  5. restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
  6. // FIXME: Use a callback-like executor / future to signal us upon failure.
  7. runnerFuture = executor.submit(runner);
  8. /*
  9. * NB: This comes at the end rather than the beginning of the method because
  10. * it sets our state to running. We want to make sure the executor is alive
  11. * and well first.
  12. */
  13. sourceCounter.start();
  14. super.start();
  15. logger.debug(“Exec source started”);
  16. }

该方法内部就是启动了一个线程去执行我们配置的终端命令

前面一篇文章也说过从入口分析如何调用到该start方法,以及调用频率:http://blog.csdn.net/simonchi/article/details/42970373

2、channel

对于通道来说,最重要的就是event的维护,flume的核心就是要中转这些event,所以event一定不能出事

Channel接口定义如下:












[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. @InterfaceAudience.Public  

  2. @InterfaceStability.Stable  

  3. public interface Channel extends LifecycleAware, NamedComponent {  

  4.   / 

  5.     <p>Puts the given event into the channel.</p> 

  6.     <p><strong>Note</strong>: This method must be invoked within an active 

  7.     {@link Transaction} boundary. Failure to do so can lead to unpredictable 

  8.     results.</p> 

  9.     @param event the event to transport. 

  10.     @throws ChannelException in case this operation fails. 

  11.     @see org.apache.flume.Transaction#begin() 

  12.    /  

  13.   public void put(Event event) throws ChannelException;  

  14.   / 

  15.     <p>Returns the next event from the channel if available. If the channel 

  16.     does not have any events available, this method must return {@code null}. 

  17.     </p> 

  18.     <p><strong>Note</strong>: This method must be invoked within an active 

  19.     {@link Transaction} boundary. Failure to do so can lead to unpredictable 

  20.     results.</p> 

  21.     @return the next available event or {@code null} if no events are 

  22.     available. 

  23.     @throws ChannelException in case this operation fails. 

  24.     @see org.apache.flume.Transaction#begin() 

  25.    /  

  26.   public Event take() throws ChannelException;  

  27.   /** 

  28.     @return the transaction instance associated with this channel. 

  29.    */  

  30.   public Transaction getTransaction();  

  31. }                                                                                                                                               



通道中的event全部都在事务的管理之中

先来看看这个事务的定义












[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. <pre code_snippet_id=“593798” snippet_file_name=“blog_20150130_5_6226220” name=“code” class=“java”>public interface Transaction {  

  2. public enum TransactionState {Started, Committed, RolledBack, Closed };  

  3.   / 

  4.     <p>Starts a transaction boundary for the current channel operation. If a 

  5.     transaction is already in progress, this method will join that transaction 

  6.     using reference counting.</p> 

  7.     <p><strong>Note</strong>: For every invocation of this method there must 

  8.     be a corresponding invocation of {@linkplain #close()} method. Failure 

  9.     to ensure this can lead to dangling transactions and unpredictable results. 

  10.     </p> 

  11.    /  

  12.   public void begin();  

  13.   / 

  14.     Indicates that the transaction can be successfully committed. It is 

  15.     required that a transaction be in progress when this method is invoked. 

  16.    /  

  17.   public void commit();  

  18.   /** 

  19.     Indicates that the transaction can must be aborted. It is 

  20.     required that a transaction be in progress when this method is invoked. 

  21.    /  

  22.   public void rollback();  

  23.   /* 

  24.     <p>Ends a transaction boundary for the current channel operation. If a 

  25.     transaction is already in progress, this method will join that transaction 

  26.     using reference counting. The transaction is completed only if there 

  27.     are no more references left for this transaction.</p> 

  28.     <p><strong>Note</strong>: For every invocation of this method there must 

  29.     be a corresponding invocation of {@linkplain #begin()} method. Failure 

  30.     to ensure this can lead to dangling transactions and unpredictable results. 

  31.     </p> 

  32.    /  

  33.   public void close();  

  34. }    



  1.  

和我们想想中的一样,也就是一些标准的事务方法的定义,和一个事务状态的枚举类型的定义

基本事务语义抽象类是对它的实现BasicTransactionSemantics

该类定义了两个属性

state状态和initialThreadId,id是唯一的,用来标识事务

构造方法中会赋值为NEW状态,并获取当前事务的一个ID值

重点来看下如下几个方法的具体实现:












[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. protected void doBegin() throws InterruptedException {}  

  2.   protected abstract void doPut(Event event) throws InterruptedException;  

  3.   protected abstract Event doTake() throws InterruptedException;  

  4.   protected abstract void doCommit() throws InterruptedException;  

  5.   protected abstract void doRollback() throws InterruptedException;  

  6.   protected void doClose() {}  



1、doBegin

没什么好说的,就是检查状态是否NEW,ID是否匹配,没有问题后,将状态修改为OPEN,表示事务打开

2、doPut

takeList和putList维护的是希望取出成功和放入成功的event队列

检查ID是否匹配,状态是否打开,event是否为空,为空当然这个put就没意义了

关键看具体是怎么put的?

在FileChannel中有个内部静态类

static class FileBackedTransaction extends BasicTransactionSemantics









 



[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. private final LinkedBlockingDeque<FlumeEventPointer> takeList;  

  2.     private final LinkedBlockingDeque<FlumeEventPointer> putList;  





这分别定义了两个双向队列,用于拿和放












[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. protected void doPut(Event event) throws InterruptedException {  

  2.       channelCounter.incrementEventPutAttemptCount();  

  3.       if(putList.remainingCapacity() == 0) {  

  4.         throw new ChannelException(“Put queue for FileBackedTransaction ” +  

  5.             “of capacity ” + putList.size() + “ full, consider ” +  

  6.             “committing more frequently, increasing capacity or ” +  

  7.             “increasing thread count. ” + channelNameDescriptor);  

  8.       }  

  9.       // this does not need to be in the critical section as it does not  

  10.       // modify the structure of the log or queue.  

  11.       if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {  

  12.         throw new ChannelFullException(“The channel has reached it’s capacity. ”  

  13.             + “This might be the result of a sink on the channel having too ”  

  14.             + “low of batch size, a downstream system running slower than ”  

  15.             + “normal, or that the channel capacity is just too low. ”  

  16.             + channelNameDescriptor);  

  17.       }  

  18.       boolean success = false;  

  19.       log.lockShared();  

  20.       try {  

  21.         FlumeEventPointer ptr = log.put(transactionID, event);  

  22.         Preconditions.checkState(putList.offer(ptr), “putList offer failed ”  

  23.           + channelNameDescriptor);  

  24.         queue.addWithoutCommit(ptr, transactionID);  

  25.         success = true;  

  26.       } catch (IOException e) {  

  27.         throw new ChannelException(“Put failed due to IO error ”  

  28.                 + channelNameDescriptor, e);  

  29.       } finally {  

  30.         log.unlockShared();  

  31.         if(!success) {  

  32.           // release slot obtained in the case  

  33.           // the put fails for any reason  

  34.           queueRemaining.release();  

  35.         }  

  36.       }  

  37.     }  



第一行,跟监控的度量信息有关,表示即将放入通道的event数量+1,监控度量请参考:http://blog.csdn.net/simonchi/article/details/43270461

1、检查队列的剩余空间

2、keepAlive秒时间内获取一个共享信号量,说明put的过程是互斥的

  1. 如果该时间内没有成功获取该信号量,那么event放入失败

3、FlumeEventPointer是用来做检查点机制的,因为这是文件通道,会用日志记录的

  1. 1、将event和事务id绑定到Pointer
  2. 2、将pointer放入队列尾部
  3. 3、通道中的事件队列FlumeEventQueue添加一个未提交的事件,绑定了事务ID

4、释放共享信号量

3、doTake









 



[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. protected Event doTake() throws InterruptedException {  

  2.       channelCounter.incrementEventTakeAttemptCount();  

  3.       if(takeList.remainingCapacity() == 0) {  

  4.         throw new ChannelException(“Take list for FileBackedTransaction, capacity ” +  

  5.             takeList.size() + “ full, consider committing more frequently, ” +  

  6.             “increasing capacity, or increasing thread count. ”  

  7.                + channelNameDescriptor);  

  8.       }  

  9.       log.lockShared();  

  10.       / 

  11.         1. Take an event which is in the queue. 

  12.         2. If getting that event does not throw NoopRecordException, 

  13.         then return it. 

  14.         3. Else try to retrieve the next event from the queue 

  15.         4. Repeat 2 and 3 until queue is empty or an event is returned. 

  16.        */  

  17.       try {  

  18.         while (true) {  

  19.           FlumeEventPointer ptr = queue.removeHead(transactionID);  

  20.           if (ptr == null) {  

  21.             return null;  

  22.           } else {  

  23.             try {  

  24.               // first add to takeList so that if write to disk  

  25.               // fails rollback actually does it’s work  

  26.               Preconditions.checkState(takeList.offer(ptr),  

  27.                 “takeList offer failed ”  

  28.                   + channelNameDescriptor);  

  29.               log.take(transactionID, ptr); // write take to disk  

  30.               Event event = log.get(ptr);  

  31.               return event;  

  32.             } catch (IOException e) {  

  33.               throw new ChannelException(“Take failed due to IO error ”  

  34.                 + channelNameDescriptor, e);  

  35.             } catch (NoopRecordException e) {  

  36.               LOG.warn(“Corrupt record replaced by File Channel Integrity ” +  

  37.                 “tool found. Will retrieve next event”, e);  

  38.               takeList.remove(ptr);  

  39.             } catch (CorruptEventException ex) {  

  40.               if (fsyncPerTransaction) {  

  41.                 throw new ChannelException(ex);  

  42.               }  

  43.               LOG.warn(“Corrupt record found. Event will be ” +  

  44.                 “skipped, and next event will be read.”, ex);  

  45.               takeList.remove(ptr);  

  46.             }  

  47.           }  

  48.         }  

  49.       } finally {  

  50.         log.unlockShared();  

  51.       }  

  52.     }  







1、剩余容量检查

2、检查点机制,日志记录操作,从头部取event

3、从takeList中删除该event

4、doCommit












[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. protected void doCommit() throws InterruptedException {  

  2.       int puts = putList.size();  

  3.       int takes = takeList.size();  

  4.       if(puts > 0) {  

  5.         Preconditions.checkState(takes == 0“nonzero puts and takes ”  

  6.                 + channelNameDescriptor);  

  7.         log.lockShared();  

  8.         try {  

  9.           log.commitPut(transactionID);  

  10.           channelCounter.addToEventPutSuccessCount(puts);  

  11.           synchronized (queue) {  

  12.             while(!putList.isEmpty()) {  

  13.               if(!queue.addTail(putList.removeFirst())) {  

  14.                 StringBuilder msg = new StringBuilder();  

  15.                 msg.append(“Queue add failed, this shouldn’t be able to ”);  

  16.                 msg.append(“happen. A portion of the transaction has been ”);  

  17.                 msg.append(“added to the queue but the remaining portion ”);  

  18.                 msg.append(“cannot be added. Those messages will be consumed ”);  

  19.                 msg.append(“despite this transaction failing. Please report.”);  

  20.                 msg.append(channelNameDescriptor);  

  21.                 LOG.error(msg.toString());  

  22.                 Preconditions.checkState(false, msg.toString());  

  23.               }  

  24.             }  

  25.             queue.completeTransaction(transactionID);  

  26.           }  

  27.         } catch (IOException e) {  

  28.           throw new ChannelException(“Commit failed due to IO error ”  

  29.                   + channelNameDescriptor, e);  

  30.         } finally {  

  31.           log.unlockShared();  

  32.         }  

  33.       } else if (takes > 0) {  

  34.         log.lockShared();  

  35.         try {  

  36.           log.commitTake(transactionID);  

  37.           queue.completeTransaction(transactionID);  

  38.           channelCounter.addToEventTakeSuccessCount(takes);  

  39.         } catch (IOException e) {  

  40.           throw new ChannelException(“Commit failed due to IO error ”  

  41.               + channelNameDescriptor, e);  

  42.         } finally {  

  43.           log.unlockShared();  

  44.         }  

  45.         queueRemaining.release(takes);  

  46.       }  

  47.       putList.clear();  

  48.       takeList.clear();  

  49.       channelCounter.setChannelSize(queue.getSize());  

  50.     }  







1、如果putList不为空,提交的是放入通道的事件数量

2、如果takeList不为空,提交的是从通道拿走的事件数量

5、doRollback









  



[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. protected void doRollback() throws InterruptedException {  

  2.       int puts = putList.size();  

  3.       int takes = takeList.size();  

  4.       log.lockShared();  

  5.       try {  

  6.         if(takes > 0) {  

  7.           Preconditions.checkState(puts == 0“nonzero puts and takes ”  

  8.               + channelNameDescriptor);  

  9.           synchronized (queue) {  

  10.             while (!takeList.isEmpty()) {  

  11.               Preconditions.checkState(queue.addHead(takeList.removeLast()),  

  12.                   “Queue add failed, this shouldn’t be able to happen ”  

  13.                       + channelNameDescriptor);  

  14.             }  

  15.           }  

  16.         }  

  17.         putList.clear();  

  18.         takeList.clear();  

  19.         queue.completeTransaction(transactionID);  

  20.         channelCounter.setChannelSize(queue.getSize());  

  21.         log.rollback(transactionID);  

  22.       } catch (IOException e) {  

  23.         throw new ChannelException(“Commit failed due to IO error ”  

  24.             + channelNameDescriptor, e);  

  25.       } finally {  

  26.         log.unlockShared();  

  27.         // since rollback is being called, puts will never make it on  

  28.         // to the queue and we need to be sure to release the resources  

  29.         queueRemaining.release(puts);  

  30.       }  

  31.     }  







在此之前,putList,takeList都没有clear,所以这里可以对着两个双向队列回滚操作

以上是文件通道的实现,如果是内存通道,就没有log的检查点记录了,简单多了,不需要维护状态了。

3、sink

sink的接口定义如下:












[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. @InterfaceAudience.Public  

  2. @InterfaceStability.Stable  

  3. public interface Sink extends LifecycleAware, NamedComponent {  

  4.   / 

  5.     <p>Sets the channel the sink will consume from</p> 

  6.     @param channel The channel to be polled 

  7.    */  

  8.   public void setChannel(Channel channel);  

  9.   / 

  10.     @return the channel associated with this sink 

  11.    /  

  12.   public Channel getChannel();  

  13.   /* 

  14.     <p>Requests the sink to attempt to consume data from attached channel</p> 

  15.     <p><strong>Note</strong>: This method should be consuming from the channel 

  16.     within the bounds of a Transaction. On successful delivery, the transaction 

  17.     should be committed, and on failure it should be rolled back. 

  18.     @return READY if 1 or more Events were successfully delivered, BACKOFF if 

  19.     no data could be retrieved from the channel feeding this sink 

  20.     @throws EventDeliveryException In case of any kind of failure to 

  21.     deliver data to the next hop destination. 

  22.    /  

  23.   public Status process() throws EventDeliveryException;  

  24.   public static enum Status {  

  25.     READY, BACKOFF  

  26.   }  

  27. }  







sink与一个通道连接,并消费通道中的events,把它们发送到一个配置的目的地

其实和source的原理大部分相同,同样有一个AbstractSink

我们同样看一个具体实现吧,HDFSEventSink

看它的process方法












[java] 
view plain
copy
print
?
在CODE上查看代码片
派生到我的代码片





  1. public Status process() throws EventDeliveryException {  

  2.     Channel channel = getChannel();  

  3.     Transaction transaction = channel.getTransaction();  

  4.     List<BucketWriter> writers = Lists.newArrayList();  

  5.     transaction.begin();  

  6.   

  7.      …………………………  

  8.   

  9.      transaction.commit();  

  10.       if (txnEventCount < 1) {  

  11.         return Status.BACKOFF;  

  12.       } else {  

  13.         sinkCounter.addToEventDrainSuccessCount(txnEventCount);  

  14.         return Status.READY;  

  15.       }  

  16.     } catch (IOException eIO) {  

  17.       transaction.rollback();  

  18.       LOG.warn(“HDFS IO error”, eIO);  

  19.       return Status.BACKOFF;  

  20.     } catch (Throwable th) {  

  21.       transaction.rollback();  

  22.       LOG.error(“process failed”, th);  

  23.       if (th instanceof Error) {  

  24.         throw (Error) th;  

  25.       } else {  

  26.         throw new EventDeliveryException(th);  

  27.       }  

  28.     } finally {  

  29.       transaction.close();  

  30.     }  

  31.   }  







这里可以看到,flume-ng在sink端是有事务控制的

事务从 从通道中取event开始,到sink到下一个目的地结束

在这个过程中,任意的失败都会导致事务的回滚,这就保证数据了一致性。

发表评论

表情:
评论列表 (有 0 条评论,267人围观)

还没有评论,来说两句吧...

相关阅读