sync.Map 不是线程安全的?
2022 10 26 12:35 PM 263次查看
sync.(*Map).Store() 抛了个异常,日志如下:fatal error: sync: unlock of unlocked mutex
goroutine 63756060 [running]:
runtime.throw({0xbeeca9, 0xb45120})
#011/usr/local/go/src/runtime/panic.go:1198 +0x71 fp=0xc0010f13a0 sp=0xc0010f1370 pc=0x4336d1
sync.throw({0xbeeca9, 0xb2dd40})
#011/usr/local/go/src/runtime/panic.go:1184 +0x1e fp=0xc0010f13c0 sp=0xc0010f13a0 pc=0x45ecfe
sync.(*Mutex).unlockSlow(0x11d45a0, 0xffffffff)
#011/usr/local/go/src/sync/mutex.go:196 +0x3c fp=0xc0010f13e8 sp=0xc0010f13c0 pc=0x47851c
sync.(*Mutex).Unlock(...)
#011/usr/local/go/src/sync/mutex.go:190
sync.(*Map).Store(0x11d45a0, {0xae4960, 0xc0012f4350}, {0xae40e0, 0xc000026078})
#011/usr/local/go/src/sync/map.go:163 +0x465 fp=0xc0010f1480 sp=0xc0010f13e8 pc=0x476fa5要知道这可是 Go 的官方库,sync.Map 可是专门为并发安全而设计的,难道直接调用它的接口也会出现线程冲突?我先展示一下简化后的业务代码:
var onlineCache sync.Map
func AddOnlineDevice(uuid string) {
ts := time.Now().Unix()
onlineCache.Store(uuid, ts)
}
func SaveOnlineTimes() {
onlineTimes := map[interface{}]interface{}{}
onlineCache.Range(func(key, value interface{}) bool {
onlineTimes[key] = value
return true
})
onlineCache = sync.Map{}
if len(onlineTimes) == 0 {
return
}
err := dao.Get().Redis.UpdateOnlineTimes(onlineTimes)
if err != nil {
log.Error(err)
}
}
func UpdateClientOnlineTimeCron() {
go func() {
for {
<-time.After(time.Second * 30)
SaveOnlineTimes()
}
}()
}这里用一个全局的 sync.Map 类型的 onlineCache 变量来存储每个设备的在线时间,然后每个请求都调用 AddOnlineDevice() 来更新时间戳,再每隔 30 秒将 onlineCache 中的数据存储到 Redis 中,并将 onlineCache 重新赋值成一个空的 sync.Map(因为没有清空的方法)。这段代码其实有一处明显的错误:调用
onlineCache.Range() 和 onlineCache = sync.Map{} 不是线程安全的,如果这期间同时在调用 AddOnlineDevice(),这些数据可能会丢失。可以想到的是将它赋值给一个临时的
sync.Map 变量,然后遍历这个临时的变量,这样线程不安全的时间会很短,但是这又违背了使用规范:A Map must not be copied after first use.这里先不考虑丢失数据的问题,继续回到锁冲突的问题上。我再把代码简化一下,让它变得容易复现:
package main
import (
"math/rand"
"sync"
"time"
)
func main() {
sm := sync.Map{}
for i := 0; i < 10; i++ {
go func() {
for {
a := rand.Float32()
b := rand.Float32()
sm.Store(a, b)
}
}()
}
for {
sm = sync.Map{}
time.Sleep(time.Second)
}
}基本上运行 1、2 秒就会报错,可以确认的是,sm.Store(a, b) 的调用和 sm = sync.Map{} 的重新赋值在并发执行时确实会引发这个问题。于是我搜索了一番,在 Github 里找到了一个相同的问题,并得到了一个解释:
Although technically that doesn't violate the rule "don't copy a map before first use" because the map you're copying hasn't been used yet. But you're copying onto an in-use Map, which is also bad.也就是说,虽然这里没有违反不能复制
sync.Map 的规定,但把它赋值给一个使用中的 sync.Map 也是不安全的。于是我翻了下 sync/map.go 的代码:
func (m *Map) Store(key, value any) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}尝试解释一下:在 sync.(*Map).Store() 里会调用 m.mu.Lock() 来加锁,此后,外面的 sm 变量如果被赋值到了一个新的 sync.Map 结构,那在调用 m.mu.Unlock() 时,m 指向的是这个新的 sync.Map 结构,于是会解锁一个未加锁的 sync.Mutex 而出错。为了验证我的猜想,我修改了这段代码,在
m.mu.Lock() 后和 m.mu.Unlock() 前加了行 println(atomic.LoadInt32(&m.mu.state)&1)。正常来说,因为这段代码是在加锁状态下执行的,m.mu.state & 1 的结果应该是 1(sync.mutexLocked),但我在测试时发现结果偶尔会变成 0,也就是未加锁的状态,说明在这个过程中,m.mu 被换掉了,唯一合理的解释就是 m 被指向了一个新创建的 sync.Map。那么怎样才能修复这个 bug 呢?
最简单的方式是用
map + sync.Mutex 的组合,但是这样会影响性能。还有啥办法可以让 sm.Store() 在执行的过程中,m 不会指向新的 sync.Map 结构呢?我灵机一动想到,把
sm 声明成一个指针(即 sm := &sync.Map{}),这样即使在后续的赋值过程中,sm 变化了,但 m 指向的仍然是原来的 sync.Map 结构,不就能解决这个问题了吗?测试了一下发现果然不报错了。而且换成指针后,顺便还缓解了前面提到的数据丢失的问题,因为复制
*sync.Map 是安全的。
0条评论 你不来一发么↓