在上一节 源码阅读(一):Golang map 我们讲到,map 不是并发安全的,本章我们就来了解下 map 为什么不是并发安全的?并发读写是会发生什么?如何保证 map 并发安全性问题?

为什么说 map 不是并发安全的

默认的 map 没有任何内置的并发控制机制,例如锁、信号量等。当多个 goroutine 同时读写 map 时,会处于竞争状态;例如以下几种情况:

  1. 当某个 goroutine 读/写某个 key 时,刚好有其他 goroutine 也进行读写,产生非预期值;
  2. 在读写 map 时,正处于重新分配内存空间或更新 bucket 哈希桶,导致数据结构异常;

从上节我们了解 map 源码的情况可知,默认提供一个无锁的 map,会显著降低实现的复杂度和提高性能,场景上更加通用化。

我们需要了解并发模型的一个概念: 同步顺序(synchronizes before) 1 在并发编程中内存模型,规定了某些操作(比如写操作 write)必须在其他操作(比如读操作 read)之前完成,就需要一些同步机制(如锁、信号量、通道等)来实现,以确保对共享资源的有序访问。

Why are map operations not defined to be atomic Go 官方博客这里解释了为什么 map 的操作不设计成原子性的。

  1. 在通常情况下使用 map 不需要对多个 goroutine 的并发访问进行并发安全保护,一般 map 都是某个更大的结构体或者计算的一部分,已经考虑了同步措施了;
  2. 如果要求所有的 map 的操作场景都需要使用锁(mutex),会比较影响性能,而只为了考虑少部分情况的安全性问题;
  3. map 的并发读取是安全的,当所有的 goroutine 都只是读取 map 的元素(查找或者是 for-range 遍历),在没有插入、删除等修改操作时并发访问都是安全的;
  4. 标准库也提供了 sync.Map 类型以解决并发读写的安全性问题,它使用在某些场景如静态缓存(static caches),但不适合作为内置 map 类型的通用替代品;

并发读写 map 时会发生什么

func main() {
	m := map[int]int{}
 
	wg := sync.WaitGroup{}
	wg.Add(20)
 
	// 并发写 map
	for i := 0; i < 10; i++ {
		go func(k, v int) {
			defer wg.Done()
			m[k] = v
		}(i, i*10)
	}
 
	// 并发读 map
	for i := 0; i < 10; i++ {
		go func(k int) {
			defer wg.Done()
			if v, ok := m[k]; ok {
				fmt.Printf("Key: %v, Value: %v\n", k, v)
			} else {
				fmt.Printf("Key: %v not found\n", k)
			}
		}(i)
	}
 
	wg.Wait()
}
$ go run ./syncmap
Key: 0, Value: 0
Key: 3, Value: 30
fatal error: concurrent map writes
Key: 4, Value: 40
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	// ...
	if h.flags&hashWriting != 0 {
	    fatal("concurrent map writes")
	}
	// ...
	// 设置 `flags` 的 `hashWriting` 标志位,将标志位进行翻转(如果原来是0就置为1,原来是1就置为0);
	// 目的就是用来检测防止多个 goroutine 并发写入同一个哈希表;
	h.flags &= hashWriting
	// ...
 
done:
	// ...
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	// 清除 `flags` 字段的 `hashWriting` 标记位
	// 先对 `hashWriting` 取反(`^`),然后再和 `flags` 进行相与操作 `&`
	h.flags &^= hashWriting
	// ...
}
 
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
	// ...
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}
	// ...
	h.flags ^= hashWriting
	// ...
 
search:
	// ...
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	// 清除 `flags` 字段的 `hashWriting` 标记位
	h.flags &^= hashWriting
}
 
func mapclear(t *maptype, h *hmap) {
	// ...
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	h.flags ^= hashWriting
	// ...
}
// src/runtime/map.go
const (
	// flags
	iterator     = 1 // there may be an iterator using buckets
	oldIterator  = 2 // there may be an iterator using oldbuckets
	// 4(2进制为100)
	hashWriting  = 4 // a goroutine is writing to the map
	sameSizeGrow = 8 // the current map growth is to a new map of the same size
)

如何才能保证并发读写是安全的

方案一:Mutex 锁

type ConcurrentMap[K comparable, V any] struct {
	m  map[K]V
	mu sync.Mutex // 锁以保证并发安全
}
 
func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
	m.mu.Lock() // 读数据前进行加锁
	v, ok := m.m[key]
	m.mu.Unlock()
	return v, ok
}
 
func (m *ConcurrentMap[K, V]) Set(key K, value V) {
	m.mu.Lock() // 写数据前进行加锁
	m.m[key] = value
	m.mu.Unlock()
}

