Open-falcon graph使用RRD保存指标数据

Open-falcon的graph组件使用rrd保存指标数据,rrd由于是本地存储,单机故障容易引起数据丢失,Open-falcon提供了双写的逻辑(transfer双写2个graph节点)以增强可用性。

graph的指标数据的存储,可选择不同的TSDB,比如InfluxDB、OpenTSDB等。

本文主要结合源码,介绍Open-falcon如何使用rrd保存和查询数据。

1. rrd文件的命名

rrd文件的命名方式:

md5=md5sum(metric, endpoint, tags)
md5[0:2]_md5_dsType_step.rrd

看一下代码中的实现:

// RRDTOOL UTILS
// 监控数据对应的rrd文件名称
func RrdFileName(baseDir string, md5 string, dsType string, step int) string {
    return baseDir + "/" + md5[0:2] + "/" +
        md5 + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}

md5的计算:将endpoint/metric/tags拼成string,然后md5sum(string)

//MD5的值
func Checksum(endpoint string, metric string, tags map[string]string) string {
    pk := PK(endpoint, metric, tags)
    return Md5(pk)
}

func PK(endpoint, metric string, tags map[string]string) string {
    ret := bufferPool.Get().(*bytes.Buffer)
    ret.Reset()
    defer bufferPool.Put(ret)

    if tags == nil || len(tags) == 0 {
        ret.WriteString(endpoint)
        ret.WriteString("/")
        ret.WriteString(metric)
        return ret.String()
    }
    ret.WriteString(endpoint)
    ret.WriteString("/")
    ret.WriteString(metric)
    ret.WriteString("/")
    ret.WriteString(SortedTags(tags))
    return ret.String()
}

比如:trovedev这台机器的agent.alive的数据,该指标将被存储在be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd文件中:

# echo -n "trovedev/agent.alive" | md5sum
be3cdc0fce0d498e19e2c6e9f1f338b6  -
# ls -alh be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd
-rw-r--r-- 1 root root 70K 6月   2 09:43 be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd

2. rrd文件的创建

rrd文件被创建时,除了在本地磁盘上新建了一个rrd文件,还指定了指标的归档策略,也就是指定保存多久、如何聚合:
比如:原始的数据1min1个点,保留12hour;原始的数据每隔5min被聚合(AVERAGE)成1个点,原始的数据被删掉;

// modules/graph/rrdtool/rrdtool.go
const (
    RRA1PointCnt   = 720 
    RRA5PointCnt   = 576 
    RRA20PointCnt  = 504 
    RRA180PointCnt = 766 
    RRA720PointCnt = 730 
)

func create(filename string, item *cmodel.GraphItem) error {
    now := time.Now()
    start := now.Add(time.Duration(-24) * time.Hour)
    step := uint(item.Step)

    c := rrdlite.NewCreator(filename, start, step)
    c.DS("metric", item.DsType, item.Heartbeat, item.Min, item.Max)

    // 设置各种归档策略
    // 1min1个点,存12hour
    c.RRA("AVERAGE", 0, 1, RRA1PointCnt)    //RRA1PointCnt=720: 1min1个点,存720个点,即12hour

    // 5m1个点,存2d
    c.RRA("AVERAGE", 0, 5, RRA5PointCnt)    //RRA5PointCnt=576:5min1个点, 存576个点,即2d
    c.RRA("MAX", 0, 5, RRA5PointCnt)
    c.RRA("MIN", 0, 5, RRA5PointCnt)

    // 20m1个点,存7d
    c.RRA("AVERAGE", 0, 20, RRA20PointCnt)    //RRA20PointCnt=504: 20m1个点, 共504个点,即7d
    c.RRA("MAX", 0, 20, RRA20PointCnt)
    c.RRA("MIN", 0, 20, RRA20PointCnt)

    .......

    return c.Create(true)
}

可以使用rrdtool查询rrd文件的信息:其中保存了配置的归档策略

