文档文档

使用基于套接字的用户定义函数 (UDF)

另一个示例中,我们学习了如何为自定义异常检测工作负载编写基于进程的用户定义函数。在本示例中,我们将学习如何编写一个简单的基于套接字的用户定义函数。

什么是用户定义函数 (UDF)?

UDF 是一个用户定义的函数,它可以与 Kapacitor 通信以处理数据。Kapacitor 会发送数据给它,UDF 可以响应新的或修改后的数据。UDF 可以用任何支持协议缓冲区的语言编写。

基于套接字的 UDF 和基于进程的 UDF 有什么区别?

  • 基于进程的 UDF 是 Kapacitor 的一个子进程,它通过 STDIN/STDOUT 与 Kapacitor 通信,并由 Kapacitor 完全管理。
  • 基于套接字的 UDF 是 Kapacitor 外部的一个进程,它通过配置的 Unix 域套接字进行通信。该进程本身不受 Kapacitor 管理。

使用基于进程的 UDF 可能比使用基于套接字的 UDF 更简单,因为 Kapacitor 会启动进程并为您管理所有内容。另一方面,您可能希望对 UDF 进程本身有更多的控制,并且只向 Kapacitor 公开一个套接字。一种常见的用例是在 Docker 容器中运行 Kapacitor,并在另一个容器中运行 UDF,该容器通过 Docker 卷公开套接字。

在这两种情况下,协议都是相同的,唯一的区别是传输机制。另请注意,由于多个 Kapacitor 任务可以使用同一个 UDF,对于基于进程的 UDF,每次使用 UDF 时都会启动一个新的子进程。相比之下,对于基于套接字的 UDF,每次使用 UDF 时都会建立一个新的套接字连接。如果您大量使用同一个 UDF,最好使用基于套接字的 UDF 来减少运行的进程数量。

编写 UDF

UDF 通过协议缓冲区请求/响应系统与 Kapacitor 通信。我们提供了 Go 和 Python 的通信层实现。由于上一个示例使用了 Python,这里我们将使用 Go 版本。

我们的示例将实现一个 mirror UDF,它只是将接收到的所有数据反射回 Kapacitor 服务器。此示例实际上是测试套件的一部分,您可以在此处找到 Python 和 Go 的实现。

生命周期

在编写任何代码之前,我们先来看看基于套接字的 UDF 的生命周期。

  1. UDF 进程已启动,独立于 Kapacitor。
  2. 进程在 Unix 域套接字上监听。
  3. Kapacitor 连接到套接字并查询有关 UDF 选项的基本信息。
  4. Kapacitor 任务已启用,该任务使用 UDF,Kapacitor 会建立新的套接字连接。
  5. 任务通过套接字连接读写数据。
  6. 如果任务因任何原因停止,套接字连接将被关闭。

Main 方法

我们需要编写一个程序,该程序启动并监听套接字。以下代码是一个 main 函数,它在默认路径上的套接字上监听,或者在指定的 -socket 标志指定的自定义路径上监听。

package main

import (
    "flag"
    "log"
    "net"
)


var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // More to come here...
}

将上面的代码放在一个名为 main.go 的临时目录中。上面的代码可以通过 go run main.go 运行,但此时它在监听套接字后会立即退出。

Agent

如前所述,Kapacitor 为 UDF 提供了一个名为 agent 的通信层实现。我们的代码只需要实现一个接口即可利用 agent 的逻辑。

我们需要实现的接口如下:

// The Agent calls the appropriate methods on the Handler as it receives requests over a socket.
//
// Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent.
// Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself.
// These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.
//
// The Handler is called from a single goroutine, meaning methods will not be called concurrently.
//
// To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.
type Handler interface {
    // Return the InfoResponse. Describing the properties of this Handler
    Info() (*agent.InfoResponse, error)
    // Initialize the Handler with the provided options.
    Init(*agent.InitRequest) (*agent.InitResponse, error)
    // Create a snapshot of the running state of the handler.
    Snapshot() (*agent.SnapshotResponse, error)
    // Restore a previous snapshot.
    Restore(*agent.RestoreRequest) (*agent.RestoreResponse, error)

    // A batch has begun.
    BeginBatch(*agent.BeginBatch) error
    // A point has arrived.
    Point(*agent.Point) error
    // The batch is complete.
    EndBatch(*agent.EndBatch) error

    // Gracefully stop the Handler.
    // No other methods will be called.
    Stop()
}

Handler

让我们定义我们自己的类型,以便开始实现 Handler 接口。更新 main.go 文件如下:

package main

import (
    "flag"
    "log"
    "net"

    "github.com/influxdata/kapacitor/udf/agent"
)



// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    // We need a reference to the agent so we can write data
    // back to Kapacitor.
    agent *agent.Agent
}

func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
    return &mirrorHandler{agent: agent}
}

var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // More to come here...
}

现在,让我们添加实现 UDF 初始化所需的每个方法。接下来的方法实现了 UDF 生命周期第 3 步中描述的行为,其中 Kapacitor 连接到套接字以查询有关 UDF 的基本信息。

将这些方法添加到 main.go 文件中。


// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        // We want a stream edge
        Wants:    agent.EdgeType_STREAM,
        // We provide a stream edge
        Provides: agent.EdgeType_STREAM,
        // We expect no options.
        Options:  map[string]*agent.OptionInfo{},
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    // Since we expected no options this method is trivial
    // and we return success.
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    return init, nil
}

目前,我们的简单镜像 UDF 不需要任何选项,因此这些方法很简单。在本示例的末尾,我们将修改代码以接受自定义选项。

现在 Kapacitor 知道了我们的 UDF 使用的边缘类型和选项,我们需要实现处理数据的方法。

将此方法添加到 main.go 文件中,该方法通过 agent 将接收到的每个点发送回 Kapacitor。

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

请注意,agent 有一个响应通道,这是因为您的 UDF 可以随时向 Kapacitor 发送数据,因此它不需要作为响应来接收点。

因此,我们需要关闭通道以告知 agent 我们将不再发送任何数据,这可以通过 Stop 方法完成。一旦 agent 调用 Stop 来处理 handler,将不再调用其他方法,并且 agent 不会停止,直到通道关闭。这给了 UDF 一个机会在关闭之前冲刷掉任何剩余数据。

// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
    // Close the channel since we won't be sending any more data to Kapacitor
    close(h.agent.Responses)
}

尽管我们已经实现了大部分 handler 实现,但仍有一些方法缺失。特别是,关于批处理和快照/恢复的方法缺失,但由于我们不需要它们,我们将只给出它们的简单实现。

// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
    return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
    return &agent.RestoreResponse{
        Success: true,
    }, nil
}

// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
    return errors.New("batching not supported")
}
func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
    return nil
}

Server

此时,我们已经完成了 Handler 接口的完整实现。在上面的生命周期第 4 步中,Kapacitor 会为每次在任务中使用 UDF 而建立新的连接。由于我们的 UDF 进程可以同时处理多个连接,因此我们需要一种机制为每个连接创建新的 agenthandler

为此提供了一个 server,它需要一个 Accepter 接口的实现。

type Accepter interface {
    // Accept new connections from the listener and handle them accordingly.
    // The typical action is to create a new Agent with the connection as both its in and out objects.
    Accept(net.Conn)
}

这是一个简单的 accepter,它为每个新连接创建一个新的 agentmirrorHandler。将其添加到 main.go 文件中。

type accepter struct {
    count int64
}

// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
    count := acc.count
    acc.count++
    a := agent.New(conn, conn)
    h := newMirrorHandler(a)
    a.Handler = h

    log.Println("Starting agent for connection", count)
    a.Start()
    go func() {
        err := a.Wait()
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Agent for connection %d finished", count)
    }()
}

现在所有组件都已就位,我们可以更新我们的 main 函数来启动 server。用以下内容替换之前提供的 main 函数:

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // Create server that listens on the socket
    s := agent.NewServer(l, &accepter{})

    // Setup signal handler to stop Server on various signals
    s.StopOnSignals(os.Interrupt, syscall.SIGTERM)

    log.Println("Server listening on", addr.String())
    err = s.Serve()
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Server stopped")
}

启动 UDF

此时,我们已准备好启动 UDF。以下是完整的 main.go 文件供参考:

package main

import (
    "errors"
    "flag"
    "log"
    "net"
    "os"
    "syscall"

    "github.com/influxdata/kapacitor/udf/agent"
)

// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    agent *agent.Agent
}

func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
    return &mirrorHandler{agent: agent}
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        Wants:    agent.EdgeType_STREAM,
        Provides: agent.EdgeType_STREAM,
        Options:  map[string]*agent.OptionInfo{},
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    return init, nil
}

// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
    return &agent.SnapshotResponse{}, nil
}

// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
    return &agent.RestoreResponse{
        Success: true,
    }, nil
}

// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
    return errors.New("batching not supported")
}

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
    return nil
}

// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
    close(h.agent.Responses)
}

type accepter struct {
    count int64
}

// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
    count := acc.count
    acc.count++
    a := agent.New(conn, conn)
    h := newMirrorHandler(a)
    a.Handler = h

    log.Println("Starting agent for connection", count)
    a.Start()
    go func() {
        err := a.Wait()
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Agent for connection %d finished", count)
    }()
}

var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")

