文档文档

编写基于套接字的用户定义函数 (UDF)

另一个示例中,我们看到了如何编写基于进程的 UDF 以进行自定义异常检测工作负载。在本示例中,我们将学习如何编写一个简单的基于套接字的 UDF。

什么是用户定义函数 (UDF)?

UDF 是一个用户定义的函数,可以与 Kapacitor 通信以处理数据。Kapacitor 将向其发送数据,UDF 可以响应新的或修改后的数据。UDF 可以用任何支持 protocol buffer 的语言编写。

套接字 UDF 和进程 UDF 之间有什么区别?

  • 进程 UDF 是 Kapacitor 的子进程,它通过 STDIN/STDOUT 与 Kapacitor 通信,并且完全由 Kapacitor 管理。
  • 套接字 UDF 是 Kapacitor 外部的进程,它通过配置的 Unix 域套接字进行通信。该进程本身不受 Kapacitor 管理。

使用进程 UDF 可能比套接字 UDF 更简单,因为 Kapacitor 将生成进程并为您管理一切。另一方面,您可能希望更多地控制 UDF 进程本身,而不是仅向 Kapacitor 公开一个套接字。一个常见的用例是在 Docker 容器中运行 Kapacitor,并在另一个容器中运行 UDF,该容器通过 Docker 卷公开套接字。

在这两种情况下,协议是相同的,唯一的区别是传输机制。另请注意,由于多个 Kapacitor 任务可以使用相同的 UDF,因此对于基于进程的 UDF,每次使用 UDF 都会生成一个新的子进程。相比之下,对于基于套接字的 UDF,每次使用 UDF 都会建立一个新的套接字连接。如果您多次使用相同的 UDF,则最好使用套接字 UDF 以保持运行进程的数量较低。

编写 UDF

UDF 通过协议缓冲区请求/响应系统与 Kapacitor 通信。我们提供了 Go 和 Python 两种语言的通信层实现。由于另一个示例使用了 Python,我们将在此处使用 Go 版本。

我们的示例将实现一个 mirror UDF,它只是将接收到的所有数据反射回 Kapacitor 服务器。此示例实际上是测试套件的一部分,Python 和 Go 实现都可以在此处找到。

生命周期

在编写任何代码之前,让我们看一下套接字 UDF 的生命周期

  1. UDF 进程独立于 Kapacitor 启动。
  2. 进程在 Unix 域套接字上监听。
  3. Kapacitor 连接到套接字并查询有关 UDF 选项的基本信息。
  4. 启用了使用 UDF 的 Kapacitor 任务,并且 Kapacitor 建立与套接字的新连接。
  5. 任务通过套接字连接读取和写入数据。
  6. 如果任务因任何原因停止,则套接字连接将关闭。

Main 方法

我们需要编写一个程序来启动并在套接字上监听。以下代码是一个 main 函数,它在默认路径或指定为 -socket 标志的自定义路径上的套接字上监听。

package main

import (
    "flag"
    "log"
    "net"
)


var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // More to come here...
}

将上面的代码放在名为 main.go 的临时目录中。上面的代码可以通过 go run main.go 运行,但在此时它将在监听套接字后立即退出。

Agent

如前所述,Kapacitor 提供了名为 agent 的 UDF 通信层实现。我们的代码只需要实现一个接口即可利用 agent 逻辑。

我们需要实现的接口如下

// The Agent calls the appropriate methods on the Handler as it receives requests over a socket.
//
// Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent.
// Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself.
// These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.
//
// The Handler is called from a single goroutine, meaning methods will not be called concurrently.
//
// To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.
type Handler interface {
    // Return the InfoResponse. Describing the properties of this Handler
    Info() (*agent.InfoResponse, error)
    // Initialize the Handler with the provided options.
    Init(*agent.InitRequest) (*agent.InitResponse, error)
    // Create a snapshot of the running state of the handler.
    Snapshot() (*agent.SnapshotResponse, error)
    // Restore a previous snapshot.
    Restore(*agent.RestoreRequest) (*agent.RestoreResponse, error)

    // A batch has begun.
    BeginBatch(*agent.BeginBatch) error
    // A point has arrived.
    Point(*agent.Point) error
    // The batch is complete.
    EndBatch(*agent.EndBatch) error

    // Gracefully stop the Handler.
    // No other methods will be called.
    Stop()
}

Handler

让我们定义自己的类型,以便我们可以开始实现 Handler 接口。更新 main.go 文件如下

