sync.Map 不是线程安全的?

标签:Go

昨天在压测公司的项目时发现,sync.(*Map).Store() 抛了个异常,日志如下:
fatal error: sync: unlock of unlocked mutex

goroutine 63756060 [running]:
runtime.throw({0xbeeca9, 0xb45120})
#011/usr/local/go/src/runtime/panic.go:1198 +0x71 fp=0xc0010f13a0 sp=0xc0010f1370 pc=0x4336d1
sync.throw({0xbeeca9, 0xb2dd40})
#011/usr/local/go/src/runtime/panic.go:1184 +0x1e fp=0xc0010f13c0 sp=0xc0010f13a0 pc=0x45ecfe
sync.(*Mutex).unlockSlow(0x11d45a0, 0xffffffff)
#011/usr/local/go/src/sync/mutex.go:196 +0x3c fp=0xc0010f13e8 sp=0xc0010f13c0 pc=0x47851c
sync.(*Mutex).Unlock(...)
#011/usr/local/go/src/sync/mutex.go:190
sync.(*Map).Store(0x11d45a0, {0xae4960, 0xc0012f4350}, {0xae40e0, 0xc000026078})
#011/usr/local/go/src/sync/map.go:163 +0x465 fp=0xc0010f1480 sp=0xc0010f13e8 pc=0x476fa5
要知道这可是 Go 的官方库,sync.Map 可是专门为并发安全而设计的,难道直接调用它的接口也会出现线程冲突?

我先展示一下简化后的业务代码:
var onlineCache sync.Map

func AddOnlineDevice(uuid string) {
	ts := time.Now().Unix()
	onlineCache.Store(uuid, ts)
}

func SaveOnlineTimes() {
	onlineTimes := map[interface{}]interface{}{}

	onlineCache.Range(func(key, value interface{}) bool {
		onlineTimes[key] = value
		return true
	})
	onlineCache = sync.Map{}

	if len(onlineTimes) == 0 {
		return
	}

	err := dao.Get().Redis.UpdateOnlineTimes(onlineTimes)
	if err != nil {
		log.Error(err)
	}
}

func UpdateClientOnlineTimeCron() {
	go func() {
		for {
			<-time.After(time.Second * 30)

			SaveOnlineTimes()
		}
	}()
}
这里用一个全局的 sync.Map 类型的 onlineCache 变量来存储每个设备的在线时间,然后每个请求都调用 AddOnlineDevice() 来更新时间戳,再每隔 30 秒将 onlineCache 中的数据存储到 Redis 中,并将 onlineCache 重新赋值成一个空的 sync.Map(因为没有清空的方法)。
这段代码其实有一处明显的错误:调用 onlineCache.Range()onlineCache = sync.Map{} 不是线程安全的,如果这期间同时在调用 AddOnlineDevice(),这些数据可能会丢失。
可以想到的是将它赋值给一个临时的 sync.Map 变量,然后遍历这个临时的变量,这样线程不安全的时间会很短,但是这又违背了使用规范:A Map must not be copied after first use.

这里先不考虑丢失数据的问题,继续回到锁冲突的问题上。我再把代码简化一下,让它变得容易复现:
package main

import (
	"math/rand"
	"sync"
	"time"
)


func main() {
	sm := sync.Map{}
	for i := 0; i < 10; i++ {
		go func() {
			for {
				a := rand.Float32()
				b := rand.Float32()
				sm.Store(a, b)
			}
		}()
	}

	for {
		sm = sync.Map{}
		time.Sleep(time.Second)
	}
}
基本上运行 1、2 秒就会报错,可以确认的是,sm.Store(a, b) 的调用和 sm = sync.Map{} 的重新赋值在并发执行时确实会引发这个问题。

于是我搜索了一番,在 Github 里找到了一个相同的问题,并得到了一个解释
Although technically that doesn't violate the rule "don't copy a map before first use" because the map you're copying hasn't been used yet. But you're copying onto an in-use Map, which is also bad.
也就是说,虽然这里没有违反不能复制 sync.Map 的规定,但把它赋值给一个使用中的 sync.Map 也是不安全的。

于是我翻了下 sync/map.go 的代码:
func (m *Map) Store(key, value any) {
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// The entry was previously expunged, which implies that there is a
			// non-nil dirty map and this entry is not in it.
			m.dirty[key] = e
		}
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		e.storeLocked(&value)
	} else {
		if !read.amended {
			// We're adding the first new key to the dirty map.
			// Make sure it is allocated and mark the read-only map as incomplete.
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}
尝试解释一下:在 sync.(*Map).Store() 里会调用 m.mu.Lock() 来加锁,此后,外面的 sm 变量如果被赋值到了一个新的 sync.Map 结构,那在调用 m.mu.Unlock() 时,m 指向的是这个新的 sync.Map 结构,于是会解锁一个未加锁的 sync.Mutex 而出错。
为了验证我的猜想,我修改了这段代码,在 m.mu.Lock() 后和 m.mu.Unlock() 前加了行 println(atomic.LoadInt32(&m.mu.state)&1)。正常来说,因为这段代码是在加锁状态下执行的,m.mu.state & 1 的结果应该是 1(sync.mutexLocked),但我在测试时发现结果偶尔会变成 0,也就是未加锁的状态,说明在这个过程中,m.mu 被换掉了,唯一合理的解释就是 m 被指向了一个新创建的 sync.Map

那么怎样才能修复这个 bug 呢?
最简单的方式是用 map + sync.Mutex 的组合,但是这样会影响性能。还有啥办法可以让 sm.Store() 在执行的过程中,m 不会指向新的 sync.Map 结构呢?
我灵机一动想到,把 sm 声明成一个指针(即 sm := &sync.Map{}),这样即使在后续的赋值过程中,sm 变化了,但 m 指向的仍然是原来的 sync.Map 结构,不就能解决这个问题了吗?
测试了一下发现果然不报错了。而且换成指针后,顺便还缓解了前面提到的数据丢失的问题,因为复制 *sync.Map 是安全的。

0条评论 你不来一发么↓

    想说点什么呢?