Documentation

to() 函数

to() 将数据写入 InfluxDB Cloud 或 2.x bucket,并返回写入的数据。

输出数据要求

to() 写入使用标准 InfluxDB Cloud 和 v2.x 数据结构组织的数据,该结构至少包含以下列

  • _time
  • _measurement
  • _field
  • _value

所有其他列都作为标签写入 InfluxDB。

注意: to() 会删除 _time 值为 null 的行,并且不会将其写入 InfluxDB。

to() 不需要导入包

to()influxdata/influxdb 包的一部分,但它是 Flux prelude 的一部分,不需要 import 语句或包命名空间。

函数类型签名
(
    <-tables: stream[A],
    ?bucket: string,
    ?bucketID: string,
    ?fieldFn: (r: A) => B,
    ?host: string,
    ?measurementColumn: string,
    ?org: string,
    ?orgID: string,
    ?tagColumns: [string],
    ?timeColumn: string,
    ?token: string,
) => stream[A] where A: Record, B: Record

有关更多信息,请参阅函数类型签名

参数

bucket

要写入的 bucket 的名称。bucketbucketID 是互斥的

bucketID

要写入的 bucket ID 的字符串编码。bucketbucketID 是互斥的

host

要写入的 InfluxDB 实例的 URL。

请参阅InfluxDB Cloud 区域InfluxDB OSS URL。写入远程 InfluxDB 实例时,host 是必需的。如果指定,token 也是必需的。

org

组织名称。orgorgID 是互斥的

orgID

要查询的组织 ID 的字符串编码。orgorgID 是互斥的

token

InfluxDB API token。

InfluxDB 1.x 或 Enterprise: 如果禁用身份验证,请提供空字符串 ("")。如果启用身份验证,请使用 <username>:<password> 语法提供您的 InfluxDB 用户名和密码。写入另一个组织或指定 host 时,token 是必需的。

timeColumn

输出的时间列。默认为 "_time"

measurementColumn

输出的测量列。默认为 "_measurement"

tagColumns

输出中的标签列。默认为类型为 string 的所有列,排除所有值列和 fieldFn 标识的列。

fieldFn

将字段键映射到字段值并返回记录的函数。默认为 (r) => ({ [r._field]: r._value })

tables

输入数据。默认为管道转发数据 (<-)。

示例

将数据写入 InfluxDB

data =
    array.from(
        rows: [
            {
                _time: 2021-01-01T00:00:00Z,
                _measurement: "m",
                tag1: "a",
                _field: "temp",
                _value: 100.1,
            },
            {
                _time: 2021-01-01T00:01:00Z,
                _measurement: "m",
                tag1: "a",
                _field: "temp",
                _value: 99.8,
            },
            {
                _time: 2021-01-01T00:02:00Z,
                _measurement: "m",
                tag1: "a",
                _field: "temp",
                _value: 99.1,
            },
            {
                _time: 2021-01-01T00:03:00Z,
                _measurement: "m",
                tag1: "a",
                _field: "temp",
                _value: 98.6,
            },
        ],
    )

data
    |> to(
        bucket: "example-bucket",
        org: "example-org",
        token: "mYSuP3rSecR37t0k3N",
        host: "http://localhost:8086",
    )

上面的示例生成以下行协议并将其发送到 InfluxDB /api/v2/write 端点

m,tag1=a temp=100.1 1609459200000000000
m,tag1=a temp=99.8 1609459260000000000
m,tag1=a temp=99.1 1609459320000000000
m,tag1=a temp=98.6 1609459380000000000

自定义 to() 操作中的测量、标签和字段列

data =
    array.from(
        rows: [
            {
                _time: 2021-01-01T00:00:00Z,
                tag1: "a",
                tag2: "b",
                hum: 53.3,
                temp: 100.1,
            },
            {
                _time: 2021-01-01T00:01:00Z,
                tag1: "a",
                tag2: "b",
                hum: 53.4,
                temp: 99.8,
            },
            {
                _time: 2021-01-01T00:02:00Z,
                tag1: "a",
                tag2: "b",
                hum: 53.6,
                temp: 99.1,
            },
            {
                _time: 2021-01-01T00:03:00Z,
                tag1: "a",
                tag2: "b",
                hum: 53.5,
                temp: 98.6,
            },
        ],
    )

data
    |> to(
        bucket: "example-bucket",
        measurementColumn: "tag1",
        tagColumns: ["tag2"],
        fieldFn: (r) => ({"hum": r.hum, "temp": r.temp}),
    )

上面的示例生成以下行协议并将其发送到 InfluxDB /api/v2/write 端点

a,tag2=b hum=53.3,temp=100.1 1609459200000000000
a,tag2=b hum=53.4,temp=99.8 1609459260000000000
a,tag2=b hum=53.6,temp=99.1 1609459320000000000
a,tag2=b hum=53.5,temp=98.6 1609459380000000000

写入多个 InfluxDB bucket

下面的示例执行以下操作

  1. 将数据写入 bucket1 并返回写入的数据。
  2. 应用空分组键将所有行分组到单个表中。
  3. 计算行数。
  4. 映射写入 InfluxDB 所需的列。
  5. 将修改后的数据写入 bucket2
data
    |> to(bucket: "bucket1")
    |> group()
    |> count()
    |> map(
        fn: (r) => ({r with _time: now(), _measurement: "writeStats", _field: "numPointsWritten"}),
    )
    |> to(bucket: "bucket2")

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像当前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可在实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建于 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层级可供非商业家用或业余爱好者使用。

有关更多信息,请查看