PostgreSQL 输出插件
此插件将指标写入PostgreSQL(或兼容)服务器,并管理模式和自动更新丢失的列。
首次引入: Telegraf v1.24.0 标签: datastore 操作系统支持: all
全局配置选项
插件支持其他全局和插件配置设置,用于修改指标、标签和字段,创建别名以及配置插件顺序等任务。更多详情请参阅 CONFIGURATION.md。
启动错误行为选项
除了插件特定的和全局的配置设置外,该插件还支持使用 startup_error_behavior 设置来指定出现启动错误时的行为。可用值如下:
error:如果出现启动错误,Telegraf 将停止并退出。这是默认行为。ignore:Telegraf 将忽略此插件的启动错误,并禁用它,但会继续处理所有其他插件。retry: Telegraf 会在每次收集或写入周期内尝试启动插件,以防出现启动错误。在启动成功之前,插件将被禁用。probe: Telegraf 将(如果可能)探测插件的功能,并在探测失败时禁用该插件。如果插件不支持探测,Telegraf 将表现得如同设置了ignore一样。
Secret-store 支持
此插件支持通过 connection 选项使用 secret-stores 中的密钥。有关如何使用它们的更多详细信息,请参阅 secret-store 文档。
配置
# Publishes metrics to a postgresql database
[[outputs.postgresql]]
## Specify connection address via the standard libpq connection string:
## host=... user=... password=... sslmode=... dbname=...
## Or a URL:
## postgres://[user[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
## See https://postgresql.ac.cn/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
##
## All connection parameters are optional. Environment vars are also supported.
## e.g. PGPASSWORD, PGHOST, PGUSER, PGDATABASE
## All supported vars can be found here:
## https://postgresql.ac.cn/docs/current/libpq-envars.html
##
## Non-standard parameters:
## pool_max_conns (default: 1) - Maximum size of connection pool for parallel (per-batch per-table) inserts.
## pool_min_conns (default: 0) - Minimum size of connection pool.
## pool_max_conn_lifetime (default: 0s) - Maximum connection age before closing.
## pool_max_conn_idle_time (default: 0s) - Maximum idle time of a connection before closing.
## pool_health_check_period (default: 0s) - Duration between health checks on idle connections.
# connection = ""
## Postgres schema to use.
# schema = "public"
## Store tags as foreign keys in the metrics table. Default is false.
# tags_as_foreign_keys = false
## Suffix to append to table name (measurement name) for the foreign tag table.
# tag_table_suffix = "_tag"
## Deny inserting metrics if the foreign tag can't be inserted.
# foreign_tag_constraint = false
## Store all tags as a JSONB object in a single 'tags' column.
# tags_as_jsonb = false
## Store all fields as a JSONB object in a single 'fields' column.
# fields_as_jsonb = false
## Name of the timestamp column
## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
# timestamp_column_name = "time"
## Type of the timestamp column
## Currently, "timestamp without time zone" and "timestamp with time zone"
## are supported
# timestamp_column_type = "timestamp without time zone"
## Templated statements to execute when creating a new table.
# create_templates = [
# '''CREATE TABLE {{ .table }} ({{ .columns }})''',
# ]
## Templated statements to execute when adding columns to a table.
## Set to an empty list to disable. Points containing tags for which there is
## no column will be skipped. Points containing fields for which there is no
## column will have the field omitted.
# add_column_templates = [
# '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
# ]
## Templated statements to execute when creating a new tag table.
# tag_table_create_templates = [
# '''CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))''',
# ]
## Templated statements to execute when adding columns to a tag table.
## Set to an empty list to disable. Points containing tags for which there is
## no column will be skipped.
# tag_table_add_column_templates = [
# '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
# ]
## The postgres data type to use for storing unsigned 64-bit integer values
## (Postgres does not have a native unsigned 64-bit integer type).
## The value can be one of:
## numeric - Uses the PostgreSQL "numeric" data type.
## uint8 - Requires pguint extension (https://github.com/petere/pguint)
# uint64_type = "numeric"
## When using pool_max_conns > 1, and a temporary error occurs, the query is
## retried with an incremental backoff. This controls the maximum duration.
# retry_max_backoff = "15s"
## Approximate number of tag IDs to store in in-memory cache (when using
## tags_as_foreign_keys). This is an optimization to skip inserting known
## tag IDs. Each entry consumes approximately 34 bytes of memory.
# tag_cache_size = 100000
## Cut column names at the given length to not exceed PostgreSQL's
## 'identifier length' limit (default: no limit)
## (see https://postgresql.ac.cn/docs/current/limits.html)
## Be careful to not create duplicate column names!
# column_name_length_limit = 0
## Enable & set the log level for the Postgres driver.
# log_level = "warn" # trace, debug, info, warn, error, none并发
默认情况下,postgresql 插件不使用任何并发。但是,它可以为了提高吞吐量而使用。当并发关闭时,telegraf 核心会处理诸如失败重试、缓冲等问题。当使用并发时,这些方面必须由插件来处理。
要启用并发写入数据库,请将 pool_max_conns 连接参数设置为大于 1 的值。启用后,传入的批次将按测量/表名进行拆分。此外,如果一个批次传入,而前一个批次尚未完成,那么新批次也将使用并发。
如果所有连接都已使用并且连接池已耗尽,则后续传入的批次将在 telegraf 核心内部进行缓冲。
外键标签
当使用 tags_as_foreign_keys 时,标签将写入一个单独的表,其中包含一个用于连接的 tag_id 列。每个序列(标签值的唯一组合)在标签表中都有自己的条目和一个唯一的 tag_id。
数据类型
默认情况下,postgresql 插件将 Influx 数据类型映射到以下 PostgreSQL 类型
| Influx | PostgreSQL |
|---|---|
| float | double precision |
| integer | bigint |
| uinteger | numeric* |
| string | text |
| boolean | boolean |
| unix timestamp | timestamp |
需要注意的是,uinteger(无符号 64 位整数)被映射到 numeric PostgreSQL 数据类型。numeric 数据类型是一个任意精度的十进制数据类型,其效率低于 bigint。这是必要的,因为 Influx uinteger 数据类型的值范围可能会超过 bigint,从而在插入数据时导致错误。
pguint
为了解决 uinteger/numeric 数据类型问题,有一个 PostgreSQL 扩展提供了无符号 64 位整数支持:https://github.com/petere/pguint。
如果安装了此扩展,您可以启用 unsigned_integers 配置参数,这将导致插件使用 uint8 数据类型而不是 numeric。
模板
postgresql 插件使用模板来修改模式的 SQL 语句。这使用户能够完全控制模式。
有关如何编写模板的文档可以在 sqltemplate 文档中找到
长列名
Postgres 对列标识符的长度有限制,可以在 官方文档中找到。默认情况下,Telegraf 不强制执行此限制,因为此限制可以在服务器端修改。此外,截断列名可能导致冲突,如果列仅在截断之后不同。
请确保您不会在设置 column_name_length_limit 时导致列名冲突!如有疑问,请使用例如 regexp processor 显式缩短字段和标签名称。
示例
TimescaleDB
tags_as_foreign_keys = true
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .columns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '7d')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
]多节点
tags_as_foreign_keys = true
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .columns }})''',
'''SELECT create_distributed_hypertable({{ .table|quoteLiteral }}, 'time', partitioning_column => 'tag_id', number_partitions => (SELECT count(*) FROM timescaledb_information.data_nodes)::integer, replication_factor => 2, chunk_time_interval => INTERVAL '7d')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
]带视图的标签表
此示例启用了 tags_as_foreign_keys,但创建了一个 postgres 视图来自动连接度量和标签表。度量和标签表存储在“telegraf”模式中,而视图在“public”模式中。
tags_as_foreign_keys = true
schema = "telegraf"
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .columns }})''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
add_column_templates = [
'''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
'''DROP VIEW IF EXISTS {{ .table.WithSchema "public" }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
tag_table_add_column_templates = [
'''ALTER TABLE {{.table}} ADD COLUMN IF NOT EXISTS {{.columns|join ", ADD COLUMN IF NOT EXISTS "}}''',
'''DROP VIEW IF EXISTS {{ .metricTable.WithSchema "public" }}''',
'''CREATE VIEW {{ .metricTable.WithSchema "public" }} AS SELECT time, {{ (.allColumns.Tags.Concat .metricTable.Columns.Fields).Identifiers | join "," }} FROM {{ .metricTable }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]不可变数据表
某些兼容 PostgreSQL 的数据库不允许在初始创建后修改表结构。此示例通过创建一个新表,然后使用视图将它们连接起来来解决此限制。
tags_as_foreign_keys = true
schema = 'telegraf'
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '7d')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '14d')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
add_column_templates = [
'''ALTER TABLE {{ .table }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash).WithSchema "" }}''',
'''ALTER VIEW {{ .table.WithSuffix "_data" }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash "_data").WithSchema "" }}''',
'''DROP VIEW {{ .table.WithSchema "public" }}''',
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '7d')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '14d')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }} UNION ALL SELECT {{ (.allColumns.Union .table.Columns).Selectors | join "," }} FROM {{ .table.WithSuffix "_" .table.Columns.Hash "_data" }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
tag_table_add_column_templates = [
'''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
'''DROP VIEW {{ .metricTable.WithSchema "public" }}''',
'''CREATE VIEW {{ .metricTable.WithSchema "public" }} AS SELECT time, {{ (.allColumns.Tags.Concat .metricTable.Columns.Fields).Identifiers | join "," }} FROM {{ .metricTable.WithSuffix "_data" }} t, {{ .table }} tt WHERE t.tag_id = tt.tag_id''',
]索引
在时间戳和标签列上创建索引,以加快数据查询速度。
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .columns }})''',
'''CREATE INDEX ON {{ .table }} USING btree({{ .columns.Keys.Identifiers | join "," }})'''
]错误处理
当插件遇到写入数据库的错误时,它会尝试确定错误是暂时的还是永久的。如果重试写入可能成功,则认为错误是暂时的。一些临时错误的例子包括连接中断、死锁等。永久性错误包括数据类型无效、权限不足等。
当确定错误是暂时的时,插件将以递增的退避策略重试写入。
当确定错误是永久的时,插件将丢弃子批次。“子批次”是正在写入同一表的输入批次的组成部分。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 Telegraf 和本文档提出反馈和 bug 报告。要获取支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。