文档文档

Python 客户端库入门程序

按照此分步教程,使用 InfluxData 客户端库和您喜欢的框架或语言构建物联网 (IoT) 应用程序。

在本教程中,您将使用 InfluxDB API 和客户端库构建现代应用程序,同时学习以下内容:

  • InfluxDB 核心概念。

  • 应用程序如何与设备和 InfluxDB 交互。

  • 如何对应用程序和设备进行 API 身份验证。

  • 如何安装客户端库。

  • 如何在 InfluxDB 中写入和查询数据。

  • 如何使用 InfluxData UI 库格式化数据并创建可视化效果。

目录

设置 InfluxDB

如果您尚未这样做,请创建 InfluxDB Cloud 帐户安装 InfluxDB OSS

IoT Starter 示例应用程序假定具有以下先决条件:

  • InfluxDB org ID
  • API 令牌(例如,具有存储桶的读取和写入权限的所有访问令牌
  • 名为 iot_center存储桶,用于存储来自设备的时间序列数据
  • 名为 iot_center_devices存储桶,用于存储设备元数据和 API 令牌 ID

IoT Starter 简介

应用程序架构有四个层:

  • InfluxDB API:InfluxDB v2 API。
  • IoT 设备:虚拟或物理设备将 IoT 数据写入 InfluxDB API。
  • UI:向服务器发送请求并在浏览器中呈现视图。
  • API:接收来自 UI 的请求,向 InfluxDB 发送请求,并处理来自 InfluxDB 的响应。

有关本教程中引用的完整代码,请参阅 influxdata/iot-api-python 存储库

创建应用程序

创建一个将包含您的 iot-api 项目的目录。以下示例代码在您的主目录中创建一个 iot-api 目录,并更改为新目录:

mkdir ~/iot-api-apps
cd ~/iot-api-apps

使用轻量级 Python Web 框架 Flask 创建您的应用程序。

  1. 在您的 ~/iot-api-apps 目录中,打开终端并输入以下命令以创建并导航到新的项目目录:

    mkdir iot-api-python && cd $_
    
  2. 在您的终端中输入以下命令,为项目创建并激活 Python 虚拟环境:

    # Create a new virtual environment named "virtualenv"
    # Python 3.8+
    python -m venv virtualenv
    
    # Activate the virtualenv (OS X & Linux)
    source virtualenv/bin/activate
    
  3. 激活完成后,在您的终端中输入以下命令,使用 pip 包安装程序(Python 随附)安装 Flask:

    pip install Flask
    
  4. 在您的项目中,创建一个 app.py 文件,该文件:

    1. 导入 Flask 包。
    2. 实例化 Flask 应用程序。
    3. 提供执行应用程序的路由。
    from flask import Flask
    app = Flask(__name__)
    
    @app.route("/")
    def hello():
      return "Hello World!"
    

    启动您的应用程序。以下示例代码在 http://localhost:3001 上启动应用程序,并启用调试和热重载:

    export FLASK_ENV=development
    flask run -h localhost -p 3001
    

    在您的浏览器中,访问 http://localhost:3001 以查看“Hello World!”响应。

安装 InfluxDB 客户端库

InfluxDB 客户端库提供以下 InfluxDB API 交互:

  • 使用 Flux 语言查询数据。
  • 将数据写入 InfluxDB。
  • 在后台批量处理数据。
  • 在失败时自动重试请求。

在您的终端中输入以下命令以安装客户端库:

pip install influxdb-client

有关客户端库的更多信息,请参阅 influxdata/influxdb-client-python 存储库

配置客户端库

InfluxDB 客户端库需要来自您的 InfluxDB 环境的配置属性。通常,您将以下属性作为应用程序的环境变量提供:

  • INFLUX_URL
  • INFLUX_TOKEN
  • INFLUX_ORG
  • INFLUX_BUCKET
  • INFLUX_BUCKET_AUTH

要设置客户端配置,请在您的项目顶级目录中创建一个 config.ini,并粘贴以下内容以提供必要的 InfluxDB 凭据:

[APP]
INFLUX_URL = <INFLUX_URL>
INFLUX_TOKEN = <INFLUX_TOKEN>
INFLUX_ORG = <INFLUX_ORG_ID>
INFLUX_BUCKET = iot_center
INFLUX_BUCKET_AUTH = iot_center_devices

替换以下内容:

  • <INFLUX_URL>:您的 InfluxDB 实例 URL。
  • <INFLUX_TOKEN>:您的 InfluxDB API 令牌,具有查询(读取)存储桶和为设备创建(写入)授权的权限。
  • <INFLUX_ORG_ID>:您的 InfluxDB 组织 ID。

构建 API

您的应用程序 API 提供服务器端 HTTP 端点,用于处理来自 UI 的请求。每个 API 端点负责以下操作:

  1. 侦听 HTTP 请求(来自 UI)。
  2. 将请求转换为 InfluxDB API 请求。
  3. 处理 InfluxDB API 响应并处理错误。
  4. 使用状态和数据进行响应(对于 UI)。

创建 API 以注册设备

在此应用程序中,注册设备是一个点,其中包含您的设备 ID、授权 ID 和 API 令牌。API 令牌和授权权限允许设备查询和写入 INFLUX_BUCKET。在本节中,您将添加 API 端点,该端点处理来自 UI 的请求,在 InfluxDB 中创建授权,并将注册设备写入 INFLUX_BUCKET_AUTH 存储桶。要了解有关 API 令牌和授权的更多信息,请参阅 管理 API 令牌

应用程序 API 使用以下 /api/v2 InfluxDB API 端点:

  • POST /api/v2/query:查询 INFLUX_BUCKET_AUTH 以查找注册设备。
  • GET /api/v2/buckets:获取 INFLUX_BUCKET 的存储桶 ID。
  • POST /api/v2/authorizations:为设备创建授权。
  • POST /api/v2/write:将设备授权写入 INFLUX_BUCKET_AUTH

为设备创建授权

在本节中,您将创建一个具有对 INFLUX_BUCKET读取-写入权限的授权,并接收设备的 API 令牌。以下示例使用以下步骤创建授权:

  1. 使用配置实例化 AuthorizationsAPI 客户端和 BucketsAPI 客户端。
  2. 检索存储桶 ID。
  3. 使用客户端库向 /api/v2/authorizations InfluxDB API 端点发送 POST 请求。

创建一个 ./api/devices.py 文件,其中包含以下内容:

# Import the dependencies.
import configparser
from datetime import datetime
from uuid import uuid4

# Import client library classes.
from influxdb_client import Authorization, InfluxDBClient, Permission, PermissionResource, Point, WriteOptions
from influxdb_client.client.authorizations_api import AuthorizationsApi
from influxdb_client.client.bucket_api import BucketsApi
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.write_api import SYNCHRONOUS

from api.sensor import Sensor

# Get the configuration key-value pairs.

config = configparser.ConfigParser()
config.read('config.ini')

def create_authorization(device_id) -> Authorization:
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=os.environ.get('INFLUX_TOKEN'),
                                     org=os.environ.get('INFLUX_ORG'))

    authorization_api = AuthorizationsApi(influxdb_client)
    # get bucket_id from bucket
    buckets_api = BucketsApi(influxdb_client)
    buckets = buckets_api.find_bucket_by_name(config.get('APP', 'INFLUX_BUCKET'))  # function returns only 1 bucket
    bucket_id = buckets.id
    org_id = buckets.org_id
    desc_prefix = f'IoTCenterDevice: {device_id}'
    org_resource = PermissionResource(org_id=org_id, id=bucket_id, type="buckets")
    read = Permission(action="read", resource=org_resource)
    write = Permission(action="write", resource=org_resource)
    permissions = [read, write]
    authorization = Authorization(org_id=org_id, permissions=permissions, description=desc_prefix)
    request = authorization_api.create_authorization(authorization=authorization)
    return request

要创建具有对 INFLUX_BUCKET读取-写入权限的授权,您需要存储桶 ID。要检索存储桶 ID,create_authorization(deviceId) 调用 BucketsAPI find_bucket_by_name 函数,该函数向 /api/v2/buckets InfluxDB API 端点发送 GET 请求。然后,create_authorization(deviceId) 在请求正文中传递新授权,其中包含以下内容:

  • 存储桶 ID。
  • 组织 ID。
  • 描述:IoTCenterDevice: DEVICE_ID
  • 存储桶的权限列表。

要了解有关 API 令牌和授权的更多信息,请参阅 管理 API 令牌

接下来,将设备授权写入存储桶

将设备授权写入存储桶

在 InfluxDB 中进行设备授权后,为设备和授权详细信息写入一个点到 INFLUX_BUCKET_AUTH。将设备授权存储在存储桶中允许您执行以下操作:

  • 报告设备授权历史记录。
  • 管理带有和不带有令牌的设备。
  • 将同一令牌分配给多个设备。
  • 刷新令牌。

要将点写入 InfluxDB,请使用 InfluxDB 客户端库向 /api/v2/write InfluxDB API 端点发送 POST 请求。在 ./api/devices.py 中,添加以下 create_device(device_id) 函数:

def create_device(device_id=None):
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=config.get('APP', 'INFLUX_TOKEN'),
                                     org=config.get('APP', 'INFLUX_ORG'))
    if device_id is None:
        device_id = str(uuid4())
    write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
    point = Point('deviceauth') \
        .tag("deviceId", device_id) \
        .field('key', f'fake_auth_id_{device_id}') \
        .field('token', f'fake_auth_token_{device_id}')
    client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET_AUTH'), record=point)
    # write() returns None on success
    if client_response is None:
        return device_id
    # Return None on failure
    return None

