跨联接序列计算速率 + 填补
收集一组时间序列数据,其中每个时间序列都计算特定事件的数量,这是常见的场景。使用 Kapacitor,可以联接一个集合中的多个时间序列,并用它们来计算一个组合值,然后将其存储为一个新的时间序列。
本指南展示了如何使用 Python 中的预制数据生成器,将两个生成的时间序列联接到一个新的计算指标中,然后使用 Kapacitor 将该指标存储回 InfluxDB。
它以一个假设的高流量网站为例,该网站记录了两个指标:
errors– 发生错误的页面浏览量。views– 没有发生错误的页面浏览量。
数据生成器
可以使用打包在 pages.zip(md5,sha256)中的 Python 3 脚本,将此类网站的数据预填并生成到 InfluxDB。该脚本为此目的而创建,它利用了 InfluxDB-Python 库。有关如何在 Python 中安装该库的说明,请参见该 Github 项目。
解压后,可以使用此脚本创建一个名为 pages 的数据库,该数据库使用默认的保留策略 autogen。它可用于创建历史数据,然后启动生成器,该生成器将随机生成 view 和 error 计数。
可以使用以下方法启动,以创建两天的数据积压:
$ ./pages_db.py --silent true pnr --start 2d
Created database pages
priming and running
data primed
generator now running. CTRL+C to stop
..........................................预填两天的数据可能需要一分钟左右。
与批处理数据联接
仅仅拥有简单的计数可能不足以满足网站管理员的需求。更重要的是知道导致错误的页面浏览量的百分比。过程是选择两个现有指标,联接它们并计算错误百分比。然后可以将错误百分比作为新的指标存储在 InfluxDB 中。
需要查询 errors 和 views 这两个指标。
// Get errors batch data
var errors = batch
|query('SELECT sum(value) FROM "pages"."autogen".errors')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)
// Get views batch data
var views = batch
|query('SELECT sum(value) FROM "pages"."autogen".views')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)联接过程会跳过在另一源中没有匹配时间点的数据点。因此,在联接批处理数据时,同时进行 groupBy 和 fill 非常重要。按时间对数据进行分组可确保每个源在一致的时间段内都有数据点。填充数据可确保每个点都有一个合理的默认值进行匹配。
在此示例中,groupBy 方法使用通配符 * 按所有标签对结果进行分组。通过声明单个标签可以使其更具体,并且由于生成的演示数据只有一个标签 page,因此 groupBy 语句可以写成:.groupBy(time(1m), 'page')。
对于每个指标的两个批处理源,需要像这样联接它们。
// Join errors and views
errors
|join(views)
.as('errors', 'views')数据按时间联接,这意味着当来自每个源的批次成对到达时,它们会被合并到一个批次中。因此,需要重命名每个源的字段以正确地为字段添加命名空间。这通过 .as('errors', 'views') 行完成。在此示例中,每个指标只有一个名为 sum 的字段。联接后的字段分别称为 errors.sum 和 views.sum。
现在数据已联接,可以计算百分比了。使用字段的新名称,可以使用以下表达式计算所需的百分比。
//Calculate percentage
|eval(lambda: "errors.sum" / ("views.sum" + "errors.sum"))
// Give the resulting field a name
.as('value')最后,将此数据存储回 InfluxDB。
|influxDBOut()
.database('pages')
.measurement('error_percent')这是批处理任务的完整 TICKscript。
dbrp "pages"."autogen"
// Get errors batch data
var errors = batch
|query('SELECT sum(value) FROM "pages"."autogen".errors')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)
// Get views batch data
var views = batch
|query('SELECT sum(value) FROM "pages"."autogen".views')
.period(1h)
.every(1h)
.groupBy(time(1m), *)
.fill(0)
// Join errors and views
errors
|join(views)
.as('errors', 'views')
//Calculate percentage
|eval(lambda: ("errors.sum" / ("views.sum" + "errors.sum")) * 100)
// Give the resulting field a name
.as('value')
|influxDBOut()
.database('pages')
.measurement('error_percent')填补
现在来看一个有趣的小技巧。使用 Kapacitor 的录制/重放操作,此 TICKscript 可以在历史数据上运行。首先,将上述脚本另存为 error_percent.tick 并定义它。然后,为我们想要填补的过去时间范围创建一个录制。
$ kapacitor define error_percent -tick error_percent.tick
$ kapacitor record batch -task error_percent -past 1d获取录制 ID,然后用历史数据重放任务。在此处指定 -rec-time 标志,以指示 Kapacitor 在处理数据时使用录制中存储的实际时间,而不是调整为当前时间。
$ kapacitor replay -task error_percent -recording RECORDING_ID -rec-time如果数据集太大而无法保存在一个录制中,请定义一个特定的时间范围进行录制,然后单独重放每个范围。
rid=$(kapacitor record batch -task error_percent -start 2015-10-01 -stop 2015-10-02)
echo $rid
kapacitor replay -task error_percent -recording $rid -rec-time
kapacitor delete recordings $rid只需循环遍历以上脚本,为每个时间窗口重建所有需要 Thus,将为历史数据填补每一分钟的 error_percent。
流处理方法
对于流处理情况,也可以进行类似的操作。请注意,命令 kapacitor record stream 不包含与 -past 选项相同的历史记录选项,因此无法直接在 Kapacitor 中使用流任务进行填补。如果需要填补,可以使用下面介绍的 kapacitor record query 命令。
尽管如此,仍然可以使用流任务中的相同 TICKscript 语义来实时计算和存储新的计算值,例如 error_percent。
以下就是一个这样的 TICKscript。
dbrp "pages"."autogen"
// Get errors stream data
var errors = stream
|from()
.measurement('errors')
.groupBy(*)
|window()
.period(1m)
.every(1m)
|sum('value')
// Get views stream data
var views = stream
|from()
.measurement('views')
.groupBy(*)
|window()
.period(1m)
.every(1m)
|sum('value')
// Join errors and views
errors
|join(views)
.as('errors', 'views')
// Calculate percentage
|eval(lambda: "errors.sum" / ("views.sum" + "errors.sum") * 100.0)
// Give the resulting field a name
.as('value')
|influxDBOut()
.database('pages')
.measurement('error_percent')记录查询和流式填补
为了向处理多个指标的流式任务提供历史数据,请在录制数据时使用多语句。
首先使用 record query,遵循此通用命令的模式:
kapacitor record query -query $'select field1,field2,field3 from "database_name"."autogen"."one" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *; select field1,field2,field3 from "database_name"."autogen"."two" where time > \'YYYY-mm-ddTHH:MM:SSZ\' and time < \'YYYY-mm-ddTHH:MM:SSZ\' GROUP BY *' -type stream例如
$ kapacitor record query -query $'select value from "pages"."autogen"."errors" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-05-31T12:00:00Z\' GROUP BY *; select value from "pages"."autogen"."views" where time > \'2018-05-30T12:00:00Z\' and time < \'2018-12-21T12:00:00Z\' GROUP BY *' -type stream
578bf299-3566-4813-b07b-744da6ab081a然后可以使用录制的时间在 Kapacitor replay 命令中使用返回的录制 ID。
$ kapacitor replay -task error_percent_s -recording 578bf299-3566-4813-b07b-744da6ab081a -rec-time
c623f73c-cf2a-4fce-be4c-9ab89f0c6045此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 Kapacitor 和本文档提供反馈和错误报告。要获取支持,请使用以下资源: