文档文档

将数据从 InfluxDB Cloud 迁移到 InfluxDB Clustered

要将数据从 InfluxDB Cloud (TSM) 迁移到由 v3 存储引擎驱动的 InfluxDB Clustered,请按基于时间的批次从 TSM 驱动的存储桶中查询数据,并将查询到的数据写入 InfluxDB 集群中的数据库。由于完整的数据迁移可能会超出您的 InfluxDB Cloud 组织的限制和可调整的配额,请分批迁移您的数据。

以下指南提供了关于设置 InfluxDB 任务的说明,该任务按基于时间的批次从 InfluxDB Cloud TSM 驱动的存储桶中查询数据,并将每个批次写入另一个组织中的 InfluxDB Clustered (InfluxDB 3) 数据库。

所有查询请求均受您的 InfluxDB Cloud 组织的速率限制和可调整配额的约束。

迁移前须知

在从 InfluxDB Cloud (TSM) 迁移到 InfluxDB Clustered 之前,TSM 存储引擎支持的某些模式设计实践在 InfluxDB 3 存储引擎中不受支持。具体来说,InfluxDB 3 强制执行以下模式限制

  • 您不能对标签和字段使用重复的名称。
  • 默认情况下,measurement 最多可以包含 250 列,其中每列代表时间、字段或标签。

有关更多信息,请参阅模式限制

如果您的模式不符合这些限制,您必须在迁移到 InfluxDB Clustered 之前更新您的模式。

修复重复的标签和字段名称

修复 measurement 中总列数超过 250 列的问题

设置迁移

迁移过程需要在源 InfluxDB 组织中创建两个存储桶:一个存储桶用于存储您要迁移的数据,第二个存储桶用于存储迁移元数据。如果您使用的是InfluxDB Cloud Free Plan,并且有多个存储桶要迁移,则您将超出计划的存储桶限制。要迁移多个存储桶,您需要升级到按使用量付费计划才能完成迁移。

  1. 在您要将数据迁移的 InfluxDB 集群中:

    1. 创建一个数据库 以将数据迁移到该数据库
    2. 创建一个数据库令牌,该令牌对您要迁移到的数据库具有写入权限
  2. 在您要从中迁移数据 InfluxDB Cloud (TSM) 组织中:

    1. 使用键 influxdb3_clustered_TOKENInfluxDB Clustered 令牌(在步骤 1b 中创建)添加为密钥。有关更多信息,请参阅添加密钥

    2. 创建一个存储桶 以存储临时迁移元数据

    3. 创建一个任务,使用提供的迁移任务。更新必要的迁移配置选项

    4. (可选)设置迁移监控

    5. 保存任务。

      新创建的任务默认情况下已启用,因此当您保存任务时,数据迁移将开始。

迁移完成后,后续的每次迁移任务执行都将失败,并显示以下错误

error exhausting result iterator: error calling function "die" @41:9-41:86:
Batch range is beyond the migration range. Migration is complete.

迁移任务

配置迁移

  1. 使用 task.every 选项指定您希望任务运行的频率。请参阅确定您的任务间隔

  2. migration 记录中定义以下属性

    migration
    • start:要包含在迁移中的最早时间。请参阅确定您的迁移开始时间
    • stop:要包含在迁移中的最晚时间。
    • batchInterval:每个基于时间的批次的持续时间。请参阅确定您的批次间隔
    • batchBucket:InfluxDB Cloud (TSM) 存储桶,用于存储迁移批次元数据。
    • sourceBucket:要从中迁移数据的 InfluxDB Cloud (TSM) 存储桶。
    • destinationHost:要将数据迁移到的 InfluxDB 集群 URL。
    • destinationOrg提供一个空字符串(InfluxDB Clustered 忽略)。
    • destinationToken:InfluxDB Clustered 令牌。为了确保 API 令牌的安全,请将其作为密钥存储在 InfluxDB Cloud (TSM) 中。
    • destinationDatabase:要将数据迁移到的 InfluxDB Clustered 数据库。

迁移 Flux 脚本

import "array"
import "experimental"
import "date"
import "influxdata/influxdb/secrets"

// Configure the task
option task = {every: 5m, name: "Migrate data from TSM to v3"}

// Configure the migration
migration = {
    start: 2023-01-01T00:00:00Z,
    stop: 2023-02-01T00:00:00Z,
    batchInterval: 1h,
    batchBucket: "migration",
    sourceBucket: "example-cloud-bucket",
    destinationHost: "https://cluster-host.com",
    destinationOrg: "",
    destinationToken: secrets.get(key: "influxdb3_clustered_TOKEN"),
    destinationDatabase: "example-destination-database",
}

