四时宝库

程序员的知识宝库

多线程必会知识 -- C++11 内存一致性模型

1、多核多线程程序的问题

现代 CPU 都是都是多核架构,每个核心都有自己的 cache。在共享内存时,多线程执行可能产生意料之外的结果。

1.1、乱序

static int a = 0, flag = 0, b;

void thread1() {
  a = 26;   // (1)
  flag = 1; // (2)
}

void thread2() {
  while (flag != 1); // (3)
  b = a; // (4)
}

上述 thread1 和 thread2 并发执行后,b 的值可能为 0 吗?

答案是可能的,应为编译器可以对指令进行乱序,(2) 可能在 (1) 前面执行,全局执行顺序是 (2)(3)(4)(1) 时,b 被赋值为 0。即使编译器不做任何优化,CPU 硬件也可能会乱序执行,产生同样的结果。

1.2、双重检查锁单例真的线程安全吗?

正是由于乱序,C++11 之前,实现线程安全的单例模式变得困难。下述是双重检查锁单例的示例代码

/// ==========singleton.h==============
class Singleton {
 public:
  static Singleton* GetInstance();
  void Method() {
    // ...
  }

 private:
  Singleton() = default;
  ~Singleton() = default;

  Singleton(const Singleton&) = delete;
  Singleton& operator=(const Singleton&) = delete;

  static Singleton* instance_;
  static std::mutex mutex_;
};


/// ==========singleton.cc==============
Singleton* Singleton::instance_ = nullptr;
std::mutex Singleton::mutex_;

Singleton* Singleton::GetInstance() {
  if (instance_ == nullptr) {
    std::lock_guard<std::mutex> guard(mutex_);
    if (instance_ == nullptr) {
      instance_ = new Singleton(); // (a), atomic ???
    }
  }
  return instance_;
}

/// ==========main.cc==============
int main() {
    Singleton::GetInstance()->Method();
}

问题出在语句 (a),这句赋值语句是原子的吗?答案是不是的,(a) 可以拆分为以下三个步骤:

/// instance_ = new Singleton();
Singleton* mem = static_cast<Singleton*>(
      operator new(sizeof(Singleton))); // (1)
mem->Singleton::Singleton(); // (2)
instance_ = mem; // (3)

如果执行顺序是 (1)(3)(2),那么 instance_ 在某段时间内,指向的是未初始化的内存,程序会出现 BUG。

因此,编程语言需要明确定义

  • 线程如何通过内存交互
  • 访问内存 read 可以返回什么值
  • 写入内存 write 的结果何时能被其他线程读取到
  • 编写程序或者编译优化时,内存明确的行为。

内存一致性模型定义了内存共享时,多线程程序执行允许的行为,分为硬件和软件两部分,软件部分是编译器生成硬件代码时行为,硬件部分指 CPU 执行指令的行为。

有两个概念需要区分:

  • 内存序(Memory Order)指的是处理器通过系统总线(System Bus)从内存系统中读取 Read/Load 或者写入 Write/Store 的顺序。
  • 程序序(Program Order)表示指令流(Instruction Stream)中内存读取或者内存写入的顺序。

在实际应用场景中,内存序和程序序一般不相同,处理器可以对指令乱序,内存系统中发生的内存访问顺序,不一定都是按照指令流要求的内存访问顺序进行。

2、memory location

memory location 是 C++ 对内存的一种抽象,它不关心底层计算机系统的寻址方式,它只强调一些对象应该占用自己储存单元,以确保修改一个对象不会影响另一个对象。

C++ 标准委员会给 memory location 的定义是

  • 标量类型的对象(算术类型,指针类型,枚举类型或 std::nullptr_t)
  • 或非零长度的最大连续的位域序列

比如说,下面的类 Data 的对象 obj 包含 8 个分离的memory location

struct Data {
  char a;     // memory location #1
  int b : 5;  // memory location #2
  int c : 11, // memory location #2 (continued)
        : 0,  // 长度为0的匿名位域表示下一个位域重新开始
      d : 8;  // memory location #3
  struct {
    int ee : 8; // memory location #4
  } e;
  char c1, c2;  // memory location #5-#6
  int array[2]; // memory location #7-#8
} obj;

需要注意的是:

  • 每个变量都是一个对象,包括那些属于其他对象数据成员的变量
  • 每个对象至少占用一个memory location
  • 基本数据结构(int、char、double、指针等)不管其数据类型,都占一个memory location
  • 连续的位域共享一个 memory location

