贡献新的 Kapacitor 输出节点
如果您还没有这样做,请查看 Kapacitor 贡献指南,了解如何开始贡献。
目标
向 Kapacitor 添加一个可以将数据输出到自定义终结点的新节点。在本指南中,我们假设要将数据输出到一个虚构的内部数据库,名为 HouseDB。
概述
Kapacitor 通过管道处理数据。管道本质上是一个有向无环图(DAG)。基本思想是,图中的每个节点代表对数据进行某种形式的处理,每个边将数据在节点之间传递。为了添加一种新类型的节点,需要编写两个组件:
- 用于创建和配置节点的 API(TICKscript),以及
- 数据处理实现的步骤。
在我们的示例中,数据处理步骤是将数据输出到 HouseDB。
代码通过两个 Go 包镜像了这些要求。
pipeline:此包定义了可用的节点类型及其配置方式。kapacitor:此包提供了pipeline包中定义的每个节点的实现。
为了使 API(即 TICKscript)清晰易读,节点定义与其实现是分开的。
更新 TICKscript
首先,我们需要更新 TICKscript,以便用户可以定义我们的新节点。为了将数据发送到 HouseDB,TICKscript 应该是什么样的?要连接到 HouseDB 实例,我们需要 URL 和数据库名称,因此我们需要一种提供这些信息的方式。这样如何?
node
|houseDBOut()
.url('house://housedb.example.com')
.database('metrics')为了更新 TICKscript 以支持这些新方法,我们需要编写一个实现 pipeline.Node 接口的 Go 类型。该接口可以在 此处找到,并且通过 pipeline.node 类型提供了完整的实现。由于 Node 的实现已为我们完成,我们只需使用它。首先,我们需要一个名称。 HouseDBOutNode 遵循命名约定。让我们定义一个 Go struct,它将通过组合来实现该接口。在 pipeline 目录中创建一个名为 housedb_out.go 的文件,其中包含以下内容:
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
}就这样,我们在 Go 中有了一个实现所需接口的类型。为了允许我们需要的 .url 和 .database 方法,只需在类型上定义同名的字段。首字母必须大写,以便导出。导出字段很重要,因为它们将被 kapacitor 包中的节点使用。名称的其余部分应与方法名称的大小写保持一致。TICKscript 将在运行时处理大小写匹配。更新 housedb_out.go 文件。
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
// URL for connecting to HouseDB
Url string
// Database name
Database string
}接下来,我们需要一种一致的方式来创建我们节点的新实例。但在此之前,我们需要考虑该节点如何与其他节点连接。由于 Kapacitor 将其视为输出节点,因此它是管道的末端。我们不会提供任何出向边,图在此节点结束。我们想象中的 HouseDB 非常灵活,可以按批次或单个数据点存储数据。因此,我们不关心 HouseDBOutNode 节点接收什么类型的数据。考虑到这些事实,我们可以定义一个函数来创建新的 HouseDBOutNode。将此函数添加到 housedb_out.go 文件的末尾:
// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
return &HouseDBOutNode{
node: node{
desc: "housedb",
wants: wants,
provides: NoEdge,
}
}
}通过显式声明节点想要和提供的边类型,Kapacitor 将进行必要的类型检查,以防止出现无效的管道。
最后,我们需要添加一个新的链式方法,以便用户可以将 HouseDBOutNodes 连接到他们现有的管道。链式方法是一种创建新节点并将其添加为调用节点子节点的方法。实际上,该方法将节点链接在一起。pipeline.chainnode 类型包含可用于链接节点的所有方法的集合。一旦我们将我们的方法添加到该类型,任何其他节点现在都可以与 HouseDBOutNode 链接。将此函数添加到 pipeline/node.go 文件的末尾:
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
h := newHouseDBOutNode(c.Provides())
c.linkChild(h)
return h
}现在,我们已经定义了所有必要的部分,以便 TICKscript 可以定义 HouseDBOutNodes。
node
|houseDBOut() // added as a method to the 'chainnode' type
.url('house://housedb.example.com') // added as a field to the HouseDBOutNode
.database('metrics') // added as a field to the HouseDBOutNode
实现 HouseDB 输出
现在 TICKscript 可以定义我们的新输出节点,我们需要实际提供一个实现,以便 Kapacitor 知道如何处理该节点。pipeline 包中的每个节点在 kapacitor 包中都有一个同名的节点。创建一个名为 housedb_out.go 的文件,并将其放在仓库的根目录下。将以下内容放入文件中。
package kapacitor
import (
"github.com/influxdb/kapacitor/pipeline"
)
type HouseDBOutNode struct {
// Include the generic node implementation
node
// Keep a reference to the pipeline node
h *pipeline.HouseDBOutNode
}kapacitor 包还定义了一个名为 Node 的接口,并通过 kapacitor.node 类型提供了默认实现。我们再次使用组合来实现接口。请注意,我们还有一个字段,它将包含我们刚刚完成定义的 pipeline.HouseDBOutNode 的实例。这个 pipeline.HouseDBOutNode 充当配置结构,告诉 kapacitor.HouseDBOutNode 需要做什么来完成其工作。
现在我们有了一个结构,让我们定义一个函数来创建我们新结构的一个实例。kapacitor 包中的 new*Node 方法遵循以下约定:
func newNodeName(et *ExecutingTask, n *pipeline.NodeName) (*NodeName, error) {}在我们的例子中,我们想定义一个名为 newHouseDBOutNode 的函数。将以下方法添加到 housedb_out.go 文件:
func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
h := &HouseDBOutNode{
// pass in necessary fields to the 'node' struct
node: node{Node: n, et: et, diag: d},
// Keep a reference to the pipeline.HouseDBOutNode
h: n,
}
// Set the function to be called when running the node
// more on this in a bit.
h.node.runF = h.runOut
return h
}为了创建我们节点的一个实例,我们需要将其与 pipeline 包中的节点关联起来。这可以通过 task.go 文件中的 createNode 方法中的 switch 语句来完成。继续我们的示例:
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
switch t := p.(type) {
...
case *pipeline.HouseDBOutNode:
n, err = newHouseDBOutNode(et, t, d)
...
}现在我们已经关联了我们的两种类型,让我们回到实现输出代码。请注意 newHouseDBOutNode 函数中的行 h.node.runF = h.runOut。当节点开始执行时,此行设置了 kapacitor.HouseDBOutNode 将被调用的方法。现在我们需要定义 runOut 方法。在 housedb_out.go 文件中,添加此方法:
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
return nil
}通过此更改,HouseDBOutNode 在语法上是完整的,但尚未执行任何操作。让我们让它做点什么!
正如我们之前所学,节点通过边进行通信。有一个名为 edge.Edge 的 Go 类型处理这种通信。我们所要做的就是从边读取数据并将其发送到 HouseDB。数据以 edge.Message 类型表示。节点使用 edge.Consumer 读取消息,并通过实现 edge.Receiver 接口来处理消息。Consumer 和 Receiver 接口都可以在 此处找到。
我们在 HouseDBOutNode 中通过组合包含的 node 类型在名为 ins 的字段中提供了边的列表。由于 HouseDBOutNode 只有一个父节点,我们关心的边是第 0 条边。我们可以使用 NewConsumerWithReceiver 函数从边消耗和处理消息。
// NewConsumerWithReceiver creates a new consumer for the edge e and receiver r.
func NewConsumerWithReceiver(e Edge, r Receiver) Consumer {
return &consumer{
edge: e,
r: r,
}
}让我们更新 runOut 以使用此函数读取和处理消息:
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
h,
)
return consumer.Consume()
}剩下的是让 HouseDBOutNode 实现 Receiver 接口,并编写一个函数来接收一批点并将其写入 HouseDB。为了方便起见,我们可以使用 edge.BatchBuffer 来接收批处理消息。我们还可以将单个点消息转换为只包含一个点的批处理消息。
func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
return nil, h.batchBuffer.BeginBatch(begin)
}
func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
return nil, h.batchBuffer.BatchPoint(bp)
}
func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
msg := h.batchBuffer.BufferedBatchMessage(end)
return msg, h.write(msg)
}
func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
batch := edge.NewBufferedBatchMessage(
edge.NewBeginBatchMessage(
p.Name(),
p.Tags(),
p.Dimensions().ByName,
p.Time(),
1,
),
[]edge.BatchPointMessage{
edge.NewBatchPointMessage(
p.Fields(),
p.Tags(),
p.Time(),
),
},
edge.NewEndBatchMessage(),
)
return p, h.write(batch)
}
func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func (h *HouseDBOutNode) Done() {}
// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
// Implement writing to HouseDB here...
return nil
}一旦我们实现了 write 方法,我们就完成了。当数据到达 HouseDBOutNode 时,它将被写入指定的 HouseDB 实例。
总结
我们首先在 pipeline 包中编写了一个节点(文件路径:pipeline/housedb_out.go),以定义将数据发送到 HouseDB 实例的 TICKscript API。然后,我们在 kapacitor 包中编写了该节点的实现(文件路径:housedb_out.go)。我们还更新了 pipeline/node.go 以添加新的链式方法,并更新了 task.go 以关联这两种类型。
以下是完整的文件内容:
pipeline/housedb_out.go
package pipeline
// A HouseDBOutNode will take the incoming data stream and store it in a
// HouseDB instance.
type HouseDBOutNode struct {
// Include the generic node implementation.
node
// URL for connecting to HouseDB
Url string
// Database name
Database string
}
// Create a new HouseDBOutNode that accepts any edge type.
func newHouseDBOutNode(wants EdgeType) *HouseDBOutNode {
return &HouseDBOutNode{
node: node{
desc: "housedb",
wants: wants,
provides: NoEdge,
}
}
}housedb_out.go
package kapacitor
import (
"github.com/influxdb/kapacitor/pipeline"
)
type HouseDBOutNode struct {
// Include the generic node implementation
node
// Keep a reference to the pipeline node
h *pipeline.HouseDBOutNode
// Buffer for a batch of points
batchBuffer *edge.BatchBuffer
}
func newHouseDBOutNode(et *ExecutingTask, n *pipeline.HouseDBOutNode, d NodeDiagnostic) (*HouseDBOutNode, error) {
h := &HouseDBOutNode{
// pass in necessary fields to the 'node' struct
node: node{Node: n, et: et, diag: d},
// Keep a reference to the pipeline.HouseDBOutNode
h: n,
// Buffer for a batch of points
batchBuffer: new(edge.BatchBuffer),
}
// Set the function to be called when running the node
h.node.runF = h.runOut
return h
}
func (h *HouseDBOutNode) runOut(snapshot []byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
h,
)
return consumer.Consume()
}
func (h *HouseDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
return nil, h.batchBuffer.BeginBatch(begin)
}
func (h *HouseDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
return nil, h.batchBuffer.BatchPoint(bp)
}
func (h *HouseDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
msg := h.batchBuffer.BufferedBatchMessage(end)
return msg, h.write(msg)
}
func (h *HouseDBOutNode) Point(p edge.PointMessage) (edge.Message, error) {
batch := edge.NewBufferedBatchMessage(
edge.NewBeginBatchMessage(
p.Name(),
p.Tags(),
p.Dimensions().ByName,
p.Time(),
1,
),
[]edge.BatchPointMessage{
edge.NewBatchPointMessage(
p.Fields(),
p.Tags(),
p.Time(),
),
},
edge.NewEndBatchMessage(),
)
return p, h.write(batch)
}
func (h *HouseDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (h *HouseDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func (h *HouseDBOutNode) Done() {}
// Write a batch of data to HouseDB
func (h *HouseDBOutNode) write(batch edge.BufferedBatchMessage) error {
// Implement writing to HouseDB here...
return nil
}pipeline/node.go(仅显示新的链式方法)
...
// Create a new HouseDBOutNode as a child of the calling node.
func (c *chainnode) HouseDBOut() *HouseDBOutNode {
h := newHouseDBOutNode(c.Provides())
c.linkChild(h)
return h
}
...task.go(仅显示新的 case)
...
// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, err error) {
switch t := p.(type) {
...
case *pipeline.HouseDBOutNode:
n, err = newHouseDBOutNode(et, t, d)
...
}
...记录您的新节点
由于 TICKscript 是一种独立的语言,我们构建了一个类似于 godoc 的小工具,名为 tickdoc。tickdoc 从代码中的注释生成文档。tickdoc 工具理解两个特殊注释,以帮助它生成清晰的文档。
tick:ignore:可以添加到任何字段、方法、函数或结构。tickdoc将会跳过它,并且不会生成任何文档。这对于忽略通过属性方法设置的字段最有价值。tick:property:仅添加到方法。通知tickdoc该方法是属性方法而不是链式方法。
将其中一个注释放在单独的一行上,tickdoc 将找到它并相应地进行处理。否则,正常记录您的代码,tickdoc 将完成其余工作。
贡献非输出节点。
编写任何节点(不仅仅是输出节点)都是一个非常类似的过程,留给读者作为练习。有几个地方可能不同:
第一个区别是,如果新节点可以将其数据发送到子节点,那么它将在 pipeline 包中使用 pipeline.chainnode 实现 pipeline.Node 接口。例如:
package pipeline
type MyCustomNode struct {
// Include pipeline.chainnode so we have all the chaining methods available
// to our new node
chainnode
}
func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
m := &MyCustomNode{
chainnode: newBasicChainNode("mycustom", e, e),
}
n.linkChild(m)
return m
}第二个区别是,可以定义一个设置管道节点字段并返回同一实例的方法,以创建属性方法。例如:
package pipeline
type MyCustomNode struct {
// Include pipeline.chainnode so we have all the chaining methods available
// to our new node
chainnode
// Mark this field as ignored for docs
// Since it is set via the Names method below
// tick:ignore
NameList []string `tick:"Names"`
}
func newMyCustomNode(e EdgeType, n Node) *MyCustomNode {
m := &MyCustomNode{
chainnode: newBasicChainNode("mycustom", e, e),
}
n.linkChild(m)
return m
}
// Set the NameList field on the node via this method.
//
// Example:
// node.names('name0', 'name1')
//
// Use the tickdoc comment 'tick:property' to mark this method
// as a 'property method'
// tick:property
func (m *MyCustomNode) Names(name ...string) *MyCustomNode {
m.NameList = name
return m
}此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 Kapacitor 和本文档提供反馈和错误报告。要获取支持,请使用以下资源: