文档文档

将数据从 InfluxDB OSS 写入 InfluxDB Cloud

要将数据从 InfluxDB OSS 写入 InfluxDB Cloud,请使用 Flux to()experimental.to() 函数。使用单个查询执行写入数据一次,或使用 InfluxDB 任务定期将数据写入 InfluxDB Cloud

将写入复制到 InfluxDB OSS 到 InfluxDB Cloud

要将所有写入 InfluxDB OSS 实例的操作复制到 InfluxDB Cloud 实例,请使用 InfluxDB 复制流

InfluxDB Cloud 速率限制

对 InfluxDB Cloud 的写入请求受限于与您的 InfluxDB Cloud 定价方案 相关的速率限制。

  1. 从 InfluxDB OSS 查询数据。

  2. (可选) 过滤或处理要写入 InfluxDB Cloud 的数据。

  3. 使用 toexperimental.to 将数据写入 InfluxDB Cloud。对于大多数用例,to() 是要使用的正确函数,但根据您要写入的数据结构,可能需要 experimental.to

    使用以下指南:

    • to(): 用于将字段键中的数据写入 _field 列,并将字段值写入 _value 列。

    • experimental.to(): 用于将列名中的数据写入相应的字段键,并将列值写入字段值。

    请参阅 to() 函数的输入和输出示例

  4. 为任一函数提供以下参数

    • bucket: 要写入的 InfluxDB Cloud 存储桶
    • host: InfluxDB Cloud 区域 URL
    • org: InfluxDB Cloud 组织
    • token: InfluxDB Cloud API 令牌
  5. (推荐) 为了使您的原始 API 令牌不出现在查询中,请将您的 InfluxDB Cloud API 令牌存储为 InfluxDB OSS 实例中的 InfluxDB 密钥,并使用 secrets.get() 检索密钥值,如以下示例所示(选择您正在使用的函数以查看正确的格式)

import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )
import "experimental"
import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> experimental.to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )

to() 函数的输入和输出数据

  • to() 需要 _time_measurement_field_value 列。
  • to() 将所有其他列作为标签写入,其中列名称是标签键,列值是标签值。

输入数据

_time_measurementexampleTag_field_value
2021-01-01T00:00:00Zexample-mAtemp80.0
2021-01-01T00:01:00Zexample-mAtemp80.3
2021-01-01T00:02:00Zexample-mAtemp81.1
_time_measurementexampleTag_field_value
2021-01-01T00:00:00Zexample-mArpm4023
2021-01-01T00:01:00Zexample-mArpm4542
2021-01-01T00:02:00Zexample-mArpm4901

输出行协议

example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000
  • experimental.to() 需要 _time_measurement 列。
  • 组键列(_measurement 除外)被解析为标签,其中列名称是标签键,列值是标签值。
  • 不在组键中(_time_ 除外)的列被解析为字段,其中列名称是字段键,列值是字段值。

输入数据

组键 = [_measurement, exampleTag]

_time_measurementexampleTagtemprpm
2021-01-01T00:00:00Zexample-mA80.04023
2021-01-01T00:01:00Zexample-mA80.34542
2021-01-01T00:02:00Zexample-mA81.14901

输出行协议

example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000

示例

降采样并将数据写入 InfluxDB Cloud

import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> aggregateWindow(every: 1m, fn: last)
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )

将最小值、最大值和平均值写入 InfluxDB Cloud

import "influxdata/influxdb/secrets"

cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")

data = from(bucket: "example-oss-bucket")
    |> range(start: -30m)
    |> filter(fn: (r) => r._measurement == "example-measurement")

min = data |> aggregateWindow(every: 10m, fn: min) |> map(fn: (r) => ({ r with _field: "{$r._field}_min" }))
max = data |> aggregateWindow(every: 10m, fn: max) |> map(fn: (r) => ({ r with _field: "{$r._field}_max" }))
mean = data |> aggregateWindow(every: 10m, fn: mean) |> map(fn: (r) => ({ r with _field: "{$r._field}_mean" }))

union(tables: [min, max, mean])
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )

自动将数据从 InfluxDB OSS 写入 InfluxDB Cloud

要自动定期地将数据从 InfluxDB OSS 写入 InfluxDB Cloud,请在您的 InfluxDB OSS 实例中创建一个任务,该任务定期查询、处理数据并将数据写入 InfluxDB Cloud。

import "influxdata/influxdb/tasks"

option task = {name: "Downsample to InfluxDB Cloud", every: 1h}

from(bucket: "example-oss-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> aggregateWindow(every: 1m, fn: last)
    |> to(
        bucket: "example-cloud-bucket",
        host: "https://cloud2.influxdata.com",
        org: "example-org",
        token: cloudToken,
    )

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可以实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可用于非商业家庭或业余爱好者使用。

有关更多信息,请查看