Open-falcon hbs源码解读

hbs负责周期性的读取db中内容,缓存到本地cache,然后提供RPC接口以供agent和judge两个组件查询调用。

// modules/hbs/rpc/rpc.go
func Start() {
    server := rpc.NewServer()
    server.Register(new(Agent))
    server.Register(new(Hbs))
    l, e := net.Listen("tcp", addr)
    for {
        conn, err := l.Accept()
        ......
        go server.ServeCodec(jsonrpc.NewServerCodec(conn))
    }
}

hbs对接agent:

  • 处理agent heartbeat请求;
  • 处理agent plugin的查询请求;
  • 处理agent 监控哪些进程、哪些端口的查询请求;

hbs对接judge:

  • 查询当前配置的所有告警策略;
  • 查询当前配置的所有告警表达式;

整体流程图:
Open-falcon hbs源码解读_第1张图片

1. hbs对接agent

agent查询执行哪些plugin:

// modules/hbs/rpc/agent.go
func (t *Agent) MinePlugins(args model.AgentHeartbeatRequest, reply *model.AgentPluginsResponse) error {
    if args.Hostname == "" {
        return nil
    }

    reply.Plugins = cache.GetPlugins(args.Hostname)
    reply.Timestamp = time.Now().Unix()

    return nil
}

agent查询监控哪些进程、哪些端口:

// modules/hbs/rpc/agent.go
func (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error {
    if args.Hostname == "" {
        return nil
    }
    metrics, err := cache.GetBuiltinMetrics(args.Hostname)
    if err != nil {
        return nil
    }
    checksum := ""
    if len(metrics) > 0 {
        checksum = DigestBuiltinMetrics(metrics)
    }
    if args.Checksum == checksum {
        reply.Metrics = []*model.BuiltinMetric{}
    } else {
        reply.Metrics = metrics
    }
    reply.Checksum = checksum
    reply.Timestamp = time.Now().Unix()
    return nil
}

可以看到,上面的rpc接口操作的都是cache,hbs会定期的从db中查询数据,然后缓存在本地cache,以供agent查询:

// modules/hbs/cache/cache.go
func LoopInit() {
    for {
        //1min周期
        time.Sleep(time.Minute)
        GroupPlugins.Init()
        ....
    }
}

从db读取数据,然后保存在本地:为了防止并发写data,这里加了锁

// moduels/hbs/cache/plugins.go
var GroupPlugins = &SafeGroupPlugins{M: make(map[int][]string)}

func (this *SafeGroupPlugins) Init() {
    m, err := db.QueryPlugins()
    if err != nil {
        return
    }

    this.Lock()
    defer this.Unlock()
    this.M = m
}

2. hbs对接judge

judge查询告警表达式:

// modules/hbs/rpc/hbs.go
func (t *Hbs) GetExpressions(req model.NullRpcRequest, reply *model.ExpressionResponse) error {
    reply.Expressions = cache.ExpressionCache.Get()
    return nil
}

judge查询告警策略:由于关联多个db table,这里进行了比较复杂的拼装

// modules/hbs/rpc/hbs.go
func (t *Hbs) GetStrategies(req model.NullRpcRequest, reply *model.StrategiesResponse) error {
    reply.HostStrategies = []*model.HostStrategy{}
    hidTids := cache.HostTemplateIds.GetMap()
    hosts := cache.MonitoredHosts.Get()
    tpls := cache.TemplateCache.GetMap()
    strategies := cache.Strategies.GetMap()
    tpl2Strategies := Tpl2Strategies(strategies)
    hostStrategies := make([]*model.HostStrategy, 0, sz)
    for hostId, tplIds := range hidTids {
        ss := CalcInheritStrategies(tpls, tplIds, tpl2Strategies)
        hs := model.HostStrategy{
            Hostname:   h.Name,
            Strategies: ss,
        }
        hostStrategies = append(hostStrategies, &hs)
    }
    reply.HostStrategies = hostStrategies
    return nil
}

同样的,上面的rpc接口操作的都是cache,hbs会定期的从db中查询数据,然后缓存在本地cache,以供judge查询:

// modules/hbs/cache/cache.go
func LoopInit() {
    for {
        time.Sleep(time.Minute)
        ......
        GroupTemplates.Init()
        HostGroupsMap.Init()
        HostMap.Init()
        TemplateCache.Init()
        Strategies.Init(TemplateCache.GetMap())
        HostTemplateIds.Init()
        ExpressionCache.Init()
    }
}

cache的数据保存在map中,然后读取db中的数据(定期),覆盖掉map

// modules/hbs/cache/templates.go
type SafeTemplateCache struct {
    sync.RWMutex
    M map[int]*model.Template
}

var TemplateCache = &SafeTemplateCache{M: make(map[int]*model.Template)}

func (this *SafeTemplateCache) Init() {
    ts, err := db.QueryTemplates()
    if err != nil {
        return
    }
    this.Lock()
    defer this.Unlock()
    this.M = ts
}

你可能感兴趣的