如果两个线程访问不同的 memory location 不会有竞争条件(race condition);如果两个线程读取同一个 memory location 也不会产生竞争条件;两个线程同时访问同一 memory location,并且至少其中一个线程是修改操作,会产生竞争条件,其行为是未定义,即使是两个不同对象也不行,例如下面的例子:

struct {
  char a:4, b:4;
} obj;

Thread1

Thread2

-

Memory Value

-

-

-

11110001

Read(value: 11110001)

-

<--

11110001

-

Read(value: 11110001)

<--

11110001

Modify(new value: 10100001)

-

-

11110001

-

Modify(new value: 11110000)

-

11110001

Write

-

-->

10100001

-

Write

-->

11110000

避免竞争条件的方法要么是使用互斥量,要么使用原子操作,将多线程的并发执行强制转换成串行执行。

3、CPU 架构内存模型

程序内存访问可以区分为内存读取 Load 和内存写入 Store。为了叙述方便,约定以下符号

  • L(a) 读取地址 a 的内存
  • S(a) 向地址 a 的内存写入
  • <p 程序序
  • <m 内存序

3.1、SC(Sequential Consistency) 一致性

顺序一致性是由 Lamport 在 1976 年提出来的,它要求

  • 所有操作如同是单线程一样,按照某种全局顺序执行
  • 每个线程都按照程序序执行

也就是对于 a 和 b 两块内存(a/b可以相同,也可以不相同)的操作

  • L(a) <p L(b) ==> L(a) <m L(b) /* Load --> Load */
  • L(a) <p S(b) ==> L(a) <m S(b) /* Load --> Store */
  • S(a) <p S(b) ==> S(a) <m S(b) /* Store --> Store */
  • S(a) <p L(b) ==> L(a) <m L(b) /* Store --> Load */

每个 L(a) 都返回最新(内存访问顺序)S(a) 写入的值,即

  • L(a) = MAX <m { S(a) | S(a) <m L(a) }

Max <m 表示最新的内存访问。

不只是 Load 和 Store 需要满足上述条件,原子指令 test-and-set(read-modify-write, RMW)也满足上述约束

SC 是约束性最强的内存一致性模型,不允许 CPU 有任何乱序的优化,其代价就是性能损失。实际是,如果两条内存访问指令访问的是不同内存地址,这两条内存访问是可以并行执行,以提高性能。

3.2、TSO(Total Store Ordering) 一致性

TSO 放松了 SC 对 Store --> Load 顺序的限制,以便使用 Store 缓存优化性能。TSO 要求

  • L(a) <p L(b) ==> L(a) <m L(b) /* Load --> Load */
  • L(a) <p S(b) ==> L(a) <m S(b) /* Load --> Store */
  • S(a) <p S(b) ==> S(a) <m S(b) /* Store --> Store */

每个 L(a) 都返回最新 S(a) 写入的值,即

  • L(a) = MAX <m { S(a) | S(a) <m L(a) or S(a) <p L(a) }

上式表示 L(a) 返回

  • 内存序在 L(a) 之前的 S(a) 写入的值
  • 程序序在 L(a) 之前的 S(a) 写入的值

TSO 提供 FENCE 操作,FENCE 满足:

  • L(a) <p FENCE ==> L(a) <m FENCE /* Load --> FENCE */
  • S(a) <p FENCE ==> S(a) <m FENCE /* Store --> FENCE */
  • FENCE <p FENCE ==> FENCE <m FENCE /* FENCE--> FENCE */
  • FENCE <p L(a) ==> FENCE <m S(a) /* FENCE--> Load */
  • FENCE <p S(a) ==> FENCE <m S(a) /* FENCE--> Store */

程序可以使用 FENCE,强制越是 Store --> Load 内存访问顺序,只要

  • S(a) <p FENCE ==> S(a) <m FENCE /* Store --> FENCE */
  • FENCE <p L(a) ==> FENCE <m S(a) /* FENCE--> Load */

3.3、弱内存一致性模型(Relaxed/Weak Memory Consistency Model)

前面介绍的 SC 和 TSO 都属于强内存一致性模型,因为基本都保持了每个线程的程序顺序(TSO 放松了 Store --> Load 的限制)。

和强内存一致性模型相对的是弱内存 一致性模型,它们只会保持程序明确指定的内存序,以获得更高的性能。

