将数据从 InfluxDB OSS 写入 InfluxDB Cloud
要将数据从 InfluxDB OSS 写入 InfluxDB Cloud,请使用 Flux to()
或 experimental.to()
函数。使用单个查询执行写入数据一次,或使用 InfluxDB 任务 以 定期将数据写入 InfluxDB Cloud。
将写入复制到 InfluxDB OSS 到 InfluxDB Cloud
要将所有写入 InfluxDB OSS 实例的操作复制到 InfluxDB Cloud 实例,请使用 InfluxDB 复制流。
InfluxDB Cloud 速率限制
对 InfluxDB Cloud 的写入请求受限于与您的 InfluxDB Cloud 定价方案 相关的速率限制。
从 InfluxDB OSS 查询数据。
(可选) 过滤或处理要写入 InfluxDB Cloud 的数据。
使用
to
或experimental.to
将数据写入 InfluxDB Cloud。对于大多数用例,to()
是要使用的正确函数,但根据您要写入的数据结构,可能需要experimental.to
。使用以下指南:
to(): 用于将字段键中的数据写入
_field
列,并将字段值写入_value
列。experimental.to(): 用于将列名中的数据写入相应的字段键,并将列值写入字段值。
请参阅
to()
函数的输入和输出示例。为任一函数提供以下参数
- bucket: 要写入的 InfluxDB Cloud 存储桶
- host: InfluxDB Cloud 区域 URL
- org: InfluxDB Cloud 组织
- token: InfluxDB Cloud API 令牌
(推荐) 为了使您的原始 API 令牌不出现在查询中,请将您的 InfluxDB Cloud API 令牌存储为 InfluxDB OSS 实例中的 InfluxDB 密钥,并使用
secrets.get()
检索密钥值,如以下示例所示(选择您正在使用的函数以查看正确的格式)
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
import "experimental"
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> experimental.to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
to()
函数的输入和输出数据
to()
需要_time
、_measurement
、_field
和_value
列。to()
将所有其他列作为标签写入,其中列名称是标签键,列值是标签值。
输入数据
_time | _measurement | exampleTag | _field | _value |
---|---|---|---|---|
2021-01-01T00:00:00Z | example-m | A | temp | 80.0 |
2021-01-01T00:01:00Z | example-m | A | temp | 80.3 |
2021-01-01T00:02:00Z | example-m | A | temp | 81.1 |
_time | _measurement | exampleTag | _field | _value |
---|---|---|---|---|
2021-01-01T00:00:00Z | example-m | A | rpm | 4023 |
2021-01-01T00:01:00Z | example-m | A | rpm | 4542 |
2021-01-01T00:02:00Z | example-m | A | rpm | 4901 |
输出行协议
example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000
experimental.to()
需要_time
和_measurement
列。- 组键 中的列(
_measurement
除外)被解析为标签,其中列名称是标签键,列值是标签值。 - 不在组键中(
_time_
除外)的列被解析为字段,其中列名称是字段键,列值是字段值。
输入数据
组键 = [_measurement, exampleTag]
_time | _measurement | exampleTag | temp | rpm |
---|---|---|---|---|
2021-01-01T00:00:00Z | example-m | A | 80.0 | 4023 |
2021-01-01T00:01:00Z | example-m | A | 80.3 | 4542 |
2021-01-01T00:02:00Z | example-m | A | 81.1 | 4901 |
输出行协议
example-m,exampleTag=A temp=80.0,rpm=4023i 1609459200000000000
example-m,exampleTag=A temp=80.3,rpm=4542i 1609459260000000000
example-m,exampleTag=A temp=81.1,rpm=4901i 1609459320000000000
示例
降采样并将数据写入 InfluxDB Cloud
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> aggregateWindow(every: 1m, fn: last)
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
将最小值、最大值和平均值写入 InfluxDB Cloud
import "influxdata/influxdb/secrets"
cloudToken = secrets.get(key: "INFLUX_CLOUD_API_TOKEN")
data = from(bucket: "example-oss-bucket")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "example-measurement")
min = data |> aggregateWindow(every: 10m, fn: min) |> map(fn: (r) => ({ r with _field: "{$r._field}_min" }))
max = data |> aggregateWindow(every: 10m, fn: max) |> map(fn: (r) => ({ r with _field: "{$r._field}_max" }))
mean = data |> aggregateWindow(every: 10m, fn: mean) |> map(fn: (r) => ({ r with _field: "{$r._field}_mean" }))
union(tables: [min, max, mean])
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
自动将数据从 InfluxDB OSS 写入 InfluxDB Cloud
要自动定期地将数据从 InfluxDB OSS 写入 InfluxDB Cloud,请在您的 InfluxDB OSS 实例中创建一个任务,该任务定期查询、处理数据并将数据写入 InfluxDB Cloud。
import "influxdata/influxdb/tasks"
option task = {name: "Downsample to InfluxDB Cloud", every: 1h}
from(bucket: "example-oss-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> aggregateWindow(every: 1m, fn: last)
|> to(
bucket: "example-cloud-bucket",
host: "https://cloud2.influxdata.com",
org: "example-org",
token: cloudToken,
)
此页是否对您有帮助?
感谢您的反馈!