Flume架构与源码分析-核心组件分析-2

r囧r小猫 2023-10-16 23:15 78阅读 0赞

上一篇链接:http://blog.csdn.net/tiantang_1986/article/details/50904939

4、整体流程
从以上部分我们可以看出,不管是Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动Source或Sink即可。

Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,此处我们已Application分析为主。

首先进入org.apache.flume.node.Application的main方法启动:
Java代码

  1. //1、设置默认值启动参数、参数是否必须的
  2. Options options = new Options();
  3. Option option = new Option("n", "name", true, "the name of this agent");
  4. option.setRequired(true);
  5. options.addOption(option);
  6. option = new Option("f", "conf-file", true,
  7. "specify a config file (required if -z missing)");
  8. option.setRequired(false);
  9. options.addOption(option);
  10. //2、接着解析命令行参数
  11. CommandLineParser parser = new GnuParser();
  12. CommandLine commandLine = parser.parse(options, args);
  13. String agentName = commandLine.getOptionValue('n');
  14. boolean reload = !commandLine.hasOption("no-reload-conf");
  15. if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
  16. isZkConfigured = true;
  17. }
  18. if (isZkConfigured) {
  19. //3、如果是通过ZooKeeper配置,则使用ZooKeeper参数启动,此处忽略,我们以配置文件讲解
  20. } else {
  21. //4、打开配置文件,如果不存在则快速失败
  22. File configurationFile = new File(commandLine.getOptionValue('f'));
  23. if (!configurationFile.exists()) {
  24. throw new ParseException(
  25. "The specified configuration file does not exist: " + path);
  26. }
  27. List<LifecycleAware> components = Lists.newArrayList();
  28. if (reload) { //5、如果需要定期reload配置文件,则走如下方式
  29. //5.1、此处使用Guava提供的事件总线
  30. EventBus eventBus = new EventBus(agentName + "-event-bus");
  31. //5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次
  32. PollingPropertiesFileConfigurationProvider configurationProvider =
  33. new PollingPropertiesFileConfigurationProvider(
  34. agentName, configurationFile, eventBus, 30);
  35. components.add(configurationProvider);
  36. application = new Application(components); //5.3、向Application注册组件
  37. //5.4、向事件总线注册本应用,EventBus会自动注册Application中使用@Subscribe声明的方法
  38. eventBus.register(application);
  39. } else { //5、配置文件不支持定期reload
  40. PropertiesFileConfigurationProvider configurationProvider =
  41. new PropertiesFileConfigurationProvider(
  42. agentName, configurationFile);
  43. application = new Application();
  44. //6.2、直接使用配置文件初始化Flume组件
  45. application.handleConfigurationEvent(configurationProvider
  46. .getConfiguration());
  47. }
  48. }
  49. //7、启动Flume应用
  50. application.start();
  51. //8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止
  52. final Application appReference = application;
  53. Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
  54. @Override
  55. public void run() {
  56. appReference.stop();
  57. }
  58. });

以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:
1、读取命令行参数;
2、读取配置文件;
3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;
4、启动Application,并注册虚拟机关闭钩子。

handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件:
Java代码

  1. @Subscribe
  2. public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
  3. stopAllComponents();
  4. startAllComponents(conf);
  5. }

MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。

