将数据从 InfluxDB Cloud 迁移到 InfluxDB Clustered
要将数据从 InfluxDB Cloud (TSM) 迁移到由 v3 存储引擎驱动的 InfluxDB Clustered,请按基于时间的批次从 TSM 驱动的存储桶中查询数据,并将查询到的数据写入 InfluxDB 集群中的数据库。由于完整的数据迁移可能会超出您的 InfluxDB Cloud 组织的限制和可调整的配额,请分批迁移您的数据。
以下指南提供了关于设置 InfluxDB 任务的说明,该任务按基于时间的批次从 InfluxDB Cloud TSM 驱动的存储桶中查询数据,并将每个批次写入另一个组织中的 InfluxDB Clustered (InfluxDB 3) 数据库。
所有查询请求均受您的 InfluxDB Cloud 组织的速率限制和可调整配额的约束。
迁移前须知
在从 InfluxDB Cloud (TSM) 迁移到 InfluxDB Clustered 之前,TSM 存储引擎支持的某些模式设计实践在 InfluxDB 3 存储引擎中不受支持。具体来说,InfluxDB 3 强制执行以下模式限制
- 您不能对标签和字段使用重复的名称。
- 默认情况下,measurement 最多可以包含 250 列,其中每列代表时间、字段或标签。
有关更多信息,请参阅模式限制。
如果您的模式不符合这些限制,您必须在迁移到 InfluxDB Clustered 之前更新您的模式。
设置迁移
迁移过程需要在源 InfluxDB 组织中创建两个存储桶:一个存储桶用于存储您要迁移的数据,第二个存储桶用于存储迁移元数据。如果您使用的是InfluxDB Cloud Free Plan,并且有多个存储桶要迁移,则您将超出计划的存储桶限制。要迁移多个存储桶,您需要升级到按使用量付费计划才能完成迁移。
在您要将数据迁移到的 InfluxDB 集群中:
在您要从中迁移数据的 InfluxDB Cloud (TSM) 组织中:
迁移完成后,后续的每次迁移任务执行都将失败,并显示以下错误
error exhausting result iterator: error calling function "die" @41:9-41:86:
Batch range is beyond the migration range. Migration is complete.
迁移任务
配置迁移
使用
task.every
选项指定您希望任务运行的频率。请参阅确定您的任务间隔。在
migration
记录中定义以下属性migration
- start:要包含在迁移中的最早时间。请参阅确定您的迁移开始时间。
- stop:要包含在迁移中的最晚时间。
- batchInterval:每个基于时间的批次的持续时间。请参阅确定您的批次间隔。
- batchBucket:InfluxDB Cloud (TSM) 存储桶,用于存储迁移批次元数据。
- sourceBucket:要从中迁移数据的 InfluxDB Cloud (TSM) 存储桶。
- destinationHost:要将数据迁移到的 InfluxDB 集群 URL。
- destinationOrg:提供一个空字符串(InfluxDB Clustered 忽略)。
- destinationToken:InfluxDB Clustered 令牌。为了确保 API 令牌的安全,请将其作为密钥存储在 InfluxDB Cloud (TSM) 中。
- destinationDatabase:要将数据迁移到的 InfluxDB Clustered 数据库。
迁移 Flux 脚本
import "array"
import "experimental"
import "date"
import "influxdata/influxdb/secrets"
// Configure the task
option task = {every: 5m, name: "Migrate data from TSM to v3"}
// Configure the migration
migration = {
start: 2023-01-01T00:00:00Z,
stop: 2023-02-01T00:00:00Z,
batchInterval: 1h,
batchBucket: "migration",
sourceBucket: "example-cloud-bucket",
destinationHost: "https://cluster-host.com",
destinationOrg: "",
destinationToken: secrets.get(key: "influxdb3_clustered_TOKEN"),
destinationDatabase: "example-destination-database",
}
// batchRange dynamically returns a record with start and stop properties for
// the current batch. It queries migration metadata stored in the
// `migration.batchBucket` to determine the stop time of the previous batch.
// It uses the previous stop time as the new start time for the current batch
// and adds the `migration.batchInterval` to determine the current batch stop time.
batchRange = () => {
_lastBatchStop =
(from(bucket: migration.batchBucket)
|> range(start: migration.start)
|> filter(fn: (r) => r._field == "batch_stop")
|> filter(fn: (r) => r.dstOrg == migration.destinationOrg)
|> filter(fn: (r) => r.dstBucket == migration.destinationDatabase)
|> last()
|> findRecord(fn: (key) => true, idx: 0))._value
_batchStart =
if exists _lastBatchStop then
time(v: _lastBatchStop)
else
migration.start
return {start: _batchStart, stop: date.add(d: migration.batchInterval, to: _batchStart)}
}
// Define a static record with batch start and stop time properties
batch = batchRange()
// Check to see if the current batch start time is beyond the migration.stop
// time and exit with an error if it is.
finished =
if batch.start >= migration.stop then
die(msg: "Batch range is beyond the migration range. Migration is complete.")
else
"Migration in progress"
// Query all data from the specified source bucket within the batch-defined time
// range. To limit migrated data by measurement, tag, or field, add a `filter()`
// function after `range()` with the appropriate predicate fn.
data = () =>
from(bucket: migration.sourceBucket)
|> range(start: batch.start, stop: batch.stop)
// rowCount is a stream of tables that contains the number of rows returned in
// the batch and is used to generate batch metadata.
rowCount =
data()
|> count()
|> group(columns: ["_start", "_stop"])
|> sum()
// emptyRange is a stream of tables that acts as filler data if the batch is
// empty. This is used to generate batch metadata for empty batches and is
// necessary to correctly increment the time range for the next batch.
emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])
// metadata returns a stream of tables representing batch metadata.
metadata = () => {
_input =
if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
rowCount
else
emptyRange
return
_input
|> map(
fn: (r) =>
({
_time: now(),
_measurement: "batches",
srcBucket: migration.sourceBucket,
dstOrg: migration.destinationOrg,
dstBucket: migration.destinationDatabase,
batch_start: string(v: batch.start),
batch_stop: string(v: batch.stop),
rows: r._value,
percent_complete:
float(v: int(v: r._stop) - int(v: migration.start)) / float(
v: int(v: migration.stop) - int(v: migration.start),
) * 100.0,
}),
)
|> group(columns: ["_measurement", "dstOrg", "srcBucket", "dstBucket"])
}
// Write the queried data to the specified InfluxDB OSS bucket.
data()
|> to(
host: migration.destinationHost,
org: migration.destinationOrg,
token: migration.destinationToken,
bucket: migration.destinationDatabase
)
// Generate and store batch metadata in the migration.batchBucket.
metadata()
|> experimental.to(bucket: migration.batchBucket)
配置帮助
排查迁移任务失败问题
如果迁移任务失败,请查看您的任务日志以识别具体错误。以下是迁移任务失败的常见原因。
超出速率限制
如果您的数据迁移导致您超出 InfluxDB Cloud 组织的限制和配额,则任务将返回类似于以下的错误
too many requests
可能的解决方案:
- 更新您的迁移任务中的
migration.batchInterval
设置,以使用更小的间隔。然后,每个批次将查询更少的数据。
API 令牌无效
如果您添加为 influxdb3_clustered_TOKEN
的 API 令牌没有对您的 InfluxDB Clustered 数据库的写入权限,则任务将返回类似于以下的错误
unauthorized access
可能的解决方案:
- 确保 API 令牌具有对您的 InfluxDB Clustered 数据库的写入权限。
- 生成一个新的 InfluxDB Clustered 令牌,该令牌对您要迁移到的数据库具有写入权限。然后,使用新令牌更新您的 InfluxDB Cloud (TSM) 实例中的
influxdb3_clustered_TOKEN
密钥。
查询超时
InfluxDB Cloud 查询超时时间为 90 秒。如果返回批次间隔中的数据花费的时间超过此时间,则查询将超时,并且任务将失败。
可能的解决方案:
- 更新您的迁移任务中的
migration.batchInterval
设置,以使用更小的间隔。然后,每个批次将查询更少的数据,并花费更少的时间来返回结果。
批次大小过大
如果您的批次大小过大,则任务将返回类似于以下的错误
internal error: error calling function "metadata" @97:1-97:11: error calling function "findRecord" @67:32-67:69: wrong number of fields
可能的解决方案:
- 更新您的迁移任务中的
migration.batchInterval
设置,以使用更小的间隔,并检索更少的数据/批次。
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB Clustered 和此文档的反馈和错误报告。要寻求支持,请使用以下资源
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。