弱内存一致性模型不同的实现具有较大差别,这里介绍书籍【A Primer on Memory Consistency and Cache Coherence】介绍的用作学习的弱内存一致性模型:Release 一致性模型 (RC),要求一下操作遵循程序序

  • ACQUIRE --> Load/Store (Load/Store --> ACQUIRE 不做要求)
  • Load/Store --> RELEASE (RELEASE --> Load/Store 不做要求)

ACQUIRE/RELEASE 之间满足 SC 要求,也就是下面四种不能乱序,按照程序序执行。

  • ACQUIRE --> ACQUIRE
  • ACQUIRE --> RELEASE
  • RELEASE --> ACQUIRE
  • RELEASE --> RELEASE

3.4、Intel & ARM 内存模型

3.4.1、Intel 内存模型

【Intel? 64 and IA-32 Architectures Software Developer’s Manual】这部文档介绍了 Intel 处理器遵循的内存模型。

从描述上来看,基本遵循 TSO 强一致性内存模型,如果有详细了解的意向,可以参阅 Intel 释放的最新文档,下面引用几个官网文档介绍的几个示例。

首先介绍以下变量的含义

  • r1/r2 这种以 r 开头的变量,表示寄存器
  • x/y/z 这种变量,表示内存地址
  • mov [_x], val 表示将 val 写入到内存 x 位置
  • mov r1, [_x] 表示读取内存 x 位置的内容,保持到寄存器 r1
  • Processor 是一个逻辑概念,可以表示一个核;具有超线程技术的 单核 CPU,也可以任务是 multi-processor

1、Store --> Store,Load --> Load 不能乱序

初始值:intially x = y = 0

Processor0

Processor1

mov [_x], 1
mov [_y], 1

mov r1, [_y]
mov r2, [_x]

结果 r1 = 1, r2 = 0 不被允许,不会出现。

因为如果 r1 = 1,表示 Store(y) 先于 Load(y) 执行,那么 Store(x) 也必然先于 Load(x) 执行。

2、Load --> Store 不能乱序

初始值:intially x = y = 0

Processor0

Processor1

mov r1, [_x]
mov [_y], 1

mov r2, [_y]
mov [_x], 1

结果 r1 = 1, r2 = 1 不被允许,不会出现。

因为如果 r1 = 1,表示 Store(x) 先于 Load(x) 执行,那么 Load(y) 也必然先于 Store(y) 执行。

3、Store --> Load 可以乱序,只要作用于不同内存地址

初始值:intially x = y = 0

Processor0

Processor1

mov [_x], 1
mov r1, [_y]

mov [_x], 1
mov r2, [_x]

结果 r1 = 0, r2 = 0 是允许的,可以出现。

应为 Processor0 和 1 内存访问不同的地址,Load 可以乱序到 Store 前执行。如果是同一地址,不能被乱序,比如下面的例子:

初始值:intially x = 0

Processor0

mov [_x], 1
mov r1, [_x]

结果 r1 = 0 不被允许,不能出现。

4、Store 可以并发执行,每个处理器可以看到不同的顺序

这条翻译不是很直观,文档中是 Intra-Processor Forwarding Is Allowed。

它讲的是:两条在不同 Processor 并发执行的 Store,可以被这两个 Processor 以不同的顺序看见。特别地,两个 Processor 都可以认为,自己的 Store 先于另一个 Processor 的 Store 执行。请看下面的示例:

初始值:x = y = 0

Processor0

Processor1

mov [_x], 1
mov r1, [_x]
mov r2, [_y]

mov [_y], 1
mov r3, [_x]
mov r4, [_y]

结果 r2 = 0, r4 = 0 是被允许的,可以出现。

因为对于 Processor0 而言,由自己发起的 Store(x) 先于由 Processor1 发起的 Store(y);Processor1 也是可以这样认为。

5、Store 传递可见

直译过来就是:对所有处理器来说,与因果关系相关的 Store 是以符合因果关系的顺序出现的。请看下面的示例:

初始值:x = y = 0

Processor0

Processor1

Processor2

mov [_x], 1

mov r1, [_x]
mov [_y], 1

mov r2, [_y]
mov r3, [_x]

结果 r1 = 1, r2 = 1, r3 = 0 不被允许,不能出现。

如果 r1 = 1, 表示全局的顺序是 Store(x) --> Store(y) ;r2 = 1 表示 Processor2 看到了 Store(y),也必然看到 Store(x),所以 r3 = 1。