create_device(device_id) 接受 device_id,并在以下步骤中将数据写入 INFLUX_BUCKET_AUTH

  1. 使用来自配置的 urltokenorg 值初始化 InfluxDBClient()
  2. 初始化 WriteAPI 客户端以将数据写入 InfluxDB 存储桶。
  3. 创建 Point
  4. 使用 write_api.write()Point 写入存储桶。
  5. 检查失败 - 如果写入成功,write_api 将返回 None
  6. 如果成功,则返回 device_id;否则返回 None

该函数写入一个包含以下元素的点:

元素名称
测量deviceauth
标签deviceId设备 ID
字段key授权 ID
字段token授权 (API) 令牌

接下来,创建 API 以列出设备

创建 API 以列出设备

添加 /api/devices API 端点,该端点检索、处理和列出已注册设备。

  1. 创建一个 Flux 查询,该查询获取每个序列的最后一行,其中包含 deviceauth 测量。以下示例查询返回包含 key 字段(授权 ID)的行,并排除包含 token 字段的行(以避免向 UI 公开令牌)。

    // Flux query finds devices
     from(bucket:`${INFLUX_BUCKET_AUTH}`)
          |> range(start: 0)
          |> filter(fn: (r) => r._measurement == "deviceauth" and r._field != "token")
          |> last()
    
  2. 使用 QueryApi 客户端将 Flux 查询发送到 POST /api/v2/query InfluxDB API 端点。

    ./api/devices.py 中,添加以下内容:

    def get_device(device_id=None) -> {}:
        influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                         token=os.environ.get('INFLUX_TOKEN'),
                                         org=os.environ.get('INFLUX_ORG'))
        # Queries must be formatted with single and double quotes correctly
        query_api = QueryApi(influxdb_client)
        device_filter = ''
        if device_id:
            device_id = str(device_id)
            device_filter = f'r.deviceId == "{device_id}" and r._field != "token"'
        else:
            device_filter = f'r._field != "token"'
    
        flux_query = f'from(bucket: "{config.get("APP", "INFLUX_BUCKET_AUTH")}") ' \
                     f'|> range(start: 0) ' \
                     f'|> filter(fn: (r) => r._measurement == "deviceauth" and {device_filter}) ' \
                     f'|> last()'
    
        response = query_api.query(flux_query)
        result = []
        for table in response:
            for record in table.records:
                try:
                    'updatedAt' in record
                except KeyError:
                    record['updatedAt'] = record.get_time()
                    record[record.get_field()] = record.get_value()
                result.append(record.values)
        return result
    

    get_device(device_id) 函数执行以下操作:

    1. 实例化 QueryApi 客户端并将 Flux 查询发送到 InfluxDB。
    2. 迭代响应中的 FluxTable,并返回元组列表。

创建 IoT 虚拟设备

创建一个 ./api/sensor.py 文件,该文件生成模拟天气遥测数据。按照 示例代码 创建 IoT 虚拟设备。

接下来,为虚拟设备生成数据,并将数据写入 InfluxDB

写入遥测数据

在本节中,您将遥测数据写入 InfluxDB 存储桶。要写入数据,请使用 InfluxDB 客户端库向 /api/v2/write InfluxDB API 端点发送 POST 请求。