package main

import (
    "flag"
    "log"
    "net"

    "github.com/influxdata/kapacitor/udf/agent"
)



// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    // We need a reference to the agent so we can write data
    // back to Kapacitor.
    agent *agent.Agent
}

func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
    return &mirrorHandler{agent: agent}
}

var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // More to come here...
}

现在让我们添加初始化 UDF 所需的每个方法。接下来的这些方法实现了上面 UDF 生命周期步骤 3 中描述的行为,其中 Kapacitor 连接到套接字以查询有关 UDF 的基本信息。

将这些方法添加到 main.go 文件


// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        // We want a stream edge
        Wants:    agent.EdgeType_STREAM,
        // We provide a stream edge
        Provides: agent.EdgeType_STREAM,
        // We expect no options.
        Options:  map[string]*agent.OptionInfo{},
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    // Since we expected no options this method is trivial
    // and we return success.
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    return init, nil
}

目前,我们简单的镜像 UDF 不需要任何选项,因此这些方法是微不足道的。在本示例的最后,我们将修改代码以接受自定义选项。

现在 Kapacitor 知道我们的 UDF 使用哪些边缘类型和选项,我们需要实现用于处理数据的方法。

将此方法添加到 main.go 文件,该方法通过 agent 将其接收到的每个点发送回 Kapacitor

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

请注意,agent 具有用于响应的通道,这是因为您的 UDF 可以随时向 Kapacitor 发送数据,因此它不需要响应来接收点。

因此,我们需要关闭通道以让 agent 知道我们将不再发送任何数据,这可以通过 Stop 方法完成。一旦 agenthandler 上调用 Stop,将不会调用其他方法,并且 agent 在通道关闭之前不会停止。这使 UDF 有机会在关闭之前刷新任何剩余数据

// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
    // Close the channel since we won't be sending any more data to Kapacitor
    close(h.agent.Responses)
}

即使我们已经实现了处理程序实现的大部分内容,仍然缺少一些方法。具体来说,缺少围绕批处理和快照/还原的方法,但是,由于我们不需要它们,我们将只给出它们微不足道的实现

// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
    return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
    return &agent.RestoreResponse{
        Success: true,
    }, nil
}

// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
    return errors.New("batching not supported")
}
func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
    return nil
}

Server

此时,我们已经完成了 Handler 接口的完整实现。在上面生命周期步骤 #4 中,Kapacitor 为任务中的每次使用建立与 UDF 的新连接。由于我们的 UDF 进程可以同时处理多个连接,因此我们需要一种机制来为每个连接创建新的 agenthandler

为此目的提供了 server,它期望实现 Accepter 接口

type Accepter interface {
    // Accept new connections from the listener and handle them accordingly.
    // The typical action is to create a new Agent with the connection as both its in and out objects.
    Accept(net.Conn)
}

这是一个简单的 accepter,它为每个新连接创建一个新的 agentmirrorHandler。将其添加到 main.go 文件

type accepter struct {
    count int64
}

// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
    count := acc.count
    acc.count++
    a := agent.New(conn, conn)
    h := newMirrorHandler(a)
    a.Handler = h

    log.Println("Starting agent for connection", count)
    a.Start()
    go func() {
        err := a.Wait()
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Agent for connection %d finished", count)
    }()
}

现在所有部分都已就位,我们可以更新我们的 main 函数以启动 server。将先前提供的 main 函数替换为

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // Create server that listens on the socket
    s := agent.NewServer(l, &accepter{})

    // Setup signal handler to stop Server on various signals
    s.StopOnSignals(os.Interrupt, syscall.SIGTERM)

    log.Println("Server listening on", addr.String())
    err = s.Serve()
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Server stopped")
}

启动 UDF

此时,我们已准备好启动 UDF。以下是完整的 main.go 文件以供参考

package main

import (
    "errors"
    "flag"
    "log"
    "net"
    "os"
    "syscall"

    "github.com/influxdata/kapacitor/udf/agent"
)

// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    agent *agent.Agent
}

func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
    return &mirrorHandler{agent: agent}
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        Wants:    agent.EdgeType_STREAM,
        Provides: agent.EdgeType_STREAM,
        Options:  map[string]*agent.OptionInfo{},
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    return init, nil
}

// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
    return &agent.SnapshotResponse{}, nil
}

// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
    return &agent.RestoreResponse{
        Success: true,
    }, nil
}

// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
    return errors.New("batching not supported")
}

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
    return nil
}

// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
    close(h.agent.Responses)
}

type accepter struct {
    count int64
}

// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
    count := acc.count
    acc.count++
    a := agent.New(conn, conn)
    h := newMirrorHandler(a)
    a.Handler = h

    log.Println("Starting agent for connection", count)
    a.Start()
    go func() {
        err := a.Wait()
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Agent for connection %d finished", count)
    }()
}

var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // Create server that listens on the socket
    s := agent.NewServer(l, &accepter{})

    // Setup signal handler to stop Server on various signals
    s.StopOnSignals(os.Interrupt, syscall.SIGTERM)

    log.Println("Server listening on", addr.String())
    err = s.Serve()
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Server stopped")
}

运行 go run main.go 以启动 UDF。如果您收到有关套接字正在使用的错误,只需删除套接字文件并再次尝试运行 UDF。

配置 Kapacitor 以与 UDF 通信

现在我们的 UDF 已准备就绪,我们需要告诉 Kapacitor 我们的 UDF 套接字在哪里,并为其命名以便我们可以使用它。将其添加到您的 Kapacitor 配置文件中

[udf]
[udf.functions]
    [udf.functions.mirror]
        socket = "/tmp/mirror.sock"
        timeout = "10s"

启动 Kapacitor

启动 Kapacitor,您应该在 Kapacitor 日志和 UDF 进程日志中看到它连接到您的 UDF。

试用一下

获取现有任务并在 TICKscript 管道中的任何位置添加 @mirror() 以查看其运行情况。

这是一个示例 TICKscript,需要将其保存到文件中

dbrp "telegraf"."autogen"

stream
    |from()
        .measurement('cpu')
    @mirror()
    |alert()
        .crit(lambda: "usage_idle" < 30)

像这样从您的终端定义上述警报

kapacitor define mirror_udf_example -tick path/to/above/script.tick

启动任务

kapacitor enable mirror_udf_example

检查任务的状态

kapacitor show mirror_udf_example

添加自定义字段

现在让我们更改 UDF 以向数据添加字段。我们可以使用 Info/Init 方法来定义和使用 UDF 上的选项,因此让我们指定要添加的字段的名称。

更新 mirrorHandler 类型以及方法 InfoInit 如下

// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    agent *agent.Agent
    name  string
    value float64
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        Wants:    agent.EdgeType_STREAM,
        Provides: agent.EdgeType_STREAM,
        Options: map[string]*agent.OptionInfo{
            "field": {ValueTypes: []agent.ValueType{
                agent.ValueType_STRING,
                agent.ValueType_DOUBLE,
            }},
        },
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (h *mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    for _, opt := range r.Options {
        switch opt.Name {
        case "field":
            h.name = opt.Values[0].Value.(*agent.OptionValue_StringValue).StringValue
            h.value = opt.Values[1].Value.(*agent.OptionValue_DoubleValue).DoubleValue
        }
    }

    if h.name == "" {
        init.Success = false
        init.Error = "must supply field"
    }
    return init, nil
}

现在我们可以使用字段名称和值在点上设置字段。更新 Point 方法

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    if p.FieldsDouble == nil {
        p.FieldsDouble = make(map[string]float64)
    }
    p.FieldsDouble[h.name] = h.value

    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

重新启动 UDF 进程并再次试用。使用 .field(name, value) 方法指定要使用的字段名称和值。您可以在 mirror UDF 之后添加 |log() 以查看新字段是否确实已创建。

dbrp "telegraf"."autogen"

stream
    |from()
        .measurement('cpu')
    @mirror()
        .field('mycustom_field', 42.0)
    |log()
    |alert()
        .cirt(lambda: "usage_idle" < 30)

总结

此时,您应该能够使用基于套接字或基于进程的方法编写自定义 UDF。UDF 具有广泛的用途,从作为连续查询一部分的自定义降采样逻辑,自定义异常检测算法,或者只是稍微“按摩”您的数据的系统。

后续步骤

如果您想了解更多信息,以下是一些入门的地方

  • 修改镜像 UDF,使其功能类似于 DefaultNode。不要总是覆盖字段,仅当字段不存在时才设置它。还要添加对设置标签以及字段的支持。
  • 更改镜像 UDF 以处理批处理而不是流。这需要在 Info 方法中更改边缘类型,以及实现 BeginBatchEndBatch 方法。
  • 查看其他示例,并修改一个以执行类似于您的现有需求的操作。

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最新的数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看