文档文档

贡献新的 Kapacitor 输出节点

如果您还没有这样做,请查看 Kapacitor 贡献指南,了解如何开始贡献。

目标

向 Kapacitor 添加一个可以将数据输出到自定义终结点的新节点。在本指南中,我们假设要将数据输出到一个虚构的内部数据库,名为 HouseDB。

概述

Kapacitor 通过管道处理数据。管道本质上是一个有向无环图(DAG)。基本思想是,图中的每个节点代表对数据进行某种形式的处理,每个边将数据在节点之间传递。为了添加一种新类型的节点,需要编写两个组件:

  1. 用于创建和配置节点的 API(TICKscript),以及
  2. 数据处理实现的步骤。

在我们的示例中,数据处理步骤是将数据输出到 HouseDB。

代码通过两个 Go 包镜像了这些要求。

  1. pipeline:此包定义了可用的节点类型及其配置方式。
  2. kapacitor:此包提供了 pipeline 包中定义的每个节点的实现。

为了使 API(即 TICKscript)清晰易读,节点定义与其实现是分开的。

更新 TICKscript

首先,我们需要更新 TICKscript,以便用户可以定义我们的新节点。为了将数据发送到 HouseDB,TICKscript 应该是什么样的?要连接到 HouseDB 实例,我们需要 URL 和数据库名称,因此我们需要一种提供这些信息的方式。这样如何?

    node
        |houseDBOut()
            .url('house://housedb.example.com')
            .database('metrics')

为了更新 TICKscript 以支持这些新方法,我们需要编写一个实现 pipeline.Node 接口的 Go 类型。该接口可以在 此处找到,并且通过 pipeline.node 类型提供了完整的实现。由于 Node 的实现已为我们完成,我们只需使用它。首先,我们需要一个名称。 HouseDBOutNode 遵循命名约定。让我们定义一个 Go struct,它将通过组合来实现该接口。在 pipeline 目录中创建一个名为 housedb_out.go 的文件,其中包含以下内容:

package pipeline

// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
    // Include the generic node implementation.
    node
}

就这样,我们在 Go 中有了一个实现所需接口的类型。为了允许我们需要的 .url.database 方法,只需在类型上定义同名的字段。首字母必须大写,以便导出。导出字段很重要,因为它们将被 kapacitor 包中的节点使用。名称的其余部分应与方法名称的大小写保持一致。TICKscript 将在运行时处理大小写匹配。更新 housedb_out.go 文件。

package pipeline

// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
    // Include the generic node implementation.
    node

    // URL for connecting to HouseDB
    Url string

    // Database name
    Database string
}

接下来,我们需要一种一致的方式来创建我们节点的新实例。但在此之前,我们需要考虑该节点如何与其他节点连接。由于 Kapacitor 将其视为输出节点,因此它是管道的末端。我们不会提供任何出向边,图在此节点结束。我们想象中的 HouseDB 非常灵活,可以按批次或单个数据点存储数据。因此,我们不关心 HouseDBOutNode 节点接收什么类型的数据。考虑到这些事实,我们可以定义一个函数来创建新的 HouseDBOutNode。将此函数添加到 housedb_out.go 文件的末尾:

// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
    return &HouseDBOutNode{
        node: node{
            desc: "housedb",
            wants: wants,
            provides: NoEdge,
        }
    }
}

通过显式声明节点想要提供的边类型,Kapacitor 将进行必要的类型检查,以防止出现无效的管道。

最后,我们需要添加一个新的链式方法,以便用户可以将 HouseDBOutNodes 连接到他们现有的管道。链式方法是一种创建新节点并将其添加为调用节点子节点的方法。实际上,该方法将节点链接在一起。pipeline.chainnode 类型包含可用于链接节点的所有方法的集合。一旦我们将我们的方法添加到该类型,任何其他节点现在都可以与 HouseDBOutNode 链接。将此函数添加到 pipeline/node.go 文件的末尾:

// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
    h := newHouseDBOutNode(c.Provides())
    c.linkChild(h)
    return h
}

现在,我们已经定义了所有必要的部分,以便 TICKscript 可以定义 HouseDBOutNodes。

    node
        |houseDBOut() // added as a method to the 'chainnode' type
            .url('house://housedb.example.com') // added as a field to the HouseDBOutNode
            .database('metrics') // added as a field to the HouseDBOutNode

实现 HouseDB 输出

现在 TICKscript 可以定义我们的新输出节点,我们需要实际提供一个实现,以便 Kapacitor 知道如何处理该节点。pipeline 包中的每个节点在 kapacitor 包中都有一个同名的节点。创建一个名为 housedb_out.go 的文件,并将其放在仓库的根目录下。将以下内容放入文件中。

package kapacitor

import (
    "github.com/influxdb/kapacitor/pipeline"
)