func main() {
    flag.Parse()

    // Create unix socket
    addr, err := net.ResolveUnixAddr("unix", *socketPath)
    if err != nil {
        log.Fatal(err)
    }
    l, err := net.ListenUnix("unix", addr)
    if err != nil {
        log.Fatal(err)
    }

    // Create server that listens on the socket
    s := agent.NewServer(l, &accepter{})

    // Setup signal handler to stop Server on various signals
    s.StopOnSignals(os.Interrupt, syscall.SIGTERM)

    log.Println("Server listening on", addr.String())
    err = s.Serve()
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Server stopped")
}

运行 go run main.go 来启动 UDF。如果您收到关于套接字被占用的错误,只需删除套接字文件,然后重试运行 UDF。

配置 Kapacitor 与 UDF 通信

现在我们的 UDF 已准备就绪,我们需要告诉 Kapacitor 我们的 UDF 套接字在哪里,并给它一个名称以便我们使用它。将此添加到您的 Kapacitor 配置文件中。

[udf]
[udf.functions]
    [udf.functions.mirror]
        socket = "/tmp/mirror.sock"
        timeout = "10s"

启动 Kapacitor

启动 Kapacitor,您应该会在 Kapacitor 日志和 UDF 进程日志中看到它连接到您的 UDF。

尝试一下

选择一个现有的任务,并在 TICKscript 管道的任何位置添加 @mirror() 来查看它的效果。

这是一个 TICKscript 示例,需要将其保存到文件中。

dbrp "telegraf"."autogen"

stream
    |from()
        .measurement('cpu')
    @mirror()
    |alert()
        .crit(lambda: "usage_idle" < 30)

像这样从终端定义上面的警报:

kapacitor define mirror_udf_example -tick path/to/above/script.tick

启动任务

kapacitor enable mirror_udf_example

检查任务状态

kapacitor show mirror_udf_example

添加自定义字段

现在让我们修改 UDF 以向数据添加一个字段。我们可以使用 Info/Init 方法来定义和消耗 UDF 上的选项,所以让我们指定要添加的字段名称。

更新 mirrorHandler 类型和 InfoInit 方法如下:

// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
    agent *agent.Agent
    name  string
    value float64
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
    info := &agent.InfoResponse{
        Wants:    agent.EdgeType_STREAM,
        Provides: agent.EdgeType_STREAM,
        Options: map[string]*agent.OptionInfo{
            "field": {ValueTypes: []agent.ValueType{
                agent.ValueType_STRING,
                agent.ValueType_DOUBLE,
            }},
        },
    }
    return info, nil
}

// Initialize the handler based of the provided options.
func (h *mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    init := &agent.InitResponse{
        Success: true,
        Error:   "",
    }
    for _, opt := range r.Options {
        switch opt.Name {
        case "field":
            h.name = opt.Values[0].Value.(*agent.OptionValue_StringValue).StringValue
            h.value = opt.Values[1].Value.(*agent.OptionValue_DoubleValue).DoubleValue
        }
    }

    if h.name == "" {
        init.Success = false
        init.Error = "must supply field"
    }
    return init, nil
}

现在我们可以在点上设置字段及其名称和值。更新 Point 方法:

func (h *mirrorHandler) Point(p *agent.Point) error {
    // Send back the point we just received
    if p.FieldsDouble == nil {
        p.FieldsDouble = make(map[string]float64)
    }
    p.FieldsDouble[h.name] = h.value

    h.agent.Responses <- &agent.Response{
        Message: &agent.Response_Point{
            Point: p,
        },
    }
    return nil
}

重启 UDF 进程并再次尝试。使用 .field(name, value) 方法指定要使用的字段名称和值。您可以在 mirror UDF 后面添加 |log() 来查看新字段是否已成功创建。

dbrp "telegraf"."autogen"

stream
    |from()
        .measurement('cpu')
    @mirror()
        .field('mycustom_field', 42.0)
    |log()
    |alert()
        .cirt(lambda: "usage_idle" < 30)

总结

此时,您应该能够使用基于套接字或基于进程的方法编写自定义 UDF。UDF 有多种用途,从作为连续查询一部分的自定义降采样逻辑、自定义异常检测算法,或者仅仅是一个“处理”数据的系统。

后续步骤

如果您想了解更多,这里有一些起点:

  • 修改 mirror UDF,使其功能类似于 DefaultNode。不是始终覆盖一个字段,而是在字段不存在时才设置它。另外,增加对设置标签和字段的支持。
  • 将 mirror UDF 从处理流更改为处理批次。这需要在 Info 方法中更改边缘类型,并实现 BeginBatchEndBatch 方法。
  • 查看其他示例,并修改其中一个以执行与您现有要求类似的操作。

此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2