// batchRange dynamically returns a record with start and stop properties for
// the current batch. It queries migration metadata stored in the
// `migration.batchBucket` to determine the stop time of the previous batch.
// It uses the previous stop time as the new start time for the current batch
// and adds the `migration.batchInterval` to determine the current batch stop time.
batchRange = () => {
    _lastBatchStop =
        (from(bucket: migration.batchBucket)
            |> range(start: migration.start)
            |> filter(fn: (r) => r._field == "batch_stop")
            |> filter(fn: (r) => r.dstOrg == migration.destinationOrg)
            |> filter(fn: (r) => r.dstBucket == migration.destinationDatabase)
            |> last()
            |> findRecord(fn: (key) => true, idx: 0))._value
    _batchStart =
        if exists _lastBatchStop then
            time(v: _lastBatchStop)
        else
            migration.start

    return {start: _batchStart, stop: date.add(d: migration.batchInterval, to: _batchStart)}
}

// Define a static record with batch start and stop time properties
batch = batchRange()

// Check to see if the current batch start time is beyond the migration.stop
// time and exit with an error if it is.
finished =
    if batch.start >= migration.stop then
        die(msg: "Batch range is beyond the migration range. Migration is complete.")
    else
        "Migration in progress"

// Query all data from the specified source bucket within the batch-defined time
// range. To limit migrated data by measurement, tag, or field, add a `filter()`
// function after `range()` with the appropriate predicate fn.
data = () =>
    from(bucket: migration.sourceBucket)
        |> range(start: batch.start, stop: batch.stop)

// rowCount is a stream of tables that contains the number of rows returned in
// the batch and is used to generate batch metadata.
rowCount =
    data()
        |> count()
        |> group(columns: ["_start", "_stop"])
        |> sum()

// emptyRange is a stream of tables that acts as filler data if the batch is
// empty. This is used to generate batch metadata for empty batches and is
// necessary to correctly increment the time range for the next batch.
emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])

// metadata returns a stream of tables representing batch metadata.
metadata = () => {
    _input =
        if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
            rowCount
        else
            emptyRange

    return
        _input
            |> map(
                fn: (r) =>
                    ({
                        _time: now(),
                        _measurement: "batches",
                        srcBucket: migration.sourceBucket,
                        dstOrg: migration.destinationOrg,
                        dstBucket: migration.destinationDatabase,
                        batch_start: string(v: batch.start),
                        batch_stop: string(v: batch.stop),
                        rows: r._value,
                        percent_complete:
                            float(v: int(v: r._stop) - int(v: migration.start)) / float(
                                    v: int(v: migration.stop) - int(v: migration.start),
                                ) * 100.0,
                    }),
            )
            |> group(columns: ["_measurement", "dstOrg", "srcBucket", "dstBucket"])
}

// Write the queried data to the specified InfluxDB OSS bucket.
data()
    |> to(
        host: migration.destinationHost,
        org: migration.destinationOrg,
        token: migration.destinationToken,
        bucket: migration.destinationDatabase
    )

// Generate and store batch metadata in the migration.batchBucket.
metadata()
    |> experimental.to(bucket: migration.batchBucket)

配置帮助

确定您的任务间隔

确定您的迁移开始时间

确定您的批次间隔

排查迁移任务失败问题

如果迁移任务失败,请查看您的任务日志以识别具体错误。以下是迁移任务失败的常见原因。

超出速率限制

如果您的数据迁移导致您超出 InfluxDB Cloud 组织的限制和配额,则任务将返回类似于以下的错误

too many requests

可能的解决方案:

  • 更新您的迁移任务中的 migration.batchInterval 设置,以使用更小的间隔。然后,每个批次将查询更少的数据。

API 令牌无效

如果您添加为 influxdb3_clustered_TOKEN 的 API 令牌没有对您的 InfluxDB Clustered 数据库的写入权限,则任务将返回类似于以下的错误

unauthorized access

可能的解决方案:

  • 确保 API 令牌具有对您的 InfluxDB Clustered 数据库的写入权限。
  • 生成一个新的 InfluxDB Clustered 令牌,该令牌对您要迁移到的数据库具有写入权限。然后,使用新令牌更新您的 InfluxDB Cloud (TSM) 实例中的 influxdb3_clustered_TOKEN 密钥。

查询超时

InfluxDB Cloud 查询超时时间为 90 秒。如果返回批次间隔中的数据花费的时间超过此时间,则查询将超时,并且任务将失败。

可能的解决方案:

  • 更新您的迁移任务中的 migration.batchInterval 设置,以使用更小的间隔。然后,每个批次将查询更少的数据,并花费更少的时间来返回结果。

批次大小过大

如果您的批次大小过大,则任务将返回类似于以下的错误

internal error: error calling function "metadata" @97:1-97:11: error calling function "findRecord" @67:32-67:69: wrong number of fields

可能的解决方案:

  • 更新您的迁移任务中的 migration.batchInterval 设置,以使用更小的间隔,并检索更少的数据/批次。

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像当前一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已发布公开 Alpha 版

InfluxDB 3 开源版本现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 Alpha 版的一部分。

InfluxDB 3 Core,是我们的新开源产品。它是用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看