6、Store 被其他 Processor 看到的顺序是一致的

第 4 条讲的是 Processor 自己看到的 Store 可以不同,但是其他 Processor 看到的 Store 顺序必须是一致的。

初始值:x = y = 0

Processor0

Processor1

Processor2

Processor3

mov [_x], 1

mov [_y], 1

mov r1, [_x]
mov r2, [_y]

mov r3, [_y]
mov r4, [_x]

结果 r1 = 1, r2 = 0, r3 = 1, r4 = 0 不被允许,不能出现。

r1 = 1, r2 = 0 表示在 Processor2 看来,Store(x) 先于 Store(y) 执行,Processor2 还读取不到 y 的新值,这个顺序对于 Process3 也是一样的,r3 = 1 表示 Processor 3 看到了 Store(y),也必然看到 Store(x),所以 r4 = 1。

7、Lock 指令有全局顺序

所有 Processor 都必须看到 lock 指令按照相同的顺序被执行。

初始值:r1 = r2 = 1, x = y = 0

Processor0

Processor1

Processor2

Processor3

xchg [_x], r1

xchg [_y], r2


mov r3, [_x]
mov r4, [_y]


mov r5, [_y]
mov r6, [_x]

结果 r3 = 1, r4 = 0, r5 = 1, r6 = 0 不被允许,不能出现。

假设 xchg(x) 先于 xchg(y) 执行,r5 = 1 表示 Processor3 上 Load(y) 在 xchg(y) 之后执行,相应的 Load(x) 也应该在 xchg(x) 之后执行,所以 r6 = 1

8、Load/Store 和 Lock 指令不能重排

位于 Lock 指令前的 Load/Store 不能乱序到 Lock 指令之后执行;同样的,位于 Lock 指令后的 Load/Store 不能乱序到 Lock 指令之前执行。

初始值:r1 = r2 = 1, x = y = 0

Processor0

Processor1

xchg [_x], r1
mov r2, [_y]

xchg [_y], r3
mov r4, [_x]

结果 r2 = 0, r4 = 0 不被允许,不能出现。

根据第 7 条约定,假定 xchg(x) 先于 xchg(y) 执行。r2 = 0 表示 Load(y) 先于 xchg(y) 执行,也就是 xchg(x) 先于 Load(x) 执行,r4 = 1

初始值:r1 = 1, x = y = 0

Processor0

Processor1

xchg [_x], r1
mov [_y], 1

xchg r2, [_y]
mov r3, [_x]

结果 r2 = 1, r3 = 0 不被允许,不能出现。

Intel 还有 Fence 相关的内容,这里不再讨论了,有意向可以参阅最新的官网文档。

3.4.2、ARM64 内存模型

ARM64 采用的是弱内存一致性模型,没有明确限制,相互没有依赖的指令都可以乱序。官网文档没有示例,这里引用 Ash Wilding 的分享 PPT 上的内容,简单介绍一下,什么情况不能乱序。

依赖可以是寄存器依赖,也可以是内存位置依赖。比如下面的例子是寄存器依赖,x0 被第二个 LDR 指令使用,这种由依赖的两条指令不能乱序。

