文档文档

处理延迟到达的数据

在某些情况下,由于网络延迟或其他问题,您的时间序列数据可能会延迟到达 InfluxDB。为确保您计算的聚合是正确的,您必须在聚合和下采样任务中考虑数据延迟。本指南将介绍一种使用 InfluxDB 任务和API 可调用脚本检测和处理延迟到达数据的方法。

场景

您正在收集和存储 100 个不同位置的水位。每个位置的数据每 10 秒报告一次。每个位置的网络连接各不相同,但报告的数据可以放心地每小时至少写入一次 InfluxDB,甚至更频繁。

设置

为了遵循本指南,您需要创建以下资源

  • 一个 All-Access 令牌
  • 三个 InfluxDB bucket
    • water_level_raw:存储原始水位数据。
    • water_level_mean:存储水位的一分钟平均值。平均值包括来自过去一小时的延迟到达数据。
    • water_level_checksum:存储水位的一分钟计数。该计数用作每个一分钟窗口的校验和。
  • 一个 API 可调用脚本
    • water_level_process.flux:此脚本计算分钟水位平均值,并计算水位平均值计算中使用的点数。平均值和计数分别写入 water_level_meanwater_level_checksum bucket。
  • 一个任务
    • water_level_checksum.flux:此任务触发 water_level_process.flux 脚本。此任务还会重新计算用于计算最近水位平均值的点数。它将 water_level_checksum bucket 中的最新计数与此新计数进行比较,并在来自延迟到达数据的计数增加时触发水位平均值的重新计算。

在此过程中,您计算每个位置在一分钟窗口内的平均水位。它旨在处理最多延迟一小时到达的数据。每个位置的数据每 10 秒写入一次。此外,每 10 秒,每个位置的过去一小时内的某个位置都会写入一个延迟数据点。

概述

在深入代码之前,先大致了解一下 Flux 脚本的逻辑。

Late arriving data architecture

water_level_checksum.flux 是一个每分钟运行一次的任务。它计算 water_level_raw bucket 中存在的点数(新计数),并将该计数与 water_level_checksum bucket 中的计数(旧计数)进行比较。如果来自 water_level_raw bucket 的新计数不等于来自 water_level_checksum bucket 的计数,则该任务调用 water_level_process.flux API 可调用脚本,该脚本重新计算旧计数和聚合。

Flux 脚本详细信息

water_level_process.flux

water_level_process.flux 是一个可调用脚本,它执行两项操作

  1. 计算由 startstop 脚本参数定义的时间范围内的值的平均值,并将计算出的平均值写入 water_level_mean bucket。
  2. 计算由 startstop 脚本参数定义的时间范围内的点数或总点数,并将计数写入 water_level_checksum bucket。
// Compute and store the mean for the window
from(bucket: "water_level_raw")
    |> range(start: params.start, stop: params.stop)
    |> mean()
    |> to(bucket: "water_level_mean", timeColumn: "_stop")
    |> yield(name: "means")

// Compute and store the new checksum for this window
from(bucket: "water_level_raw")
    |> range(start: params.start, stop: params.stop)
    |> group(columns: ["_measurement", "_field", "_stop"])
    |> count()
    |> to(bucket: "water_level_checksum", timeColumn: "_stop")
    |> yield(name: "checksums")

使用 APICLI 创建可调用脚本。

water_level_checsum.flux

water_level_process.flux 是一个执行以下操作的任务

  1. 计算过去一小时内 water_level_raw bucket 中的点数(新计数),跨越一分钟的窗口。
  2. 调用 water_level_process.flux 可调用脚本,以计算跨越一分钟窗口的新平均值和新计数。
  3. 收集 water_level_checksum bucket 中过去一小时的先前计数(旧计数)。
  4. 连接旧流和新流,并比较旧计数和新计数。
  5. 筛选出计数不匹配的情况。
  6. 调用 water_level_process.flux 可调用脚本,以重新计算每个一分钟窗口的平均值和计数,其中计数不匹配。

任务详细信息

  • task 选项为任务提供配置设置
    • name:为任务提供名称。
    • every:定义任务运行的频率(每分钟一次),在本例中,定义用于计算平均值和计数的窗口间隔。
    • offset:定义执行任务前等待的时间量。_offset 不会更改任务查询的时间范围。_
  • invokeScripts() 是一个自定义函数,用于调用 water_level_process.flux 可调用脚本。
    • startstop 参数是必需的。
    • scriptID 是必需的。使用 APICLI 查找 scriptID
    • 将您的 InfluxDB API 令牌存储为 InfluxDB secret,并使用 secrets 程序包检索令牌。
option task = {name: "water_level_checksum", every: 1m, offset: 10s}

invokeScript = (start, stop, scriptID) =>
    requests.post(
        url: "https://cloud2.influxdata.com/api/v2/scripts/${scriptID}/invoke",

        headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
        body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
    )