type HouseDBOutNode struct {
    // Include the generic node implementation
    node
    // Keep a reference to the pipeline node
    h *pipeline.HouseDBOutNode
}

kapacitor 包还定义了一个名为 Node 的接口,并通过 kapacitor.node 类型提供了默认实现。我们再次使用组合来实现接口。请注意,我们还有一个字段,它将包含我们刚刚完成定义的 pipeline.HouseDBOutNode 的实例。这个 pipeline.HouseDBOutNode 充当配置结构,告诉 kapacitor.HouseDBOutNode 需要做什么来完成其工作。

现在我们有了一个结构,让我们定义一个函数来创建我们新结构的一个实例。kapacitor 包中的 new*Node 方法遵循以下约定:

func newNodeName(et *ExecutingTask, n *pipeline.NodeName) (*NodeName, error) {}

在我们的例子中,我们想定义一个名为 newHouseDBOutNode 的函数。将以下方法添加到 housedb_out.go 文件:

func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
    h := &HouseDBOutNode{
        // pass in necessary fields to the 'node' struct
        node: node{Node: n, et: et, diag: d},
        // Keep a reference to the pipeline.HouseDBOutNode
        h: n,
    }
    // Set the function to be called when running the node
    // more on this in a bit.
    h.node.runF = h.runOut
    return h
}

为了创建我们节点的一个实例,我们需要将其与 pipeline 包中的节点关联起来。这可以通过 task.go 文件中的 createNode 方法中的 switch 语句来完成。继续我们的示例:

// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
    switch t := p.(type) {
    ...
	case *pipeline.HouseDBOutNode:
		n, err = newHouseDBOutNode(et, t, d)
    ...
}

现在我们已经关联了我们的两种类型,让我们回到实现输出代码。请注意 newHouseDBOutNode 函数中的行 h.node.runF = h.runOut。当节点开始执行时,此行设置了 kapacitor.HouseDBOutNode 将被调用的方法。现在我们需要定义 runOut 方法。在 housedb_out.go 文件中,添加此方法:

func (h *HouseDBOutNode) runOut(snapshot []byte) error {
    return nil
}

通过此更改,HouseDBOutNode 在语法上是完整的,但尚未执行任何操作。让我们让它做点什么!

正如我们之前所学,节点通过边进行通信。有一个名为 edge.Edge 的 Go 类型处理这种通信。我们所要做的就是从边读取数据并将其发送到 HouseDB。数据以 edge.Message 类型表示。节点使用 edge.Consumer 读取消息,并通过实现 edge.Receiver 接口来处理消息。ConsumerReceiver 接口都可以在 此处找到。

我们在 HouseDBOutNode 中通过组合包含的 node 类型在名为 ins 的字段中提供了边的列表。由于 HouseDBOutNode 只有一个父节点,我们关心的边是第 0 条边。我们可以使用 NewConsumerWithReceiver 函数从边消耗和处理消息。

// NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.
func NewConsumerWithReceiver(e Edge, r Receiver) Consumer {
	return &consumer{
		edge: e,
		r:    r,
	}
}

让我们更新 runOut 以使用此函数读取和处理消息:

func (h *HouseDBOutNode) runOut(snapshot []byte) error {
	consumer := edge.NewConsumerWithReceiver(
		n.ins[0],
		h,
	)
	return consumer.Consume()
}

剩下的是让 HouseDBOutNode 实现 Receiver 接口,并编写一个函数来接收一批点并将其写入 HouseDB。为了方便起见,我们可以使用 edge.BatchBuffer 来接收批处理消息。我们还可以将单个点消息转换为只包含一个点的批处理消息。

func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BeginBatch(begin)
}

func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BatchPoint(bp)
}

func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
    msg := h.batchBuffer.BufferedBatchMessage(end)
    return msg, h.write(msg)
}

func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
	batch := edge.NewBufferedBatchMessage(
		edge.NewBeginBatchMessage(
			p.Name(),
			p.Tags(),
			p.Dimensions().ByName,
			p.Time(),
			1,
		),
		[]edge.BatchPointMessage{
			edge.NewBatchPointMessage(
				p.Fields(),
				p.Tags(),
				p.Time(),
			),
		},
		edge.NewEndBatchMessage(),
	)
    return p, h.write(batch)
}

func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
	return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
	return d, nil
}
func (h *HouseDBOutNode) Done() {}

// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
    // Implement writing to HouseDB here...
    return nil
}

一旦我们实现了 write 方法,我们就完成了。当数据到达 HouseDBOutNode 时,它将被写入指定的 HouseDB 实例。

总结

我们首先在 pipeline 包中编写了一个节点(文件路径:pipeline/housedb_out.go),以定义将数据发送到 HouseDB 实例的 TICKscript API。然后,我们在 kapacitor 包中编写了该节点的实现(文件路径:housedb_out.go)。我们还更新了 pipeline/node.go 以添加新的链式方法,并更新了 task.go 以关联这两种类型。