以下示例使用以下步骤生成数据,然后将其写入 InfluxDB:

  1. 初始化 WriteAPI 实例。
  2. 创建一个 Point,其中包含 environment 测量和温度、湿度、压力、纬度和经度的数据字段。
  3. 使用 WriteAPI write 方法将点发送到 InfluxDB。

./api/devices.py 中,添加以下 write_measurements(device_id) 函数:

def write_measurements(device_id):
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=config.get('APP', 'INFLUX_TOKEN'),
                                     org=config.get('APP', 'INFLUX_ORG'))
    write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
    virtual_device = Sensor()
    coord = virtual_device.geo()
    point = Point("environment") \
        .tag("device", device_id) \
        .tag("TemperatureSensor", "virtual_bme280") \
        .tag("HumiditySensor", "virtual_bme280") \
        .tag("PressureSensor", "virtual_bme280") \
        .field("Temperature", virtual_device.generate_measurement()) \
        .field("Humidity", virtual_device.generate_measurement()) \
        .field("Pressure", virtual_device.generate_measurement()) \
        .field("Lat", coord['latitude']) \
        .field("Lon", coord['latitude']) \
        .time(datetime.utcnow())
    print(f"Writing: {point.to_line_protocol()}")
    client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET'), record=point)
    # write() returns None on success
    if client_response is None:
        # TODO Maybe also return the data that was written
        return device_id
    # Return None on failure
    return None

查询遥测数据

在本节中,您将从 InfluxDB 存储桶检索遥测数据。要检索数据,请使用 InfluxDB 客户端库向 /api/v2/query InfluxDB API 端点发送 POST 请求。以下示例使用以下步骤检索和处理遥测数据:

  1. INFLUX_BUCKET 中查询 environment 测量。
  2. device_id 过滤结果。
  3. 返回 influxdata/giraffe UI 库 可以处理的 CSV 数据。

./api/devices.py 中,添加以下 get_measurements(device_id) 函数:

def get_measurements(query):
    influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
                                     token=os.environ.get('INFLUX_TOKEN'), org=os.environ.get('INFLUX_ORG'))
    query_api = QueryApi(influxdb_client)
    result = query_api.query_csv(query,
                                 dialect=Dialect(
                                       header=True,
                                       delimiter=",",
                                       comment_prefix="#",
                                       annotations=['group', 'datatype', 'default'],
                                       date_time_format="RFC3339"))
    response = ''
    for row in result:
        response += (',').join(row) + ('\n')
    return response

定义 API 响应

app.py 中,添加与传入请求匹配的 API 端点,并使用模块的结果进行响应。在以下 /api/devices/<device_id> 路由示例中,app.pyGETPOST 请求中检索 device_id,将其传递给 get_device(device_id) 方法,并将结果作为带有 CORS allow- 标头的 JSON 数据返回。

@app.route('/api/devices/<string:device_id>', methods=['GET', 'POST'])
def api_get_device(device_id):
    if request.method == "OPTIONS": # CORS preflight
        return _build_cors_preflight_response()
    return _corsify_actual_response(jsonify(devices.get_device(device_id)))

在您的终端中输入以下命令以重新启动应用程序:

  1. CONTROL+C 停止应用程序。
  2. flask run -h localhost -p 3001 启动应用程序。

要从您的 API 检索设备数据,请在您的浏览器中访问 http://localhost:3001/api/devices

安装并运行 UI

influxdata/iot-api-ui 是一个独立的 Next.js React UI,它使用您的应用程序 API 在 InfluxDB 中写入和查询数据。iot-api-ui 使用 Next.js 重写/api/ 路径中的所有请求路由到您的 API。

要安装和运行 UI,请执行以下操作:

  1. 在您的 ~/iot-api-apps 目录中,克隆 influxdata/iot-api-ui 存储库 并进入 iot-api-ui 目录 - 例如:

    cd ~/iot-api-apps
    git clone git@github.com:influxdata/iot-api-ui.git
    cd ./iot-app-ui
    
  2. ./.env.development 文件包含您可以编辑或覆盖的默认配置设置(使用 ./.env.local 文件)。

  3. 要启动 UI,请在您的终端中输入以下命令:

    yarn dev
    

    要查看列表和注册设备,请在您的浏览器中访问 http://localhost:3000/devices

要了解有关 UI 组件的更多信息,请参阅 influxdata/iot-api-ui


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需更改任何代码。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建于 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层级可供非商业家庭或业余爱好者使用。

有关更多信息,请查看