# rrdtool info be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd
filename = "be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd"
rrd_version = "0003"
step = 60
last_update = 1591062120
header_size = 3080
ds[metric].index = 0
ds[metric].type = "GAUGE"
ds[metric].minimal_heartbeat = 120
ds[metric].min = NaN
ds[metric].max = NaN
ds[metric].last_ds = "1"
ds[metric].value = 0.0000000000e+00
ds[metric].unknown_sec = 0
rra[0].cf = "AVERAGE"
rra[0].rows = 720
rra[0].cur_row = 385
rra[0].pdp_per_row = 1
rra[0].xff = 0.0000000000e+00
rra[0].cdp_prep[0].value = NaN
rra[0].cdp_prep[0].unknown_datapoints = 0
rra[1].cf = "AVERAGE"
rra[1].rows = 576
rra[1].cur_row = 543
rra[1].pdp_per_row = 5
rra[1].xff = 0.0000000000e+00
rra[1].cdp_prep[0].value = 2.0000000000e+00
rra[1].cdp_prep[0].unknown_datapoints = 0
rra[2].cf = "MAX"
rra[2].rows = 576
rra[2].cur_row = 24
rra[2].pdp_per_row = 5
rra[2].xff = 0.0000000000e+00
rra[2].cdp_prep[0].value = 1.0000000000e+00
rra[2].cdp_prep[0].unknown_datapoints = 0
rra[3].cf = "MIN"
.....

3. 使用rrd文件查询数据

先看下如何直接查询rrd文件中的指标数据,比如查询2020-06-02 09:30:46 ~ 2020-06-02 09:39:46这段时间, 每60s1个点的数据:

# rrdtool fetch be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd AVERAGE -r 60 -s 1591061446 -e 1591061986
                         metric
1591061460: 1.0000000000e+00
1591061520: 1.0000000000e+00
1591061580: 1.0000000000e+00
1591061640: 1.0000000000e+00
1591061700: 1.0000000000e+00
1591061760: 1.0000000000e+00
1591061820: 1.0000000000e+00
1591061880: 1.0000000000e+00
1591061940: 1.0000000000e+00
1591062000: 1.0000000000e+00

再看一下代码中如何查询的:

  • filename: 指标所在的文件;
  • cf: 指标聚合方法,比如AVERAGE;
  • start,end: 查询的起始/终止时间;
  • step: 指标的间隔时间;
// modules/graph/rrdtool/rrdtool.go
func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) {
    start_t := time.Unix(start, 0)
    end_t := time.Unix(end, 0)
    step_t := time.Duration(step) * time.Second

    fetchRes, err := rrdlite.Fetch(filename, cf, start_t, end_t, step_t)
    if err != nil {
        return []*cmodel.RRDData{}, err
    }

    defer fetchRes.FreeValues()

    values := fetchRes.Values()
    size := len(values)
    ret := make([]*cmodel.RRDData, size)

    start_ts := fetchRes.Start.Unix()
    step_s := fetchRes.Step.Seconds()

    for i, val := range values {
        ts := start_ts + int64(i+1)*int64(step_s)
        d := &cmodel.RRDData{
            Timestamp: ts,
            Value:     cmodel.JsonFloat(val),
        }
        ret[i] = d
    }
    return ret, nil
}

4. 保存数据到rrd文件

通过rrdlite.Updater来实现,先将指标数据写入cache,然后一起update到rrd文件:

// modules/graph/rrdtool/rrdtool.go
func update(filename string, items []*cmodel.GraphItem) error {
    u := rrdlite.NewUpdater(filename)

    for _, item := range items {
        v := math.Abs(item.Value)
        if v > 1e+300 || (v < 1e-300 && v > 0) {
            continue
        }
        if item.DsType == "DERIVE" || item.DsType == "COUNTER" {
            u.Cache(item.Timestamp, int(item.Value))
        } else {
            u.Cache(item.Timestamp, item.Value)
        }
    }

    return u.Update()
}

你可能感兴趣的