首先,计算新计数并将其存储在变量 newCounts 中。
newCounts =

    from(bucket: "water_level_raw")
        |> range(start: start, stop: stop)
        |> group(columns: ["_measurement", "_field"])
        |> aggregateWindow(every: every, fn: count)

其中范围的开始和停止值定义为
start = date.truncate(t: -late_window, unit: every)

stop = date.truncate(t: now(), unit: every. late_window 等于您愿意等待延迟到达数据的最长时间(在本例中等于)。 date.truncate() 函数用于将开始和停止时间截断到最新的分钟,以确保您成功地在相同的时间戳上重新计算值。其中 every = task.every。由于任务以 1 分钟间隔运行,因此 every 等于 1m。此外,请记住,aggregateWindow 函数默认使用 _stop 列作为聚合值的新时间值的来源。

接下来,使用以下代码计算当前平均值和计数
// 始终计算最近的时间间隔

newCounts
    |> filter(fn: (r) => r._time == stop)
    |> map(
        fn: (r) => {
            response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)

            return {r with code: response.statusCode}
        },
    )
    |> yield(name: "current")

筛选出最后一个 newCount 值。请记住,由于 aggregateWindow() 函数的默认行为,时间值等于停止值。然后映射该单行表以调用 invokeScript 函数一次。在这里,您还将值传递给 date.sub(d: every, from: r._time)r._time 的 start 和 stop 参数。请记住,every 变量等于 1m。实际上,这意味着您将计算 1 分钟间隔内的平均值和计数(时间戳已适当截断,以确保稍后覆盖重新计算的平均值)。此代码确保您至少调用一次 water_level_process.flux 脚本,以将新的平均值和计数分别写入 water_level_mean 和 water_level_checksum bucket。

接下来,查询 water_level_checksum bucket 过去一小时的数据

oldCounts =
    from(bucket: "water_level_checksum")
        |> range(start: start, stop: stop)
        |> group(columns: ["_measurement", "_field"])

请记住,此处的开始和停止时间等于 - 和 now() 截断到分钟。

现在将旧计数和新计数连接在一起。您还可以筛选出计数不同的情况。如果它们确实不同,则响应中将有可以映射的记录。映射这些记录以通过调用 level_water_process.flux 脚本来重新计算平均值和计数

experimental.join(
    left: oldCounts,
    right: newCounts,
    fn: (left, right) => ({left with old_count: left._value, new_count: right._value}),
)
    // Recompute any windows where the checksum is different
    |> filter(fn: (r) => r.old_count != r.new_count)
    |> map(
        fn: (r) => {
            response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)

            return {r with code: response.statusCode}
        },
    )
    |> yield(name: "diffs")

完整的 water_level_checsum.flux 如下所示

import "influxdata/influxdb/secrets"
import "experimental/http/requests"
import "json"
import "date"
import "experimental"

option task = {name: "water_level_checksum", every: 1m, offset: 10s}

// Size of the window to aggregate
every = task.every

// Longest we are willing to wait for late data
late_window = 1h

token = secrets.get(key: "SELF_TOKEN")

// invokeScript calls a Flux script with the given start stop
// parameters to recompute the window.
invokeScript = (start, stop) =>
    requests.post(
        // We have hardcoded the script ID here
        url: "https://eastus-1.azure.cloud2.influxdata.com/api/v2/scripts/095fabd404108000/invoke",
        headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
        body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
    )

// Only query windows that span a full minute
start = date.truncate(t: -late_window, unit: every)
stop = date.truncate(t: now(), unit: every)

newCounts =
    from(bucket: "water_level_raw")
        |> range(start: start, stop: stop)
        |> group(columns: ["_measurement", "_field"])
        |> aggregateWindow(every: every, fn: count)

// Always compute the most recent interval
newCounts
    |> filter(fn: (r) => r._time == stop)
    |> map(
        fn: (r) => {
            response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)

            return {r with code: response.statusCode}
        },
    )
    |> yield(name: "current")

oldCounts =
    from(bucket: "water_level_checksum")
        |> range(start: start, stop: stop)
        |> group(columns: ["_measurement", "_field"])

// Compare old and new checksum
experimental.join(
    left: oldCounts,
    right: newCounts,
    fn: (left, right) => ({left with old_count: left._value, new_count: right._value}),
)
    // Recompute any windows where the checksum is different
    |> filter(fn: (r) => r.old_count != r.new_count)
    |> map(
        fn: (r) => {
            response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)

            return {r with code: response.statusCode}
        },
    )
    |> yield(name: "diffs")

此页内容对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像当前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久保存到本地磁盘或对象存储。InfluxDB 3 Enterprise 以 Core 的基础为基础,增加了高可用性、读取副本、增强的安全性和数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看