这么实现并发问题解决了,但是性能上有比较大的影响;一种常见用法可以通过读写锁 sync.RWMutex2 进一步优化:

type ConcurrentMap[K comparable, V any] struct {
	m  map[K]V
	mu sync.RWMutex // 锁以保证并发安全
}
 
func NewConcurrentMap[K comparable, V any]() *ConcurrentMap[K, V] {
	return &ConcurrentMap[K, V]{
		m: make(map[K]V),
	}
}
 
func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
	m.mu.RLock() // 读数据前进行加锁
	v, ok := m.m[key]
	m.mu.RUnlock()
	return v, ok
}
 
func (m *ConcurrentMap[K, V]) Set(key K, value V) {
	m.mu.Lock() // 写数据前进行加锁
	m.m[key] = value
	m.mu.Unlock()
}
 
func (m *ConcurrentMap[K, V]) Delete(key K) {
	m.mu.Lock() // 写数据前进行加锁
	delete(m.m, key)
	m.mu.Unlock()
}
 
func (m *ConcurrentMap[K, V]) String() string {
	return fmt.Sprintf("%v", m.m)
}
 
func main() {
	m := NewConcurrentMap[string, int]()
	m.Set("foo", 1)
	m.Set("bar", 2)
 
	if v, ok := m.Get("foo"); ok {
		fmt.Printf("foo: %d\n", v)
	}
	m.Delete("bar")
 
	fmt.Printf("%+v\n", m)
}

方案二:sync.Map

让我们来改写下最上面的例子,使用 sync.Map 如何保证并发读写安全:

func main() {
	var m sync.Map
 
	wg := sync.WaitGroup{}
	wg.Add(20)
 
	// 并发读 map
	for i := 0; i < 10; i++ {
		go func(k, v int) {
			defer wg.Done()
			m.Store(k, v)
		}(i, i*10)
 
	}
 
	// 并发写 map
	for i := 0; i < 10; i++ {
		go func(k int) {
			defer wg.Done()
			if value, ok := m.Load(k); ok {
				fmt.Printf("Key: %v, Value: %v\n", k, value)
			} else {
				fmt.Printf("Key: %v not found\n", k)
			}
		}(i)
	}
 
	wg.Wait()
}

可以看到,最核心的两个方法就是 Load(key)Store(key, value),除此之外,sync.Map 还提供如下方法:

// src/sync/map.go
 
type Map struct {
	// contains filtered or unexported fields
}
 
// Delete 删除指定 `key`;
func (m *Map) Delete(key any)
// Load 加载指定 `key`,返回 `key` 对应的 `value` 值,并返回是否 `key` 存在;
func (m *Map) Load(key any) (value any, ok bool)
// LoadAndDelete 加载并删除指定 `key`,如果 `key` 存在的话,则删除并返回对应的 `value` 值和 `true`;
// 否则不进行删除并返回 `nil` 和 `false`;
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
// LoadOrStore 如果 `key` 存在则返回当前 `key` 对应的 `value` 值和 `true`,
// 否则保存对应的键值对,并返回 `false`;
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)
// Range 遍历 map,如果对应函数 `f` 返回 `false`,则停止并退出遍历;
func (m *Map) Range(f func(key, value any) bool)
// Store 保存键值对 `key`、`value`;
func (m *Map) Store(key, value any)
 
// go1.20 版本引入的几个方法:
// CompareAndSwap CAS 操作,如果 `key` 对应的 `value` 为 `old` 值,
// 则把值更新为 `new`,并返回 `true`,否则不更新并返回 `false`;
func (m *Map) CompareAndSwap(key, old, new any) bool
// CompareAndDelete 删除之前会比较 `key` 对应的 `value` 是否和 `old` 值相等;
// 如果相等的话,则删除该键值对,并返回 `true`,否则不进行删除并返回 `false`;
func (m *Map) CompareAndDelete(key, old any) (deleted bool)
// Swap 交换 `key` 对应的 `value` 值,返回 `key` 原来对应的值以及 `key` 是否存在;
func (m *Map) Swap(key, value any) (previous any, loaded bool)
// src/sync/map.go
 
// Map 的字段结构大致如下,我们会在下面具体展开说明各字段的用途
type Map struct {
	// mu 互斥锁,用于保护 read 和 dirty 哈希表的并发访问
	mu Mutex
	// read 只读哈希表,包含常用的键值对,通过 atomic.Value 来保证原子性和快速读取的操作;
	read atomic.Pointer[readOnly]
	// dirty 脏 map,包含所有更新后的键值对,查询指定 key 时,会优先读取 read map,找不到的时候再查找 dirty map;
	dirty map[any]*entry
	// misses 记录从 read map 中未命中的次数;当 misses 值达到一定阈值时,会把 dirty map 的数据迁移到 read map;
	misses int
}
 