LDR x0, [x1]
LDR x2, [x0, #4]

由比如下面的例子,访问的内存地址有重叠

STR  x0, [x1]       //  [1][1][1][1][1][1][1][1]
STRH w2, [x1, #0]   //  [-][-][-][-][-][-][2][2]
STR  w3, [x1, #2]   //  [-][-][3][3][3][3][-][-]
STRB w4, [x1, #3]   //  [-][-][-][-][4][-][-][-]
LRD  x5, [x1]       //  [1][1][3][3][4][3][1][1]

第一条指令必须最先执行;第二条和第三条指令可以乱序;第二条指令可以和第四条指令乱序;最后一条指令必须最后执行。

没有依赖的指令间,都可以乱序,需要使用 BARRIER 强制要求内存序。

4、std::memory_order

C++11定义了 6 种 memory_order

typedef enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
} memory_order;

这六种访问顺序代表三种内存模型

  • sequentially constisent ordering -- memory_order_seq_cst
  • acquire-release ordering -- memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel
  • relaxed ordering -- memory_order_relaxed

memory_order_consume 已经在 C++17 不建议使用,本文也不再分析。

C++ 内存模型对于有依赖的语句,尽管操作的是不同的变量,也不会乱序,这是程序正确执行的基本保证。

C++ 内存模型的限制都是针对单个线程,多线程间不会有限制。

4.1、relaxed ordering

relaxed ordering 只保证原子性,对不同变量的操作不限制指令顺序。比如

#include <assert.h>

#include <atomic>
#include <thread>

std::atomic<bool> x(false), y(false);
std::atomic<int> z(0);

void write_x_then_y() {
  x.store(true, std::memory_order_relaxed); // (1)
  y.store(true, std::memory_order_relaxed); // (2)
}

void read_y_then_x() {
  while (!y.load(std::memory_order_relaxed)); // (3)
  if (x.load(std::memory_order_relaxed)) {    // (4)
    ++z;
  }
}

int main() {
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
  b.join();
  assert(z.load() != 0); // (5)
}

语句 (5) assert 可能会触发。因为 (1)(2) 是可以乱序的,导致全局的执行顺序为 (2)(1)(3)(4),z = 1。(3)(4) 是不能乱序的,不然程序正确性都不能保证了。

再看下面的例子

#include <atomic>
#include <iostream>
#include <thread>

struct read_values {
  int x, y, z;
};

std::atomic<int> x(0), y(0), z(0);
std::atomic<bool> go(false);
unsigned const loop_count = 10;
read_values values1[loop_count];
read_values values2[loop_count];
read_values values3[loop_count];
read_values values4[loop_count];
read_values values5[loop_count];

void increment(std::atomic<int>* var_to_inc, read_values* values) {
  while (!go) std::this_thread::yield();
  for (unsigned i = 0; i < loop_count; ++i) {
    values[i].x = x.load(std::memory_order_relaxed);
    values[i].y = y.load(std::memory_order_relaxed);
    values[i].z = z.load(std::memory_order_relaxed);
    var_to_inc->store(i + 1, std::memory_order_relaxed);
    std::this_thread::yield();
  }
}
void read_vals(read_values* values) {
  while (!go) std::this_thread::yield();
  for (unsigned i = 0; i < loop_count; ++i) {
    values[i].x = x.load(std::memory_order_relaxed);
    values[i].y = y.load(std::memory_order_relaxed);
    values[i].z = z.load(std::memory_order_relaxed);
    std::this_thread::yield();
  }
}
void print(read_values* v) {
  for (unsigned i = 0; i < loop_count; ++i) {
    if (i) std::cout << ',';
    std::cout << '(' << v[i].x << ',' << v[i].y << ',' << v[i].z << ')';
  }
  std::cout << std::endl;
}

int main() {
  std::thread t1(increment, &x, values1);
  std::thread t2(increment, &y, values2);
  std::thread t3(increment, &z, values3);
  std::thread t4(read_vals, values4);
  std::thread t5(read_vals, values5);
  go = true;
  t5.join();
  t4.join();
  t3.join();
  t2.join();
  t1.join();
  print(values1);
  print(values2);
  print(values3);
  print(values4);
  print(values5);
}

可能的输出是

(0,0,0),(1,0,0),(2,0,0),(3,0,0),(4,0,0),(5,7,0),(6,7,8),(7,9,8),(8,9,8),(9,9,10)
(0,0,0),(0,1,0),(0,2,0),(1,3,5),(8,4,5),(8,5,5),(8,6,6),(8,7,9),(10,8,9),(10,9,10)
(0,0,0),(0,0,1),(0,0,2),(0,0,3),(0,0,4),(0,0,5),(0,0,6),(0,0,7),(0,0,8),(0,0,9)
(1,3,0),(2,3,0),(2,4,1),(3,6,4),(3,9,5),(5,10,6),(5,10,8),(5,10,10),(9,10,10),(10,10,10)
(0,0,0),(0,0,0),(0,0,0),(6,3,7),(6,5,7),(7,7,7),(7,8,7),(8,8,7),(8,8,9),(8,8,9)

有如下规律

  • t1 中 x 每次递增 1;t1 中 y 每次递增 1;t3 中 z 每次递增 1
  • 在其他 thread,x 不一定每次都递增,或者递增 1;y/z 也是同样的现象
  • t3 只看到 z 的递增,没有看到 x/y 的递增

在 t1 线程中,先 load(x) 再 store(x),并且不能乱序,所以 x 的值每次 load 都是 x 最新的值,t2/t3 也是同样的道理。

4.2、acquire-release ordering

acquire-release ordering 建立在 relaxed ordering 之上,也没有要求指令的顺序。但是对指令乱序做了约束

  • release 之前的 Store (包括普通变量),不能乱序到 release 之后
  • acquire 之后的 Load (包括普通变量),不能乱序到 acquire 之前

但是 acquire-release 必须成对使用,并且作用于同一变量。例如下面的例子,(1) 一定先于 (2) 执行,并且 (3) 结束循环后,(4) 能读取到 x 最新的值。

#include <assert.h>

#include <atomic>
#include <thread>

std::atomic<bool> x(false), y(false);
std::atomic<int> z(0);

void write_x_then_y() {
  x.store(true, std::memory_order_relaxed); // (1)
  y.store(true, std::memory_order_release); // (2)
}

void read_y_then_x() {
  while (!y.load(std::memory_order_acquire)); // (3)
  if (x.load(std::memory_order_relaxed)) {    // (4)
    ++z;
  }
}

int main() {
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
  b.join();
  assert(z.load() != 0); // (5)
}

memory_order_acq_rel 具备 acquire 和 release 的作用,通常是 compare_exchange_strong() 中被使用。

4.3、sequentially constisent ordering

memory_order_seq_cst 约束性最强

  • 要求所有线程内所有 memory_order_seq_cst 约束的 Load/Store 都按照程序指定的顺序执行,
  • 线程间 memory_order_seq_cst 必须有某个全局顺序
  • 并且可以和 acquire-release 结合Load 具有 acquire 语义Store 具有 release 语义read-modify-write 具有 acquire-release 语义

比如下面的例子,尽管没有要求,(1)/(2) 在全局必须按照某个顺序执行,并且 c/d 线程看到 (1)/(2) 以相同的顺序执行。 假定 (1) 先于 (2) 执行,并且 (3) 返回 false,那么 (4) 一定返回 true,z = 1;其他情况也类似,z 也可以为 2,但是不能为 0。

#include <assert.h>

#include <atomic>
#include <thread>
std::atomic<bool> x(false), y(false);
std::atomic<int> z(0);

void write_x() {
  x.store(true, std::memory_order_seq_cst); // (1)
}

void write_y() {
  y.store(true, std::memory_order_seq_cst); // (2)
}

void read_x_then_y() {
  while (!x.load(std::memory_order_seq_cst));
  if (y.load(std::memory_order_seq_cst)) ++z; // (3)
}

void read_y_then_x() {
  while (!y.load(std::memory_order_seq_cst));
  if (x.load(std::memory_order_seq_cst)) ++z; // (4)
}

int main() {
  std::thread a(write_x);
  std::thread b(write_y);
  std::thread c(read_x_then_y);
  std::thread d(read_y_then_x);
  a.join();
  b.join();
  c.join();
  d.join();
  assert(z.load() != 0); // (5)
}

5、LevelDB 跳表 SkipList 案例分析

LevelDB 利用 SkipList 管理内存中的 Key-Value 数据。SkipList 是作为平衡树的替代品(Alternative)被提出,在概率上具有和平衡树相同的查找、插入和删除效率。而且,SkipList 实现以及插入和删除操作都比平衡树或者红黑树简单。

在多线程环境中,利用 SkipList 代替普通的平衡树或者红黑树,可以使临界区代码更加简短,减少多线程间的竞争条件。

SkipList::Node::Next() 使用了 acquire,希望 Next() 返回后,Node 所有的配置都完成。SetNext() 使用了 release,保证 SetNext() 返回后,Node 的所有设置的完成。

/// skiplist.h
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
  explicit Node(const Key& k) : key(k) {}

  Key const key;

  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return next_[n].load(std::memory_order_acquire);
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].store(x, std::memory_order_release);
  }

  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return next_[n].load(std::memory_order_relaxed);
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].store(x, std::memory_order_relaxed);
  }

 private:
  std::atomic<Node*> next_[1];
};

SetNode() 在 SkipList::Insert() 被调用,并且在最后调用,release 不允许它之前的 Store 乱序到它之后。

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == nullptr || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (nullptr), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since nullptr sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.store(height, std::memory_order_relaxed);
  }

  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}

参考资料

A Primer on Memory Consistency and Cache Coherence

C++ Concurrency in Action 2nd

Intel? 64 and IA-32 Architectures Software Developer’s Manual

arm64’s weakly-ordered memory model and the need for correct, minimally intrusive barriers

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接