etrace-watchdog 落日映苍穹つ 2023-03-02 10:47 10阅读 0赞 基于storm,spout运行所有router policy,bolt运行所有worker policy 提交时根据zk cluster和kafka topic 指定多个spout和一个bolt 同一jvm运行的spout/bolt共享一个routerEngine/workerEngine,内部不断拉取router/worker policy(根据update time 增量拉取,http),可能涉及删除(状态置无效),新增,更新(我们处理为删了再加) 对policy内每条语句的每个注解,注册listener到语句上 epEngine会创建固定context,如create context pre\_agg start @now end after 10 sec, 增加事件类型(metric-Metric,event-SimpleMetric) 内部的epConfig:annotationImport,静态函数所需import,convertor下各种plugin的view,AggregatorFunction portal提供http服务,从数据库获取PolicyV2,转换成Context(含多Component,如collector,converter,trigger, helper,每个Component含多plugin,各Component共享内容在sharedContent),获取语句对象, 对象内属性替换,过滤并转换为字符串,返回RunningPolicy 获取语句对象: router: 存在指标缺失plugin,无router policy,然后helper根据是否聚合,决定是否在pre\_agg context下面,含@Router worker:委托给collector,converter,trigger, collector看是否聚合,不聚合单select,聚合两constant,两create context,两select(还得考虑指标缺失) convertor委托给插件,一插件一select,每select内部velocity替换index,viewPart,viewPart内容来自插件上的@ViewPlugin和其它内容混合,只有阀值插件无viewPart trigger:一select写lindb @Lindb,一select 判断并触发报警 @Alert 语句对象属性替换:四大组件类型和组件的plugin类型field类型及field的子类型的方法中找到@TemplateMethod, 构建类似`alias-$collector.methodName`和`alias-$collector.fieldName.methodName`, 考虑方法参数会有点不一样,通过velocity对语句内属性值替换,构建类似collector-具体collector对象,二次velocity替换(方法调用) @Router(policyId,fields,tags):生成SimpleMetric(保留需要的fields和tags,tags内增加policyId),根据tags放在特定bolt实例的缓存内,nextTuple统一发出, tags通过GroupByTagsPlugin获取,fields由每个converter plugin的每个fieldExpr通过antlr提取 @Alert AlertEventV2发往kafka 每个policy都对应一个timewindow,分为aggregateWindow,nonAggregateWindow,metric进入,会检查window是否有效,metric时间是否在startTime,endTime内,通过则发往epEngine(若需要缺失补0,也会记录) aggregateWindow:startTime每个聚合周期tick更新,endTime=startTime+2\*period nonAggregateWindow: endTime不限,两时间不更新 每分钟tick一次,所有window检查 1当前window是否依然生效(如过了policy的生效时间),不生效需移除policy 2聚合窗口检查是否需要移动窗口,移动则发FlushEvent(带policyId和窗口的startTime) 并更新startTime和endTime 3若需要缺失补0:则聚合窗口每个周期,非聚合窗口每分钟进行检查 portal接kakfa的AlertEventV2,创建AlertMessage-suppress handlers-render message-subscribe handlers(每个都可以获取hooks,hooks进行or merge)-HooksHandler(hooks 进行and merge) -AlertMessage入库-构建多条MessageRecord-异步发送(针对每条MessageRecord进行发送并入库)-后处理handlers 梯度静默时间:5 10 20 通过suppress handler实现,数据库找last AlertMessage(同policyId,同groupByTags通道),AlertMessage含start,update,level,如果触发时间ts-start在梯度范围内,修改last Message的update,当前Message suppress,如果超出梯度范围,当前Message生成start,update,level(ts-update大于梯度范围,level上升,否则level=0)且不suppress Label处理:policy-Label,Label—User 都是多对多关系,且Label—User关联表含多channel配置 Hooks(多user,每user多channel)
还没有评论,来说两句吧...