type readOnly struct {
	m       map[any]*entry
	amended bool
}
 
type entry struct {
	p atomic.Pointer[any]
}

接下来,我们将借助 dlv 调试命令工具(如果想了解 dlv 的详细内容,可以参考这篇文章: 如何使用 dlv 进行 golang 代码调试)一步步看下 sync.Map 是如何保存元素的,具体代码如下:

// main.go
package main
 
import (
	"fmt"
	"sync"
)
 
func main() {
	var m sync.Map
 
	m.Store("foo", "bar")
}
$ ls .
main.go
 
# 调试 main.go 文件
$ dlv debug main.go
Type 'help' for list of commands.
 
# 设置断点
(dlv) break main.main
Breakpoint 1 set at 0x1082c0e for main.main() ./main.go:8
 
# 执行程序,并跳转到断点处
(dlv) continue
> main.main() ./main.go:8 (hits goroutine(1):1 total:1) (PC: 0x1082c0e)
     3: import (
     4:         "fmt"
     5:         "sync"
     6: )
     7:
=>   8: func main() {
     9:         var m sync.Map
    10:
    11:         m.Store("foo", "bar")
    12:
    13:         _, _ = m.Load("foo")
 
# 让我们执行三下 `s`(step) 进入 m.Store 方法
(dlv) s
> sync.(*Map).Store() /Users/vuri/.gvm/gos/go1.21.0/src/sync/map.go:154 (PC: 0x10664ea)
   149:         }
   150:         return *p, true
   151: }
   152:
   153: // Store sets the value for a key.
=> 154: func (m *Map) Store(key, value any) {
   155:         _, _ = m.Swap(key, value)
   156: }
// src/sync/map.go
 
// Store 保存键值对 `key`、`value`;
// 等价于 map[key] = value,设置字典的 key、value 值;
func (m *Map) Store(key, value any) {
	_, _ = m.Swap(key, value)
}
 
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
	// 加载 readonly 的哈希表,很明显这里第一次肯定找不到,继续往下走
	read := m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		// ...
	}
 
	// 双检锁逻辑,重新获取 readonly 哈希表
	m.mu.Lock()
	read = m.loadReadOnly()
	if e, ok := read.m[key]; ok {
		// ...
	} else if e, ok := m.dirty[key]; ok { // 获取 dirty 哈希表,也找不到指定元素
		// ...
	} else { // 首次时进入该分支
		if !read.amended { // 表示 readonly 和 dirty 两个哈希表是否数据不同步
			// 添加第一个元素到 dirty 哈希表;
			// 需要标记 readonly 哈希表处于数据未同步到 dirty 哈希表的状态(`amended` = true);
			// 这里会第一次进行初始化 dirty 哈希表(`m.dirty = make(map[any]*entry, len(read.m))`);
			m.dirtyLocked()
			m.read.Store(&readOnly{m: read.m, amended: true})
		}
		// 设置 `key`、`value` 到 dirty 哈希表中
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
	return previous, loaded
}
// src/sync/map.go
 
// Load 加载指定 `key`,返回 `key` 对应的 `value` 值,并返回是否 `key` 存在;
// 等价于 value, ok := map[key]
func (m *Map) Load(key any) (value any, ok bool) {
	// 加载 readonly 哈希表,此时还没有数据
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		// 再次加载 readonly 哈希表(在并发场景下有可能刚好 dirty 同步到 readonly);
		// `spurious miss`(误报的缓存未命中)说的就是这种情况;
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended { // 再次进入该代码分支
			e, ok = m.dirty[key]
			// 无论有没找到 `key`,都记录下 read 只读 map 未命中的情况;
			// 当满足条件(`m.misses < len(m.dirty)`)时,把 dirty 迁移到 read map;
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

前面我们提到了,当 m.misses 达到一定程度时,会把 dirty map 的数据迁移到 read map,让我们来看看 m.missLocked() 具体是如何实现的:

// src/sync/map.go
 
func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	// 这里实现很简单,全量更新 read map,同时置空 dirty map;
	// 当新添加元素时,会走到下一个函数的逻辑;
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}
 
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}
 
	// dirty map 先前被置空了,新分配内存空间;****
	read := m.loadReadOnly()
	m.dirty = make(map[any]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

Footnotes

  1. The Go Memory Model

  2. Go maps in action - map concurrency