处理延迟到达的数据
在某些情况下,由于网络延迟或其他问题,您的时间序列数据可能会延迟到达 InfluxDB。为确保计算的聚合结果正确,您必须在聚合和降采样任务中考虑数据延迟。本指南将通过一个使用 InfluxDB 任务和 API 可调用脚本 来检测和处理延迟数据的方法进行介绍。
场景
您正在收集和存储 100 个不同地点的水位数据。每个地点的数据每 10 秒报告一次。每个地点的网络连接状况不同,但可以确保报告的数据至少每小时写入一次 InfluxDB,甚至可能更频繁。
设置
要遵循本指南,您需要创建以下资源
- 一个 所有访问令牌。
- 三个 InfluxDB 存储桶
- water_level_raw:存储原始水位数据。
- water_level_mean:存储一分钟的水位平均值。平均值包含过去一小时内延迟到达的数据。
- water_level_checksum:存储一分钟的水位计数。该计数用作每分钟窗口的校验和。
- 一个 API 可调用脚本
water_level_process.flux:此脚本计算每分钟的水位平均值,并计算用于计算水位平均值的数据点的数量。平均值和计数分别写入 water_level_mean 和 water_level_checksum 存储桶。
- 一个任务
water_level_checksum.flux:此任务触发water_level_process.flux脚本。此任务还会重新计算用于计算最近一个水位平均值的数据点的数量。它将 water_level_checksum 存储桶中的最新计数与此新计数进行比较,并触发对水位平均值的重新计算,以适应因延迟到达数据而导致的计数增加。
在此过程中,您将计算每分钟窗口内每个地点的平均水位。它旨在处理延迟最多一小时到达的数据。来自每个地点的数据每 10 秒写入一次。此外,每 10 秒,每个地点的延迟数据点都会写入过去一小时内的某个位置。
概述
在深入研究代码之前,先从高层次了解 Flux 脚本的逻辑。

