把任务队列 delayed 移植到 Go 了
2023 3 24 05:22 PM 195次查看
目前的 delayed 实现是用 pickle 来做序列化的,其实之前也写过用 JSON 来做序列化的版本,但是遇到了几个问题:
- JSON 会丢失一些对象类型,例如无法区分 tuple 和 list。
- JSON 无法直接编码二进制字符串(bytes)。
- JSON 不支持原本 pickle 能支持的很多类型。
先看看序列化方案,主要从兼容性、易用性、速度和空间占用的角度来考虑。参考 go_serialization_benchmarks,性能最好的都是生成代码的。
但是生成代码意味着需要定义 schema,然后执行
go generate
。但 Go 的 struct 实际也协商了 schema,有点重复工作的意思,而且这增加了开发的成本,有可能修改参数类型后忘记改 schema 和重新生成。从调用方的角度来看,肯定是希望定义一个函数,然后直接就能用它比较好,而不是每定义一个函数,还需要维护一份 schema,并且记得生成代码。程序员的惰性决定了,一旦这件事是有成本的,就会尽量避免去做。因此我决定不采用生成代码的方案。
在不生成代码的方案中,MessagePack 是比较好的选择,比 JSON 短,可以传输二进制数据,且 Redis 的 Lua 脚本也内置了 cmsgpack。这样如果后续有什么特殊的需求,也可以直接用 Lua 脚本来处理任务,无需从 Redis 中取出,编辑完再塞回去。
经测试,shamaton/msgpack 和 msgpack-python 是其中性能较好的库。
而序列化
struct
和 []interface{}
的性能差不多,但前者的空间占用更多,主要是会当成 map 处理,字段名会占用空间,而后者则可能丢失类型(例如 int 序列化再反序列化后可能变成 uint8),导致接口参数类型不一致。再看看函数绑定方案。
Go 不像 Python 那么动态,不能从路径字符串生成函数的引用,因此需要一个主动的注册过程。一种方案是像 HTTP 服务一样自定义路径,并与函数绑定;另一种是用
runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
来获取函数路径。后者更方便些。接着是函数调用方案,Go 的函数调用接口有主要三种方案:
type Handler func(arg interface{})
:这种方式会丢失参数类型,MessagePack 在反序列化时不知道真实的类型,只能转成map[string]interface{}
,较难使用。type Handler func(ctx Context)
:需要把参数单独序列化,再调用ctx
的各种Bind()
接口,把参数转换成真正的函数参数类型,较为麻烦,类似大部分 HTTP 框架的 handler 实现。- 任意 func 类型,使用反射来调用:
这种方案可以适配任意函数,非常易用。但性能较差,且因为 MessagePack 在序列化时可能丢失类型,参数func test(a string, b int) int { return len(a) * b } func main() { f := reflect.ValueOf(test) a := reflect.ValueOf("abc") b := reflect.ValueOf(5) fmt.Println(f.Call([]reflect.Value{a, b})[0].Int()) }
b
在反序列化时可能因为节省空间变成了uint8(5)
,导致调用时 panic。
我在看 rpcx 的实现时,发现它为了解决这个问题,同时存储了参数的类型,然后用反射生成对应类型的值:
经测试,原生函数调用大概 0.25ns,加了类型断言大概 8ns,反射调用的性能则与参数和返回值的数量相关(上述函数约 280ns,但去掉返回值后降为 180ns,且减少了一次内存分配)。b := 5 bt := reflect.TypeOf(b) b2 := reflect.New(bt) bp := b2.Interface().(*int)
但是这种方案仍然不适合使用列表来存储参数,因为[]interface{}
或[]reflect.Value
是没有元素类型的,需要针对每个参数调用一次反序列化,实现非常低效。
为了支持任意数目的参数,我又翻了翻shamaton/msgpack
的代码,发现了一对隐藏的接口:msgpack.MarshalAsArray()
和msgpack.UnmarshalAsArray()
。一般在序列化 struct 时,会当成 map 来处理;而这对方法则是当成数组来处理,即第 0 个字段作为第 0 个元素,以此类推。所以我可以动态地构建一个 struct,然后用msgpack.UnmarshalAsArray()
反序列化成对应的 struct,此时 struct 的字段类型和函数接口是符合的,再构建对应的[]reflect.Value
作为参数即可:
这个实现因为增加了太多的反射,性能显而易见地慢。func x(a, b uint8) uint8 { return a + b } func main() { f := reflect.TypeOf(x) a0 := f.In(0) a1 := f.In(1) fields := []reflect.StructField{{ Name: "F0", // 字段名必须是大写(即可导出的)才能被 msgpack.Marshal() 序列化 Type: a0, }, { Name: "F1", Type: a1, }} st := reflect.StructOf(fields) d, _ := msgpack.Marshal([]interface{}{2, 3}) s := reflect.New(st) msgpack.UnmarshalAsArray(d, s.Interface()) args := []reflect.Value{s.Elem().Field(0), s.Elem().Field(1)} fmt.Println(reflect.ValueOf(x).Call(args)[0]) }
此外还需要解决可变参数的问题,reflect
库并没有提供检测的方法,...int
和[]int
类型的参数都会被判定为[]int
,但是函数类型的字符串表示中会包含...
,因此可以用strings.Contains(f.String(), "...")
来判断。当判定为可变参数函数后,调用方法从Call()
改成CallSlice()
,可变参数作为最后一个reflect.Value
类型的参数传入即可。
然后看看子进程方案。
在 Python 中可以简单地使用
os.fork()
来调用 C 语言的 fork()
函数,而 Go 并没有直接提供这样的接口,无论是 os/exec.Cmd()
还是 syscall.ForkExec()
,都是封装成了 fork - exec 的形式,不符合 delayed 的需求。而 delayed 之所以需要
fork()
,原因是可以控制进程的执行时间,在超时后可以强制干掉,而 goroutine 是不能在调用端结束的,需要在协程内主动退出。于是我又找了一番,经测试发现可以这样实现:
func main() {
r1, r2, err := syscall.Syscall(syscall.SYS_FORK, 0, 0, 0)
// r1: 子进程 pid
// r2:0 是父进程,1 是子进程
if err != 0 {
fmt.Println("Failed to fork:", err)
return
}
fmt.Println(r1, r2)
}
需要注意的是,多线程程序使用 fork()
时,只有当前线程会被克隆,而 Go 的 runtime 和第三方库都可能有后台线程,这可能导致奇怪的问题。另外还发现个 bug,子进程调用
os.Getpid()
会返回原进程(即父进程)的 pid,但 os.Getppid()
则是正常的。(某大佬回复说可能是因为缓存,避免每次都执行系统调用,而 syscall.Syscall(syscall.SYS_GETPID, 0, 0, 0)
是正常的。)创建完子进程,还需要等待子进程退出,Go 提供了阻塞的方案:
proc, err := os.FindProcess(pid)
if err != nil {
panic(err)
}
state, err := proc.Wait()
if err != nil {
panic(err)
}
然而 Wait()
方法并不接受 WNOHANG
参数,这使得无法同时监听子进程退出和等待超时。另一个可行方案是监听
SIGCHLD
信号:func main() {
rand.Seed(time.Now().UnixNano())
r1, r2, _ := syscall.Syscall(syscall.SYS_FORK, 0, 0, 0)
if r2 == 1 {
fmt.Println("sleep")
r := rand.Intn(1000)
time.Sleep(time.Millisecond * time.Duration(r))
fmt.Println("wake up")
if r < 500 {
os.Exit(1)
}
} else {
pid := int(r1)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGCHLD)
timer := time.NewTimer(time.Millisecond * time.Duration(rand.Intn(1000)))
for {
select {
case <-sigChan:
proc, err := os.FindProcess(pid)
if err != nil {
fmt.Println("Failed to find process:", err)
return
}
err = proc.Signal(syscall.Signal(0))
if err != nil {
fmt.Println("Failed to signal:", err)
return
}
state, err := proc.Wait() // 结束僵尸进程
if err != nil {
fmt.Println("Failed to wait:", err)
return
}
fmt.Println(state)
return
case <-timer.C:
fmt.Println("timeout")
proc, err := os.FindProcess(pid)
if err != nil {
fmt.Println("Failed to find process:", err)
return
}
err = proc.Kill()
if err != nil {
fmt.Println("Failed to signal:", err)
return
}
proc.Release()
}
}
}
}
这个方案的 bug 是外部进程可以对它发出 SIGCHLD
信号,此时没有办法判断子进程是否真的已经退出,只能阻塞在 Wait()
。最终我发现 Go 提供了
syscall.Wait4()
,相当于 waitpid()
的超集:for {
select {
case <-sigChan:
var status syscall.WaitStatus
p, err := syscall.Wait4(pid, &status, syscall.WNOHANG, nil)
if err != nil {
fmt.Println("Failed to wait4:", err)
return
}
if p == pid && (status.Exited() || status.Signaled()) {
// p 为 0 时表示没有子进程退出
// 子进程退出时,status.Exited() 为 true,status.ExitStatus() 为返回值,被信号退出时不会返回 true
// status.Signaled() 表示是否是信号导致子进程退出,此时 status.Signal() 为接收到的信号
return
}
case <-timer.C:
fmt.Println("timeout")
syscall.Kill(pid, syscall.SIGKILL)
}
}
注意只能用 SIGKILL
信号,Go 会在后台线程里处理 SIGTERM
、SIGQUIT
等信号,但是 fork()
时并不会复制后台线程,导致收到信号时崩溃。我们还需要在管理进程和执行进程间传递任务和执行结果,这种进程间通信使用 pipe 是比较通用的做法。Go 提供了
os.Pipe()
和 io.Pipe()
两种方式创建 pipe,其中后者的内部实现使用了 channel,但是并没有暴露出来,因此都只有同步的 Read()
和 Write()
接口。为了与上述的
select
组合使用,必须起一个新的协程,把读出来的数据写到一个 channel 里,让 select
去选择,这种实现有点丑。综上所述,想把 Python 模型的子进程版本移植到 Go 充满了异味,且引入了太多复杂性和不稳定性(第三方库可能起了后台线程导致
fork()
后崩溃),而子进程唯一的作用是任务执行超时可以被父进程干掉。因此现阶段先不实现该模型,有需要时也可以用一个外部进程来检测日志,或用 Redis 来存储任务的取出时间,分析是否需要干掉。
既然去掉了超时自动干掉的功能,那么任务的超时时间也需要去掉,但是因为 sweeper 依赖它来确定任务是否需要被重新放回队列,因此无法简单地删除这个字段。
其实 sweeper 只需要在回收任务时检查被取出的任务当前是否有 worker 在执行,没有就把它放回去即可。
因此需要维护一个正在执行的任务表,先让 worker 在启动时随机生成一个字符串作为 worker_id,然后在运行时起一个后台线程,每隔一段时间(假设 15 秒)执行
SETEX worker_id 60 1
。假设队列名是 default,使用如下的几个 key:
- default:list 类型,存储任务列表
- default_noti:list 类型,用于通知有新任务
- default_processing:hash 类型,field 为 worker_id,value 为正在处理的任务
default
和 default_noti
插入数据。在拉取任务时,用
BLPOP
来监听 default_noti
,一旦获取到数据,就执行这段 Lua 脚本:从 default
中 LPOP
一个任务,然后 HSET default_processing worker_id task_data
。在回收任务时,sweeper 遍历
default_processing
中的数据,然后检查该 worker 是否仍然存活(GET worker_id
),不存活时就放回任务队列。这里用两个 list 来处理的原因是避免
BLPOP
了任务后,进程挂了导致任务丢失。因为使用了 Lua 脚本,任务数据肯定在 default
或 default_processing
中,顶多只会丢掉一个通知,而 sweeper 是可以把通知补上的。这样优化后,去掉了之前使用的 sorted set,插入和取出任务的速度从
O(log(N))
变成了 O(1)
,但是与 Python 版不兼容,因此需要写两套 Lua 脚本,或者把 Python 版也改成同样的实现。既然是个新版本,这里选择修改 Python 版本的实现。主要的问题解决后,再来看看实现时的一些优化。
对于任务类型,我希望它的字段是私有的,这样就不需要考虑序列化和反序列化时字段被外部修改过,但
shamaton/msgpack
在序列化 struct 时,又要求字段是可导出的才能被序列化。为此我嵌套了一个 struct 来实现:type RawGoTask struct {
FuncPath string
Payload []byte // serialized arg
}
type GoTask struct {
raw RawGoTask // make it unexported but can be serialized by MessagePack
arg interface{}
data []byte // serialized data
}
而在序列化任务时,需要序列化两次,先把
GoTask.arg
序列化成 GoTask.raw.Payload
,再把 GoTask.raw
序列化成 GoTask.data
:func (t *GoTask) Serialize() (data []byte, err error) {
if len(t.data) != 0 {
return t.data, nil
}
if t.arg != nil {
t.raw.Payload, err = msgpack.MarshalAsArray(t.arg)
if err != nil {
log.Errorf("Failed to serialize task.arg: %v", err)
return
}
}
t.data, err = msgpack.MarshalAsArray(&t.raw)
if err != nil {
log.Errorf("Failed to serialize task.data: %v", err)
return
}
return t.data, nil
}
func DeserializeGoTask(data []byte) (task *GoTask, err error) {
t := &GoTask{
data: data,
}
err = msgpack.UnmarshalAsArray(data, &t.raw)
if err != nil {
log.Errorf("Failed to deserialize task: %v", err)
return
}
return t, nil
}
这里的原因是我希望任务的参数 GoTask.arg
可以接受任意类型,因此类型需要定义成 interface{}
。但是如何能让它被反序列化成符合函数接口的类型呢?
如果我将
GoTask.raw.FuncPath
和 GoTask.arg
一次性序列化到 GoTask.data
,那么在反序列化时,我无法知道 GoTask.arg
的实际类型;而分开序列化后,我在第一次反序列化时可以拿到 GoTask.raw.FuncPath
,就可以找到对应的函数,并拿到它的参数类型,然后再根据这个类型来反序列化 GoTask.arg
。而在封装处理函数时,还需要解决前面提到的慢的问题,我的做法是提前构造:
type Handler struct {
fn reflect.Value
path string
argCount int
arg interface{}
args []reflect.Value
isVariadic bool
}
func (h *Handler) Call(payload []byte) (result []reflect.Value, err error) {
if h.argCount > 0 && len(payload) > 0 {
err := msgpack.UnmarshalAsArray(payload, h.arg)
if err != nil {
log.Errorf("Failed to unmarshal payload: %v", err)
return nil, err
}
}
if h.isVariadic {
return h.fn.CallSlice(h.args), nil
}
return h.fn.Call(h.args), nil
}
可以看到,*Handler.Call()
在执行时,中间用到的所有值都提前分配到 Handler
的 struct 里了,除了 msgpack.UnmarshalAsArray()
以外,不会有内存分配和反射的开销。而我们的执行模式是一次取一个任务,执行完再处理下一个,因此不会遇到有两个任务同时使用同一个 Handler
的并发问题。那么如何提前构造好这些参数呢?这就需要用到
NewHandler()
了:func NewHandler(f interface{}) (h *Handler) {
fn := reflect.ValueOf(f)
if fn.Kind() != reflect.Func {
return nil
}
path := runtime.FuncForPC(fn.Pointer()).Name()
if path == "" {
return nil
}
fnType := fn.Type()
h = &Handler{
fn: fn,
path: path,
argCount: fnType.NumIn(),
}
// the rest fields can be reused among tasks, because the worker won't handle tasks concurrently
if h.argCount == 0 {
h.args = []reflect.Value{}
} else {
h.isVariadic = strings.Contains(fnType.String(), "...")
if h.argCount == 1 {
argType := fnType.In(0)
arg := reflect.New(argType)
h.arg = arg.Interface()
h.args = []reflect.Value{arg.Elem()}
} else {
fields := make([]reflect.StructField, h.argCount)
for i := 0; i < h.argCount; i++ {
arg := fnType.In(i)
fields[i] = reflect.StructField{
Name: "F" + strconv.Itoa(i),
Type: arg,
}
}
argType := reflect.StructOf(fields)
arg := reflect.New(argType)
argElem := arg.Elem()
h.arg = arg.Interface()
h.args = make([]reflect.Value, h.argCount)
for i := 0; i < h.argCount; i++ {
h.args[i] = argElem.Field(i)
}
}
}
return
}
这里有 3 个分支:- 对于无参数的函数,构造一个空的
[]reflect.Value
作为参数列表即可。 - 对于 1 个参数的函数,假设参数类型是
int
;arg
是这个参数类型的指针的反射值,即指向*int
的reflect.Value
;h.arg
是arg
的实际值,即*int
类型;arg.Elem()
是arg
内部的*int
所指向的int
的反射值;h.args
则是只包含一个int
反射值的[]reflect.Value
,可以直接作为h.fn.Call()
的参数。 - 对于 2 个或更多参数的函数,假设参数类型是
int
和string
;先构造一个 struct,假设类型名叫Args
,2 个字段分别是int
和string
类型;然后创建一个*Args
类型的反射值作为arg
;argElem
是arg
所指向的Args
变量的反射值;h.arg
是*Args
类型;h.args
是h.argCount
长度的[]reflect.Value
,里面的每个元素都是argElem
的字段。这样在执行msgpack.UnmarshalAsArray(payload, h.arg)
时,h.arg
因为是*Args
类型,可以正确地处理类型;h.args
里的各个元素也指向了h.arg
的各个字段,所以类型也是正确的。
reflect.Value
内部是用 unsafe.Pointer
实现的,因此它也相当于是一个指针。当 h.arg
这个指针和 h.args
这个指针数组指向同一个 struct
时,修改前一个的值,后一个的值也会跟着变化,没有额外开销。其他的部分比较好懂就不解释了,大家可以直接看 go-delayed 源码。
0条评论 你不来一发么↓