sync.Map 不是线程安全的?
2022 10 26 12:35 PM 220次查看
sync.(*Map).Store()
抛了个异常,日志如下:fatal error: sync: unlock of unlocked mutex
goroutine 63756060 [running]:
runtime.throw({0xbeeca9, 0xb45120})
#011/usr/local/go/src/runtime/panic.go:1198 +0x71 fp=0xc0010f13a0 sp=0xc0010f1370 pc=0x4336d1
sync.throw({0xbeeca9, 0xb2dd40})
#011/usr/local/go/src/runtime/panic.go:1184 +0x1e fp=0xc0010f13c0 sp=0xc0010f13a0 pc=0x45ecfe
sync.(*Mutex).unlockSlow(0x11d45a0, 0xffffffff)
#011/usr/local/go/src/sync/mutex.go:196 +0x3c fp=0xc0010f13e8 sp=0xc0010f13c0 pc=0x47851c
sync.(*Mutex).Unlock(...)
#011/usr/local/go/src/sync/mutex.go:190
sync.(*Map).Store(0x11d45a0, {0xae4960, 0xc0012f4350}, {0xae40e0, 0xc000026078})
#011/usr/local/go/src/sync/map.go:163 +0x465 fp=0xc0010f1480 sp=0xc0010f13e8 pc=0x476fa5
要知道这可是 Go 的官方库,sync.Map
可是专门为并发安全而设计的,难道直接调用它的接口也会出现线程冲突?我先展示一下简化后的业务代码:
var onlineCache sync.Map
func AddOnlineDevice(uuid string) {
ts := time.Now().Unix()
onlineCache.Store(uuid, ts)
}
func SaveOnlineTimes() {
onlineTimes := map[interface{}]interface{}{}
onlineCache.Range(func(key, value interface{}) bool {
onlineTimes[key] = value
return true
})
onlineCache = sync.Map{}
if len(onlineTimes) == 0 {
return
}
err := dao.Get().Redis.UpdateOnlineTimes(onlineTimes)
if err != nil {
log.Error(err)
}
}
func UpdateClientOnlineTimeCron() {
go func() {
for {
<-time.After(time.Second * 30)
SaveOnlineTimes()
}
}()
}
这里用一个全局的 sync.Map
类型的 onlineCache
变量来存储每个设备的在线时间,然后每个请求都调用 AddOnlineDevice()
来更新时间戳,再每隔 30 秒将 onlineCache
中的数据存储到 Redis 中,并将 onlineCache
重新赋值成一个空的 sync.Map
(因为没有清空的方法)。这段代码其实有一处明显的错误:调用
onlineCache.Range()
和 onlineCache = sync.Map{}
不是线程安全的,如果这期间同时在调用 AddOnlineDevice()
,这些数据可能会丢失。可以想到的是将它赋值给一个临时的
sync.Map
变量,然后遍历这个临时的变量,这样线程不安全的时间会很短,但是这又违背了使用规范:A Map must not be copied after first use.这里先不考虑丢失数据的问题,继续回到锁冲突的问题上。我再把代码简化一下,让它变得容易复现:
package main
import (
"math/rand"
"sync"
"time"
)
func main() {
sm := sync.Map{}
for i := 0; i < 10; i++ {
go func() {
for {
a := rand.Float32()
b := rand.Float32()
sm.Store(a, b)
}
}()
}
for {
sm = sync.Map{}
time.Sleep(time.Second)
}
}
基本上运行 1、2 秒就会报错,可以确认的是,sm.Store(a, b)
的调用和 sm = sync.Map{}
的重新赋值在并发执行时确实会引发这个问题。于是我搜索了一番,在 Github 里找到了一个相同的问题,并得到了一个解释:
Although technically that doesn't violate the rule "don't copy a map before first use" because the map you're copying hasn't been used yet. But you're copying onto an in-use Map, which is also bad.也就是说,虽然这里没有违反不能复制
sync.Map
的规定,但把它赋值给一个使用中的 sync.Map
也是不安全的。于是我翻了下 sync/map.go 的代码:
func (m *Map) Store(key, value any) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
尝试解释一下:在 sync.(*Map).Store()
里会调用 m.mu.Lock()
来加锁,此后,外面的 sm
变量如果被赋值到了一个新的 sync.Map
结构,那在调用 m.mu.Unlock()
时,m
指向的是这个新的 sync.Map
结构,于是会解锁一个未加锁的 sync.Mutex
而出错。为了验证我的猜想,我修改了这段代码,在
m.mu.Lock()
后和 m.mu.Unlock()
前加了行 println(atomic.LoadInt32(&m.mu.state)&1)
。正常来说,因为这段代码是在加锁状态下执行的,m.mu.state & 1
的结果应该是 1(sync.mutexLocked
),但我在测试时发现结果偶尔会变成 0,也就是未加锁的状态,说明在这个过程中,m.mu
被换掉了,唯一合理的解释就是 m
被指向了一个新创建的 sync.Map
。那么怎样才能修复这个 bug 呢?
最简单的方式是用
map
+ sync.Mutex
的组合,但是这样会影响性能。还有啥办法可以让 sm.Store()
在执行的过程中,m
不会指向新的 sync.Map
结构呢?我灵机一动想到,把
sm
声明成一个指针(即 sm := &sync.Map{}
),这样即使在后续的赋值过程中,sm
变化了,但 m
指向的仍然是原来的 sync.Map
结构,不就能解决这个问题了吗?测试了一下发现果然不报错了。而且换成指针后,顺便还缓解了前面提到的数据丢失的问题,因为复制
*sync.Map
是安全的。
0条评论 你不来一发么↓