water_level_checksum.flux 是一个每分钟运行的任务。它计算 water_level_raw 存储桶中的数据点数量(新计数),并将其与 water_level_checksum 存储桶中的计数(旧计数)进行比较。如果 water_level_raw 存储桶中的新计数不等于 water_level_checksum 存储桶中的计数,则该任务会调用 water_level_process.flux API 可调用脚本,该脚本将重新计算旧计数和聚合。
Flux 脚本详解
water_level_process.flux
water_level_process.flux 是一个可调用脚本,执行以下两项操作
- 计算
start和stop脚本参数定义的时间范围内的值的平均值,并将计算出的平均值写入 water_level_mean 存储桶。 - 计算
start和stop脚本参数定义的时间范围内的点数或总数,并将计数写入 water_level_checksum 存储桶。
// Compute and store the mean for the window
from(bucket: "water_level_raw")
|> range(start: params.start, stop: params.stop)
|> mean()
|> to(bucket: "water_level_mean", timeColumn: "_stop")
|> yield(name: "means")
// Compute and store the new checksum for this window
from(bucket: "water_level_raw")
|> range(start: params.start, stop: params.stop)
|> group(columns: ["_measurement", "_field", "_stop"])
|> count()
|> to(bucket: "water_level_checksum", timeColumn: "_stop")
|> yield(name: "checksums")water_level_checsum.flux
water_level_process.flux 是一个执行以下操作的任务
- 计算过去一小时内 water_level_raw 存储桶中的数据点数量(新计数),按一分钟窗口划分。
- 调用
water_level_process.flux可调用脚本,按一分钟窗口计算新的平均值和新计数。 - 获取 water_level_checksum 存储桶中过去一小时的先前计数(旧计数)。
- 联接旧计数和新计数,并比较旧计数与新计数。
- 过滤不匹配的计数。
- 调用
water_level_process.flux可调用脚本,为计数不匹配的每个一分钟窗口重新计算平均值和计数。
任务详情
task选项提供了任务的配置设置name:为任务提供一个名称。every:定义任务运行的频率(每分钟一次),以及在此情况下用于计算平均值和计数的窗口间隔。offset:定义执行任务之前等待的时间。_offset 不会改变任务查询的时间范围。_
invokeScripts()是一个调用water_level_process.flux可调用脚本的自定义函数。
option task = {name: "water_level_checksum", every: 1m, offset: 10s}
invokeScript = (start, stop, scriptID) =>
requests.post(
url: "https://cloud2.influxdata.com/api/v2/scripts/${scriptID}/invoke",
headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
)首先计算新计数并将其存储在变量 newCounts 中。newCounts =
from(bucket: "water_level_raw")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
|> aggregateWindow(every: every, fn: count)其中范围的 start 和 stop 值定义为start = date.truncate(t: -late_window, unit: every)
stop = date.truncate(t: now(), unit: every. late_window 等于您愿意等待延迟数据到达的最长时间(在本例中为 )。date.truncate() 函数用于将开始和停止时间截断到最近的分钟,以确保您成功地重新计算具有相同时间戳的值。其中 every = task.every。由于任务以 1 分钟的间隔运行,因此 every 等于 1m。此外,请记住,aggregateWindow 函数默认使用 _stop 列作为聚合值的新时间值的来源。
接下来,使用以下代码计算当前平均值和计数// 始终计算最近的时间间隔
newCounts
|> filter(fn: (r) => r._time == stop)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "current")filter 以获取最后一个 newCount 值。请记住,由于 aggregateWindow() 函数的默认行为,时间值等于 stop 值。然后遍历这个单行表,调用 invokeScript 函数一次。在这里,您还将值传递给 date.sub(d: every, from: r._time) 和 r._time 的 start 和 stop 参数,分别。请记住,every 变量等于 1m。实际上,这意味着您将计算 1 分钟间隔的平均值和计数(时间戳已正确截断,以确保稍后重新计算的平均值被覆盖)。此代码可确保您至少调用一次 water_level_process.flux 脚本,以将新的平均值和计数分别写入 water_level_mean 和 water_level_checksum 存储桶。
接下来,查询 water_level_checksum 存储桶过去一小时的数据
oldCounts =
from(bucket: "water_level_checksum")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])请记住,这里的 start 和 stop 时间等于 - and now(),截断到分钟。
现在将旧计数和新计数联接在一起。您还过滤了计数不同的情况。如果它们确实不同,则响应中将有记录可以进行映射。遍历这些记录,通过调用 level_water_process.flux 脚本来重新计算平均值和计数
experimental.join(
left: oldCounts,
right: newCounts,
fn: (left, right) => ({left with old_count: left._value, new_count: right._value}),
)
// Recompute any windows where the checksum is different
|> filter(fn: (r) => r.old_count != r.new_count)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "diffs")完整的 water_level_checsum.flux 如下所示
import "influxdata/influxdb/secrets"
import "experimental/http/requests"
import "json"
import "date"
import "experimental"
option task = {name: "water_level_checksum", every: 1m, offset: 10s}
// Size of the window to aggregate
every = task.every
// Longest we are willing to wait for late data
late_window = 1h
token = secrets.get(key: "SELF_TOKEN")
// invokeScript calls a Flux script with the given start stop
// parameters to recompute the window.
invokeScript = (start, stop) =>
requests.post(
// We have hardcoded the script ID here
url: "https://eastus-1.azure.cloud2.influxdata.com/api/v2/scripts/095fabd404108000/invoke",
headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
)
// Only query windows that span a full minute
start = date.truncate(t: -late_window, unit: every)
stop = date.truncate(t: now(), unit: every)
newCounts =
from(bucket: "water_level_raw")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
|> aggregateWindow(every: every, fn: count)
// Always compute the most recent interval
newCounts
|> filter(fn: (r) => r._time == stop)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "current")
oldCounts =
from(bucket: "water_level_checksum")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
// Compare old and new checksum
experimental.join(
left: oldCounts,
right: newCounts,
fn: (left, right) => ({left with old_count: left._value, new_count: right._value}),
)
// Recompute any windows where the checksum is different
|> filter(fn: (r) => r.old_count != r.new_count)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "diffs")此页面是否有帮助?
感谢您的反馈!