Prometheus源码笔记(5)-retrieval模块

12/22/2017

retrieval模块主要用于管理监测目标和提供对于监测目标的指标数据的抓取,是Prometheus中核心的数据捕获组件。本节内容将针对该模块中重要的组成代码和核心逻辑进行简要的分析。

1. target定义与管理

Prometheus中target指的是一个用于监测的url终端,比如我们使用snmp抓取信息的时候,snmp-export暴露的url地址就是一个独立的target。结构体定义如下,其中params保存了所有url的相关信息。

type Target struct {
    // Labels before any processing.
    discoveredLabels labels.Labels
    // Any labels that are added to this target and its metrics.
    labels labels.Labels
    // Additional URL parmeters that are part of the target URL.
    params url.Values

    mtx        sync.RWMutex
    lastError  error
    lastScrape time.Time
    health     TargetHealth
}

另外在页面上展示的多个target保存在一个Targets对象中,并依据url进行排序:

// Targets is a sortable list of targets.
type Targets []*Target

func (ts Targets) Len() int           { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int)      { ts[i], ts[j] = ts[j], ts[i] }

2. TargetManager定义及流程

用来管理Target并提供抓取数据的管理,结构体定义如下:

type TargetManager struct {
    append        Appendable
    scrapeConfigs []*config.ScrapeConfig

    mtx    sync.RWMutex
    ctx    context.Context
    cancel func()
    wg     sync.WaitGroup

    // Set of unqiue targets by scrape configuration.
    targetSets map[string]*targetSet
    logger     log.Logger
    starting   chan struct{}
}


func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
    tm.mtx.Lock()
    defer tm.mtx.Unlock()

    tm.scrapeConfigs = cfg.ScrapeConfigs

    if tm.ctx != nil {
        tm.reload()
    }
    return nil
}

通过ApplyConfig来加载config配置文件并执行TargetManger的Run()方法启动Manager管理流程,Promethueus的核心App main.go中启动TargetManager的方式如下:

        // TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel.
        cancel := make(chan struct{})
        g.Add(
            func() error {
                targetManager.Run()
                <-cancel
                return nil
            },
            func(err error) {
                targetManager.Stop()
                close(cancel)
            },
        )

其中targetManager的Run函数运行,将创建一个管理的上下文,其中核心部分位于reload函数,根据抓取数据的配置内容进行实际的抓取数据操作。其中根据每一个抓取的配置进行实际的操作,比如下面的的实例配置和reload函数如下:

  - job_name: 'snmp'
    static_configs:
      - targets:
        - 1.1.1.1  # SNMP device.
    metrics_path: /snmp
    params:
      module: [if_mib]
    relabel_configs:
      - source_labels: [__address__]
        target_label: __param_target
      - source_labels: [__param_target]
        target_label: instance
      - target_label: __address__
        replacement: 1.1.1.1:9116  # SNMP exporter.
func (tm *TargetManager) reload() {
    jobs := map[string]struct{}{}

    // Start new target sets and update existing ones.
    for _, scfg := range tm.scrapeConfigs {
        jobs[scfg.JobName] = struct{}{}

        ts, ok := tm.targetSets[scfg.JobName]
        if !ok {
            ctx, cancel := context.WithCancel(tm.ctx)
            ts = &targetSet{
                ctx:    ctx,
                cancel: cancel,
                sp:     newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)),
            }
            ts.ts = discovery.NewTargetSet(ts.sp)

            tm.targetSets[scfg.JobName] = ts

            tm.wg.Add(1)

            go func(ts *targetSet) {
                // Run target set, which blocks until its context is canceled.
                // Gracefully shut down pending scrapes in the scrape pool afterwards.
                ts.ts.Run(ctx)
                ts.sp.stop()
                tm.wg.Done()
            }(ts)
        } else {
            ts.sp.reload(scfg)
        }
        ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger))
    }

使用sync.WaitGroup来防止程序的退出,执行的过程中将,根据配置获得所有的target对象,存储在一个targetSet结构体中保存。当没有target对象的时候调用其中的Run方法来执行同步的操作,如果有对象则执行reload操作。Run操作如下定义,

// Run starts the processing of target providers and their updates.
// It blocks until the context gets canceled.
func (ts *TargetSet) Run(ctx context.Context) {
Loop:
    for {
        // Throttle syncing to once per five seconds.
        select {
        case <-ctx.Done():
            break Loop
        case p := <-ts.providerCh:
            ts.updateProviders(ctx, p)
        case <-time.After(5 * time.Second):
        }

        select {
        case <-ctx.Done():
            break Loop
        case <-ts.syncCh:
            ts.sync()
        case p := <-ts.providerCh:
            ts.updateProviders(ctx, p)
        }
    }
}

