贡献新的 Kapacitor 输出节点
如果您还没有这样做,请查看 Kapacitor 贡献指南,了解如何开始贡献的信息。
目标
向 Kapacitor 添加一个新节点,该节点可以将数据输出到自定义端点。在本指南中,假设我们想要将数据输出到一个名为 HouseDB 的虚构内部数据库。
概述
Kapacitor 通过管道处理数据。管道在形式上是有向无环图 (DAG)。基本思想是,图中的每个节点代表对数据进行某种形式的处理,每条边在节点之间传递数据。为了添加一种新的节点类型,需要编写两个组件
- 用于创建和配置节点的 API (TICKscript),以及
- 数据处理步骤的实现。
在我们的示例中,数据处理步骤是将数据输出到 HouseDB。
代码通过两个 Go 包反映了这些要求。
pipeline
:此包定义了可用的节点类型以及它们的配置方式。kapacitor
:此包提供了pipeline
包中定义的每个节点的实现。
为了使 API(即 TICKscript)清晰易读,节点的定义与节点的实现分离开来。
更新 TICKscript
首先,我们需要更新 TICKscript,以便用户可以定义我们的新节点。将数据发送到 HouseDB 的 TICKscript 应该是什么样的?要连接到 HouseDB 实例,我们需要 URL 和数据库名称,因此我们需要一种提供该信息的方法。像这样怎么样?
node
|houseDBOut()
.url('house://housedb.example.com')
.database('metrics')
为了更新 TICKscript 以支持这些新方法,我们需要编写一个实现 pipeline.Node
接口的 Go 类型。该接口可以在 这里 找到,以及通过 pipeline.node
类型的完整实现。由于 Node
的实现已经为我们完成,我们只需要使用它。首先我们需要一个名称。HouseDBOutNode
遵循命名约定。让我们定义一个 Go struct
,它将通过组合来实现该接口。在 pipeline
目录中创建一个名为 housedb_out.go
的文件,内容如下
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
}
就像这样,我们在 Go 中有了一个实现了所需接口的类型。为了允许我们需要的 .url
和 .database
方法,只需在类型上定义具有相同名称的字段即可。第一个字母需要大写,以便导出。重要的是,字段需要导出,因为它们将被 kapacitor
包中的节点使用。名称的其余部分应与方法名称具有相同的大小写。TICKscript 将在运行时处理大小写匹配。更新 housedb_out.go
文件。
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
// URL for connecting to HouseDB
Url string
// Database name
Database string
}
接下来,我们需要一种一致的方式来创建我们节点的新实例。但是要做到这一点,我们需要考虑此节点如何连接到其他节点。由于就 Kapacitor 而言,我们是一个输出节点,因此这是管道的末端。我们不会提供任何出站边,图形在此节点上结束。我们虚构的 HouseDB 非常灵活,可以批量或作为单个数据点存储数据。因此,我们不关心 HouseDBOutNode
节点接收的数据类型。考虑到这些事实,我们可以定义一个函数来创建新的 HouseDBOutNode。将此函数添加到 housedb_out.go
文件的末尾
// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
return &HouseDBOutNode{
node: node{
desc: "housedb",
wants: wants,
provides: NoEdge,
}
}
}
通过显式声明节点 wants
和 provides
的边的类型,Kapacitor 将执行必要的类型检查以防止无效的管道。
最后,我们需要添加一个新的 chaining method
,以便用户可以将 HouseDBOutNode 连接到他们现有的管道。chaining method
是一个创建新节点并将其添加为调用节点的子节点的方法。实际上,该方法将节点链接在一起。pipeline.chainnode
类型包含可用于链接节点的所有方法的集合。一旦我们将我们的方法添加到该类型,任何其他节点现在都可以与 HouseDBOutNode 链接。将此函数添加到 pipeline/node.go
文件的末尾
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
h := newHouseDBOutNode(c.Provides())
c.linkChild(h)
return h
}
我们现在已经定义了所有必要的组件,以便 TICKscripts 可以定义 HouseDBOutNode
node
|houseDBOut() // added as a method to the 'chainnode' type
.url('house://housedb.example.com') // added as a field to the HouseDBOutNode
.database('metrics') // added as a field to the HouseDBOutNode
实现 HouseDB 输出
现在 TICKscript 可以定义我们的新输出节点,我们需要实际提供一个实现,以便 Kapacitor 知道如何处理该节点。pipeline
包中的每个节点在 kapacitor
包中都有一个同名的节点。创建一个名为 housedb_out.go
的文件,并将其放在 repo 的根目录中。将以下内容放入文件中。
package kapacitor
import (
"github.com/influxdb/kapacitor/pipeline"
)
type HouseDBOutNode struct {
// Include the generic node implementation
node
// Keep a reference to the pipeline node
h *pipeline.HouseDBOutNode
}
kapacitor
包还定义了一个名为 Node
的接口,并通过 kapacitor.node
类型提供了默认实现。同样,我们使用组合来实现接口。请注意,我们还有一个字段,它将包含我们刚刚完成定义的 pipeline.HouseDBOutNode
的实例。此 pipeline.HouseDBOutNode
就像一个配置结构,告诉 kapacitor.HouseDBOutNode
它需要做什么工作。
现在我们有了一个结构体,让我们定义一个函数来创建我们新结构体的实例。kapacitor
包中的 new*Node
方法遵循以下约定
func newNodeName(et *ExecutingTask, n *pipeline.NodeName) (*NodeName, error) {}
在我们的例子中,我们想要定义一个名为 newHouseDBOutNode
的函数。将以下方法添加到 housedb_out.go
文件中。
func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
h := &HouseDBOutNode{
// pass in necessary fields to the 'node' struct
node: node{Node: n, et: et, diag: d},
// Keep a reference to the pipeline.HouseDBOutNode
h: n,
}
// Set the function to be called when running the node
// more on this in a bit.
h.node.runF = h.runOut
return h
}
为了创建我们节点的实例,我们需要将其与 pipeline
包中的节点关联起来。这可以通过 task.go
文件中 createNode
方法中的 switch 语句来完成。继续我们的示例
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
switch t := p.(type) {
...
case *pipeline.HouseDBOutNode:
n, err = newHouseDBOutNode(et, t, d)
...
}
现在我们已经关联了我们的两个类型,让我们回到实现输出代码。请注意 newHouseDBOutNode
函数中的行 h.node.runF = h.runOut
。此行设置了 kapacitor.HouseDBOutNode
的方法,该方法将在节点开始执行时被调用。现在我们需要定义 runOut
方法。在文件 housedb_out.go
中添加此方法
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
return nil
}
通过此更改,HouseDBOutNode
在语法上是完整的,但目前还什么都不做。让我们给它一些事情做!
正如我们之前了解到的,节点通过边进行通信。有一个 Go 类型 edge.Edge
处理此通信。我们只想从边读取数据并将其发送到 HouseDB。数据以 edge.Message
类型的形式表示。节点使用 edge.Consumer
读取消息,节点通过实现 edge.Receiver
接口来处理消息。Consumer
和 Receiver
接口都可以在 这里 找到
我们通过组合包含在 HouseDBOutNode
中的 node
类型在名为 ins
的字段中提供了一个边列表。由于 HouseDBOutNode
只能有一个父节点,因此我们关心的边是第 0 条边。我们可以使用 NewConsumerWithReceiver
函数从边使用和处理消息。
// NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.
func NewConsumerWithReceiver(e Edge, r Receiver) Consumer {
return &consumer{
edge: e,
r: r,
}
}
让我们更新 runOut
以使用此函数读取和处理消息。
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
h,
)
return consumer.Consume()
}
剩下的就是让 HouseDBOutNode
实现 Receiver
接口,并编写一个函数,该函数接受一批点并将它们写入 HouseDB。为了方便起见,我们可以使用 edge.BatchBuffer
来接收批处理消息。我们还可以将单点消息转换为仅包含一个点的批处理消息。
func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
return nil, h.batchBuffer.BeginBatch(begin)
}
func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
return nil, h.batchBuffer.BatchPoint(bp)
}
func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
msg := h.batchBuffer.BufferedBatchMessage(end)
return msg, h.write(msg)
}
func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
batch := edge.NewBufferedBatchMessage(
edge.NewBeginBatchMessage(
p.Name(),
p.Tags(),
p.Dimensions().ByName,
p.Time(),
1,
),
[]edge.BatchPointMessage{
edge.NewBatchPointMessage(
p.Fields(),
p.Tags(),
p.Time(),
),
},
edge.NewEndBatchMessage(),
)
return p, h.write(batch)
}
func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func (h *HouseDBOutNode) Done() {}
// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
// Implement writing to HouseDB here...
return nil
}
一旦我们实现了 write
方法,我们就完成了。当数据到达 HouseDBOutNode
时,它将被写入指定的 HouseDB 实例。
总结
我们首先在 pipeline
包(文件路径:pipeline/housedb_out.go
)中编写了一个节点,以定义用于将数据发送到 HouseDB 实例的 TICKscript API。然后,我们在 kapacitor
包(文件路径:housedb_out.go
)中编写了该节点的实现。我们还更新了 pipeline/node.go
以添加新的链接方法,并更新了 task.go
以关联这两种类型。
以下是完整的文件内容
pipeline/housedb_out.go
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
// URL for connecting to HouseDB
Url string
// Database name
Database string
}
// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
return &HouseDBOutNode{
node: node{
desc: "housedb",
wants: wants,
provides: NoEdge,
}
}
}
housedb_out.go
package kapacitor
import (
"github.com/influxdb/kapacitor/pipeline"
)
type HouseDBOutNode struct {
// Include the generic node implementation
node
// Keep a reference to the pipeline node
h *pipeline.HouseDBOutNode
// Buffer for a batch of points
batchBuffer *edge.BatchBuffer
}
func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
h := &HouseDBOutNode{
// pass in necessary fields to the 'node' struct
node: node{Node: n, et: et, diag: d},
// Keep a reference to the pipeline.HouseDBOutNode
h: n,
// Buffer for a batch of points
batchBuffer: new(edge.BatchBuffer),
}
// Set the function to be called when running the node
h.node.runF = h.runOut
return h
}
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
h,
)
return consumer.Consume()
}
func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
return nil, h.batchBuffer.BeginBatch(begin)
}
func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
return nil, h.batchBuffer.BatchPoint(bp)
}
func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
msg := h.batchBuffer.BufferedBatchMessage(end)
return msg, h.write(msg)
}
func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
batch := edge.NewBufferedBatchMessage(
edge.NewBeginBatchMessage(
p.Name(),
p.Tags(),
p.Dimensions().ByName,
p.Time(),
1,
),
[]edge.BatchPointMessage{
edge.NewBatchPointMessage(
p.Fields(),
p.Tags(),
p.Time(),
),
},
edge.NewEndBatchMessage(),
)
return p, h.write(batch)
}
func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func (h *HouseDBOutNode) Done() {}
// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
// Implement writing to HouseDB here...
return nil
}
pipeline/node.go(仅显示新的链接方法)
...
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
h := newHouseDBOutNode(c.Provides())
c.linkChild(h)
return h
}
...
task.go(仅显示新的 case)
...
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
switch t := p.(type) {
...
case *pipeline.HouseDBOutNode:
n, err = newHouseDBOutNode(et, t, d)
...
}
...
为您的新节点编写文档
由于 TICKscript 是它自己的语言,我们构建了一个类似于 godoc 的小型实用程序,名为 tickdoc。tickdoc
从代码中的注释生成文档。tickdoc
实用程序理解两个特殊的注释,以帮助它生成清晰的文档。
tick:ignore
:可以添加到任何字段、方法、函数或结构体。tickdoc
将跳过它,并且不会为其生成任何文档。这对于忽略通过属性方法设置的字段最有用。tick:property
:仅添加到方法。告知tickdoc
该方法是property method
而不是chaining method
。
将这些注释之一单独放在一行上,tickdoc
将找到它并做出相应的行为。否则,正常记录您的代码,tickdoc
将完成其余的工作。
贡献非输出节点。
编写任何节点(不仅仅是输出节点)的过程都非常相似,留给读者作为练习。有几件事可能有所不同
第一个不同之处在于,如果您的新节点可以将数据发送到子节点,则它将希望在 pipeline
包中使用 pipeline.Node
接口的 pipeline.chainnode
实现。例如
package pipeline
type MyCustomNode struct {
// Include pipeline.chainnode so we have all the chaining methods available
// to our new node
chainnode
}
func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
m := &MyCustomNode{
chainnode: newBasicChainNode("mycustom", e, e),
}
n.linkChild(m)
return m
}
第二个不同之处在于,可以定义一个方法,该方法在 pipeline 节点上设置字段并返回相同的实例,以便创建属性方法。例如
package pipeline
type MyCustomNode struct {
// Include pipeline.chainnode so we have all the chaining methods available
// to our new node
chainnode
// Mark this field as ignored for docs
// Since it is set via the Names method below
// tick:ignore
NameList []string `tick:"Names"`
}
func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
m := &MyCustomNode{
chainnode: newBasicChainNode("mycustom", e, e),
}
n.linkChild(m)
return m
}
// Set the NameList field on the node via this method.
//
// Example:
// node.names('name0', 'name1')
//
// Use the tickdoc comment 'tick:property' to mark this method
// as a 'property method'
// tick:property
func (m *MyCustomNode) Names(name ...string) *MyCustomNode {
m.NameList = name
return m
}
此页是否对您有帮助?
感谢您的反馈!