使用 InfluxDB 客户端库写入数据
使用与您的代码集成的 InfluxDB 3 客户端库,将数据构造为时间序列点,然后以行协议的形式写入 InfluxDB 3 Core 数据库。
设置您的项目
设置您的 InfluxDB 3 Core 项目和凭据,以便使用您选择的编程语言的 InfluxDB 3 客户端库来写入数据。
- 安装 InfluxDB 3 Core
- 设置 InfluxDB 3 Core
- 创建一个项目目录,并将您的 InfluxDB 3 Core 凭据存储为环境变量或在项目配置文件中,例如
.env(“dotenv”) 文件。
设置 InfluxDB 3 Core 和您的项目后,您应该拥有以下内容
InfluxDB 3 Core 凭据
用于您的项目的目录。
凭据存储为环境变量或在项目配置文件中 - 例如,一个
.env(“dotenv”) 文件。
初始化项目目录
创建一个项目目录并为其初始化您的编程语言。
安装 Go 1.13 或更高版本。
为您的 Go 模块创建一个目录并切换到该目录 - 例如
mkdir iot-starter-go && cd $_初始化一个 Go 模块 - 例如
go mod init iot-starter
安装 Node.js。
为您的 JavaScript 项目创建一个目录并切换到该目录 - 例如
mkdir -p iot-starter-js && cd $_初始化一个项目 - 例如,使用
npmnpm init
安装客户端库
安装您选择的编程语言的 InfluxDB 3 客户端库。
使用 dotnet CLI 将 InfluxDB 3 C# 客户端库添加到您的项目,或通过将包添加到您的项目文件 - 例如
dotnet add package InfluxDB3.Client使用 go get 命令将 InfluxDB 3 Go 客户端库添加到您的项目 - 例如
go mod init path/to/project/dir && cd $_
go get github.com/InfluxCommunity/influxdb3-go/v2/influxdb3使用 Maven Gradle 构建工具将 InfluxDB 3 Java 客户端库添加到您的项目依赖项中。
例如,要将库添加到 Maven 项目,请将以下依赖项添加到您的 pom.xml 文件中
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb3-java</artifactId>
<version>1.1.0</version>
</dependency>要将库添加到 Gradle 项目,请将以下依赖项添加到您的 build.gradle 文件中
dependencies {
implementation 'com.influxdb:influxdb3-java:1.1.0'
}对于 Node.js 项目,请使用 @influxdata/influxdb3-client,它提供主(CommonJS)、模块(ESM)和浏览器(UMD)导出。使用您喜欢的包管理器将 InfluxDB 3 JavaScript 客户端库添加到您的项目中 - 例如,使用 npm
npm install --save @influxdata/influxdb3-client使用 pip 安装 InfluxDB 3 Python 客户端库。要使用 Python 客户端库提供的 Pandas 功能(例如 to_pandas()),您还必须安装 pandas 包。
pip install influxdb3-python pandas构建行协议
通过对 行协议有基本了解,您可以构建行协议数据并将其写入 InfluxDB 3 Core。
使用客户端库写入方法,将数据提供为原始行协议或 Point 对象,客户端库会将 Point 对象转换为行协议。如果您的程序创建您写入 InfluxDB 的数据,请使用 Point 接口来利用程序中的类型安全。
客户端库提供一个或多个 Point 构造方法。某些库支持语言原生数据结构,例如 Go 的 struct,用于创建点。
本指南中的示例展示了如何构建符合 示例 home schema 的 Point 对象,然后将这些点作为行协议数据写入 InfluxDB 3 Core 数据库。
示例 home schema
考虑一个收集您家中传感器数据的用例。每个传感器收集温度、湿度和一氧化碳读数。
要收集此数据,请使用以下 schema
- 表 (table):
home- 标签 (tags)
room:Living Room 或 Kitchen
- 字段 (fields)
temp:温度(摄氏度,浮点数)hum:湿度百分比(浮点数)co:一氧化碳(百万分之几,整数)
- 时间戳 (timestamp):以秒精度为单位的 Unix 时间戳
- 标签 (tags)
为您的模块创建一个文件 - 例如:
main.go。在
main.go中,输入以下示例代码package main import ( "context" "os" "fmt" "time" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" "github.com/influxdata/line-protocol/v2/lineprotocol" ) func Write() error { url := os.Getenv("INFLUX_HOST") token := os.Getenv("INFLUX_TOKEN") database := os.Getenv("INFLUX_DATABASE") // To instantiate a client, call New() with InfluxDB credentials. client, err := influxdb3.New(influxdb3.ClientConfig{ Host: url, Token: token, Database: database, }) /** Use a deferred function to ensure the client is closed when the * function returns. **/ defer func (client *influxdb3.Client) { err = client.Close() if err != nil { panic(err) } }(client) /** Use the NewPoint method to construct a point. * NewPoint(measurement, tags map, fields map, time) **/ point := influxdb3.NewPoint("home", map[string]string{ "room": "Living Room", }, map[string]any{ "temp": 24.5, "hum": 40.5, "co": 15i}, time.Now(), ) /** Use the NewPointWithMeasurement method to construct a point with * method chaining. **/ point2 := influxdb3.NewPointWithMeasurement("home"). SetTag("room", "Living Room"). SetField("temp", 23.5). SetField("hum", 38.0). SetField("co", 16i). SetTimestamp(time.Now()) fmt.Println("Writing points") points := []*influxdb3.Point{point, point2} /** Write points to InfluxDB. * You can specify WriteOptions, such as Gzip threshold, * default tags, and timestamp precision. Default precision is lineprotocol.Nanosecond **/ err = client.WritePoints(context.Background(), points, influxdb3.WithPrecision(lineprotocol.Second)) return nil } func main() { Write() }要运行该模块并将数据写入您的 InfluxDB 3 Core 数据库,请在终端中输入以下命令
go run main.go
为您的模块创建一个文件 - 例如:
write-points.js。在
write-points.js中,输入以下示例代码// write-points.js import { InfluxDBClient, Point } from '@influxdata/influxdb3-client'; /** * Set InfluxDB credentials. */ const host = process.env.INFLUX_HOST ?? ''; const database = process.env.INFLUX_DATABASE; const token = process.env.INFLUX_TOKEN; /** * Write line protocol to InfluxDB using the JavaScript client library. */ export async function writePoints() { /** * Instantiate an InfluxDBClient. * Provide the host URL and the database token. */ const client = new InfluxDBClient({ host, token }); /** Use the fluent interface with chained methods to construct Points. */ const point = Point.measurement('home') .setTag('room', 'Living Room') .setFloatField('temp', 22.2) .setFloatField('hum', 35.5) .setIntegerField('co', 7) .setTimestamp(new Date().getTime() / 1000); const point2 = Point.measurement('home') .setTag('room', 'Kitchen') .setFloatField('temp', 21.0) .setFloatField('hum', 35.9) .setIntegerField('co', 0) .setTimestamp(new Date().getTime() / 1000); /** Write points to InfluxDB. * The write method accepts an array of points, the target database, and * an optional configuration object. * You can specify WriteOptions, such as Gzip threshold, default tags, * and timestamp precision. Default precision is lineprotocol.Nanosecond **/ try { await client.write([point, point2], database, '', { precision: 's' }); console.log('Data has been written successfully!'); } catch (error) { console.error(`Error writing data to InfluxDB: ${error.body}`); } client.close(); } writePoints();要运行该模块并将数据写入您的 {{< product-name >}} 数据库,请在终端中输入以下命令
node writePoints.js
为您的模块创建一个文件 - 例如:
write-points.py。在
write-points.py中,输入以下示例代码以批处理模式写入数据import os from influxdb_client_3 import ( InfluxDBClient3, InfluxDBError, Point, WritePrecision, WriteOptions, write_client_options) host = os.getenv('INFLUX_HOST') token = os.getenv('INFLUX_TOKEN') database = os.getenv('INFLUX_DATABASE') # Create an array of points with tags and fields. points = [Point("home") .tag("room", "Kitchen") .field("temp", 25.3) .field('hum', 20.2) .field('co', 9)] # With batching mode, define callbacks to execute after a successful or # failed write request. # Callback methods receive the configuration and data sent in the request. def success(self, data: str): print(f"Successfully wrote batch: data: {data}") def error(self, data: str, exception: InfluxDBError): print(f"Failed writing batch: config: {self}, data: {data} due: {exception}") def retry(self, data: str, exception: InfluxDBError): print(f"Failed retry writing batch: config: {self}, data: {data} retry: {exception}") # Configure options for batch writing. write_options = WriteOptions(batch_size=500, flush_interval=10_000, jitter_interval=2_000, retry_interval=5_000, max_retries=5, max_retry_delay=30_000, exponential_base=2) # Create an options dict that sets callbacks and WriteOptions. wco = write_client_options(success_callback=success, error_callback=error, retry_callback=retry, write_options=write_options) # Instantiate a synchronous instance of the client with your # InfluxDB credentials and write options, such as Gzip threshold, default tags, # and timestamp precision. Default precision is nanosecond ('ns'). with InfluxDBClient3(host=host, token=token, database=database, write_client_options=wco) as client: client.write(points, write_precision='s')要运行该模块并将数据写入您的 InfluxDB 3 Core 数据库,请在终端中输入以下命令
python write-points.py
示例代码执行以下操作
- 实例化一个配置了 InfluxDB URL 和 API 令牌的客户端。
- 构造
home表Point对象。 - 将数据作为行协议格式发送到 InfluxDB 并等待响应。
- 如果写入成功,将成功消息记录到 stdout;否则,将失败消息和错误详细信息记录到 stdout。
- 关闭客户端以释放资源。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。