文档文档

计算跨连接序列的速率 + 回填

收集一组时间序列数据,其中每个时间序列都在计数特定事件,这是一种常见场景。使用 Kapacitor,可以将集合中的多个时间序列连接起来,并用于计算组合值,然后可以将其存储为新的时间序列。

本指南演示如何使用 Python 中准备好的数据生成器将两个生成的时间序列组合成一个新的计算度量,然后使用 Kapacitor 将该度量存储回 InfluxDB 中。

它以一个假设的高流量网站为例,该网站采取两个度量:

  • errors – 发生错误的页面浏览量。
  • views – 没有错误的页面浏览量。

数据生成器

可以使用 Python 3 脚本(滚动到 pages.zip (md5, sha256) 并为此目的创建)来准备和生成此类网站的数据到 InfluxDB。它利用了 InfluxDB-Python 库。有关如何在 Python 中安装该库的说明,请参阅该 Github 项目。

解压缩后,此脚本可用于创建一个名为 pages 的数据库,该数据库使用默认保留策略 autogen。它可用于创建数据积压,然后设置生成器运行,沿着随机生成的 viewerror 计数走动。

它可以以两天随机数据的积压开始,如下所示:

$ ./pages_db.py --silent true pnr --start 2d
Created database  pages
priming and running
data primed
generator now running. CTRL+C to stop
..........................................

准备两天的数据可能需要大约一分钟。

与批处理数据连接

对于站点管理员来说,只有简单的计数可能是不够的。更重要的是了解导致错误的页面浏览量的百分比。过程是选择两个现有度量,将它们连接起来并计算错误百分比。然后可以将错误百分比作为新度量存储在 InfluxDB 中。

需要查询两个度量 errorsviews

// Get errors batch data
var errors = batch
    |query('SELECT sum(value) FROM "pages"."autogen".errors')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Get views batch data
var views = batch
    |query('SELECT sum(value) FROM "pages"."autogen".views')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

连接过程会跳过另一个源中没有匹配时间点的点。因此,在连接批处理数据时,同时 groupByfill 数据非常重要。按时间对数据进行分组可确保每个源在一致的时间段内都有数据点。填充数据可确保每个点都将与合理的默认值匹配。

在本示例中,groupBy 方法使用通配符 * 按所有标签对结果进行分组。可以通过声明单个标签使其更具体,并且由于生成的演示数据仅包含一个标签 page,因此 groupBy 语句可以写成如下形式:.groupBy(time(1m), 'page')

对于每个度量的两个批处理源,它们需要像这样连接:

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')

数据按时间连接,这意味着当来自每个源的成对批处理到达时,它们会被组合成一个批处理。因此,需要重命名来自每个源的字段以正确命名空间字段。这是通过 .as('errors', 'views') 行完成的。在本示例中,每个度量只有一个名为 sum 的字段。连接的字段分别称为 errors.sumviews.sum

现在数据已连接,可以计算百分比。使用字段的新名称,可以使用以下表达式来计算所需的百分比。

    //Calculate percentage
    |eval(lambda: "errors.sum" / ("views.sum" + "errors.sum"))
        // Give the resulting field a name
        .as('value')

最后,此数据存储回 InfluxDB 中。

    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

这是批处理任务的完整 TICKscript:

dbrp "pages"."autogen"

// Get errors batch data
var errors = batch
    |query('SELECT sum(value) FROM "pages"."autogen".errors')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Get views batch data
var views = batch
    |query('SELECT sum(value) FROM "pages"."autogen".views')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')
    //Calculate percentage
    |eval(lambda: ("errors.sum" / ("views.sum" + "errors.sum")) * 100)
        // Give the resulting field a name
        .as('value')
    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

回填

现在来一个有趣的小技巧。使用 Kapacitor 的记录/重放操作,此 TICKscript 可以对历史数据运行。首先,将上述脚本另存为 error_percent.tick 并定义它。然后,为我们要填充的过去时间范围创建一个记录。

$ kapacitor define error_percent -tick error_percent.tick
$ kapacitor record batch -task error_percent -past 1d

获取记录 ID 并针对任务重放历史数据。在这里,指定 -rec-time 标志以指示 Kapacitor 在处理数据时使用记录中存储的实际时间,而不是调整为当前时间。

$ kapacitor replay -task error_percent -recording RECORDING_ID -rec-time

如果数据集太大而无法保存在一个记录中,请定义要记录的特定时间范围,然后单独重放每个范围。

rid=$(kapacitor record batch -task error_percent -start 2015-10-01 -stop 2015-10-02)
echo $rid
kapacitor replay -task error_percent -recording $rid -rec-time
kapacitor delete recordings $rid

只需为每个时间窗口循环执行上述脚本,并重建所有需要的历史数据。这样,每个分钟的 error_percent 将为历史数据回填。

流方法

对于流式处理,可以执行类似的操作。请注意,命令 kapacitor record stream 不包含相同的历史选项 -past,因此无法直接在 Kapacitor 中使用任务进行回填。如果需要回填,也可以使用下面介绍的命令 kapacitor record query

尽管如此,相同的 TICKscript 语义可以与任务一起使用,以实时计算和存储新的计算值,例如 error_percent

以下是这样一个 TICKscript。

dbrp "pages"."autogen"

// Get errors stream data
var errors = stream
    |from()
        .measurement('errors')
        .groupBy(*)
    |window()
        .period(1m)
        .every(1m)
    |sum('value')

// Get views stream data
var views = stream
    |from()
        .measurement('views')
        .groupBy(*)
    |window()
        .period(1m)
        .every(1m)
    |sum('value')

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')
    // Calculate percentage
    |eval(lambda: "errors.sum" / ("views.sum" + "errors.sum") * 100.0)
        // Give the resulting field a name
        .as('value')
    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

记录查询和使用流回填

要为处理多个度量的流任务提供历史数据,请在使用 multiple statements 记录数据时使用。

首先,使用 record query,遵循此通用命令的模式:

kapacitor record query -query $'select field1,field2,field3 from "database_name"."autogen"."one" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *; select field1,field2,field3 from "database_name"."autogen"."two" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *' -type stream

例如:

$ kapacitor record query -query $'select value from "pages"."autogen"."errors" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-05-31T12:00:00Z\' GROUP BY *; select value from "pages"."autogen"."views" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-12-21T12:00:00Z\' GROUP BY *' -type stream
578bf299-3566-4813-b07b-744da6ab081a

然后,可以在 Kapacitor replay 命令中使用返回的记录 ID,并使用记录的时间。

$ kapacitor replay -task error_percent_s -recording 578bf299-3566-4813-b07b-744da6ab081a -rec-time
c623f73c-cf2a-4fce-be4c-9ab89f0c6045

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已发布公共 Alpha 版

InfluxDB 3 开源版本现已可用于 alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 alpha 的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看