编写基于套接字的用户自定义函数 (UDF)
在 另一个示例 中,我们看到了如何为自定义异常检测工作负载编写基于进程的 UDF。在本示例中,我们将学习如何编写一个简单的基于套接字的 UDF。
什么是用户自定义函数 (UDF)?
UDF 是一个用户定义的函数,可以与 Kapacitor 通信以处理数据。Kapacitor 将向其发送数据,UDF 可以响应新的或修改的数据。UDF 可以使用任何具有 协议缓冲区 支持的语言编写。
套接字 UDF 和进程 UDF 之间有什么区别?
- 进程 UDF 是 Kapacitor 的子进程,它通过 STDIN/STDOUT 与 Kapacitor 通信,并且完全由 Kapacitor 管理。
- 套接字 UDF 是 Kapacitor 外部的进程,它通过配置的 Unix 域套接字进行通信。进程本身不由 Kapacitor 管理。
使用进程 UDF 可能比套接字 UDF 更简单,因为 Kapacitor 将生成进程并为您管理一切。另一方面,您可能希望对 UDF 进程本身有更多控制权,而宁愿仅向 Kapacitor 公开一个套接字。一个常见的用例是在 Docker 容器中运行 Kapacitor,并在另一个容器中运行 UDF,该容器通过 Docker 卷公开套接字。
在这两种情况下,协议是相同的,唯一的区别是传输机制。另请注意,由于多个 Kapacitor 任务可以使用同一个 UDF,因此对于基于进程的 UDF,每次使用 UDF 时都会生成一个新的子进程。相比之下,对于基于套接字的 UDF,每次使用 UDF 时都会建立一个新的套接字连接。如果您多次使用同一个 UDF,则最好使用套接字 UDF 以保持运行进程的数量较低。
编写 UDF
UDF 通过协议缓冲区请求/响应系统与 Kapacitor 通信。我们提供了 Go 和 Python 两种语言的通信层实现。由于另一个示例使用了 Python,我们将在此处使用 Go 版本。
我们的示例将实现一个 mirror
UDF,它只是将接收到的所有数据反射回 Kapacitor 服务器。此示例实际上是测试套件的一部分,Python 和 Go 实现都可以在 此处 找到。
生命周期
在我们编写任何代码之前,让我们看一下套接字 UDF 的生命周期
- UDF 进程独立于 Kapacitor 启动。
- 进程在 Unix 域套接字上监听。
- Kapacitor 连接到套接字并查询有关 UDF 选项的基本信息。
- 启用了使用 UDF 的 Kapacitor 任务,Kapacitor 与套接字建立新的连接。
- 任务通过套接字连接读取和写入数据。
- 如果任务因任何原因停止,套接字连接将关闭。
Main 方法
我们需要编写一个程序,该程序启动并在套接字上监听。以下代码是一个 main 函数,它在默认路径或指定为 -socket
标志的自定义路径上的套接字上监听。
package main
import (
"flag"
"log"
"net"
)
var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// More to come here...
}
将上面的代码放在名为 main.go
的临时目录中。上面的代码可以通过 go run main.go
运行,但在此时它将在监听套接字后立即退出。
Agent (代理)
如前所述,Kapacitor 提供了 UDF 通信层的实现,称为 agent
(代理)。我们的代码只需要实现一个接口即可利用 agent
逻辑。
我们需要实现的接口如下
// The Agent calls the appropriate methods on the Handler as it receives requests over a socket.
//
// Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent.
// Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself.
// These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.
//
// The Handler is called from a single goroutine, meaning methods will not be called concurrently.
//
// To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.
type Handler interface {
// Return the InfoResponse. Describing the properties of this Handler
Info() (*agent.InfoResponse, error)
// Initialize the Handler with the provided options.
Init(*agent.InitRequest) (*agent.InitResponse, error)
// Create a snapshot of the running state of the handler.
Snapshot() (*agent.SnapshotResponse, error)
// Restore a previous snapshot.
Restore(*agent.RestoreRequest) (*agent.RestoreResponse, error)
// A batch has begun.
BeginBatch(*agent.BeginBatch) error
// A point has arrived.
Point(*agent.Point) error
// The batch is complete.
EndBatch(*agent.EndBatch) error
// Gracefully stop the Handler.
// No other methods will be called.
Stop()
}
Handler (处理程序)
让我们定义我们自己的类型,以便我们可以开始实现 Handler
接口。按如下方式更新 main.go
文件
package main
import (
"flag"
"log"
"net"
"github.com/influxdata/kapacitor/udf/agent"
)
// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
// We need a reference to the agent so we can write data
// back to Kapacitor.
agent *agent.Agent
}
func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
return &mirrorHandler{agent: agent}
}
var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// More to come here...
}
现在让我们添加初始化 UDF 所需的每个方法。接下来的这些方法实现了上面 UDF 生命周期步骤 3 中描述的行为,其中 Kapacitor 连接到套接字以查询有关 UDF 的基本信息。
将这些方法添加到 main.go
文件
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
info := &agent.InfoResponse{
// We want a stream edge
Wants: agent.EdgeType_STREAM,
// We provide a stream edge
Provides: agent.EdgeType_STREAM,
// We expect no options.
Options: map[string]*agent.OptionInfo{},
}
return info, nil
}
// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
// Since we expected no options this method is trivial
// and we return success.
init := &agent.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
目前,我们简单的镜像 UDF 不需要任何选项,因此这些方法是微不足道的。在本示例的末尾,我们将修改代码以接受自定义选项。
现在 Kapacitor 知道我们的 UDF 使用哪些边缘类型和选项,我们需要实现处理数据的方法。
将此方法添加到 main.go
文件,该方法将它接收到的每个点通过 agent(代理)发送回 Kapacitor
func (h *mirrorHandler) Point(p *agent.Point) error {
// Send back the point we just received
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: p,
},
}
return nil
}
请注意,agent
(代理)有一个用于响应的通道,这是因为您的 UDF 可以随时向 Kapacitor 发送数据,因此它不需要响应才能接收点。
因此,我们需要关闭通道以让 agent
(代理)知道我们将不再发送任何数据,这可以通过 Stop
方法完成。一旦 agent
(代理)在 handler
(处理程序)上调用 Stop
,将不会调用其他方法,并且 agent
(代理)在通道关闭之前不会停止。这使 UDF 有机会在关闭之前刷新任何剩余数据
// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
// Close the channel since we won't be sending any more data to Kapacitor
close(h.agent.Responses)
}
即使我们已经实现了处理程序实现的大部分,但仍然缺少一些方法。具体来说,缺少围绕批处理和快照/恢复的方法,但是,由于我们不需要它们,我们将只给出它们简单的实现
// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
return &agent.RestoreResponse{
Success: true,
}, nil
}
// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
return errors.New("batching not supported")
}
func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
return nil
}
Server (服务器)
此时,我们已经完全实现了 Handler
接口。在上面生命周期中的步骤 4 中,Kapacitor 为任务中的每次使用与 UDF 建立新的连接。由于我们的 UDF 进程可以同时处理多个连接,因此我们需要一种机制来为每个连接创建新的 agent
(代理)和 handler
(处理程序)。
为此目的提供了 server
(服务器),它期望实现 Accepter
接口
type Accepter interface {
// Accept new connections from the listener and handle them accordingly.
// The typical action is to create a new Agent with the connection as both its in and out objects.
Accept(net.Conn)
}
这是一个简单的 accepter
(接受器),它为每个新连接创建一个新的 agent
(代理)和 mirrorHandler
(镜像处理程序)。将其添加到 main.go
文件
type accepter struct {
count int64
}
// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
count := acc.count
acc.count++
a := agent.New(conn, conn)
h := newMirrorHandler(a)
a.Handler = h
log.Println("Starting agent for connection", count)
a.Start()
go func() {
err := a.Wait()
if err != nil {
log.Fatal(err)
}
log.Printf("Agent for connection %d finished", count)
}()
}
现在所有部分都已到位,我们可以更新我们的 main
函数以启动 server
(服务器)。将先前提供的 main
函数替换为
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// Create server that listens on the socket
s := agent.NewServer(l, &accepter{})
// Setup signal handler to stop Server on various signals
s.StopOnSignals(os.Interrupt, syscall.SIGTERM)
log.Println("Server listening on", addr.String())
err = s.Serve()
if err != nil {
log.Fatal(err)
}
log.Println("Server stopped")
}
启动 UDF
此时,我们已准备好启动 UDF。这是完整的 main.go
文件,供参考
package main
import (
"errors"
"flag"
"log"
"net"
"os"
"syscall"
"github.com/influxdata/kapacitor/udf/agent"
)
// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
agent *agent.Agent
}
func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
return &mirrorHandler{agent: agent}
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
info := &agent.InfoResponse{
Wants: agent.EdgeType_STREAM,
Provides: agent.EdgeType_STREAM,
Options: map[string]*agent.OptionInfo{},
}
return info, nil
}
// Initialize the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
init := &agent.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
return &agent.RestoreResponse{
Success: true,
}, nil
}
// Start working with the next batch
func (*mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
return errors.New("batching not supported")
}
func (h *mirrorHandler) Point(p *agent.Point) error {
// Send back the point we just received
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: p,
},
}
return nil
}
func (*mirrorHandler) EndBatch(end *agent.EndBatch) error {
return nil
}
// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
close(h.agent.Responses)
}
type accepter struct {
count int64
}
// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
count := acc.count
acc.count++
a := agent.New(conn, conn)
h := newMirrorHandler(a)
a.Handler = h
log.Println("Starting agent for connection", count)
a.Start()
go func() {
err := a.Wait()
if err != nil {
log.Fatal(err)
}
log.Printf("Agent for connection %d finished", count)
}()
}
var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")
func main() {
flag.Parse()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// Create server that listens on the socket
s := agent.NewServer(l, &accepter{})
// Setup signal handler to stop Server on various signals
s.StopOnSignals(os.Interrupt, syscall.SIGTERM)
log.Println("Server listening on", addr.String())
err = s.Serve()
if err != nil {
log.Fatal(err)
}
log.Println("Server stopped")
}
运行 go run main.go
以启动 UDF。如果您收到有关套接字正在使用的错误,只需删除套接字文件并尝试再次运行 UDF。
配置 Kapacitor 以与 UDF 通信
现在我们的 UDF 已准备就绪,我们需要告诉 Kapacitor 我们的 UDF 套接字在哪里,并为其命名以便我们可以使用它。将其添加到您的 Kapacitor 配置文件中
[udf]
[udf.functions]
[udf.functions.mirror]
socket = "/tmp/mirror.sock"
timeout = "10s"
启动 Kapacitor
启动 Kapacitor,您应该在 Kapacitor 日志和 UDF 进程日志中看到它连接到您的 UDF。
试用一下
获取现有任务并在 TICKscript 管道中的任何位置添加 @mirror()
以查看其运行情况。
这是一个示例 TICKscript,需要保存到文件中
dbrp "telegraf"."autogen"
stream
|from()
.measurement('cpu')
@mirror()
|alert()
.crit(lambda: "usage_idle" < 30)
像这样从终端定义上面的警报
kapacitor define mirror_udf_example -tick path/to/above/script.tick
启动任务
kapacitor enable mirror_udf_example
检查任务的状态
kapacitor show mirror_udf_example
添加自定义字段
现在让我们更改 UDF 以向数据添加字段。我们可以使用 Info/Init
方法来定义和使用 UDF 上的选项,因此让我们指定要添加的字段的名称。
按如下方式更新 mirrorHandler
类型以及 Info
和 Init
方法
// Mirrors all points it receives back to Kapacitor
type mirrorHandler struct {
agent *agent.Agent
name string
value float64
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
info := &agent.InfoResponse{
Wants: agent.EdgeType_STREAM,
Provides: agent.EdgeType_STREAM,
Options: map[string]*agent.OptionInfo{
"field": {ValueTypes: []agent.ValueType{
agent.ValueType_STRING,
agent.ValueType_DOUBLE,
}},
},
}
return info, nil
}
// Initialize the handler based of the provided options.
func (h *mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
init := &agent.InitResponse{
Success: true,
Error: "",
}
for _, opt := range r.Options {
switch opt.Name {
case "field":
h.name = opt.Values[0].Value.(*agent.OptionValue_StringValue).StringValue
h.value = opt.Values[1].Value.(*agent.OptionValue_DoubleValue).DoubleValue
}
}
if h.name == "" {
init.Success = false
init.Error = "must supply field"
}
return init, nil
}
现在我们可以使用其名称和值在点上设置字段。更新 Point
方法
func (h *mirrorHandler) Point(p *agent.Point) error {
// Send back the point we just received
if p.FieldsDouble == nil {
p.FieldsDouble = make(map[string]float64)
}
p.FieldsDouble[h.name] = h.value
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: p,
},
}
return nil
}
重新启动 UDF 进程并再次试用。使用 .field(name, value)
方法指定要使用的字段名称和值。您可以在 mirror
UDF 之后添加 |log()
以查看是否确实已创建新字段。
dbrp "telegraf"."autogen"
stream
|from()
.measurement('cpu')
@mirror()
.field('mycustom_field', 42.0)
|log()
|alert()
.cirt(lambda: "usage_idle" < 30)
总结
至此,您应该能够使用基于套接字或进程的方法编写自定义 UDF。UDF 的用途非常广泛,从作为持续查询一部分的自定义降采样逻辑、自定义异常检测算法,到只是一个稍微“按摩”您的数据的系统。
下一步
如果您想了解更多信息,以下是一些开始的地方
- 修改镜像 UDF,使其功能类似于 DefaultNode。不要总是覆盖字段,而只在字段不存在时才设置它。还要添加对设置标签以及字段的支持。
- 更改镜像 UDF 以在批处理而不是流上工作。这需要在
Info
方法中更改边缘类型,并实现BeginBatch
和EndBatch
方法。 - 查看其他 示例 并修改一个示例以执行类似于您的现有需求的操作。
此页是否对您有帮助?
感谢您的反馈!