1、多核多线程程序的问题
现代 CPU 都是都是多核架构,每个核心都有自己的 cache。在共享内存时,多线程执行可能产生意料之外的结果。
1.1、乱序
static int a = 0, flag = 0, b;
void thread1() {
a = 26; // (1)
flag = 1; // (2)
}
void thread2() {
while (flag != 1); // (3)
b = a; // (4)
}
上述 thread1 和 thread2 并发执行后,b 的值可能为 0 吗?
答案是可能的,应为编译器可以对指令进行乱序,(2) 可能在 (1) 前面执行,全局执行顺序是 (2)(3)(4)(1) 时,b 被赋值为 0。即使编译器不做任何优化,CPU 硬件也可能会乱序执行,产生同样的结果。
1.2、双重检查锁单例真的线程安全吗?
正是由于乱序,C++11 之前,实现线程安全的单例模式变得困难。下述是双重检查锁单例的示例代码
/// ==========singleton.h==============
class Singleton {
public:
static Singleton* GetInstance();
void Method() {
// ...
}
private:
Singleton() = default;
~Singleton() = default;
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
static Singleton* instance_;
static std::mutex mutex_;
};
/// ==========singleton.cc==============
Singleton* Singleton::instance_ = nullptr;
std::mutex Singleton::mutex_;
Singleton* Singleton::GetInstance() {
if (instance_ == nullptr) {
std::lock_guard<std::mutex> guard(mutex_);
if (instance_ == nullptr) {
instance_ = new Singleton(); // (a), atomic ???
}
}
return instance_;
}
/// ==========main.cc==============
int main() {
Singleton::GetInstance()->Method();
}
问题出在语句 (a),这句赋值语句是原子的吗?答案是不是的,(a) 可以拆分为以下三个步骤:
/// instance_ = new Singleton();
Singleton* mem = static_cast<Singleton*>(
operator new(sizeof(Singleton))); // (1)
mem->Singleton::Singleton(); // (2)
instance_ = mem; // (3)
如果执行顺序是 (1)(3)(2),那么 instance_ 在某段时间内,指向的是未初始化的内存,程序会出现 BUG。
因此,编程语言需要明确定义
- 线程如何通过内存交互
- 访问内存 read 可以返回什么值
- 写入内存 write 的结果何时能被其他线程读取到
- 编写程序或者编译优化时,内存明确的行为。
内存一致性模型定义了内存共享时,多线程程序执行允许的行为,分为硬件和软件两部分,软件部分是编译器生成硬件代码时行为,硬件部分指 CPU 执行指令的行为。
有两个概念需要区分:
- 内存序(Memory Order)指的是处理器通过系统总线(System Bus)从内存系统中读取 Read/Load 或者写入 Write/Store 的顺序。
- 程序序(Program Order)表示指令流(Instruction Stream)中内存读取或者内存写入的顺序。
在实际应用场景中,内存序和程序序一般不相同,处理器可以对指令乱序,内存系统中发生的内存访问顺序,不一定都是按照指令流要求的内存访问顺序进行。
2、memory location
memory location 是 C++ 对内存的一种抽象,它不关心底层计算机系统的寻址方式,它只强调一些对象应该占用自己储存单元,以确保修改一个对象不会影响另一个对象。
C++ 标准委员会给 memory location 的定义是
- 标量类型的对象(算术类型,指针类型,枚举类型或 std::nullptr_t)
- 或非零长度的最大连续的位域序列
比如说,下面的类 Data 的对象 obj 包含 8 个分离的memory location
struct Data {
char a; // memory location #1
int b : 5; // memory location #2
int c : 11, // memory location #2 (continued)
: 0, // 长度为0的匿名位域表示下一个位域重新开始
d : 8; // memory location #3
struct {
int ee : 8; // memory location #4
} e;
char c1, c2; // memory location #5-#6
int array[2]; // memory location #7-#8
} obj;
需要注意的是:
- 每个变量都是一个对象,包括那些属于其他对象数据成员的变量
- 每个对象至少占用一个memory location
- 基本数据结构(int、char、double、指针等)不管其数据类型,都占一个memory location
- 连续的位域共享一个 memory location
如果两个线程访问不同的 memory location 不会有竞争条件(race condition);如果两个线程读取同一个 memory location 也不会产生竞争条件;两个线程同时访问同一 memory location,并且至少其中一个线程是修改操作,会产生竞争条件,其行为是未定义,即使是两个不同对象也不行,例如下面的例子:
struct {
char a:4, b:4;
} obj;
Thread1 | Thread2 | - | Memory Value |
- | - | - | 11110001 |
Read(value: 11110001) | - | <-- | 11110001 |
- | Read(value: 11110001) | <-- | 11110001 |
Modify(new value: 10100001) | - | - | 11110001 |
- | Modify(new value: 11110000) | - | 11110001 |
Write | - | --> | 10100001 |
- | Write | --> | 11110000 |
避免竞争条件的方法要么是使用互斥量,要么使用原子操作,将多线程的并发执行强制转换成串行执行。
3、CPU 架构内存模型
程序内存访问可以区分为内存读取 Load 和内存写入 Store。为了叙述方便,约定以下符号
- L(a) 读取地址 a 的内存
- S(a) 向地址 a 的内存写入
- <p 程序序
- <m 内存序
3.1、SC(Sequential Consistency) 一致性
顺序一致性是由 Lamport 在 1976 年提出来的,它要求
- 所有操作如同是单线程一样,按照某种全局顺序执行
- 每个线程都按照程序序执行
也就是对于 a 和 b 两块内存(a/b可以相同,也可以不相同)的操作
- L(a) <p L(b) ==> L(a) <m L(b) /* Load --> Load */
- L(a) <p S(b) ==> L(a) <m S(b) /* Load --> Store */
- S(a) <p S(b) ==> S(a) <m S(b) /* Store --> Store */
- S(a) <p L(b) ==> L(a) <m L(b) /* Store --> Load */
每个 L(a) 都返回最新(内存访问顺序)S(a) 写入的值,即
- L(a) = MAX <m { S(a) | S(a) <m L(a) }
Max <m 表示最新的内存访问。
不只是 Load 和 Store 需要满足上述条件,原子指令 test-and-set(read-modify-write, RMW)也满足上述约束
SC 是约束性最强的内存一致性模型,不允许 CPU 有任何乱序的优化,其代价就是性能损失。实际是,如果两条内存访问指令访问的是不同内存地址,这两条内存访问是可以并行执行,以提高性能。
3.2、TSO(Total Store Ordering) 一致性
TSO 放松了 SC 对 Store --> Load 顺序的限制,以便使用 Store 缓存优化性能。TSO 要求
- L(a) <p L(b) ==> L(a) <m L(b) /* Load --> Load */
- L(a) <p S(b) ==> L(a) <m S(b) /* Load --> Store */
- S(a) <p S(b) ==> S(a) <m S(b) /* Store --> Store */
每个 L(a) 都返回最新 S(a) 写入的值,即
- L(a) = MAX <m { S(a) | S(a) <m L(a) or S(a) <p L(a) }
上式表示 L(a) 返回
- 内存序在 L(a) 之前的 S(a) 写入的值
- 程序序在 L(a) 之前的 S(a) 写入的值
TSO 提供 FENCE 操作,FENCE 满足:
- L(a) <p FENCE ==> L(a) <m FENCE /* Load --> FENCE */
- S(a) <p FENCE ==> S(a) <m FENCE /* Store --> FENCE */
- FENCE <p FENCE ==> FENCE <m FENCE /* FENCE--> FENCE */
- FENCE <p L(a) ==> FENCE <m S(a) /* FENCE--> Load */
- FENCE <p S(a) ==> FENCE <m S(a) /* FENCE--> Store */
程序可以使用 FENCE,强制越是 Store --> Load 内存访问顺序,只要
- S(a) <p FENCE ==> S(a) <m FENCE /* Store --> FENCE */
- FENCE <p L(a) ==> FENCE <m S(a) /* FENCE--> Load */
3.3、弱内存一致性模型(Relaxed/Weak Memory Consistency Model)
前面介绍的 SC 和 TSO 都属于强内存一致性模型,因为基本都保持了每个线程的程序顺序(TSO 放松了 Store --> Load 的限制)。
和强内存一致性模型相对的是弱内存 一致性模型,它们只会保持程序明确指定的内存序,以获得更高的性能。
弱内存一致性模型不同的实现具有较大差别,这里介绍书籍【A Primer on Memory Consistency and Cache Coherence】介绍的用作学习的弱内存一致性模型:Release 一致性模型 (RC),要求一下操作遵循程序序
- ACQUIRE --> Load/Store (Load/Store --> ACQUIRE 不做要求)
- Load/Store --> RELEASE (RELEASE --> Load/Store 不做要求)
ACQUIRE/RELEASE 之间满足 SC 要求,也就是下面四种不能乱序,按照程序序执行。
- ACQUIRE --> ACQUIRE
- ACQUIRE --> RELEASE
- RELEASE --> ACQUIRE
- RELEASE --> RELEASE
3.4、Intel & ARM 内存模型
3.4.1、Intel 内存模型
【Intel? 64 and IA-32 Architectures Software Developer’s Manual】这部文档介绍了 Intel 处理器遵循的内存模型。
从描述上来看,基本遵循 TSO 强一致性内存模型,如果有详细了解的意向,可以参阅 Intel 释放的最新文档,下面引用几个官网文档介绍的几个示例。
首先介绍以下变量的含义
- r1/r2 这种以 r 开头的变量,表示寄存器
- x/y/z 这种变量,表示内存地址
- mov [_x], val 表示将 val 写入到内存 x 位置
- mov r1, [_x] 表示读取内存 x 位置的内容,保持到寄存器 r1
- Processor 是一个逻辑概念,可以表示一个核;具有超线程技术的 单核 CPU,也可以任务是 multi-processor
1、Store --> Store,Load --> Load 不能乱序
初始值:intially x = y = 0
Processor0 | Processor1 |
mov [_x], 1 | mov r1, [_y] |
结果 r1 = 1, r2 = 0 不被允许,不会出现。
因为如果 r1 = 1,表示 Store(y) 先于 Load(y) 执行,那么 Store(x) 也必然先于 Load(x) 执行。
2、Load --> Store 不能乱序
初始值:intially x = y = 0
Processor0 | Processor1 |
mov r1, [_x] | mov r2, [_y] |
结果 r1 = 1, r2 = 1 不被允许,不会出现。
因为如果 r1 = 1,表示 Store(x) 先于 Load(x) 执行,那么 Load(y) 也必然先于 Store(y) 执行。
3、Store --> Load 可以乱序,只要作用于不同内存地址
初始值:intially x = y = 0
Processor0 | Processor1 |
mov [_x], 1 | mov [_x], 1 |
结果 r1 = 0, r2 = 0 是允许的,可以出现。
应为 Processor0 和 1 内存访问不同的地址,Load 可以乱序到 Store 前执行。如果是同一地址,不能被乱序,比如下面的例子:
初始值:intially x = 0
Processor0 |
mov [_x], 1 |
结果 r1 = 0 不被允许,不能出现。
4、Store 可以并发执行,每个处理器可以看到不同的顺序
这条翻译不是很直观,文档中是 Intra-Processor Forwarding Is Allowed。
它讲的是:两条在不同 Processor 并发执行的 Store,可以被这两个 Processor 以不同的顺序看见。特别地,两个 Processor 都可以认为,自己的 Store 先于另一个 Processor 的 Store 执行。请看下面的示例:
初始值:x = y = 0
Processor0 | Processor1 |
mov [_x], 1 | mov [_y], 1 |
结果 r2 = 0, r4 = 0 是被允许的,可以出现。
因为对于 Processor0 而言,由自己发起的 Store(x) 先于由 Processor1 发起的 Store(y);Processor1 也是可以这样认为。
5、Store 传递可见
直译过来就是:对所有处理器来说,与因果关系相关的 Store 是以符合因果关系的顺序出现的。请看下面的示例:
初始值:x = y = 0
Processor0 | Processor1 | Processor2 |
mov [_x], 1 | mov r1, [_x] | mov r2, [_y] |
结果 r1 = 1, r2 = 1, r3 = 0 不被允许,不能出现。
如果 r1 = 1, 表示全局的顺序是 Store(x) --> Store(y) ;r2 = 1 表示 Processor2 看到了 Store(y),也必然看到 Store(x),所以 r3 = 1。
6、Store 被其他 Processor 看到的顺序是一致的
第 4 条讲的是 Processor 自己看到的 Store 可以不同,但是其他 Processor 看到的 Store 顺序必须是一致的。
初始值:x = y = 0
Processor0 | Processor1 | Processor2 | Processor3 |
mov [_x], 1 | mov [_y], 1 | mov r1, [_x] | mov r3, [_y] |
结果 r1 = 1, r2 = 0, r3 = 1, r4 = 0 不被允许,不能出现。
r1 = 1, r2 = 0 表示在 Processor2 看来,Store(x) 先于 Store(y) 执行,Processor2 还读取不到 y 的新值,这个顺序对于 Process3 也是一样的,r3 = 1 表示 Processor 3 看到了 Store(y),也必然看到 Store(x),所以 r4 = 1。
7、Lock 指令有全局顺序
所有 Processor 都必须看到 lock 指令按照相同的顺序被执行。
初始值:r1 = r2 = 1, x = y = 0
Processor0 | Processor1 | Processor2 | Processor3 |
xchg [_x], r1 | xchg [_y], r2 |
|
|
结果 r3 = 1, r4 = 0, r5 = 1, r6 = 0 不被允许,不能出现。
假设 xchg(x) 先于 xchg(y) 执行,r5 = 1 表示 Processor3 上 Load(y) 在 xchg(y) 之后执行,相应的 Load(x) 也应该在 xchg(x) 之后执行,所以 r6 = 1
8、Load/Store 和 Lock 指令不能重排
位于 Lock 指令前的 Load/Store 不能乱序到 Lock 指令之后执行;同样的,位于 Lock 指令后的 Load/Store 不能乱序到 Lock 指令之前执行。
初始值:r1 = r2 = 1, x = y = 0
Processor0 | Processor1 |
xchg [_x], r1 | xchg [_y], r3 |
结果 r2 = 0, r4 = 0 不被允许,不能出现。
根据第 7 条约定,假定 xchg(x) 先于 xchg(y) 执行。r2 = 0 表示 Load(y) 先于 xchg(y) 执行,也就是 xchg(x) 先于 Load(x) 执行,r4 = 1
初始值:r1 = 1, x = y = 0
Processor0 | Processor1 |
xchg [_x], r1 | xchg r2, [_y] |
结果 r2 = 1, r3 = 0 不被允许,不能出现。
Intel 还有 Fence 相关的内容,这里不再讨论了,有意向可以参阅最新的官网文档。
3.4.2、ARM64 内存模型
ARM64 采用的是弱内存一致性模型,没有明确限制,相互没有依赖的指令都可以乱序。官网文档没有示例,这里引用 Ash Wilding 的分享 PPT 上的内容,简单介绍一下,什么情况不能乱序。
依赖可以是寄存器依赖,也可以是内存位置依赖。比如下面的例子是寄存器依赖,x0 被第二个 LDR 指令使用,这种由依赖的两条指令不能乱序。
LDR x0, [x1]
LDR x2, [x0, #4]
由比如下面的例子,访问的内存地址有重叠
STR x0, [x1] // [1][1][1][1][1][1][1][1]
STRH w2, [x1, #0] // [-][-][-][-][-][-][2][2]
STR w3, [x1, #2] // [-][-][3][3][3][3][-][-]
STRB w4, [x1, #3] // [-][-][-][-][4][-][-][-]
LRD x5, [x1] // [1][1][3][3][4][3][1][1]
第一条指令必须最先执行;第二条和第三条指令可以乱序;第二条指令可以和第四条指令乱序;最后一条指令必须最后执行。
没有依赖的指令间,都可以乱序,需要使用 BARRIER 强制要求内存序。
4、std::memory_order
C++11定义了 6 种 memory_order
typedef enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
} memory_order;
这六种访问顺序代表三种内存模型
- sequentially constisent ordering -- memory_order_seq_cst
- acquire-release ordering -- memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel
- relaxed ordering -- memory_order_relaxed
memory_order_consume 已经在 C++17 不建议使用,本文也不再分析。
C++ 内存模型对于有依赖的语句,尽管操作的是不同的变量,也不会乱序,这是程序正确执行的基本保证。
C++ 内存模型的限制都是针对单个线程,多线程间不会有限制。
4.1、relaxed ordering
relaxed ordering 只保证原子性,对不同变量的操作不限制指令顺序。比如
#include <assert.h>
#include <atomic>
#include <thread>
std::atomic<bool> x(false), y(false);
std::atomic<int> z(0);
void write_x_then_y() {
x.store(true, std::memory_order_relaxed); // (1)
y.store(true, std::memory_order_relaxed); // (2)
}
void read_y_then_x() {
while (!y.load(std::memory_order_relaxed)); // (3)
if (x.load(std::memory_order_relaxed)) { // (4)
++z;
}
}
int main() {
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load() != 0); // (5)
}
语句 (5) assert 可能会触发。因为 (1)(2) 是可以乱序的,导致全局的执行顺序为 (2)(1)(3)(4),z = 1。(3)(4) 是不能乱序的,不然程序正确性都不能保证了。
再看下面的例子
#include <atomic>
#include <iostream>
#include <thread>
struct read_values {
int x, y, z;
};
std::atomic<int> x(0), y(0), z(0);
std::atomic<bool> go(false);
unsigned const loop_count = 10;
read_values values1[loop_count];
read_values values2[loop_count];
read_values values3[loop_count];
read_values values4[loop_count];
read_values values5[loop_count];
void increment(std::atomic<int>* var_to_inc, read_values* values) {
while (!go) std::this_thread::yield();
for (unsigned i = 0; i < loop_count; ++i) {
values[i].x = x.load(std::memory_order_relaxed);
values[i].y = y.load(std::memory_order_relaxed);
values[i].z = z.load(std::memory_order_relaxed);
var_to_inc->store(i + 1, std::memory_order_relaxed);
std::this_thread::yield();
}
}
void read_vals(read_values* values) {
while (!go) std::this_thread::yield();
for (unsigned i = 0; i < loop_count; ++i) {
values[i].x = x.load(std::memory_order_relaxed);
values[i].y = y.load(std::memory_order_relaxed);
values[i].z = z.load(std::memory_order_relaxed);
std::this_thread::yield();
}
}
void print(read_values* v) {
for (unsigned i = 0; i < loop_count; ++i) {
if (i) std::cout << ',';
std::cout << '(' << v[i].x << ',' << v[i].y << ',' << v[i].z << ')';
}
std::cout << std::endl;
}
int main() {
std::thread t1(increment, &x, values1);
std::thread t2(increment, &y, values2);
std::thread t3(increment, &z, values3);
std::thread t4(read_vals, values4);
std::thread t5(read_vals, values5);
go = true;
t5.join();
t4.join();
t3.join();
t2.join();
t1.join();
print(values1);
print(values2);
print(values3);
print(values4);
print(values5);
}
可能的输出是
(0,0,0),(1,0,0),(2,0,0),(3,0,0),(4,0,0),(5,7,0),(6,7,8),(7,9,8),(8,9,8),(9,9,10)
(0,0,0),(0,1,0),(0,2,0),(1,3,5),(8,4,5),(8,5,5),(8,6,6),(8,7,9),(10,8,9),(10,9,10)
(0,0,0),(0,0,1),(0,0,2),(0,0,3),(0,0,4),(0,0,5),(0,0,6),(0,0,7),(0,0,8),(0,0,9)
(1,3,0),(2,3,0),(2,4,1),(3,6,4),(3,9,5),(5,10,6),(5,10,8),(5,10,10),(9,10,10),(10,10,10)
(0,0,0),(0,0,0),(0,0,0),(6,3,7),(6,5,7),(7,7,7),(7,8,7),(8,8,7),(8,8,9),(8,8,9)
有如下规律
- t1 中 x 每次递增 1;t1 中 y 每次递增 1;t3 中 z 每次递增 1
- 在其他 thread,x 不一定每次都递增,或者递增 1;y/z 也是同样的现象
- t3 只看到 z 的递增,没有看到 x/y 的递增
在 t1 线程中,先 load(x) 再 store(x),并且不能乱序,所以 x 的值每次 load 都是 x 最新的值,t2/t3 也是同样的道理。
4.2、acquire-release ordering
acquire-release ordering 建立在 relaxed ordering 之上,也没有要求指令的顺序。但是对指令乱序做了约束
- release 之前的 Store (包括普通变量),不能乱序到 release 之后
- acquire 之后的 Load (包括普通变量),不能乱序到 acquire 之前
但是 acquire-release 必须成对使用,并且作用于同一变量。例如下面的例子,(1) 一定先于 (2) 执行,并且 (3) 结束循环后,(4) 能读取到 x 最新的值。
#include <assert.h>
#include <atomic>
#include <thread>
std::atomic<bool> x(false), y(false);
std::atomic<int> z(0);
void write_x_then_y() {
x.store(true, std::memory_order_relaxed); // (1)
y.store(true, std::memory_order_release); // (2)
}
void read_y_then_x() {
while (!y.load(std::memory_order_acquire)); // (3)
if (x.load(std::memory_order_relaxed)) { // (4)
++z;
}
}
int main() {
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load() != 0); // (5)
}
memory_order_acq_rel 具备 acquire 和 release 的作用,通常是 compare_exchange_strong() 中被使用。
4.3、sequentially constisent ordering
memory_order_seq_cst 约束性最强
- 要求所有线程内所有 memory_order_seq_cst 约束的 Load/Store 都按照程序指定的顺序执行,
- 线程间 memory_order_seq_cst 必须有某个全局顺序
- 并且可以和 acquire-release 结合Load 具有 acquire 语义Store 具有 release 语义read-modify-write 具有 acquire-release 语义
比如下面的例子,尽管没有要求,(1)/(2) 在全局必须按照某个顺序执行,并且 c/d 线程看到 (1)/(2) 以相同的顺序执行。 假定 (1) 先于 (2) 执行,并且 (3) 返回 false,那么 (4) 一定返回 true,z = 1;其他情况也类似,z 也可以为 2,但是不能为 0。
#include <assert.h>
#include <atomic>
#include <thread>
std::atomic<bool> x(false), y(false);
std::atomic<int> z(0);
void write_x() {
x.store(true, std::memory_order_seq_cst); // (1)
}
void write_y() {
y.store(true, std::memory_order_seq_cst); // (2)
}
void read_x_then_y() {
while (!x.load(std::memory_order_seq_cst));
if (y.load(std::memory_order_seq_cst)) ++z; // (3)
}
void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst));
if (x.load(std::memory_order_seq_cst)) ++z; // (4)
}
int main() {
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load() != 0); // (5)
}
5、LevelDB 跳表 SkipList 案例分析
LevelDB 利用 SkipList 管理内存中的 Key-Value 数据。SkipList 是作为平衡树的替代品(Alternative)被提出,在概率上具有和平衡树相同的查找、插入和删除效率。而且,SkipList 实现以及插入和删除操作都比平衡树或者红黑树简单。
在多线程环境中,利用 SkipList 代替普通的平衡树或者红黑树,可以使临界区代码更加简短,减少多线程间的竞争条件。
SkipList::Node::Next() 使用了 acquire,希望 Next() 返回后,Node 所有的配置都完成。SetNext() 使用了 release,保证 SetNext() 返回后,Node 的所有设置的完成。
/// skiplist.h
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}
private:
std::atomic<Node*> next_[1];
};
SetNode() 在 SkipList::Insert() 被调用,并且在最后调用,release 不允许它之前的 Store 乱序到它之后。
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
参考资料
A Primer on Memory Consistency and Cache Coherence
C++ Concurrency in Action 2nd
Intel? 64 and IA-32 Architectures Software Developer’s Manual
arm64’s weakly-ordered memory model and the need for correct, minimally intrusive barriers