文档文档 (Documentation)

计算跨连接序列的速率 + 回填

收集一组时间序列数据,其中每个时间序列都在计数特定事件,这是一种常见的场景。使用 Kapacitor,可以将一组中的多个时间序列连接起来,并用于计算组合值,然后可以将该值存储为新的时间序列。

本指南展示了如何使用 Python 中准备好的数据生成器将两个生成的时间序列组合成一个新的计算指标,然后使用 Kapacitor 将该指标存储回 InfluxDB。

它以一个假设的高流量网站为例,该网站采集两个指标

  • errors – 发生错误的页面浏览量。
  • views – 没有错误的页面浏览量。

数据生成器 (The Data generator)

可以使用 Python 3 脚本(包含在 pages.zip (md5, sha256) 中并为此目的创建)来准备和生成此类网站的数据到 InfluxDB。它利用了 InfluxDB-Python 库。有关如何在 Python 中安装该库的说明,请参阅该 Github 项目。

解压缩后,此脚本可用于创建名为 pages 的数据库,该数据库使用默认保留策略 autogen。它可用于创建数据积压,然后设置生成器运行,随机生成 viewerror 计数。

它可以按如下方式启动,积压两天随机数据

$ ./pages_db.py --silent true pnr --start 2d
Created database  pages
priming and running
data primed
generator now running. CTRL+C to stop
..........................................

准备两天的数据大约需要一分钟。

与批处理数据连接 (Joining with batch data)

对于网站管理员来说,拥有简单的计数可能还不够。更重要的是了解导致错误的页面浏览量的百分比。该过程是选择两个现有指标,将它们连接起来并计算错误百分比。然后可以将错误百分比作为新指标存储在 InfluxDB 中。

需要查询两个指标 errorsviews

// Get errors batch data
var errors = batch
    |query('SELECT sum(value) FROM "pages"."autogen".errors')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Get views batch data
var views = batch
    |query('SELECT sum(value) FROM "pages"."autogen".views')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

连接过程会跳过来自另一个源中没有匹配时间点的点。因此,在连接批处理数据时,同时 groupByfill 数据非常重要。按时间对数据进行分组可确保每个源在一致的时间段内都有数据点。填充数据可确保每个点都与合理的默认值匹配。

在本示例中,groupBy 方法使用通配符 * 按所有标签对结果进行分组。可以通过声明单个标签使其更具体,并且由于生成的演示数据仅包含一个标签 page,因此 groupBy 语句可以写成如下形式:.groupBy(time(1m), 'page')

对于每个指标的两个批处理源,需要像这样连接它们。

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')

数据按时间连接,这意味着当来自每个源的成对批次到达时,它们会组合成一个批次。因此,需要重命名来自每个源的字段,以便正确命名字段空间。这是通过 .as('errors', 'views') 行完成的。在本示例中,每个指标只有一个名为 sum 的字段。连接后的字段分别称为 errors.sumviews.sum

现在数据已连接,可以计算百分比。使用字段的新名称,可以使用以下表达式来计算所需的百分比。

    //Calculate percentage
    |eval(lambda: "errors.sum" / ("views.sum" + "errors.sum"))
        // Give the resulting field a name
        .as('value')

最后,此数据存储回 InfluxDB 中。

    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

这是批处理任务的完整 TICKscript

dbrp "pages"."autogen"

// Get errors batch data
var errors = batch
    |query('SELECT sum(value) FROM "pages"."autogen".errors')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Get views batch data
var views = batch
    |query('SELECT sum(value) FROM "pages"."autogen".views')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')
    //Calculate percentage
    |eval(lambda: ("errors.sum" / ("views.sum" + "errors.sum")) * 100)
        // Give the resulting field a name
        .as('value')
    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

回填 (Backfill)

现在来一个有趣的小技巧。使用 Kapacitor 的记录/重放操作,可以在历史数据上运行此 TICKscript。首先,将上述脚本另存为 error_percent.tick 并定义它。然后,为我们要填充的过去时间范围创建记录。

$ kapacitor define error_percent -tick error_percent.tick
$ kapacitor record batch -task error_percent -past 1d

获取记录 ID,并针对任务重放历史数据。在此处指定 -rec-time 标志,以指示 Kapacitor 在处理数据时使用记录中存储的实际时间,而不是调整为当前时间。

$ kapacitor replay -task error_percent -recording RECORDING_ID -rec-time

如果数据集太大而无法保存在一个记录中,请定义要记录的特定时间范围,然后单独重放每个范围。

rid=$(kapacitor record batch -task error_percent -start 2015-10-01 -stop 2015-10-02)
echo $rid
kapacitor replay -task error_percent -recording $rid -rec-time
kapacitor delete recordings $rid

只需循环遍历每个时间窗口的上述脚本,并重建所需的所有历史数据。这样,每分钟的 error_percent 将针对历史数据进行回填。

流方法 (Stream method)

对于流式传输情况,可以完成类似的操作。请注意,命令 kapacitor record stream 不包含相同的历史选项 -past,因此无法直接在 Kapacitor 中使用任务进行回填。如果需要回填,也可以使用下面介绍的命令 kapacitor record query

尽管如此,相同的 TICKscript 语义可以与任务一起使用,以实时计算和存储新的计算值,例如 error_percent

以下是这样一个 TICKscript。

dbrp "pages"."autogen"

// Get errors stream data
var errors = stream
    |from()
        .measurement('errors')
        .groupBy(*)
    |window()
        .period(1m)
        .every(1m)
    |sum('value')

// Get views stream data
var views = stream
    |from()
        .measurement('views')
        .groupBy(*)
    |window()
        .period(1m)
        .every(1m)
    |sum('value')

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')
    // Calculate percentage
    |eval(lambda: "errors.sum" / ("views.sum" + "errors.sum") * 100.0)
        // Give the resulting field a name
        .as('value')
    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

记录查询和使用流回填 (Record Query and backfill with stream)

要为处理多个指标的流任务提供历史数据,请在记录数据时使用多个语句

首先使用 record query,遵循此通用命令的模式

kapacitor record query -query $'select field1,field2,field3 from "database_name"."autogen"."one" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *; select field1,field2,field3 from "database_name"."autogen"."two" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *' -type stream

例如

$ kapacitor record query -query $'select value from "pages"."autogen"."errors" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-05-31T12:00:00Z\' GROUP BY *; select value from "pages"."autogen"."views" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-12-21T12:00:00Z\' GROUP BY *' -type stream
578bf299-3566-4813-b07b-744da6ab081a

然后,返回的记录 ID 可以在 Kapacitor replay 命令中使用记录的时间。

$ kapacitor replay -task error_percent_s -recording 578bf299-3566-4813-b07b-744da6ab081a -rec-time
c623f73c-cf2a-4fce-be4c-9ab89f0c6045

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来 (The future of Flux)

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多 (Read more)

现已全面上市 (Now Generally Available)

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。 (Start fast. Scale faster.)

获取更新 (Get the Updates)

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看