reload函数定义如下,获得启动时间和查询使用的client,循环获得loop对象进行操作。newLoop.run(interval, timeout, nil)用于启动循环抓取数据的操作。

func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
    start := time.Now()

    sp.mtx.Lock()
    defer sp.mtx.Unlock()

    client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
    if err != nil {
        // Any errors that could occur here should be caught during config validation.
        level.Error(sp.logger).Log("msg", "Error creating HTTP client", "err", err)
    }
    sp.config = cfg
    sp.client = client

    var (
        wg       sync.WaitGroup
        interval = time.Duration(sp.config.ScrapeInterval)
        timeout  = time.Duration(sp.config.ScrapeTimeout)
    )

    for fp, oldLoop := range sp.loops {
        var (
            t       = sp.targets[fp]
            s       = &targetScraper{Target: t, client: sp.client, timeout: timeout}
            newLoop = sp.newLoop(t, s)
        )
        wg.Add(1)

        go func(oldLoop, newLoop loop) {
            oldLoop.stop()
            wg.Done()

            go newLoop.run(interval, timeout, nil)
        }(oldLoop, newLoop)

        sp.loops[fp] = newLoop
    }

    wg.Wait()
    targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
        time.Since(start).Seconds(),
    )
}

3. scrape定义和流程管理

scrape执行数据的抓取操作,在模块引入之前,加入了很多自监控的数据指标定义如下:

    targetReloadIntervalLength = prometheus.NewSummaryVec(
        prometheus.SummaryOpts{
            Name:       "prometheus_target_reload_length_seconds",
            Help:       "Actual interval to reload the scrape pool with a given configuration.",
            Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
        },
        []string{"interval"},
    )

    targetScrapeSampleOutOfBounds = prometheus.NewCounter(
        prometheus.CounterOpts{
            Name: "prometheus_target_scrapes_sample_out_of_bounds_total",
            Help: "Total number of samples rejected due to timestamp falling outside of the time bounds",
        },
    )

func init() {
    prometheus.MustRegister(targetIntervalLength)
    ...
    prometheus.MustRegister(targetScrapeSampleOutOfBounds)
}

对于任何一个终端的数据抓取通过实现scraper接口来定义,接口定义如下:

// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
    scrape(ctx context.Context, w io.Writer) error
    report(start time.Time, dur time.Duration, err error)
    offset(interval time.Duration) time.Duration
}

其中的scrape是实际执行爬取数据的操作,report代表了采集的状态报告,offset代表了下次采集的时间(后面的两个方法在target.go中定义)。targetScraper实现了该接口的方法定义如下:

// targetScraper implements the scraper interface for a target.
type targetScraper struct {
    *Target

    client  *http.Client
    req     *http.Request
    timeout time.Duration

    gzipr *gzip.Reader
    buf   *bufio.Reader
}

补充. 理解Metric和labels

每一个时间序列采样定义都像如下的例子一样,其中包含一个名称和括号内的多个键值属性。

go_memstats_heap_idle_bytes{instance="1.2.3.4:9090",job="prometheus"}

这条记录中的go_memstats_heap_idle_bytes称为metric,里面的键值称为labels,利用label我们可以实现一个多维度的数据模型,增加修改任何的label都会产生一个新的时间序列。如果label名称开头为_则代表一个内部使用的属性.

promethues中的指标metric类型包含四种,定义如下:

  1. Counter: 一个只会不断增加的数值,用于计数或者统计总量。
  2. Guage: 一个普通参数,会随着时间起伏变化,比如说温度和内存使用等等。
  3. histogram: 一个配置范围内的所有采样数据集合,可自定义大小或者比例等【le=''】
  4. Summary:与histogram相似,代表了一段时间内的数据统计

以下是使用histogram采集的实例, 假如一个服务(job)的SLA需要满足300ms以内的响应请求需大于95%,如何统计:

  sum(rate(http_request_duration_seconds_bucket{le="0.3"}[5m])) by (job)
/
  sum(rate(http_request_duration_seconds_count[5m])) by (job)

由于此部分内容较多,后期会持续补充


系统运维 golang 源码阅读 页面已被访问95次

发表评论