Dubbo 3.2 监控体系核心源码解读

傷城~ 2024-03-17 17:13 72阅读 0赞

一、核心类以及含义














































对象 作用
MetricsConfig 监控指标的配置类,支持具体指标类型的开关,例如可将注册中心的指标关闭
MetricSample 指标收集样本,定义指标名称、tag、描述、类别等,直接与micrometer交互打点
MetricsLevel 监控指标级别:app、service、method、config
MetricsCat 一个组合对象,参数有:指标、对应的更新指标值的函数
MetricsCategory 指标类别(注册、元数据、请求、线程池等)
MetricsExport export方法获取所有的 MetricSample 数据
AbstractMetricsExport 初始化的MetricsKey
CombMetricsCollector 指标收集器的抽象类,核心方法有记录 rt、自增指标、获取所有的指标样本 MetricSample 以编译通过 micromiter 打点
DefaultMetricsCollector 默认的指标收集器,负责收集请求类型的指标、线程池指标

二、注册指标的收集

1. 定义指标类型

构造函数定义收集的指标类型:应用与服务级别的注册、订阅、收到推送后的 notify 的成功、失败数量

  1. public RegistryMetricsCollector(ApplicationModel applicationModel) {
  2. super(new BaseStatComposite(applicationModel) {
  3. @Override
  4. protected void init(ApplicationStatComposite applicationStatComposite) {
  5. super.init(applicationStatComposite);
  6. applicationStatComposite.init(RegistryMetricsConstants.APP_LEVEL_KEYS);
  7. }
  8. }
  9. @Override
  10. protected void init(ServiceStatComposite serviceStatComposite) {
  11. super.init(serviceStatComposite);
  12. serviceStatComposite.initWrapper(RegistryMetricsConstants.SERVICE_LEVEL_KEYS);
  13. }
  14. @Override
  15. protected void init(RtStatComposite rtStatComposite) {
  16. super.init(rtStatComposite);
  17. rtStatComposite.init(OP_TYPE_REGISTER, OP_TYPE_SUBSCRIBE, OP_TYPE_NOTIFY, OP_TYPE_REGISTER_SERVICE, OP_TYPE_SUBSCRIBE_SERVICE);
  18. }
  19. //...省略无关代码

2. 订阅指标变更

初始化注册指标收集器的时候创建指标定义对象 RegistrySubDispatcher

  1. public RegistryMetricsCollector(ApplicationModel applicationModel) {
  2. //...省略无关代码
  3. super.setEventMulticaster(new RegistrySubDispatcher(this));
  4. this.applicationModel = applicationModel;
  5. }

在 RegistrySubDispatcher 中遍历了指标类型,为每个指标类型添加监听器,当收到指标变更事件时自动更新指标

  1. public RegistrySubDispatcher(RegistryMetricsCollector collector) {
  2. CategorySet.ALL.forEach(categorySet ->
  3. {
  4. super.addListener(categorySet.getPost().getEventFunc().apply(collector));
  5. if (categorySet.getFinish() != null) {
  6. super.addListener(categorySet.getFinish().getEventFunc().apply(collector));
  7. }
  8. if (categorySet.getError() != null) {
  9. super.addListener(categorySet.getError().getEventFunc().apply(collector));
  10. }
  11. });
  12. }

3. 收集指标集合

从应用、rt、服务、方法四个维度收集指标集合

  1. @Override
  2. public List<MetricSample> collect() {
  3. List<MetricSample> list = new ArrayList<>();
  4. if (!isCollectEnabled()) {
  5. return list;
  6. }
  7. list.addAll(super.export(MetricsCategory.REGISTRY));
  8. return list;
  9. }

4. 注册行为监控打点

启动时自动注册,使用静态类 MetricsEventBus 的 post 方法进行打点,该 post 方法会触发 MetricsDispatcher 发布注册监控事件,也会执行 MetricsDispatcher 对应的 listener ,最终执行 RegistryMetricsCollector 对应的 onEvent 方法,而该方法又会触发 MetricsEventMulticaster 对象的 publishEvent 方法,从而执行注册指标的对应的监听器最终将数据收集到各个 StatComposite 中

  1. private void registerServiceInstance() {
  2. try {
  3. registered = true;
  4. MetricsEventBus.post(RegistryEvent.toRegisterEvent(applicationModel),
  5. () -> {
  6. ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
  7. return null;
  8. }
  9. );
  10. } catch (Exception e) {
  11. logger.error(CONFIG_REGISTER_INSTANCE_ERROR, "configuration server disconnected", "", "Register instance error.", e);
  12. }
  13. //省略无关代码

5. 指标收集到 micrometer

定时执行收集任务

  1. private void scheduleMetricsCollectorSyncJob() {
  2. NamedThreadFactory threadFactory = new NamedThreadFactory("metrics-collector-sync-job", true);
  3. collectorSyncJobExecutor = Executors.newScheduledThreadPool(1, threadFactory);
  4. collectorSyncJobExecutor.scheduleWithFixedDelay(this::refreshData, DEFAULT_SCHEDULE_INITIAL_DELAY, DEFAULT_SCHEDULE_PERIOD, TimeUnit.SECONDS);
  5. }

遍历每个指标收集器将收集到的指标样本通过 micrometer 记录起来

  1. public void refreshData() {
  2. collectors.forEach(collector -> {
  3. List<MetricSample> samples = collector.collect();
  4. for (MetricSample sample : samples) {
  5. try {
  6. switch (sample.getType()) {
  7. case GAUGE:
  8. GaugeMetricSample gaugeSample = (GaugeMetricSample) sample;
  9. List<Tag> tags = getTags(gaugeSample);
  10. Gauge.builder(gaugeSample.getName(), gaugeSample.getValue(), gaugeSample.getApply())
  11. .description(gaugeSample.getDescription()).tags(tags).register(compositeRegistry);
  12. break;
  13. case COUNTER:
  14. CounterMetricSample counterMetricSample = (CounterMetricSample) sample;
  15. FunctionCounter.builder(counterMetricSample.getName(), counterMetricSample.getValue(),
  16. Number::doubleValue).description(counterMetricSample.getDescription())
  17. .tags(getTags(counterMetricSample))
  18. .register(compositeRegistry);
  19. case TIMER:
  20. case LONG_TASK_TIMER:
  21. case DISTRIBUTION_SUMMARY:
  22. // TODO
  23. break;
  24. default:
  25. break;
  26. }
  27. } catch (Exception e) {
  28. logger.error(COMMON_METRICS_COLLECTOR_EXCEPTION, "", "", "error occurred when synchronize metrics collector.", e);
  29. }
  30. }
  31. });
  32. }

发表评论

表情:
评论列表 (有 0 条评论,72人围观)

还没有评论,来说两句吧...

相关阅读