Python 客户端库
使用 InfluxDB Python 客户端库 将 InfluxDB 集成到 Python 脚本和应用程序中。
本指南假设您已熟悉 Python 和 InfluxDB。如果刚刚开始,请参阅InfluxDB 入门。
开始之前
安装 InfluxDB Python 库
pip install influxdb-client
确保 InfluxDB 正在运行。如果本地运行 InfluxDB,请访问 http://localhost:8086。(如果使用 InfluxDB Cloud,请访问您的 InfluxDB Cloud UI 的 URL。例如:https://us-west-2-1.aws.cloud2.influxdata.com。)
使用 Python 写入数据到 InfluxDB
我们将使用 Python 库以 Line Protocol 格式写入一些数据。
在您的 Python 程序中,导入 InfluxDB 客户端库并使用它将数据写入 InfluxDB。
import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "<my-bucket>" org = "<my-org>" token = "<my-token>" # Store the URL of your InfluxDB instance url="http://localhost:8086"
实例化客户端。
InfluxDBClient
对象接受三个命名参数:url
、org
和token
。传入命名参数。client = influxdb_client.InfluxDBClient( url=url, token=token, org=org )
InfluxDBClient
对象有一个用于配置的write_api
方法。使用
client
对象和write_api
方法实例化一个写入客户端。使用write_api
方法配置写入器对象。write_api = client.write_api(write_options=SYNCHRONOUS)
创建一个 point 对象,并使用 API 写入器对象的
write
方法将其写入 InfluxDB。write 方法需要三个参数:bucket
、org
和record
。p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) write_api.write(bucket=bucket, org=org, record=p)
完整的写入脚本示例
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "<my-bucket>"
org = "<my-org>"
token = "<my-token>"
# Store the URL of your InfluxDB instance
url="http://localhost:8086"
client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
# Write script
write_api = client.write_api(write_options=SYNCHRONOUS)
p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
write_api.write(bucket=bucket, org=org, record=p)
使用 Python 从 InfluxDB 查询数据
实例化查询客户端。
query_api = client.query_api()
创建一个 Flux 查询,然后将其格式化为 Python 字符串。
query = 'from(bucket:"my-bucket")\ |> range(start: -10m)\ |> filter(fn:(r) => r._measurement == "my_measurement")\ |> filter(fn:(r) => r.location == "Prague")\ |> filter(fn:(r) => r._field == "temperature")'
查询客户端将 Flux 查询发送到 InfluxDB,并返回一个具有表格结构的 Flux 对象。
将
query()
方法传递两个命名参数:org
和query
。result = query_api.query(org=org, query=query)
遍历 Flux 对象中的表格和记录。
- 使用
get_value()
方法返回值。 - 使用
get_field()
方法返回字段。
results = [] for table in result: for record in table.records: results.append((record.get_field(), record.get_value())) print(results) [(temperature, 25.3)]
- 使用
Flux 对象提供以下方法来访问您的数据
get_measurement()
: 返回记录的 measurement 名称。get_field()
: 返回字段名称。get_value()
: 返回实际字段值。values
: 返回列值的映射。values.get("<your tag>")
: 返回给定列的记录值。get_time()
: 返回记录的时间。get_start()
: 返回当前表格中所有记录的包含下限时间。get_stop()
: 返回当前表格中所有记录的排除上限时间。
完整的查询脚本示例
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "<my-bucket>"
org = "<my-org>"
token = "<my-token>"
# Store the URL of your InfluxDB instance
url="http://localhost:8086"
client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
# Query script
query_api = client.query_api()
query = 'from(bucket:"my-bucket")\
|> range(start: -10m)\
|> filter(fn:(r) => r._measurement == "my_measurement")\
|> filter(fn:(r) => r.location == "Prague")\
|> filter(fn:(r) => r._field == "temperature")'
result = query_api.query(org=org, query=query)
results = []
for table in result:
for record in table.records:
results.append((record.get_field(), record.get_value()))
print(results)
[(temperature, 25.3)]
更多信息,请参阅 GitHub 上的 Python 客户端 README。
此页是否对您有帮助?
感谢您的反馈!