Dubbo 3.2 监控体系核心源码解读
一、核心类以及含义
对象 | 作用 |
---|---|
MetricsConfig | 监控指标的配置类,支持具体指标类型的开关,例如可将注册中心的指标关闭 |
MetricSample | 指标收集样本,定义指标名称、tag、描述、类别等,直接与micrometer交互打点 |
MetricsLevel | 监控指标级别:app、service、method、config |
MetricsCat | 一个组合对象,参数有:指标、对应的更新指标值的函数 |
MetricsCategory | 指标类别(注册、元数据、请求、线程池等) |
MetricsExport | export方法获取所有的 MetricSample 数据 |
AbstractMetricsExport | 初始化的MetricsKey |
CombMetricsCollector | 指标收集器的抽象类,核心方法有记录 rt、自增指标、获取所有的指标样本 MetricSample 以编译通过 micromiter 打点 |
DefaultMetricsCollector | 默认的指标收集器,负责收集请求类型的指标、线程池指标 |
二、注册指标的收集
1. 定义指标类型
构造函数定义收集的指标类型:应用与服务级别的注册、订阅、收到推送后的 notify 的成功、失败数量
public RegistryMetricsCollector(ApplicationModel applicationModel) {
super(new BaseStatComposite(applicationModel) {
@Override
protected void init(ApplicationStatComposite applicationStatComposite) {
super.init(applicationStatComposite);
applicationStatComposite.init(RegistryMetricsConstants.APP_LEVEL_KEYS);
}
}
@Override
protected void init(ServiceStatComposite serviceStatComposite) {
super.init(serviceStatComposite);
serviceStatComposite.initWrapper(RegistryMetricsConstants.SERVICE_LEVEL_KEYS);
}
@Override
protected void init(RtStatComposite rtStatComposite) {
super.init(rtStatComposite);
rtStatComposite.init(OP_TYPE_REGISTER, OP_TYPE_SUBSCRIBE, OP_TYPE_NOTIFY, OP_TYPE_REGISTER_SERVICE, OP_TYPE_SUBSCRIBE_SERVICE);
}
//...省略无关代码
2. 订阅指标变更
初始化注册指标收集器的时候创建指标定义对象 RegistrySubDispatcher
public RegistryMetricsCollector(ApplicationModel applicationModel) {
//...省略无关代码
super.setEventMulticaster(new RegistrySubDispatcher(this));
this.applicationModel = applicationModel;
}
在 RegistrySubDispatcher 中遍历了指标类型,为每个指标类型添加监听器,当收到指标变更事件时自动更新指标
public RegistrySubDispatcher(RegistryMetricsCollector collector) {
CategorySet.ALL.forEach(categorySet ->
{
super.addListener(categorySet.getPost().getEventFunc().apply(collector));
if (categorySet.getFinish() != null) {
super.addListener(categorySet.getFinish().getEventFunc().apply(collector));
}
if (categorySet.getError() != null) {
super.addListener(categorySet.getError().getEventFunc().apply(collector));
}
});
}
3. 收集指标集合
从应用、rt、服务、方法四个维度收集指标集合
@Override
public List<MetricSample> collect() {
List<MetricSample> list = new ArrayList<>();
if (!isCollectEnabled()) {
return list;
}
list.addAll(super.export(MetricsCategory.REGISTRY));
return list;
}
4. 注册行为监控打点
启动时自动注册,使用静态类 MetricsEventBus 的 post 方法进行打点,该 post 方法会触发 MetricsDispatcher 发布注册监控事件,也会执行 MetricsDispatcher 对应的 listener ,最终执行 RegistryMetricsCollector 对应的 onEvent 方法,而该方法又会触发 MetricsEventMulticaster 对象的 publishEvent 方法,从而执行注册指标的对应的监听器最终将数据收集到各个 StatComposite 中
private void registerServiceInstance() {
try {
registered = true;
MetricsEventBus.post(RegistryEvent.toRegisterEvent(applicationModel),
() -> {
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
return null;
}
);
} catch (Exception e) {
logger.error(CONFIG_REGISTER_INSTANCE_ERROR, "configuration server disconnected", "", "Register instance error.", e);
}
//省略无关代码
5. 指标收集到 micrometer
定时执行收集任务
private void scheduleMetricsCollectorSyncJob() {
NamedThreadFactory threadFactory = new NamedThreadFactory("metrics-collector-sync-job", true);
collectorSyncJobExecutor = Executors.newScheduledThreadPool(1, threadFactory);
collectorSyncJobExecutor.scheduleWithFixedDelay(this::refreshData, DEFAULT_SCHEDULE_INITIAL_DELAY, DEFAULT_SCHEDULE_PERIOD, TimeUnit.SECONDS);
}
遍历每个指标收集器将收集到的指标样本通过 micrometer 记录起来
public void refreshData() {
collectors.forEach(collector -> {
List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
try {
switch (sample.getType()) {
case GAUGE:
GaugeMetricSample gaugeSample = (GaugeMetricSample) sample;
List<Tag> tags = getTags(gaugeSample);
Gauge.builder(gaugeSample.getName(), gaugeSample.getValue(), gaugeSample.getApply())
.description(gaugeSample.getDescription()).tags(tags).register(compositeRegistry);
break;
case COUNTER:
CounterMetricSample counterMetricSample = (CounterMetricSample) sample;
FunctionCounter.builder(counterMetricSample.getName(), counterMetricSample.getValue(),
Number::doubleValue).description(counterMetricSample.getDescription())
.tags(getTags(counterMetricSample))
.register(compositeRegistry);
case TIMER:
case LONG_TASK_TIMER:
case DISTRIBUTION_SUMMARY:
// TODO
break;
default:
break;
}
} catch (Exception e) {
logger.error(COMMON_METRICS_COLLECTOR_EXCEPTION, "", "", "error occurred when synchronize metrics collector.", e);
}
}
});
}
还没有评论,来说两句吧...