对于startAllComponents实现大体如下:
Java代码

  1. //1、首先启动Channel
  2. supervisor.supervise(Channels,
  3. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  4. //2、确保所有Channel是否都已启动
  5. for(Channel ch: materializedConfiguration.getChannels().values()){
  6. while(ch.getLifecycleState() != LifecycleState.START
  7. && !supervisor.isComponentInErrorState(ch)){
  8. try {
  9. Thread.sleep(500);
  10. } catch (InterruptedException e) {
  11. Throwables.propagate(e);
  12. }
  13. }
  14. }
  15. //3、启动SinkRunner
  16. supervisor.supervise(SinkRunners,
  17. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  18. //4、启动SourceRunner
  19. supervisor.supervise(SourceRunner,
  20. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  21. //5、初始化监控服务
  22. this.loadMonitoring();

从如下代码中可以看到,首先要准备好Channel,因为Source和Sink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisor和MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。

对于stopAllComponents实现大体如下:
Java代码

  1. //1、首先停止SourceRunner
  2. supervisor.unsupervise(SourceRunners);
  3. //2、接着停止SinkRunner
  4. supervisor.unsupervise(SinkRunners);
  5. //3、然后停止Channel
  6. supervisor.unsupervise(Channels);
  7. //4、最后停止MonitorService
  8. monitorServer.stop();

此处可以看出,停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。

Application中的start方法代码实现如下:
Java代码

  1. public synchronized void start() {
  2. for(LifecycleAware component : components) {
  3. supervisor.supervise(component,
  4. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  5. }
  6. }

其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。

而Application关闭执行了如下动作:
Java代码

  1. public synchronized void stop() {
  2. supervisor.stop();
  3. if(monitorServer != null) {
  4. monitorServer.stop();
  5. }
  6. }

即关闭守护哨兵和监控服务。

到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。

整体流程可以总结为:
1、首先初始化命令行配置;
2、接着读取配置文件;
3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;
4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;
5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;
6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。

轮训实现的SourceRunner 和SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。

首先创建LifecycleSupervisor:
Java代码

  1. //1、用于存放被守护的组件
  2. supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
  3. //2、用于存放正在被监控的组件
  4. monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
  5. //3、创建监控服务线程池
  6. monitorService = new ScheduledThreadPoolExecutor(10,
  7. new ThreadFactoryBuilder().setNameFormat(
  8. "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
  9. .build());
  10. monitorService.setMaximumPoolSize(20);
  11. monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
  12. //4、定期清理被取消的组件
  13. purger = new Purger();
  14. //4.1、默认不进行清理
  15. needToPurge = false;

LifecycleSupervisor启动时会进行如下操作:
Java代码

  1. public synchronized void start() {
  2. monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
  3. lifecycleState = LifecycleState.START;
  4. }

首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP:
Java代码

  1. //1、首先停止守护监控服务
  2. if (monitorService != null) {
  3. monitorService.shutdown();
  4. try {
  5. monitorService.awaitTermination(10, TimeUnit.SECONDS);
  6. } catch (InterruptedException e) {
  7. logger.error("Interrupted while waiting for monitor service to stop");
  8. }
  9. }
  10. //2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止
  11. for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {
  12. if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
  13. entry.getValue().status.desiredState = LifecycleState.STOP;
  14. entry.getKey().stop();
  15. }
  16. }
  17. //3、更新本组件状态
  18. if (lifecycleState.equals(LifecycleState.START)) {
  19. lifecycleState = LifecycleState.STOP;
  20. }
  21. //4、最后的清理
  22. supervisedProcesses.clear();
  23. monitorFutures.clear();

接下来就是调用supervise进行组件守护了:
Java代码

  1. if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
  2. || this.monitorService.isTerminating()){
  3. //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护
  4. }
  5. //2、初始化守护组件
  6. Supervisoree process = new Supervisoree();
  7. process.status = new Status();
  8. //2.1、默认策略是失败重启
  9. process.policy = policy;
  10. //2.2、初始化组件默认状态,大多数组件默认为START
  11. process.status.desiredState = desiredState;
  12. process.status.error = false;
  13. //3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件
  14. MonitorRunnable monitorRunnable = new MonitorRunnable();
  15. monitorRunnable.lifecycleAware = lifecycleAware;
  16. monitorRunnable.supervisoree = process;
  17. monitorRunnable.monitorService = monitorService;
  18. supervisedProcesses.put(lifecycleAware, process);
  19. //4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件
  20. ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
  21. monitorRunnable, 0, 3, TimeUnit.SECONDS);
  22. monitorFutures.put(lifecycleAware, future);
  23. }

如果不需要守护了,则需要调用unsupervise:
Java代码

  1. public synchronized void unsupervise(LifecycleAware lifecycleAware) {
  2. synchronized (lifecycleAware) {
  3. Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
  4. //1.1、设置守护组件的状态为被丢弃
  5. supervisoree.status.discard = true;
  6. //1.2、设置组件盼望的最新生命周期状态为STOP
  7. this.setDesiredState(lifecycleAware, LifecycleState.STOP);
  8. //1.3、停止组件
  9. lifecycleAware.stop();
  10. }
  11. //2、从守护组件中移除
  12. supervisedProcesses.remove(lifecycleAware);
  13. //3、取消定时监控组件服务
  14. monitorFutures.get(lifecycleAware).cancel(false);
  15. //3.1、通知Purger需要进行清理,Purger会定期的移除cancel的组件
  16. needToPurge = true;
  17. monitorFutures.remove(lifecycleAware);
  18. }

接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:
Java代码

  1. public void run() {
  2. long now = System.currentTimeMillis();
  3. try {
  4. if (supervisoree.status.firstSeen == null) {
  5. supervisoree.status.firstSeen = now; //1、记录第一次状态查看时间
  6. }
  7. supervisoree.status.lastSeen = now; //2、记录最后一次状态查看时间
  8. synchronized (lifecycleAware) {
  9. //3、如果守护组件被丢弃或出错了,则直接返回
  10. if (supervisoree.status.discard || supervisoree.status.error) {
  11. return;
  12. }
  13. //4、更新最后一次查看到的状态
  14. supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
  15. //5、如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
  16. if (!lifecycleAware.getLifecycleState().equals(
  17. supervisoree.status.desiredState)) {
  18. switch (supervisoree.status.desiredState) {
  19. case START: //6、如果是启动状态,则启动组件
  20. try {
  21. lifecycleAware.start();
  22. } catch (Throwable e) {
  23. if (e instanceof Error) {
  24. supervisoree.status.desiredState = LifecycleState.STOP;
  25. try {
  26. lifecycleAware.stop();
  27. } catch (Throwable e1) {
  28. supervisoree.status.error = true;
  29. if (e1 instanceof Error) {
  30. throw (Error) e1;
  31. }
  32. }
  33. }
  34. supervisoree.status.failures++;
  35. }
  36. break;
  37. case STOP: //7、如果是停止状态,则停止组件
  38. try {
  39. lifecycleAware.stop();
  40. } catch (Throwable e) {
  41. if (e instanceof Error) {
  42. throw (Error) e;
  43. }
  44. supervisoree.status.failures++;
  45. }
  46. break;
  47. default:
  48. }
  49. } catch(Throwable t) {
  50. }
  51. }
  52. }

如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。

原文链接;http://jinnianshilongnian.iteye.com/blog/2263778

发表评论

表情:
评论列表 (有 0 条评论,78人围观)

还没有评论,来说两句吧...

相关阅读