以下是完整的文件内容:

pipeline/housedb_out.go

package pipeline

// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
    // Include the generic node implementation.
    node

    // URL for connecting to HouseDB
    Url string

    // Database name
    Database string
}

// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
    return &HouseDBOutNode{
        node: node{
            desc: "housedb",
            wants: wants,
            provides: NoEdge,
        }
    }
}

housedb_out.go

package kapacitor

import (
    "github.com/influxdb/kapacitor/pipeline"
)

type HouseDBOutNode struct {
    // Include the generic node implementation
    node
    // Keep a reference to the pipeline node
    h *pipeline.HouseDBOutNode
    // Buffer for a batch of points
    batchBuffer *edge.BatchBuffer
}

func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
    h := &HouseDBOutNode{
        // pass in necessary fields to the 'node' struct
        node: node{Node: n, et: et, diag: d},
        // Keep a reference to the pipeline.HouseDBOutNode
        h: n,
        // Buffer for a batch of points
        batchBuffer: new(edge.BatchBuffer),
    }
    // Set the function to be called when running the node
    h.node.runF = h.runOut
    return h
}

func (h *HouseDBOutNode) runOut(snapshot []byte) error {
	consumer := edge.NewConsumerWithReceiver(
		n.ins[0],
		h,
	)
	return consumer.Consume()
}

func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BeginBatch(begin)
}

func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
	return nil, h.batchBuffer.BatchPoint(bp)
}

func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
    msg := h.batchBuffer.BufferedBatchMessage(end)
    return msg, h.write(msg)
}

func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
	batch := edge.NewBufferedBatchMessage(
		edge.NewBeginBatchMessage(
			p.Name(),
			p.Tags(),
			p.Dimensions().ByName,
			p.Time(),
			1,
		),
		[]edge.BatchPointMessage{
			edge.NewBatchPointMessage(
				p.Fields(),
				p.Tags(),
				p.Time(),
			),
		},
		edge.NewEndBatchMessage(),
	)
    return p, h.write(batch)
}

func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
	return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
	return d, nil
}
func (h *HouseDBOutNode) Done() {}

// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
    // Implement writing to HouseDB here...
    return nil
}

pipeline/node.go(仅显示新的链式方法)

...
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
    h := newHouseDBOutNode(c.Provides())
    c.linkChild(h)
    return h
}
...

task.go(仅显示新的 case)

...
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
    switch t := p.(type) {
    ...
	case *pipeline.HouseDBOutNode:
		n, err = newHouseDBOutNode(et, t, d)
    ...
}
...

记录您的新节点

由于 TICKscript 是一种独立的语言,我们构建了一个类似于 godoc 的小工具,名为 tickdoctickdoc 从代码中的注释生成文档。tickdoc 工具理解两个特殊注释,以帮助它生成清晰的文档。

  1. tick:ignore:可以添加到任何字段、方法、函数或结构。tickdoc 将会跳过它,并且不会生成任何文档。这对于忽略通过属性方法设置的字段最有价值。
  2. tick:property:仅添加到方法。通知 tickdoc 该方法是属性方法而不是链式方法

将其中一个注释放在单独的一行上,tickdoc 将找到它并相应地进行处理。否则,正常记录您的代码,tickdoc 将完成其余工作。

贡献非输出节点。

编写任何节点(不仅仅是输出节点)都是一个非常类似的过程,留给读者作为练习。有几个地方可能不同:

第一个区别是,如果新节点可以将其数据发送到子节点,那么它将在 pipeline 包中使用 pipeline.chainnode 实现 pipeline.Node 接口。例如:

package pipeline

type MyCustomNode struct {
    // Include pipeline.chainnode so we have all the chaining methods available
    // to our new node
    chainnode

}

func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
    m := &MyCustomNode{
        chainnode: newBasicChainNode("mycustom", e, e),
    }
    n.linkChild(m)
    return m
}

第二个区别是,可以定义一个设置管道节点字段并返回同一实例的方法,以创建属性方法。例如:

package pipeline

type MyCustomNode struct {
    // Include pipeline.chainnode so we have all the chaining methods available
    // to our new node
    chainnode

    // Mark this field as ignored for docs
    // Since it is set via the Names method below
    // tick:ignore
    NameList []string `tick:"Names"`

}

func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
    m := &MyCustomNode{
        chainnode: newBasicChainNode("mycustom", e, e),
    }
    n.linkChild(m)
    return m
}

// Set the NameList field on the node via this method.
//
// Example:
//    node.names('name0', 'name1')
//
// Use the tickdoc comment 'tick:property' to mark this method
// as a 'property method'
// tick:property
func (m *MyCustomNode) Names(name ...string) *MyCustomNode {
    m.NameList = name
    return m
}

此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2