文档文档

跨联接序列计算速率 + 填补

收集一组时间序列数据,其中每个时间序列都计算特定事件的数量,这是常见的场景。使用 Kapacitor,可以联接一个集合中的多个时间序列,并用它们来计算一个组合值,然后将其存储为一个新的时间序列。

本指南展示了如何使用 Python 中的预制数据生成器,将两个生成的时间序列联接到一个新的计算指标中,然后使用 Kapacitor 将该指标存储回 InfluxDB。

它以一个假设的高流量网站为例,该网站记录了两个指标:

  • errors – 发生错误的页面浏览量。
  • views – 没有发生错误的页面浏览量。

数据生成器

可以使用打包在 pages.zipmd5sha256)中的 Python 3 脚本,将此类网站的数据预填并生成到 InfluxDB。该脚本为此目的而创建,它利用了 InfluxDB-Python 库。有关如何在 Python 中安装该库的说明,请参见该 Github 项目。

解压后,可以使用此脚本创建一个名为 pages 的数据库,该数据库使用默认的保留策略 autogen。它可用于创建历史数据,然后启动生成器,该生成器将随机生成 viewerror 计数。

可以使用以下方法启动,以创建两天的数据积压:

$ ./pages_db.py --silent true pnr --start 2d
Created database  pages
priming and running
data primed
generator now running. CTRL+C to stop
..........................................

预填两天的数据可能需要一分钟左右。

与批处理数据联接

仅仅拥有简单的计数可能不足以满足网站管理员的需求。更重要的是知道导致错误的页面浏览量的百分比。过程是选择两个现有指标,联接它们并计算错误百分比。然后可以将错误百分比作为新的指标存储在 InfluxDB 中。

需要查询 errorsviews 这两个指标。

// Get errors batch data
var errors = batch
    |query('SELECT sum(value) FROM "pages"."autogen".errors')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Get views batch data
var views = batch
    |query('SELECT sum(value) FROM "pages"."autogen".views')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

联接过程会跳过在另一源中没有匹配时间点的数据点。因此,在联接批处理数据时,同时进行 groupByfill 非常重要。按时间对数据进行分组可确保每个源在一致的时间段内都有数据点。填充数据可确保每个点都有一个合理的默认值进行匹配。

在此示例中,groupBy 方法使用通配符 * 按所有标签对结果进行分组。通过声明单个标签可以使其更具体,并且由于生成的演示数据只有一个标签 page,因此 groupBy 语句可以写成:.groupBy(time(1m), 'page')

对于每个指标的两个批处理源,需要像这样联接它们。

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')

数据按时间联接,这意味着当来自每个源的批次成对到达时,它们会被合并到一个批次中。因此,需要重命名每个源的字段以正确地为字段添加命名空间。这通过 .as('errors', 'views') 行完成。在此示例中,每个指标只有一个名为 sum 的字段。联接后的字段分别称为 errors.sumviews.sum

现在数据已联接,可以计算百分比了。使用字段的新名称,可以使用以下表达式计算所需的百分比。

    //Calculate percentage
    |eval(lambda: "errors.sum" / ("views.sum" + "errors.sum"))
        // Give the resulting field a name
        .as('value')

最后,将此数据存储回 InfluxDB。

    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

这是批处理任务的完整 TICKscript。

dbrp "pages"."autogen"

// Get errors batch data
var errors = batch
    |query('SELECT sum(value) FROM "pages"."autogen".errors')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Get views batch data
var views = batch
    |query('SELECT sum(value) FROM "pages"."autogen".views')
        .period(1h)
        .every(1h)
        .groupBy(time(1m), *)
        .fill(0)

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')
    //Calculate percentage
    |eval(lambda: ("errors.sum" / ("views.sum" + "errors.sum")) * 100)
        // Give the resulting field a name
        .as('value')
    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

填补

现在来看一个有趣的小技巧。使用 Kapacitor 的录制/重放操作,此 TICKscript 可以在历史数据上运行。首先,将上述脚本另存为 error_percent.tick 并定义它。然后,为我们想要填补的过去时间范围创建一个录制。

$ kapacitor define error_percent -tick error_percent.tick
$ kapacitor record batch -task error_percent -past 1d

获取录制 ID,然后用历史数据重放任务。在此处指定 -rec-time 标志,以指示 Kapacitor 在处理数据时使用录制中存储的实际时间,而不是调整为当前时间。

$ kapacitor replay -task error_percent -recording RECORDING_ID -rec-time

如果数据集太大而无法保存在一个录制中,请定义一个特定的时间范围进行录制,然后单独重放每个范围。

rid=$(kapacitor record batch -task error_percent -start 2015-10-01 -stop 2015-10-02)
echo $rid
kapacitor replay -task error_percent -recording $rid -rec-time
kapacitor delete recordings $rid

只需循环遍历以上脚本,为每个时间窗口重建所有需要 Thus,将为历史数据填补每一分钟的 error_percent

流处理方法

对于流处理情况,也可以进行类似的操作。请注意,命令 kapacitor record stream 不包含与 -past 选项相同的历史记录选项,因此无法直接在 Kapacitor 中使用任务进行填补。如果需要填补,可以使用下面介绍的 kapacitor record query 命令。

尽管如此,仍然可以使用任务中的相同 TICKscript 语义来实时计算和存储新的计算值,例如 error_percent

以下就是一个这样的 TICKscript。

dbrp "pages"."autogen"

// Get errors stream data
var errors = stream
    |from()
        .measurement('errors')
        .groupBy(*)
    |window()
        .period(1m)
        .every(1m)
    |sum('value')

// Get views stream data
var views = stream
    |from()
        .measurement('views')
        .groupBy(*)
    |window()
        .period(1m)
        .every(1m)
    |sum('value')

// Join errors and views
errors
    |join(views)
        .as('errors', 'views')
    // Calculate percentage
    |eval(lambda: "errors.sum" / ("views.sum" + "errors.sum") * 100.0)
        // Give the resulting field a name
        .as('value')
    |influxDBOut()
        .database('pages')
        .measurement('error_percent')

记录查询和流式填补

为了向处理多个指标的流式任务提供历史数据,请在录制数据时使用多语句

首先使用 record query,遵循此通用命令的模式:

kapacitor record query -query $'select field1,field2,field3 from "database_name"."autogen"."one" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *; select field1,field2,field3 from "database_name"."autogen"."two" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *' -type stream

例如

$ kapacitor record query -query $'select value from "pages"."autogen"."errors" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-05-31T12:00:00Z\' GROUP BY *; select value from "pages"."autogen"."views" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-12-21T12:00:00Z\' GROUP BY *' -type stream
578bf299-3566-4813-b07b-744da6ab081a

然后可以使用录制的时间在 Kapacitor replay 命令中使用返回的录制 ID。

$ kapacitor replay -task error_percent_s -recording 578bf299-3566-4813-b07b-744da6ab081a -rec-time
c623f73c-cf2a-4fce-be4c-9ab89f0c6045

此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2