to() 函数
to()
将数据写入 InfluxDB Cloud 或 2.x bucket,并返回写入的数据。
输出数据要求
to()
写入使用标准 InfluxDB Cloud 和 v2.x 数据结构组织的数据,该结构至少包含以下列
_time
_measurement
_field
_value
所有其他列都作为标签写入 InfluxDB。
注意: to()
会删除 _time
值为 null 的行,并且不会将其写入 InfluxDB。
to() 不需要导入包
to()
是 influxdata/influxdb
包的一部分,但它是 Flux prelude 的一部分,不需要 import 语句或包命名空间。
函数类型签名
(
<-tables: stream[A],
?bucket: string,
?bucketID: string,
?fieldFn: (r: A) => B,
?host: string,
?measurementColumn: string,
?org: string,
?orgID: string,
?tagColumns: [string],
?timeColumn: string,
?token: string,
) => stream[A] where A: Record, B: Record
有关更多信息,请参阅函数类型签名。
参数
bucket
要写入的 bucket 的名称。bucket
和 bucketID
是互斥的。
bucketID
要写入的 bucket ID 的字符串编码。bucket
和 bucketID
是互斥的。
host
要写入的 InfluxDB 实例的 URL。
请参阅InfluxDB Cloud 区域或InfluxDB OSS URL。写入远程 InfluxDB 实例时,host
是必需的。如果指定,token
也是必需的。
org
组织名称。org
和 orgID
是互斥的。
orgID
要查询的组织 ID 的字符串编码。org
和 orgID
是互斥的。
token
InfluxDB API token。
InfluxDB 1.x 或 Enterprise: 如果禁用身份验证,请提供空字符串 (""
)。如果启用身份验证,请使用 <username>:<password>
语法提供您的 InfluxDB 用户名和密码。写入另一个组织或指定 host
时,token
是必需的。
timeColumn
输出的时间列。默认为 "_time"
。
measurementColumn
输出的测量列。默认为 "_measurement"
。
tagColumns
输出中的标签列。默认为类型为 string
的所有列,排除所有值列和 fieldFn
标识的列。
fieldFn
将字段键映射到字段值并返回记录的函数。默认为 (r) => ({ [r._field]: r._value })
。
tables
输入数据。默认为管道转发数据 (<-
)。
示例
将数据写入 InfluxDB
data =
array.from(
rows: [
{
_time: 2021-01-01T00:00:00Z,
_measurement: "m",
tag1: "a",
_field: "temp",
_value: 100.1,
},
{
_time: 2021-01-01T00:01:00Z,
_measurement: "m",
tag1: "a",
_field: "temp",
_value: 99.8,
},
{
_time: 2021-01-01T00:02:00Z,
_measurement: "m",
tag1: "a",
_field: "temp",
_value: 99.1,
},
{
_time: 2021-01-01T00:03:00Z,
_measurement: "m",
tag1: "a",
_field: "temp",
_value: 98.6,
},
],
)
data
|> to(
bucket: "example-bucket",
org: "example-org",
token: "mYSuP3rSecR37t0k3N",
host: "http://localhost:8086",
)
上面的示例生成以下行协议并将其发送到 InfluxDB /api/v2/write
端点
m,tag1=a temp=100.1 1609459200000000000
m,tag1=a temp=99.8 1609459260000000000
m,tag1=a temp=99.1 1609459320000000000
m,tag1=a temp=98.6 1609459380000000000
自定义 to() 操作中的测量、标签和字段列
data =
array.from(
rows: [
{
_time: 2021-01-01T00:00:00Z,
tag1: "a",
tag2: "b",
hum: 53.3,
temp: 100.1,
},
{
_time: 2021-01-01T00:01:00Z,
tag1: "a",
tag2: "b",
hum: 53.4,
temp: 99.8,
},
{
_time: 2021-01-01T00:02:00Z,
tag1: "a",
tag2: "b",
hum: 53.6,
temp: 99.1,
},
{
_time: 2021-01-01T00:03:00Z,
tag1: "a",
tag2: "b",
hum: 53.5,
temp: 98.6,
},
],
)
data
|> to(
bucket: "example-bucket",
measurementColumn: "tag1",
tagColumns: ["tag2"],
fieldFn: (r) => ({"hum": r.hum, "temp": r.temp}),
)
上面的示例生成以下行协议并将其发送到 InfluxDB /api/v2/write
端点
a,tag2=b hum=53.3,temp=100.1 1609459200000000000
a,tag2=b hum=53.4,temp=99.8 1609459260000000000
a,tag2=b hum=53.6,temp=99.1 1609459320000000000
a,tag2=b hum=53.5,temp=98.6 1609459380000000000
写入多个 InfluxDB bucket
下面的示例执行以下操作
- 将数据写入
bucket1
并返回写入的数据。 - 应用空分组键将所有行分组到单个表中。
- 计算行数。
- 映射写入 InfluxDB 所需的列。
- 将修改后的数据写入
bucket2
。
data
|> to(bucket: "bucket1")
|> group()
|> count()
|> map(
fn: (r) => ({r with _time: now(), _measurement: "writeStats", _field: "numPointsWritten"}),
)
|> to(bucket: "bucket2")
此页面是否对您有帮助?
感谢您的反馈!