c++

第十一章:并发

Posted by lili on

目录

在上一章涵盖了惰性求值(Lazy Evaluation)和代理对象(Proxy Objects)之后,我们现在将探索如何使用带有共享内存的线程来编写 C++ 并发程序。我们将研究如何通过编写没有数据竞争(Data Races)和死锁(Deadlocks)的程序,来确保并发程序的正确性。本章还将包含有关如何使并发程序以低延迟和高吞吐量运行的建议。

在继续之前,您应该知道本章既不是对并发编程的完整介绍,也不会涵盖 C++ 并发性的所有细节。相反,本章是对使用 C++ 编写并发程序的核心构建块的介绍,并结合了一些与性能相关的指南。如果您以前没有编写过并发程序,建议您先阅读一些入门材料,以涵盖并发编程的理论方面。死锁(Deadlocks)、临界区(Critical Sections)、条件变量(Condition Variables)和互斥锁(Mutexes)等概念将进行非常简短的讨论,但这更多是作为回顾而非彻底的介绍。

本章涵盖以下内容:

  • 并发编程的基础知识,包括并行执行、共享内存、数据竞争和死锁
  • C++ 线程支持库、原子库和 C++ 内存模型的介绍
  • 一个简短的无锁编程(Lock-free Programming)示例
  • 性能指南

理解并发编程的基础

一个并发程序可以同时执行多个任务。通常,并发编程比顺序编程要困难得多,但程序可以从并发中受益的原因有以下几点:

  • 效率 (Efficiency): 如今的智能手机和台式电脑都拥有多个 CPU 核心,可以并行执行多个任务。如果您能将一个大型任务分解成可以并行运行的子任务,理论上可以将该大型任务的运行时间除以 CPU 核心的数量。对于运行在单核机器上的程序,如果一个任务是 I/O 密集型(I/O bound),仍然可以获得性能提升。当一个子任务正在等待 I/O 时,其他子任务仍然可以在 CPU 上执行有用的工作。

  • 响应能力和低延迟上下文 (Responsiveness and low latency contexts): 对于带有图形用户界面(GUI)的应用程序,重要的是永远不要阻塞 UI,以免应用程序变得无响应。为了防止无响应,通常的做法是让耗时较长的任务(例如从磁盘加载文件或从网络获取数据)在单独的后台线程中执行,这样负责 UI 的线程就不会被这些耗时任务阻塞。另一个重视低延迟的例子是实时音频。负责生成音频数据缓冲区的函数在一个单独的高优先级线程中执行,而程序的其余部分可以在较低优先级线程中运行,以处理 UI 等。

  • 模拟 (Simulation): 并发可以使模拟现实世界中并发发生的系统变得更容易。毕竟,我们周围的大多数事物都是并发发生的,有时使用顺序编程模型很难对并发流程进行建模。我们不会在本书中重点关注模拟,而是将重点放在并发中与性能相关的方面。

并发为我们解决了许多问题,但也带来了新的问题,我们将在接下来讨论。

为什么并发编程很难?

并发编程之所以困难,有很多原因。如果您以前编写过并发程序,很可能已经遇到过以下列出的问题:

  • 安全地在多个线程之间共享状态非常困难。每当数据可以被同时读取和写入时,我们就需要某种方式来保护这些数据免受数据竞争(Data Races)的影响。您稍后会看到许多这样的示例。

  • 并发程序通常更复杂,由于存在多个并行执行流,推理起来更加困难。

  • 并发使调试复杂化。由于依赖于线程的调度方式,由数据竞争引起的错误可能非常难以调试。这类错误难以重现,在最坏的情况下,它们甚至可能在使用调试器运行程序时停止出现。有时,一个看似无害的控制台调试跟踪(Debug Trace)就可能改变多线程程序的行为方式,使错误暂时消失。您已被警告!

在我们开始研究使用 C++ 进行并发编程之前,我们将介绍一些与并发和并行编程相关的一般概念。

并发与并行 (Concurrency and Parallelism)

并发(Concurrency)和并行(Parallelism)是两个有时可互换使用的术语。然而,它们并不相同,理解它们之间的区别非常重要。

  • 如果一个程序有多个独立的控制流在重叠的时间段内运行,则称该程序是并发运行的。在 C++ 中,每个独立的控制流都由一个线程(Thread)表示。
  • 这些线程可能在同一时刻执行,也可能不在同一时刻执行。如果它们在同一时刻执行,则称它们是并行执行的。

要使一个并发程序并行运行,它需要在支持指令并行执行的机器上执行;即,一台具有多个 CPU 核心的机器。

乍一看,出于效率考虑,似乎我们总是希望并发程序在可能的情况下并行运行。然而,事实并非总是如此。本章介绍的许多同步原语(Synchronization Primitives,例如互斥锁 Mutex Locks)仅为支持线程的并行执行而必需。不并行运行的并发任务不需要相同的锁定机制,因此推理起来要容易得多。

时间分片 (Time Slicing)

您可能会问:“在只有单个 CPU 核心的机器上,并发线程是如何执行的?” 答案是时间分片(Time Slicing)。这与操作系统用于支持进程并发执行的机制相同。

为了理解时间分片,假设我们有两个单独的指令序列需要并发执行:

编号的方框代表指令。每个指令序列在一个单独的线程中执行,分别标记为 T1 和 T2。操作系统会调度每个线程,让它在 CPU 上获得一段有限的时间,然后执行上下文切换(Context Switch)。上下文切换会存储当前运行线程的状态,并加载下一个要执行线程的状态。这个过程发生的频率足够高,使得线程看起来好像在同时运行。

然而,上下文切换是耗时的,并且每次新线程在 CPU 核心上执行时,很可能会产生大量的缓存未命中(Cache Misses)。因此,我们不希望上下文切换发生得过于频繁。

下图展示了两个线程在单个 CPU 上被调度的可能执行序列:

线程 T1 的第一条指令开始执行,然后是一个上下文切换,让线程 T2 执行前两条指令。作为程序员,我们必须确保程序能够按预期运行,无论操作系统调度程序如何调度任务。如果某个序列出于某种原因无效,可以使用锁来控制指令的执行顺序,我们将在后面介绍。

如果一台机器有多个 CPU 核心,则可以并行执行这两个线程。然而,不能保证(甚至不太可能)这两个线程在程序的整个生命周期内都会在各自的专用核心上执行。整个系统共享 CPU 时间,因此调度程序也会允许其他进程执行。这是线程不会被调度在专用核心上的原因之一。

下图展示了相同两个线程的执行,但现在它们运行在具有两个 CPU 核心的机器上。您可以看到,第一个线程的第二条和第三条指令(白色方框)与另一个线程的指令同时执行——这两个线程正在并行执行:

接下来,我们讨论共享内存。

共享内存 (Shared Memory)

在同一个进程中创建的线程共享相同的虚拟内存。这意味着一个线程可以访问该进程内可寻址的任何数据。

操作系统使用虚拟内存来保护进程间的内存,但不会保护我们免受意外访问进程内部不打算在不同线程之间共享的内存。虚拟内存只保护我们不访问分配在我们自己进程之外的内存。

在多个线程之间共享内存是处理线程间通信的一种非常有效的方式。然而,以安全的方式在线程间共享内存是编写 C++ 并发程序的主要挑战之一。我们应该始终努力最小化线程之间共享资源的数量。

幸运的是,并非所有内存默认都是共享的。每个线程都有自己的栈(Stack),用于存储局部变量和处理函数调用所需的其他数据。除非一个线程将局部变量的引用或指针传递给其他线程,否则没有其他线程能够访问该线程的栈。这是尽可能多地使用栈的另一个原因。

此外,还有线程局部存储(Thread Local Storage,缩写为 TLS),可用于存储在线程上下文中是全局的,但在线程之间不共享的变量。线程局部变量可以被视为一个全局变量,但每个线程都有自己的副本。

其他所有东西默认都是共享的:即在堆(Heap)上分配的动态内存、全局变量和静态局部变量。每当您有被某个线程修改的共享数据时,您都需要确保没有其他线程同时访问该数据,否则就会发生数据竞争。

该示例中,进程包含三个线程。默认情况下,堆内存由所有线程共享。

数据竞争 (Data Races)

当两个线程同时访问相同的内存,并且至少一个线程正在修改数据时,就会发生数据竞争(Data Race)。如果您的程序存在数据竞争,则意味着您的程序具有未定义行为(Undefined Behavior)。

编译器和优化器会假设您的代码中没有数据竞争,并在此假设下进行优化。这可能导致程序崩溃或产生完全出乎意料的行为。换句话说,您在任何情况下都不能允许程序中出现数据竞争。编译器通常不会警告您数据竞争,因为它们很难在编译时检测到。

调试数据竞争可能是一个真正的挑战,有时需要借助 ThreadSanitizer(来自 Clang)或 Concurrency Visualizer(Visual Studio 扩展)等工具。这些工具通常会对代码进行插桩(instrument),以便运行时库能够在程序运行时检测、警告或可视化潜在的数据竞争。

示例:数据竞争

下图展示了两个线程将要更新一个名为 counter 的整数。想象一下,这两个线程都通过指令 ++counter 递增一个全局计数器变量。事实证明,递增一个 int 可能涉及多条 CPU 指令。这在不同的 CPU 上可能以不同的方式完成,但我们假设 ++counter 生成以下虚构的机器指令:

  • R: 从内存中读取 counter
  • $+1$: 递增 counter
  • W: 将新的 counter 值写入内存

现在,如果两个线程都要更新初始值为 42 的计数器值,我们预期在两个线程运行后它会变成 44。然而,如下图所示,无法保证指令会顺序执行以保证对计数器变量的正确递增。

在没有数据竞争的情况下,计数器将达到值 44,但由于数据竞争,它只达到了 43。

在这个示例中,两个线程都读取了值 42 并将该值递增到 43。然后,它们都写入了新值 43,这意味着我们永远没有达到正确的答案 44。如果第一个线程能够在第二个线程开始读取之前写入值 43,那么我们最终会得到 44。请注意,即使只有一个 CPU 核心,这也可能发生。调度程序可以以类似的方式调度这两个线程,使得两个读取指令都在任何写入指令之前执行。

再次强调,这只是一种可能的情况,但重要的是其行为是未定义的。当您的程序存在数据竞争时,任何事情都可能发生。其中一个例子是撕裂(tearing),它是读撕裂(torn reads)和写撕裂(torn writes)的通用术语。这发生在当一个线程将一个值的一部分写入内存时,另一个线程同时读取该值,从而导致最终得到一个损坏的值。

避免数据竞争

我们如何避免数据竞争?有两种主要选择:

  1. 使用原子数据类型(Atomic Data Type)而不是 int。这将告诉编译器原子地执行读取、递增和写入操作。我们将在本章后面花更多时间讨论原子数据类型。
  2. 使用互斥锁(Mutually Exclusive Lock,Mutex),它保证多个线程永远不会同时执行临界区(Critical Section)。临界区是代码中不得同时执行的部分,因为它会更新或读取共享内存,从而可能产生数据竞争。

另外值得强调的是,不可变数据结构(Immutable Data Structures)——即永远不会被更改的数据结构——可以被多个线程访问,而不会有任何数据竞争的风险。最小化可变对象的使用有许多好处,但在编写并发程序时变得更加重要。一个常见的模式是始终创建新的不可变对象,而不是修改现有对象。当新对象完全构建并代表新状态时,可以将其与旧对象交换(swap)。通过这种方式,我们可以最小化代码中的临界区。只有交换操作本身是一个临界区,因此需要通过原子操作或互斥锁来保护。

互斥锁 (Mutex)

互斥锁(Mutex),是互斥锁(Mutual Exclusion Lock)的缩写,是用于避免数据竞争的同步原语。

需要进入临界区的线程首先需要锁定(lock)互斥锁(锁定有时也称为获取互斥锁 acquire a mutex lock)。这意味着在持有锁的第一个线程解锁(unlock)互斥锁之前,其他线程不能锁定相同的互斥锁。通过这种方式,互斥锁保证在任何给定时间只有一个线程位于临界区内。

下图展示了如何使用互斥锁来避免数据竞争示例中演示的竞态条件(Race Condition)。标记为 L 的指令是锁定指令,标记为 U 的指令是解锁指令。

在 Core 0 上执行的第一个线程首先到达临界区,并在读取计数器值之前锁定互斥锁。然后,它将计数器加 1,并将其写回内存。之后,它释放锁。

在 Core 1 上执行的第二个线程在第一个线程获取互斥锁后立即到达临界区。由于互斥锁已被锁定,该线程被阻塞(blocked),直到第一个线程不受干扰地更新计数器并释放互斥锁。

最终结果是,这两个线程可以安全且正确地更新可变的共享变量。然而,这也意味着这两个线程不再能够并行运行。如果一个线程的大部分工作无法在不序列化(serializing)工作的情况下完成,那么从性能的角度来看,使用线程就没有意义了。

第二个线程被第一个线程阻塞的状态称为争用(Contention)。这是我们努力最小化的事情,因为它会损害并发程序的可伸性(Scalability)。如果争用程度很高,增加更多的 CPU 核心将无法改善性能。

死锁 (Deadlock)

当使用互斥锁来保护共享资源时,存在陷入死锁(Deadlock)状态的风险。当两个线程相互等待对方释放锁时,可能会发生死锁。两个线程都无法继续执行,并陷入死锁状态。

死锁发生需要满足的一个条件是:一个已经持有锁的线程尝试获取额外的锁。

当系统增长和变大时,跟踪系统中运行的所有线程可能使用的所有锁变得越来越困难。这也是始终尝试最小化共享资源使用的原因之一,并说明了互斥锁定的必要性。

下图显示了两个线程处于等待状态,试图获取对方持有的锁:

接下来,我们讨论同步和异步任务。

同步任务与异步任务 (Synchronous and Asynchronous Tasks)

在本章中,我将提及同步任务(Synchronous Tasks)和异步任务(Asynchronous Tasks)。

  • 同步任务就像普通的 C++ 函数。当同步任务完成它应该做的任何事情时,它将控制权返回给任务的调用者。任务的调用者会等待或阻塞,直到同步任务完成。

  • 异步任务则会立即将控制权返回给调用者,而是并发地执行其工作。

下图展示了分别调用同步任务和异步任务的区别:

如果您以前没有见过异步任务,它们乍一看可能很奇怪,因为 C++ 中的普通函数在遇到 return 语句或到达函数体末尾时总是停止执行。然而,异步 API 越来越常见,您很可能以前遇到过它们,例如在处理异步 JavaScript 时。

有时,我们使用术语阻塞(blocking)来指代那些阻塞调用者的操作;也就是说,使调用者等待直到操作完成。

在对并发进行了总体介绍之后,是时候探索 C++ 中对线程编程的支持了。

C++ 中的并发编程

C++ 中的并发支持使得程序能够并发执行多个任务。如前所述,编写正确的并发 C++ 程序通常比编写在一个线程中按顺序执行所有任务的程序要困难得多。本节还将演示一些常见的陷阱,以便让您了解编写并发程序所涉及的所有困难。

并发支持最早是在 C++11 中引入的,此后在 C++14、C++17 和 C++20 中得到了扩展。在并发成为语言的一部分之前,它是通过操作系统的原生并发支持、POSIX 线程(pthreads)或其他库实现的。

有了 C++ 语言中直接的并发支持,我们可以编写跨平台的并发程序,这非常棒!然而,在处理特定平台上的并发问题时,有时仍然需要依赖特定平台的功能。例如,C++ 标准库中不支持设置线程优先级、配置 CPU 亲和性(CPU 绑定),或设置新线程的栈大小。

还需要指出的是,随着 C++20 的发布,线程支持库得到了相当大的扩展,未来版本的语言中可能会增加更多功能。由于硬件的发展方式,对良好并发支持的需求正在增加,在高效性、可扩展性和高度并发程序的正确性方面,还有很多有待发现。

线程支持库

我们现在将对 C++ 线程支持库进行一次概览,并介绍其最重要的组成部分。

线程 (Threads)

一个运行中的程序至少包含一个线程。当您的 main 函数被调用时,它在一个通常被称为主线程的线程上执行。每个线程都有一个标识符,这在调试并发程序时非常有用。下面的程序打印出主线程的线程标识符:

int main() {
    std::cout << "Thread ID: " << std::this_thread::get_id() << '\n';
}

运行上述程序可能会产生类似如下的输出:

Thread ID: 0x1001553c0

可以使线程休眠。休眠很少用于生产代码,但在调试期间非常有用。例如,如果您有一个只有在极少数情况下才会发生的数据竞争,那么在代码中添加休眠可能会使其更频繁地出现。下面是使当前运行的线程休眠一秒钟的方法:

std::this_thread::sleep_for(std::chrono::seconds{1});

您的程序在插入随机休眠后,不应该暴露出任何数据竞争。添加休眠后,您的程序可能无法令人满意地工作;缓冲区可能会满,UI 可能会滞后等等,但它应该始终以可预测和定义好的方式运行。我们无法控制线程的调度,随机休眠只是模拟了不太可能但可能发生的调度场景。

现在,让我们使用 <thread> 头文件中的 std::thread 类创建一个额外的线程。它代表一个单一的执行线程,通常是对操作系统线程的封装。下面的 print() 函数将由我们显式创建的线程调用:

void print() {
    std::this_thread::sleep_for(std::chrono::seconds{1});
    std::cout << "Thread ID: "<< std::this_thread::get_id() << '\n';
}
int main() {
    auto t1 = std::thread{print};
    t1.join();
    std::cout << "Thread ID: "<< std::this_thread::get_id() << '\n';
}

创建线程时,我们传入一个可调用对象(函数、lambda 或函数对象),线程在 CPU 上获得调度时间后将开始执行该对象。我添加了一个对 sleep 的调用,以使我们为什么需要调用线程的 join() 变得更明显。当一个 std::thread 对象被销毁时,它必须已经被 joineddetached,否则程序将调用 std::terminate(),默认情况下,如果我们没有安装自定义的 std::terminate_handler,它将调用 std::abort()

在上面的示例中,join() 函数是阻塞的——它会一直等待直到线程完成运行。因此,在上面的示例中,main() 函数直到线程 t1 完成运行才会返回。考虑以下一行代码:

t1.join();

假设我们通过将上述行替换为以下行来分离线程 t1:

t1.detach();

在这种情况下,我们的 main 函数将在线程 t1 醒来打印消息之前结束,结果是程序(很可能)只输出主线程的线程 ID。请记住,我们无法控制线程的调度,尽管可能性很小,但主线程可能会在 print() 函数有时间休眠、醒来并打印其线程 ID 之后才输出其消息。

在这个示例中使用 detach() 代替 join() 也引入了另一个问题。我们在两个线程中都使用了 std::cout 而没有任何同步,而且由于 main() 不再等待线程 t1 完成,它们在理论上可以并行使用 std::cout。幸运的是,std::cout 是线程安全的,可以在多个线程中使用而不会引入数据竞争,因此没有未定义行为。然而,线程生成的输出仍有可能交错,导致类似如下的结果:

Thread ID: Thread ID: 0x1003a93400x700004fd4000

如果我们想避免交错输出,我们需要将字符输出视为一个临界区并同步对 std::cout 的访问。我们稍后会讨论更多关于临界区和竞态条件的内容,但首先,让我们讨论一些关于 std::thread 的细节。

线程状态 (Thread states)

在我们深入之前,您应该对 std::thread 对象真正代表什么以及它可能处于什么状态有一个很好的理解。我们还没有讨论在一个执行 C++ 程序的系统中通常有哪些类型的线程。

在下图中,您可以看到一个假设的运行系统的快照。

从底部开始,该图显示了 CPU 及其硬件线程。它们是 CPU 上的执行单元。在这个示例中,CPU 提供了四个硬件线程。通常这意味着它有四个核心,但它可能是其他配置;例如,一些核心可以执行两个硬件线程。这通常被称为超线程(hyperthreading)。硬件线程的总数可以在运行时通过以下方式打印:

std::cout << std::thread::hardware_concurrency() << '\n';
// Possible output: 4

如果无法确定运行平台上的硬件线程数量,上述代码也可能输出 0。

硬件线程上方的层包含操作系统线程。这些是实际的软件线程。操作系统调度程序决定何时以及多长时间执行一个操作系统线程。在图中,目前有六个软件线程中的三个正在执行。

图中最顶层包含 std::thread 对象。一个 std::thread 对象不过是一个普通的 C++ 对象,它可能或可能不与底层操作系统线程关联。std::thread 的两个实例不能与同一个底层线程关联。在图中,您可以看到程序目前有三个 std::thread 实例;其中两个与线程关联,一个没有。可以使用 std::thread::joinable 属性来查明一个 std::thread 对象处于什么状态。

如果一个线程满足以下任一条件,则它不是 joinable 的:

  • 默认构造;即,它没有要执行的东西。
  • 被移走(其关联的运行线程已转移到另一个 std::thread 对象)。
  • 通过调用 detach() 被分离。
  • 已经通过调用 join() 被加入。

否则,std::thread 对象处于 joinable 状态。请记住,当一个 std::thread 对象被销毁时,它必须不再处于 joinable 状态,否则程序将终止。

可join的线程 (std::jthread)

C++20 引入了一个新的线程类,名为 std::jthread。它与 std::thread 非常相似,但有一些重要的补充:

  • std::jthread 支持使用 停止令牌(stop token)来停止线程。这是我们在 C++20 之前使用 std::thread 时必须手动实现的功能。
  • std::jthread 的析构函数在非 joinable 状态下被销毁时,不会导致应用程序终止,而是会发送一个停止请求并加入线程(join the thread on destruction)。

我将在接下来说明后一点。首先,我们将使用 print() 函数,它定义如下:

void print() {
    std::this_thread::sleep_for(std::chrono::seconds{1});
    std::cout << "Thread ID: "<< std::this_thread::get_id() << '\n';
}

它休眠一秒钟,然后打印当前的线程标识符:

int main() {
    std::cout << "main begin\n";
    auto joinable_thread = std::jthread{print};
    std::cout << "main end\n";
} // OK: jthread will join automatically

在我的机器上运行代码时产生了以下输出:

main begin
main end
Thread ID: 0x1004553c0

现在让我们修改我们的 print() 函数,使其在循环中连续输出消息。然后我们需要某种方式来通知 print() 函数何时停止。std::jthread(与 std::thread 不同)通过使用停止令牌(stop token)内置了对这一功能的支持。当 std::jthread 调用 print() 函数时,如果 print() 函数接受这样一个参数,它可以传递一个 std::stop_token 的实例。下面是一个如何使用停止令牌实现新的 print() 函数的示例:

void print(std::stop_token stoken) {
    while (!stoken.stop_requested()) {
        std::cout << std::this_thread::get_id() << '\n';
        std::this_thread::sleep_for(std::chrono::seconds{1});
    }
    std::cout << "Stop requested\n";
}

while 循环在每次迭代时通过调用 stop_requested() 检查是否已请求停止该函数。现在,在我们的 main() 函数中,可以通过调用 std::jthread 实例上的 request_stop() 来请求停止:

int main() {
    auto joinable_thread = std::jthread(print);
    std::cout << "main: goes to sleep\n";
    std::this_thread::sleep_for(std::chrono::seconds{3});
    std::cout << "main: request jthread to stop\n";
    joinable_thread.request_stop();
}

当我运行这个程序时,它生成了以下输出:

main: goes to sleep
Thread ID: 0x70000f7e1000
Thread ID: 0x70000f7e1000
Thread ID: 0x70000f7e1000
main: request jthread to stop
Stop requested

在这个示例中,我们可以省略显式调用 request_stop(),因为 jthread 在析构时会自动调用 request_stop()

新的 jthread 类是 C++ 线程库的一个受欢迎的补充,在 C++ 中选择线程类时,它应该是首选。

保护临界区 (Protecting critical sections)

正如我已经提到的,我们的代码中不能包含任何数据竞争(data races)。不幸的是,编写带有数据竞争的代码非常容易。在使用线程的这种风格编写并发程序时,找到临界区(critical sections)并用锁保护它们是我们不断需要思考的问题。

C++ 为我们提供了一个 std::mutex 类,可用于保护临界区并避免数据竞争。我将通过一个经典的示例来演示如何使用互斥锁,该示例使用一个由多个线程更新的共享可变计数器变量。

首先,我们定义一个全局可变变量和递增计数器的函数:

auto counter = 0; // Warning! Global mutable variable
void increment_counter(int n) {
    for (int i = 0; i < n; ++i)
        ++counter;
}

接下来的 main() 函数创建了两个线程,它们都将执行 increment_counter() 函数。注意在这个例子中我们如何向线程调用的函数传递参数。我们可以向线程构造函数传递任意数量的参数,以匹配要调用函数签名中的参数。最后,我们断言计数器的值应该与程序没有竞态条件时我们期望的值相同:

int main() {
    constexpr auto n = int{100'000'000};
    {
        auto t1 = std::jthread{increment_counter, n};
        auto t2 = std::jthread{increment_counter, n};
    }
    std::cout << counter << '\n';
    // If we don't have a data race, this assert should hold:
    assert(counter == (n * 2));
}

这个程序很可能会失败。assert() 函数不成立,因为程序当前包含一个竞态条件(race condition)。当我重复运行程序时,我得到了不同的计数器值。例如,我曾得到一个不高于 137182234 的值,而不是达到 200000000。这个例子与本章前面说明的数据竞争示例非常相似。

表达式 ++counter 所在的行是一个临界区——它使用了共享可变变量,并由多个线程执行。为了保护临界区,我们现在将使用包含在 <mutex> 头文件中的 std::mutex。稍后,您将看到如何通过使用原子类型(atomics)来避免这个例子中的数据竞争,但现在,我们将使用锁。

首先,我们在计数器旁边添加全局 std::mutex 对象:

auto counter = 0; // Counter will be protected by counter_mutex
auto counter_mutex = std::mutex{};

但是 std::mutex 对象本身不也是一个可变的共享变量,如果被多个线程使用,会不会产生数据竞争呢?是的,它是一个可变共享变量,但不会产生数据竞争。C++ 线程库中的同步原语,如 std::mutex,就是为此特定目的而设计的。在这方面,它们非常特殊,并使用硬件指令,或我们平台上必要的任何其他机制,来保证它们自身不会产生数据竞争。

现在我们需要在读取和更新计数器变量的临界区中使用互斥锁。我们可以使用 counter_mutex 上的 lock()unlock() 成员函数,但更推荐和更安全的方法是始终使用 RAII 来处理互斥锁。将互斥锁视为一种资源,当我们使用完毕后总是需要解锁它。线程库为我们提供了一些有用的 RAII 类模板来处理锁定。在这里,我们将使用 std::scoped_lock<Mutex> 模板来确保我们安全地释放互斥锁。下面是更新后的 increment_counter() 函数,它现在受到互斥锁保护:

void increment_counter(int n) {
    for (int i = 0; i < n; ++i) {
        auto lock = std::scoped_lock{counter_mutex};
        ++counter;
    }
}

该程序现在没有数据竞争,并且按预期工作。如果我们再次运行它,assert() 函数中的条件现在将成立。

避免死锁 (Avoiding deadlocks)

只要一个线程从不同时获取多于一个锁,就没有死锁的风险。然而,有时有必要在已经持有一个先前获取的锁的同时,获取另一个锁。可以通过同时获取这两个锁来避免这些情况下的死锁风险。C++ 有一种方法可以通过使用 std::lock() 函数来做到这一点,该函数接受任意数量的锁并阻塞,直到所有锁都被获取。

下面是一个在账户之间转账的示例。交易期间需要保护两个账户,因此我们需要同时获取两个锁。它是这样工作的:

struct Account {
    Account() {}
    int balance_{0};
    std::mutex m_{};
};
void transfer_money(Account& from, Account& to, int amount) {
    auto lock1 = std::unique_lock<std::mutex>{from.m_, std::defer_lock};
    auto lock2 = std::unique_lock<std::mutex>{to.m_, std::defer_lock};
    // Lock both unique_locks at the same time
    std::lock(lock1, lock2);
    from.balance_ -= amount;
    to.balance_ += amount;
}

我们再次使用 RAII 类模板来确保每当此函数返回时,我们都会释放锁。在这种情况下,我们使用 std::unique_lock,它为我们提供了延迟锁定互斥锁的可能性。然后,我们通过使用 std::lock() 函数同时显式地锁定了两个互斥锁。

条件变量 (Condition variables)

条件变量(A condition variable)使得线程能够等待直到满足某些特定的条件。线程还可以使用条件变量来通知其他线程条件已发生变化。

并发程序中的一个常见模式是有一个或多个线程正在等待数据以某种方式被消费。这些线程通常被称为消费者(consumers)。另一组线程则负责生产准备好被消费的数据。这些生产数据的线程被称为生产者(producers),或者如果只有一个线程则称为一个生产者。

生产者和消费者模式可以使用条件变量来实现。我们可以结合使用 std::condition_variablestd::unique_lock 来实现此目的。让我们看一个生产者和消费者的示例,使它们不那么抽象:

auto cv = std::condition_variable{};
auto q = std::queue<int>{};
auto mtx = std::mutex{}; // Protects the shared queue
constexpr int sentinel = -1; // Value to signal that we are done

void print_ints() {
    auto i = 0;
    while (i != sentinel) {
        {
            auto lock = std::unique_lock<std::mutex>{mtx};
            while (q.empty()) {
                cv.wait(lock); // The lock is released while waiting
            }
            i = q.front();
            q.pop();
        }
        if (i != sentinel) {
            std::cout << "Got: " << i << '\n';
        }
    }
}

auto generate_ints() {
    for (auto i : {1, 2, 3, sentinel}) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        {
            auto lock = std::scoped_lock{mtx};
            q.push(i);
        }
        cv.notify_one();
    }
}

int main() {
    auto producer = std::jthread{generate_ints};
    auto consumer = std::jthread{print_ints};
}

我们创建了两个线程:一个消费者线程和一个生产者线程。生产者线程生成一系列整数,并每秒将它们推送到全局 std::queue<int> 一次。每当一个元素被添加到队列时,生产者会使用 notify_one() 发出信号,表明条件已更改。

程序检查队列中是否有可供消费者线程消费的数据。另请注意,在通知条件变量时不需要持有锁。

消费者线程负责将数据(即整数)打印到控制台。它使用条件变量来等待空队列发生变化。当消费者调用 cv.wait(lock) 时,线程进入休眠并让出 CPU 供其他线程执行。理解为什么我们需要在调用 wait() 时传递变量 lock 是很重要的。除了让线程进入休眠之外,wait() 在休眠时还会解锁互斥锁,并在返回之前获取互斥锁。如果 wait() 没有释放互斥锁,生产者将无法向队列添加元素。

为什么消费者等待条件变量时使用 while 循环而不是 if 语句?这是一种常见的模式,有时我们需要这样做,因为可能有其他消费者也被唤醒并在我们之前清空了队列。然而,在我们的程序中只有一个消费者线程,所以这种情况不会发生。但是,即使生产者线程没有发出信号,消费者也有可能从等待中被唤醒。这种现象被称为虚假唤醒(spurious wakeup),发生这种情况的原因超出了本书的范围。

作为使用 while 循环的替代方案,我们可以使用接受谓词(predicate)的重载版本 wait()。这个版本的 wait() 会检查谓词是否满足,并将为我们完成循环。在我们的示例中,它看起来像这样:

// ...
auto lock = std::unique_lock<std::mutex>{mtx};
cv.wait(lock, [] { return !q.empty(); });
// ...

您可以在 Anthony Williams 撰写的《C++ 并发实战(第二版)》中找到有关虚假唤醒的更多信息。您现在至少知道如何处理可能发生虚假唤醒的情况:始终在 while 循环中检查条件,或使用接受谓词的重载版本 wait()

条件变量和互斥锁是自 C++ 引入线程以来就可用的同步原语。C++20 带来了额外的有用类模板用于同步线程,即 std::counting_semaphorestd::barrierstd::latch。我们将在后面介绍这些新原语。首先,我们将花一些时间在返回值和错误处理上。

返回数据和处理错误 (Returning data and handling errors)

本章到目前为止介绍的示例都使用共享变量在线程之间通信状态。我们使用了互斥锁来确保避免数据竞争。正如我们一直在做的,当程序规模增加时,使用带互斥锁的共享数据可能很难正确实现。维护使用散布在代码库中的显式锁的代码也需要大量工作。跟踪共享内存和显式锁定使我们偏离了编写程序时真正想要完成和花费时间的事情。

此外,我们还没有处理任何错误处理。如果一个线程需要向另一个线程报告错误怎么办?当函数需要报告运行时错误时,我们习惯于使用异常,那么我们如何使用异常来做这件事呢?

在标准库的 <future> 头文件中,我们可以找到一些类模板,它们可以帮助我们编写不使用全局变量和锁的并发代码,此外,还可以在线程之间通信异常以进行错误处理。我现在将介绍 futurepromise,它们代表一个值的两个方面。future 是值的接收端,promise 是值的返回端。

下面是使用 std::promise 将结果返回给调用者的示例:

auto divide(int a, int b, std::promise<int>& p) {
    if (b == 0) {
        auto e = std::runtime_error{"Divide by zero exception"};
        p.set_exception(std::make_exception_ptr(e));
    }
    else {
        const auto result = a / b;
        p.set_value(result);
    }
}
int main() {
    auto p = std::promise<int>{};
    std::thread(divide, 45, 5, std::ref(p)).detach();
    auto f = p.get_future();
    try {
        const auto& result = f.get(); // Blocks until ready
        std::cout << "Result: " << result << '\n';
    }
    catch (const std::exception& e) {
        std::cout << "Caught exception: " << e.what() << '\n';
    }
}

调用者(main() 函数)创建了 std::promise 对象并将其传递给 divide() 函数。我们需要使用 <functional> 中的 std::ref,以便引用可以通过 std::thread 正确转发到 divide() 函数。

divide() 函数计算出结果后,它通过调用 set_value() 函数将返回值传递给 promise。如果 divide() 函数发生错误,它会转而调用 promise 上的 set_exception() 函数。

future 代表可能尚未计算或正在计算中的计算结果值。由于 future 是一个普通对象,我们可以,例如,将其传递给需要计算值的其他对象。最后,当某个客户端需要该值时,它调用 get() 来获取实际的值。如果那时还没有计算出来,调用 get() 将会阻塞直到完成。

另请注意,我们如何在不使用任何共享全局数据和任何显式锁的情况下,通过适当的错误处理来回传递数据。promise 为我们处理了这些,我们可以专注于实现程序的基本逻辑。

任务 (Tasks)

有了 futurepromise,我们设法摆脱了显式锁和共享全局数据。在可能的情况下,我们的代码将受益于使用更高级别的抽象,尤其是在代码库增长时。在这里,我们将进一步探索自动为我们设置 futurepromise 的类。您还将看到如何摆脱手动管理线程,并将这部分工作留给库。

在许多情况下,我们不需要管理线程;相反,我们真正需要的是能够异步执行一个任务,并让该任务与程序的其余部分并发执行,然后最终将结果或错误传达给程序中需要它的部分。该任务应该隔离执行,以最大限度地减少竞争和数据竞争的风险。

我们将从重写我们之前划分两个数字的示例开始。这次,我们将使用 <future> 中的 std::packaged_task,它为我们完成了设置 promise 的所有工作:

int divide(int a, int b) { // No need to pass a promise ref here!
    if (b == 0) {
        throw std::runtime_error{"Divide by zero exception"};
    }
    return a / b;
}

int main() {
    auto task = std::packaged_task<decltype(divide)>{divide};
    auto f = task.get_future();
    std::thread{std::move(task), 45, 5}.detach();

    // The code below is unchanged from the previous example
    try {
        const auto& result = f.get(); // Blocks until ready
        std::cout << "Result: " << result << '\n';
    }
    catch (const std::exception& e) {
        std::cout << "Caught exception: " << e.what() << '\n';
    }
    return 0;
}

std::packaged_task 本身是一个可调用对象,可以移动到我们正在创建的 std::thread 对象中。正如您所看到的,std::packaged_task 现在为我们完成了大部分工作:我们不必自己创建 promise。但更重要的是,我们可以像编写普通函数一样编写 divide() 函数,而无需通过 promise 显式地返回值或异常;std::packaged_task 将为我们完成这项工作。

作为本节的最后一步,我们还希望摆脱手动线程管理。创建线程并非没有成本,您稍后会看到程序中的线程数量会影响性能。看来是否应该为我们的 divide() 函数创建一个新线程的问题不一定由 divide() 的调用者决定。库在这里再次帮助了我们,提供了一个名为 std::async() 的另一个有用函数模板。在我们的 divide() 示例中,我们唯一需要做的就是将创建 std::packaged_taskstd::thread 对象的代码替换为一个简单的 std::async() 调用:

auto f = std::async(divide, 45, 5);

我们现在已从基于线程的编程模型切换到基于任务的模型(task-based model)。完整的基于任务的示例如下所示:

int divide(int a, int b) {
    if (b == 0) {
        throw std::runtime_error{"Divide by zero exception"};
    }
    return a / b;
}

int main() {
    auto future = std::async(divide, 45, 5);
    try {
        const auto& result = future.get();
        std::cout << "Result: " << result << '\n';
    }
    catch (const std::exception& e) {
        std::cout << "Caught exception: " << e.what() << '\n';
    }
}

这里剩下的用于处理并发的代码量非常少。调用函数异步执行的推荐方法是使用 std::async()。关于为什么以及何时优先使用 std::async() 的更深入讨论,我强烈推荐 Scott Meyers 撰写的《Effective Modern C++》中的并发章节。

C++20 中的额外同步原语

C++20 带来了一些额外的同步原语,即 std::latchstd::barrierstd::counting_semaphore(以及模板特化 std::binary_semaphore)。本节将概述这些新类型及其可能有用的一些典型场景。我们将从 std::latch 开始。

使用闩锁 (Using latches)

闩锁(A latch)是一种可用于同步多个线程的同步原语。它创建了一个所有线程都必须到达的同步点。您可以将闩锁视为一个递减计数器。通常,所有线程递减计数器一次,然后等待闩锁达到零后才能继续前进。

闩锁是通过传入内部计数器的初始值来构造的:

auto lat = std::latch{8}; // Construct a latch initialized with 8

线程可以使用 count_down() 来递减计数器:

lat.count_down(); // Decrement but don't wait

线程可以等待闩锁达到零:

lat.wait(); // Block until zero

也可以检查(不阻塞)计数器是否已达到零:

if (lat.try_wait()) {
// All threads have arrived ...
}

通常在递减计数器后立即等待闩锁达到零,如下所示:

lat.count_down();
lat.wait();

事实上,这种用法非常常见,值得拥有一个量身定制的成员函数;arrive_and_wait() 递减闩锁,然后等待闩锁达到零:

lat.arrive_and_wait(); // Decrement and block while not zero

在处理并发时,加入一组分叉任务(Joining a set of forked tasks)是一个常见场景。如果只需要在最后加入任务,我们可以使用一个 future 对象的数组(来等待)或只是等待所有线程完成。但在其他情况下,我们希望一组异步任务到达一个共同的同步点,然后任务继续运行。这些情况通常发生在多线程工作之前需要进行某种初始化时。

示例:使用 std::latch 初始化线程

下面的示例演示了当多个工作线程需要在开始工作之前运行一些初始化代码时,如何使用 std::latch

当创建线程时,会为栈分配一个连续的内存块。通常,这段内存最初在虚拟地址空间中分配时并不驻留在物理内存中。相反,当栈被使用时,会生成页错误(page faults),以便将虚拟内存映射到物理内存。操作系统为我们处理映射,这是一种在需要时延迟映射内存的有效方式。通常,这正是我们想要的:我们尽可能晚地支付映射内存的成本,并且只有在需要时才支付。然而,在低延迟至关重要的情况下(例如在实时代码中),可能需要完全避免页错误。栈内存不太可能被操作系统分页换出,因此通常只需运行一些会生成页错误的代码,从而将虚拟栈内存映射到物理内存。这个过程称为预取页(prefaulting)。

没有可移植的方法来设置或获取 C++ 线程的栈大小,所以在这里我们假设栈至少有 500 KB。下面的代码试图预取栈的前 500 KB:

void prefault_stack() {
    // We don't know the size of the stack
    constexpr auto stack_size = 500u * 1024u;
    // Make volatile to avoid optimization
    volatile unsigned char mem[stack_size];
    std::fill(std::begin(mem), std::end(mem), 0);
}

这里的想法是在栈上分配一个数组,它将占据大量的栈内存。然后,为了生成页错误,我们使用 std::fill() 写入数组中的每个元素。关键字 volatile 之前没有提到,它是 C++ 中一个有些令人困惑的关键字。它与并发无关;它被添加到这里只是为了防止编译器优化掉这段代码。通过将 mem 数组声明为 volatile,编译器不允许忽略对数组的写入。

现在,让我们关注实际的 std::latch。假设我们想要创建一些工作线程,它们应该只在所有线程栈都被预取页后才开始工作。我们可以使用 std::latch 实现这种同步,如下所示:

auto do_work() { /* ... */ }
int main() {
    constexpr auto n_threads = 2;
    auto initialized = std::latch{n_threads};
    auto threads = std::vector<std::thread>{};
    for (auto i = 0; i < n_threads; ++i) {
        threads.emplace_back([&] {
            prefault_stack();
            initialized.arrive_and_wait();
            do_work();
        });
    }
    initialized.wait();
    std::cout << "Initialized, starting to work\n";
    for (auto&& t : threads) {
        t.join();
    }
}

在所有线程到达之后,主线程可以开始向工作线程提交工作。在这个示例中,所有线程都通过在闩锁上调用 arrive_and_wait() 来等待其他线程到达。一旦闩锁达到零,它就不能再被重用。没有重置闩锁的函数。如果我们需要多个同步点的场景,我们可以转而使用 std::barrier

使用屏障 (Using barriers)

屏障(Barriers)与闩锁相似,但有两个主要附加功能:屏障可以被重用,并且它可以在所有线程到达屏障时运行一个完成函数(completion function)。

屏障是通过传入内部计数器的初始值和一个完成函数来构造的:

auto bar = std::barrier{8, [] {
    // Completion function
    std::cout << "All threads arrived at barrier\n";
}};

线程可以以与使用闩锁相同的方式到达并等待:

bar.arrive_and_wait(); // Decrement and block while not zero

每当所有线程都到达时(即,当屏障的内部计数器达到零时),会发生两件事:

  • 屏障会调用提供给构造函数的完成函数。
  • 在完成函数返回后,内部计数器会重置为其初始值。

屏障在基于分叉-加入(fork-join)模型的并行编程算法中非常有用。通常,迭代算法包含一部分可以并行运行,而另一部分需要按顺序运行。多个任务被分叉并并行运行。然后,当所有任务完成并加入后,会执行一些单线程代码来确定算法是否应该继续或结束。

遵循分叉-加入模型的并发算法将受益于使用屏障,并且可以优雅高效地避免其他显式锁定机制。让我们看看如何使用屏障解决一个简单问题。

示例:使用 std::barrier 进行分叉-加入

我们的下一个示例是一个演示分叉-加入模型的玩具问题。我们将创建一个小程序,模拟一组骰子被掷出,并计算掷出所有 6 所需要的次数。掷一组骰子是我们可以并发完成的事情(分叉)。加入步骤,在单个线程中执行,检查结果并确定是再次掷骰子还是结束。

首先,我们需要实现掷六面骰子的代码。为了生成一个介于 1 到 6 之间的数字,我们可以使用 <random> 头文件中类组合,如下所示:

auto engine =
    std::default_random_engine{std::random_device{}()};
auto dist = std::uniform_int_distribution<>{1, 6};
auto result = dist(engine);

在这里,std::random_device 负责为引擎生成一个种子,该引擎将产生伪随机数。为了以相等的概率选择 1 到 6 之间的整数,我们使用了 std::uniform_int_distribution。变量 result 是掷骰子的结果。

现在我们想将这段代码封装到一个将生成随机整数的函数中。生成种子和创建引擎通常很慢,是我们希望避免在每次调用时都执行的操作。一种常见的做法是将随机引擎声明为静态持续期,以便它在程序的整个生命周期内都存在。然而,<random> 中的类不是线程安全的,所以我们需要以某种方式保护静态引擎。与其使用互斥锁同步访问(这将使随机数生成器按顺序运行),我将借此机会演示如何使用线程局部存储(thread-local storage)。

下面是如何将引擎声明为静态线程局部(static thread_local)对象:

auto random_int(int min, int max) {
    // One engine instance per thread
    static thread_local auto engine =
        std::default_random_engine{std::random_device{}()};
    auto dist = std::uniform_int_distribution<>{min, max};
    return dist(engine);
}

具有 thread_local 存储持续期的静态变量将为每个线程创建一次;因此,在不使用任何同步原语的情况下,从多个线程并发调用 random_int() 是安全的。有了这个小的辅助函数,我们可以继续使用 std::barrier 实现程序的其余部分:

int main() {
    constexpr auto n = 5; // Number of dice
    auto done = false;
    auto dice = std::array<int, n>{};
    auto threads = std::vector<std::thread>{};
    auto n_turns = 0;
    
    auto check_result = [&] { // Completion function
        ++n_turns;
        auto is_six = [](auto i) { return i == 6; };
        done = std::all_of(dice.begin(), dice.end(), is_six);
    };

    auto bar = std::barrier{n, check_result};
    
    for (int i = 0; i < n; ++i) {
        threads.emplace_back([&, i] {
            while (!done) {
                dice [i] = random_int(1, 6); // Roll dice
                bar.arrive_and_wait(); // Join
            }});
    }

    for (auto&& t : threads) {
        t.join();
    }
    std::cout << n_turns << '\n';
}

lambda check_result() 是完成函数,它将在所有线程到达屏障时被调用。完成函数检查每个骰子的值,并确定是否应该进行新一轮的掷骰子,或者是否已完成。

传递给 std::thread 对象的 lambda 通过值捕获了索引 i,以便所有线程都有一个唯一的索引。其他变量 donedicebar 都是通过引用捕获的。

另请注意,由于屏障执行的协调,我们可以在不引入任何数据竞争的情况下,从不同的线程更改和读取通过引用捕获的变量。

使用信号量进行信号通知和资源计数 (Signalling and resource counting using semaphores)

信号量(The word semaphore)一词意为可用于发出信号的事物,例如旗帜或灯光。在接下来的示例中,您将看到我们如何使用信号量来发出其他线程可以等待的不同状态的信号。

信号量也可用于控制对资源的访问,类似于 std::mutex 限制对临界区的访问方式:

class Server {
public:
    void handle(const Request& req) {
        sem_.acquire();
        // Restricted section begins here.
        // Handle at most 4 requests concurrently.
        do_handle(req);
        sem_.release();
    }
private:
    void do_handle(const Request& req) { /* ... */ }
    std::counting_semaphore<4> sem_{4};
};

在这种情况下,信号量初始化值为 4,这意味着最多可以同时处理四个并发请求。不同于对代码中某个部分的互斥访问,多个线程可以访问同一部分,但对当前在该部分中的线程数量有限制。

如果信号量大于零,成员函数 acquire() 会递减信号量。否则,acquire() 会阻塞,直到信号量允许它递减并进入受限部分。release() 会递增计数器而不阻塞。如果信号量在被 release() 递增之前为零,等待的线程将被通知。

除了 acquire() 函数之外,还可以使用 try_acquire() 函数尝试在不阻塞的情况下递减计数器。如果它成功递减了计数器,则返回 true,否则返回 falsetry_acquire_for()try_acquire_until() 函数也可以以类似的方式使用。但它们不是在计数器已经为零时立即返回 false,而是在返回给调用者之前,自动尝试在指定的时间内递减计数器。这三个函数遵循与标准库中其他类型相同的模式,例如 std::timed_mutex 及其成员函数 try_lock()try_lock_for()try_lock_until()

std::counting_semaphore 是一个模板,带有一个模板参数,接受信号量的最大值。将信号量递增(release)到超过其最大值被认为是编程错误。

最大大小为 1 的 std::counting_semaphore 称为二值信号量(binary semaphore)。<semaphore> 头文件包含二值信号量的别名声明:

std::binary_semaphore = std::counting_semaphore<1>;

二值信号量保证比最大值更高的计数信号量实现得更高效。

信号量的另一个重要属性是释放信号量的线程可能不是获取它的线程。这与 std::mutex 形成对比,std::mutex 要求获取互斥锁的线程也必须释放它。然而,对于信号量,通常有一种类型的任务进行等待(acquire),而另一种类型的任务进行信号通知(release)。这将在我们的下一个示例中演示。

示例:使用信号量的有界缓冲区 (A bounded buffer using semaphores)

下面的示例演示了一个有界缓冲区(bounded buffer)。它是一个固定大小的缓冲区,可以有多个线程从中读取和写入。同样,这个示例演示了您已经使用条件变量看到的生产者-消费者模式。生产者线程是写入缓冲区的线程,读取器线程是从缓冲区读取(并弹出元素)的线程。

上图显示了缓冲区(一个固定大小的数组)以及跟踪读取和写入位置的两个变量。

我们将一步一步来,从一个关注有界缓冲区内部逻辑的版本开始。使用信号量进行信号通知将在下一个版本中添加。在这里,最初的尝试演示了如何使用读取和写入位置:

template <class T, int N>
class BoundedBuffer {
    std::array<T, N> buf_;
    std::size_t read_pos_{};
    std::size_t write_pos_{};
    std::mutex m_;

    void do_push(auto&& item) {
        /* Missing: Should block if buffer is full */
        auto lock = std::unique_lock{m_};
        buf_[write_pos_] = std::forward<decltype(item)>(item);
        write_pos_ = (write_pos_ + 1) % N;
    }
public:
    void push(const T& item) { do_push(item); }
    void push(T&& item) { do_push(std::move(item)); }

    auto pop() {
        /* Missing: Should block if buffer is empty */
        auto item = std::optional<T>{};
        {
            auto lock = std::unique_lock{m_};
            item = std::move(buf_[read_pos_]);
            read_pos_ = (read_pos_ + 1) % N;
        }
        return std::move(*item);
    }
};

第一次尝试包含了固定大小的缓冲区、读取和写入位置,以及用于保护数据成员免受数据竞争的互斥锁。此实现应该能够让任意数量的线程并发调用 push()pop()

push() 函数重载了 const T&T&&。这是一种标准库容器使用的优化技术。当调用者传递一个右值时,T&& 版本避免了复制参数。

为了避免重复 push 操作的逻辑,辅助函数 do_push() 包含了实际的逻辑。通过使用转发引用(forwarding reference,auto&& item)结合 std::forwarditem 参数将被移动赋值或复制赋值,具体取决于客户端是使用右值还是左值调用 push()

然而,这个版本的有界缓冲区并不完整,因为它没有保护我们免受 write_pos 指向(或超过)read_pos 的情况。同样,read_pos 绝不能指向(或超过)write_pos。我们想要的是一个当缓冲区满时生产者线程阻塞,当缓冲区空时消费者线程阻塞的缓冲区。

这是使用计数信号量的完美应用。信号量会阻塞一个试图在信号量已为零时减少它的线程。当值为零的信号量递增时,信号量会通知阻塞的线程。

对于有界缓冲区,我们需要两个信号量:

  • 第一个信号量 n_empty_slots:跟踪缓冲区中空槽位的数量。它将以缓冲区大小的值开始。
  • 第二个信号量 n_full_slots:跟踪缓冲区中满槽位的数量。

请确保您理解为什么需要两个计数信号量(而不是一个)。原因是需要发出信号的两种不同状态:缓冲区满和缓冲区空。

在添加了使用两个计数信号量进行信号处理后,有界缓冲区现在看起来像这样(此版本中添加的行标记为 “new”):

template <class T, int N>
class BoundedBuffer {
    std::array<T, N> buf_;
    std::size_t read_pos_{};
    std::size_t write_pos_{};
    std::mutex m_;
    std::counting_semaphore<N> n_empty_slots_{N}; // New
    std::counting_semaphore<N> n_full_slots_{0}; // New

    void do_push(auto&& item) {
        // Take one of the empty slots (might block)
        n_empty_slots_.acquire(); // New
        try {
            auto lock = std::unique_lock{m_};
            buf_[write_pos_] = std::forward<decltype(item)>(item);
            write_pos_ = (write_pos_ + 1) % N;
        } catch (...) {
            n_empty_slots_.release(); // New
            throw;
        }
        // Increment and signal that there is one more full slot
        n_full_slots_.release(); // New
    }
public:
    void push(const T& item) { do_push(item); }
    void push(T&& item) { do_push(std::move(item)); }

    auto pop() {
        // Take one of the full slots (might block)
        n_full_slots_.acquire(); // New
        auto item = std::optional<T>{};
        try {
            auto lock = std::unique_lock{m_};
            item = std::move(buf_[read_pos_]);
            read_pos_ = (read_pos_ + 1) % N;
        } catch (...) {
            n_full_slots_.release(); // New
            throw;
        }
        // Increment and signal that there is one more empty slot
        n_empty_slots_.release(); // New
        return std::move(*item);
    }
};

此版本支持多个生产者和消费者。两个信号量的使用保证了两个信号量都不会达到大于缓冲区中最大元素数量的值。例如,生产者线程不可能在没有首先检查至少有一个空槽位的情况下添加值并递增 n_full_slots 信号量。

另请注意,acquire()release() 是从不同的线程调用的。例如,消费者线程正在等待(acquire()n_full_slots 信号量,而生产者线程正在对同一个信号量发出信号(release())。

添加到 C++20 的新同步原语是线程库中常见的已知构造。与 std::mutexstd::condition_variable 相比,它们提供了方便且通常更高效的替代方案来同步对共享资源的访问。

C++ 中的原子支持

标准库支持原子变量(atomic variables),有时简称为原子(atomics)。原子变量是一种可以安全地从多个线程中使用和修改,而不会引入数据竞争的变量。

您是否还记得我们之前看过的两个线程更新一个全局计数器的数据竞争示例?我们通过添加互斥锁 (mutex) 来配合计数器解决了这个问题。现在,我们可以使用 std::atomic<int> 来替代显式锁:

std::atomic<int> counter;

auto increment_counter(int n) {
    for (int i = 0; i < n; ++i)
        ++counter; // Safe, counter is now an atomic<int>
}

++counter 是一种方便的写法,等同于 counter.fetch_add(1)。所有可以在原子类型上调用的成员函数都可以在多个线程中并发安全地调用。

原子类型来自 <atomic> 头文件。对于所有标量数据类型,都有形如 std::atomic_int 的类型别名。这与 std::atomic<int> 是完全相同的。

可以将自定义类型包装在 std::atomic 模板中,但前提是该自定义类型是平凡可复制的(trivially copyable)。基本上,这意味着一个类对象完全由其数据成员的位(bits)来描述。这样,对象就可以通过复制原始字节(raw bytes)的方式(例如使用 std::memcpy())进行复制。因此,如果一个类包含虚函数、指向动态内存的指针等,就不再可能仅复制对象的原始位并期望其能正常工作,因此它就不是平凡可复制的。这可以在编译时检查,所以如果您尝试创建不是平凡可复制类型的原子变量,将会得到一个编译错误:

struct Point {
    int x_{};
    int y_{};
};

auto p = std::atomic<Point>{};       // OK: Point is trivially copyable
auto s = std::atomic<std::string>{}; // Error: Not trivially copyable

也可以创建原子指针。这使得指针本身是原子的,但它所指向的对象不是原子的。我们稍后会更多地讨论原子指针和引用。

无锁属性

使用原子变量而不是使用互斥锁来保护对变量的访问,一个重要的原因是避免使用 std::mutex 引入的性能开销。此外,互斥锁可能会以非确定性的时间长度阻塞线程,并可能引入优先级反转(参见线程优先级一节),因此在低延迟上下文中排除了互斥锁的使用。换句话说,您的代码的某些部分可能有延迟要求,从而完全禁止使用互斥锁。在这些情况下,了解原子变量是否使用了互斥锁来保护数据至关重要。

原子变量可能使用,也可能不使用锁来保护数据;这取决于变量的类型和平台。如果原子变量不使用锁,则称其为无锁(lock-free)的。您可以在运行时查询变量是否为无锁:

auto variable = std::atomic<int>{1};
assert(variable.is_lock_free()); // Runtime assert

这很好,因为现在我们至少在运行程序时断言了使用该变量对象是无锁的。通常,所有相同类型的原子对象要么都是无锁的,要么都不是,但在某些特殊的平台上,两个原子对象可能会给出不同的答案。

通常,人们更感兴趣的是某个原子类型(std::atomic<T>)是否保证在特定平台上是无锁的,并且最好我们希望在编译时而不是运行时知道这一点。自 C++17 起,还可以使用 is_always_lock_free() 在编译时验证原子特化是否为无锁,如下所示:

static_assert(std::atomic<int>::is_always_lock_free);

如果 atomic<int> 在我们目标平台上不是无锁的,这段代码将生成一个编译错误。这样,如果我们编译一个假设 std::atomic<int> 不使用锁的程序,它就会编译失败,这正是我们想要的。

在现代平台上,任何适合原生字大小(native word size)的 std::atomic<T> 类型通常都是始终无锁(always lock-free)的。而在现代 x64 芯片上,您甚至可以获得双倍的大小。例如,在现代 Intel CPU 上编译的 libc++ 中,std::atomic<std::complex<double>> 始终是无锁的。

原子标志

std::atomic_flag 是一种保证始终是无锁的原子类型(无论目标平台如何)。因此,std::atomic_flag 不提供 is_always_lock_free() / is_lock_free() 函数,因为它们总是会返回 true

原子标志可以用作保护临界区的替代方案,而不是使用 std::mutex。由于锁的概念很容易理解,我将用它作为这里的示例。不过需要注意的是,我在这本书中演示的锁实现不是可用于生产环境的代码,而是概念性实现。下面的示例演示了如何概念性地实现一个简单的自旋锁(spinlock):

class SimpleMutex {
    std::atomic_flag is_locked_{}; // Cleared by default
public:
    auto lock() noexcept {
        while (is_locked_.test_and_set()) {
            while (is_locked_.test()); // Spin here
        }
    }
    auto unlock() noexcept {
        is_locked_.clear();
    }
};

lock() 函数调用 test_and_set() 来设置标志,同时获取标志的先前值。如果 test_and_set() 返回 false,则意味着调用者成功获取了锁(在标志先前被清除时设置了标志)。否则,内部的 while 循环将使用 test() 在一个自旋循环中不断轮询标志的状态。我们在额外的内部循环中使用 test() 是出于性能原因:test() 不会使缓存行失效,而 test_and_set() 会。这种锁定协议被称为 Test and Test-and-Set。

【译注:

性能优势的核心:缓存一致性

这里的性能差异来源于现代 CPU 的缓存一致性协议(Cache Coherency Protocol),例如 MESI 协议。

test_and_set() 是一个原子写操作。

  • 操作: 它尝试修改内存中的标志(将其设置为 true)。
  • 缓存影响: 根据缓存一致性协议,当一个核心(线程)执行写操作时,它必须通知(或使)所有其他核心中包含该内存地址的缓存行(Cache Line)失效(Invalidate)。
  • 性能开销: 这个缓存失效和同步的过程需要消耗总线带宽,并引入显著的延迟。如果多个核心不断地执行 test_and_set(),它们会持续争夺并失效彼此的缓存,导致性能急剧下降,这种现象称为缓存颠簸(Cache Thrashing)。

test() 只是一个原子读操作(读取标志的当前值)。

  • 操作: 它只读取内存中的标志值。
  • 缓存影响: 如果该标志所在的缓存行已经在核心的本地缓存中处于共享(Shared)或独占(Exclusive)状态,那么读取操作可以直接从本地缓存中完成,不会导致缓存行失效,也不会通知其他核心。
  • 性能优势: 线程可以在不引起总线流量和缓存颠簸的情况下,快速重复检查本地缓存中的标志状态。

通过结合这两种操作,自旋锁实现了高效的等待:

  1. 外层循环 (while (is_locked_.test_and_set())):
    • 目的在于尝试获取锁。
    • 只有在第一次进入循环或从内层退出时才执行,次数较少。
  2. 内层循环 (while (is_locked_.test())):
    • 目的在于高效地等待锁被释放。
    • 一旦线程发现锁已被占用,它会进入内层循环,持续快速地读取本地缓存中的标志,直到看到标志被清除(false)为止。
    • 只有当持有锁的线程调用 is_locked_.clear() 时,内存才会被写入,并最终导致等待线程的缓存行失效或更新,使其退出内层循环,再次尝试外层的 test_and_set()

这种设计将高开销的写操作(test_and_set())的使用降到最低,而将等待过程转移到了低开销的读操作(test())上,从而显著提高了多线程争用时的性能。

这个自旋锁有效,但它对资源不友好:当线程执行时,它不断地使用 CPU 一次又一次地检查相同的条件。我们可以在每次迭代中添加一个带有指数退避(exponential backoff)的短暂休眠,但针对各种平台和场景进行微调是很困难的。

幸运的是,C++20 为 std::atomic 添加了 等待和通知 (wait and notify) API,这使得线程可以(以资源友好的方式)等待原子变量的值发生变化。

原子等待和通知

自 C++20 起,std::atomicstd::atomic_flag 提供了等待和通知的功能。wait() 函数会阻塞当前线程,直到原子变量的值发生变化,并且有其他线程通知等待的线程。线程可以通过调用 notify_one()notify_all() 来通知发生了变化。

有了这个新功能,我们可以避免持续轮询原子的状态,而是以更资源友好的方式等待直到值发生变化;这类似于 std::condition_variable 允许我们等待和通知状态变化的方式。

通过使用等待和通知,上一节实现的 SimpleMutex 可以改写成这样:

class SimpleMutex {
    std::atomic_flag is_locked_{};
public:
    auto lock() noexcept {
        while (is_locked_.test_and_set())
            is_locked_.wait(true); // Don't spin, wait
    }
    auto unlock() noexcept {
        is_locked_.clear();
        is_locked_.notify_one(); // Notify blocked thread
    }
};

我们将旧值 (true) 传递给 wait()。当 wait() 返回时,原子变量保证已经发生了变化,因此它不再是 true。然而,不能保证我们能捕获到变量的所有变化。变量可能从状态 A 变为状态 B,然后再变回状态 A,而没有通知等待的线程。这在无锁编程中被称为 ABA 问题。

这个示例演示了使用 std::atomic_flag 的等待和通知函数。相同的等待和通知 API 也可用于 std::atomic 类模板。

请注意,本章介绍的自旋锁不是可用于生产环境的代码。实现一个高效的锁通常涉及正确使用内存顺序(稍后讨论)和用于让步的非可移植代码,这超出了本书的范围。

现在,我们将继续讨论原子指针和原子引用。

在多线程环境中使用 shared_ptr

那么 std::shared_ptr 呢?它可以在多线程环境中使用吗?当多个线程访问由多个共享指针引用的对象时,引用计数是如何处理的?

要理解共享指针和线程安全,我们需要回顾 std::shared_ptr 通常是如何实现的。考虑以下代码:

// Thread 1
auto p1 = std::make_shared<int>(42);

该代码在堆上创建了一个 int 和一个指向该 int 对象的引用计数智能指针。当使用 std::make_shared() 创建共享指针时,会在 int 旁边创建一个控制块(control block)。控制块除其他外,包含一个用于引用计数的变量,每当创建指向 int 的新指针时,该变量就会增加;每当指向 int 的指针被销毁时,该变量就会减少。总而言之,当执行上面的代码行时,创建了三个独立的实体:

  • 实际的 std::shared_ptr 对象 p1(栈上的局部变量)
  • 一个控制块(堆对象)
  • 一个 int(堆对象)

现在,考虑如果第二个线程执行以下代码会发生什么:

// Thread 2
auto p2 = p1;

我们正在创建一个指向 int(和控制块)的新指针。在创建 p2 指针时,我们读取了 p1,但在更新引用计数器时,我们也需要修改控制块。控制块位于堆上,并在两个线程之间共享,因此它需要同步以避免数据竞争。由于控制块是隐藏在 std::shared_ptr 接口后面的实现细节,我们无法知道如何保护它,结果是它已经被实现处理了。

它通常会使用一个可变的原子计数器。换句话说,引用计数器的更新是线程安全的,因此我们可以从不同的线程中使用多个共享指针,而无需担心同步引用计数器。这是一种很好的实践,也是设计类时需要考虑的问题。如果您在从客户端角度看来是语义只读(const)的方法中修改变量,您应该使修改的变量线程安全。另一方面,客户端可以检测到的修改函数应该留给类的客户端来同步。

总结:shared_ptr 的线程安全

  • 共享对象(int): 不是线程安全的,如果从多个线程访问,需要显式加锁。
  • 控制块(引用计数): 已经是线程安全的,因此引用计数机制在多线程环境中可以正常工作。

让我们继续保护 shared_ptr 实例本身。

保护 shared_ptr 实例

现在只剩下一个部分:在前面的示例中,实际的 std::shared_ptr 对象 p1p2 怎么办?为了理解这一点,让我们来看一个只使用一个全局 std::shared_ptr 对象 p 的示例:

// Global, how to protect?
auto p = std::shared_ptr<int>{};

我们如何从多个线程修改 p 而不引入数据竞争?一种选择是在每次使用 p 时,用显式互斥锁保护它。或者,我们可以使用 std::atomicstd::shared_ptr 的模板特化(在 C++20 中引入)。换句话说,可以将 p 声明为原子共享指针,如下所示:

// Global, protect using atomic
auto p = std::atomic<std::shared_ptr<int>>{};

这种模板特化可能或可能不是无锁的。您可以使用 is_lock_free() 成员函数来验证这一点。另一点需要注意的是,特化 std::atomic<std::shared_ptr<T>> 是一个例外,它打破了 std::atomic 只能特化平凡可复制类型的规则。无论如何,我们很高兴标准库最终有了这个有用的类型。

下面的示例演示了如何从多个线程原子地加载和存储共享指针对象:

// Thread T1 calls this function
auto f1() {
    auto new_p = std::make_shared<int>(std::rand());
    // ...
    p.store(new_p);
}

// Thread T2 calls this function
auto f2() {
    auto local_p = p.load();
    // Use local_p...
}

在前面的示例中,我们假设有两个线程 T1 和 T2 分别调用函数 f1()f2()。线程 T1 通过调用 std::make_shared<int>() 创建新的堆分配的 int 对象。

在这个示例中需要考虑一个微妙的细节:堆分配的 int 是在哪个线程中删除的?当 local_pf2() 函数中超出作用域时,它可能是指向 int 的最后一个引用(引用计数达到零)。在这种情况下,堆分配的 int 的删除将发生在线程 T2 中。否则,删除将发生在调用 std::atomic_store() 时所在的线程 T1 中。因此,答案是 int 的删除可以发生在两个线程中。

原子引用

到目前为止,您已经看到了 std::atomic_flag 和具有许多有用特化的 std::atomic<>std::atomic 可以特化为指针(例如 std::atomic<T*>),但您还没有看到如何将原子与引用类型一起使用。

不可能写出 std::atomic<T&>;相反,标准库为我们提供了一个名为 std::atomic_ref 的模板。

std::atomic_ref 模板是在 C++20 中引入的。它的接口与 std::atomic 相同,之所以使用单独的名称是为了避免影响使用 std::atomic<T> 的现有泛型代码。

原子引用允许我们对我们引用的非原子对象执行原子操作。当我们引用由客户端或某些不提供内部同步对象的第三方代码提供的对象时,这会很方便。我们将通过一个示例来演示原子引用的用处。

示例:使用原子引用

假设我们正在编写一个函数,用于抛掷硬币指定的次数:

void flip_coin(std::size_t n, Stats& outcomes);

结果累计在 Stats 类型的 outcomes 对象中,它看起来像这样:

struct Stats {
    int heads_{};
    int tails_{};
};

std::ostream& operator<<(std::ostream& os, const Stats &s) {
    os << "heads: " << s.heads_ << ", tails: " << s.tails_;
    return os;
}

客户端可以使用相同的 Stats 实例多次调用 flip_coins(),抛掷的结果会添加到 Stats 中:

auto outcomes = Stats{};
flip_coin(30, outcomes);
flip_coin(10, outcomes);

假设我们想要并行化 flip_coin() 的实现,并让多个线程修改 Stats 对象。此外,我们可以假设以下几点:

  • Stats 结构不能被修改(可能来自第三方库)。
  • 我们希望客户端不知道我们的实用函数 flip_coin() 是并发的;即 flip_coin() 函数的并发性应该对调用者完全透明。

对于这个示例,我们将重用我们先前定义的用于生成随机数的函数:

int random_int(int min, int max); // See implementation above

现在我们准备定义 flip_coin() 函数,它将使用两个线程来抛掷硬币 $n$ 次:

void flip_coin(std::size_t n, Stats &outcomes) {
    auto flip = [&outcomes](auto n) {
        auto heads = std::atomic_ref<int>{outcomes.heads_};
        auto tails = std::atomic_ref<int>{outcomes.tails_};
        for (auto i = 0u; i < n; ++i) {
            random_int(0, 1) == 0 ? ++heads : ++tails;
        }
    };
    auto t1 = std::jthread{flip, n / 2};      // First half
    auto t2 = std::jthread{flip, n - (n / 2)}; // The rest
}

两个线程在抛掷硬币后都会更新非原子的 outcome 对象。我们没有使用 std::mutex,而是创建了两个 std::atomic_ref<int> 变量来原子地更新 outcome 对象的成员。重要的是要记住,为了保护 heads 和 tails 计数器免受数据竞争,所有对计数器的并发访问都需要使用 std::atomic_ref 来保护。

下面的小程序演示了可以调用 flip_coin() 函数,而无需了解 flip_coin() 的并发实现:

int main() {
    auto stats = Stats{};
    flip_coin(5000, stats); // Flip 5000 times
    std::cout << stats << '\n';
    assert((stats.tails_ + stats.heads_) == 5000);
}

在我的机器上运行此程序产生了以下输出:

heads: 2592, tails: 2408

这个示例结束了我们关于 C++ 中各种原子类模板的部分。原子自 C++11 以来一直是标准库的一部分,并持续发展。C++20 引入了:

  • 特化 std::atomic<std::shared_ptr<T>>
  • 原子引用;即 std::atomic_ref<T> 模板
  • 等待和通知 API,这是使用条件变量的轻量级替代方案

现在,我们将继续讨论 C++ 内存模型及其与原子和并发编程的关系。

C++ 内存模型 (The C++ Memory Model)

为什么在讨论并发的章节中要谈论 C++ 内存模型?内存模型与并发紧密相关,因为它定义了内存的读写操作如何在线程之间保持可见性。这是一个相当复杂的主题,涉及编译器优化和多核计算机架构。不过,好消息是,如果您的程序没有数据竞争,并且您使用了原子库默认提供的内存顺序,那么您的并发程序的行为将遵循一个直观、易于理解的内存模型。尽管如此,了解内存模型是什么以及默认内存顺序保证了什么仍然非常重要。

本节涵盖的概念在 Herb Sutter 的演讲《原子武器:C++ 内存模型和现代硬件 1 \& 2》(Atomic Weapons: The C++ Memory Model and Modern Hardware 1 & 2)中得到了透彻的解释。这些演讲可以在其网站上免费获取,如果您需要深入了解这个主题,强烈推荐观看。

指令重排序 (Instruction Reordering)

要理解内存模型的重要性,首先需要一些关于我们编写的程序实际如何执行的背景知识。

当我们编写和运行一个程序时,我们自然会假设源代码中的指令将按照它们在源代码中出现的顺序执行。事实并非如此。 我们编写的代码在最终执行之前会经过多个阶段的优化。编译器和硬件都会对指令进行重排序,目的是更高效地执行程序。这不是新技术:编译器长期以来一直在这样做,这也是优化构建比非优化构建运行得更快的原因之一。只要重排序在程序运行时是不可观察的,编译器(和硬件)就可以自由地对指令进行重排序。程序运行起来就像所有事情都按照程序顺序发生一样。

我们来看一个示例代码片段:

int a = 10;      // 1
std::cout << a;  // 2
int b = a;       // 3
std::cout << b;  // 4
// 观察到的输出: 1010

在这里,很明显第 2 行和第 3 行可以互换,而不会引入任何可观察到的效果:

int a = 10;      // 1
int b = a;       // 3 这行上移了
std::cout << a;  // 2 这行下移了
std::cout << b;  // 4
// 观察到的输出: 1010

这是另一个示例,类似于(但不完全相同)第 4 章《数据结构》中的示例,其中编译器可以优化迭代二维矩阵时对缓存不友好的版本:

constexpr auto ksize = size_t{100};
using MatrixType = std::array<std::array<int, ksize>, ksize>;

auto cache_thrashing(MatrixType& matrix, int v) { // 1
    for (size_t i = 0; i < ksize; ++i)           // 2
        for (size_t j = 0; j < ksize; ++j)       // 3
            matrix[j][i] = v;                    // 4
}

您在第 4 章《数据结构》中看到,类似于此的代码会产生大量的缓存未命中,从而损害性能。编译器可以自由地通过重排序 for 语句来优化它,如下所示:

auto cache_thrashing(MatrixType& matrix, int v) { // 1
    for (size_t j = 0; j < ksize; ++j)           // 3 行上移了
        for (size_t i = 0; i < ksize; ++i)       // 2 行下移了
            matrix[j][i] = v;                    // 4
}

在执行程序时,这两种版本之间无法观察到差异,但后者运行得更快。

由编译器和硬件执行的优化(包括指令流水线、分支预测和缓存层次结构)是非常复杂且不断发展的技术。幸运的是,所有这些对原始程序的转换都可以看作是对源代码中读写操作的重排序。这也意味着无论是编译器还是硬件的某些部分执行了转换并不重要。对于 C++ 程序员来说,重要的是要知道指令可以被重排序,但不会产生任何可观察到的副作用。

如果您曾尝试调试程序的优化构建,您可能已经注意到,由于重排序,单步调试可能会很困难。因此,通过调试器,重排序在某种意义上是可观察的,但在以正常方式运行程序时,它们是不可观察的。

原子操作与内存顺序 (Atomics and Memory Orders)

在编写单线程 C++ 程序时,没有发生数据竞争的风险。我们可以愉快地编写程序,而无需意识到指令重排序。然而,对于多线程程序中的共享变量来说,情况就完全不同了。编译器(和硬件)所有优化的依据是仅对一个线程而言是真实且可观察到的情况。编译器无法知道其他线程通过共享变量能够观察到什么,因此我们程序员的工作就是通知编译器允许哪些重排序。

事实上,这正是我们使用原子变量或互斥锁来保护我们免受数据竞争时所做的事情。

当使用互斥锁保护临界区时,可以保证只有当前拥有锁的线程才能执行临界区。但是,互斥锁也在临界区周围创建了内存栅栏(Memory Fences),以通知系统在临界区边界处不允许某些重排序。在获取锁时,会添加一个获取栅栏(acquire fence);在释放锁时,会添加一个释放栅栏(release fence)。

我将用一个示例来演示这一点。想象我们有四条指令:i1、i2、i3 和 i4。它们之间没有依赖关系,因此系统可以任意重排序指令,而不会产生任何可观察到的副作用。指令 i2 和 i3 使用共享数据,因此是需要由互斥锁保护的临界区。在添加互斥锁的获取和释放后,现在有些重排序不再有效。显然,我们不能将属于临界区的指令移到临界区之外,否则它们将不再受互斥锁的保护。单向栅栏(one-way fences)确保没有指令可以移出临界区。i1 指令可以通过获取栅栏移入临界区,但不能移到释放栅栏之后。i4 指令也可以通过释放栅栏移入临界区,但不能移到获取栅栏之前。

下图显示了单向栅栏如何限制指令的重排序。没有读写指令可以越过获取栅栏之上,也没有指令可以越过释放栅栏之下:

当我们获取互斥锁时,我们创建了一个获取内存栅栏。它告诉系统,没有内存访问(读或写)可以移动到获取栅栏所在的线之上。系统可以将 i4 指令移到释放栅栏之上,越过 i3 和 i2 指令,但不能再进一步,因为有获取栅栏的限制。

现在,让我们看看原子变量而不是互斥锁。当我们在程序中使用共享原子变量时,它给我们两样东西:

  • 防止撕裂写入(torn writes): 原子变量总是原子地更新,因此读取者不可能读取到部分写入的值。
  • 通过添加足够的内存栅栏实现内存同步: 这可以防止某些指令重排序,以保证原子操作指定的特定内存顺序。

如果我们的程序没有数据竞争,并且在使用原子操作时使用了默认的内存顺序,那么 C++ 内存模型保证顺序一致性(sequential consistency)。那么什么是顺序一致性?顺序一致性保证执行结果与操作按照原始程序中指定的顺序执行的结果相同。线程之间的指令交错是任意的;也就是说,我们无法控制线程的调度。这听起来可能很复杂,但这可能正是您已经思考并发程序执行方式的方式。

顺序一致性的缺点是它会损害性能。因此,可以使用具有宽松内存模型(relaxed memory model)的原子操作来代替。这意味着您只能获得防止撕裂写入的保护,而没有顺序一致性提供的内存顺序保证。

我强烈建议您不要使用默认顺序一致性内存顺序以外的任何东西,除非您对较弱内存模型可能引入的影响有非常透彻的理解。

我们在此将不再进一步讨论宽松内存顺序,因为它超出了本书的范围。但作为旁注,您可能感兴趣的是,std::shared_ptr 中的引用计数器在递增计数器时使用宽松模型(但在递减计数器时不使用)。这就是为什么 std::shared_ptr 成员函数 use_count() 在多线程环境中使用时只报告近似而非实际的引用数量。

内存模型和原子操作高度相关的一个领域是无锁编程(lock-free programming)。下一节将让您初步了解什么是无锁编程及其一些应用。

无锁编程

无锁编程非常困难。我们不会在本书中花费大量时间讨论无锁编程,而是会为您提供一个非常简单的无锁数据结构如何实现的示例。在网络和书籍中(例如前面提到的 Anthony Williams 的书),有大量专门关于无锁编程的资源,它们将解释您在编写自己的无锁数据结构之前需要理解的概念。您可能听说过的一些概念,例如比较并交换(Compare-and-Swap, CAS)和 ABA 问题,将不会在本书中进一步讨论。

示例:一个无锁队列

在这里,您将看到一个无锁队列的示例,它是一个相对简单但很有用的无锁数据结构。无锁队列可用于与那些不能使用锁来同步访问共享数据的线程进行单向通信。

它的实现非常简单,因为要求有限:它只支持一个读取线程和一个写入线程。队列的容量也是固定的,不能在运行时改变。

无锁队列是在通常放弃异常的环境中可能使用的组件的一个例子。因此,下面的队列设计为没有异常,这使得 API 与本书中的其他示例有所不同。

类模板 LockFreeQueue<T> 具有以下公共接口:

  • push(): 将一个元素添加到队列并成功返回 true。此函数必须仅由(唯一一个)写入线程调用。为了避免客户端提供右值时进行不必要的复制,push()const T&T&& 进行了重载。这种技术也用于本章前面介绍的 BoundedBuffer 类中。
  • pop(): 返回一个包含队列前端元素的 std::optional<T>,除非队列为空。此函数必须仅由(唯一一个)读取线程调用。
  • size(): 返回队列的当前大小。此函数可以由两个线程并发调用。

以下是该队列的完整实现:

template <class T, size_t N>
class LockFreeQueue {
    std::array<T, N> buffer_{};         // Used by both threads
    std::atomic<size_t> size_{0};       // Used by both threads
    size_t read_pos_{0};                // Used by reader thread
    size_t write_pos_{0};               // Used by writer thread

    static_assert(std::atomic<size_t>::is_always_lock_free);

    bool do_push(auto&& t) { // Helper function
        if (size_.load() == N) {
            return false;
        }
        buffer_[write_pos_] = std::forward<decltype(t)>(t);
        write_pos_ = (write_pos_ + 1) % N;
        size_.fetch_add(1);
        return true;
    }

public:
    // Writer thread
    bool push(T&& t) { return do_push(std::move(t)); }
    bool push(const T& t) { return do_push(t); }

    // Reader thread
    auto pop() -> std::optional<T> {
        auto val = std::optional<T>{};
        if (size_.load() > 0) {
            val = std::move(buffer_[read_pos_]);
            read_pos_ = (read_pos_ + 1) % N;
            size_.fetch_sub(1);
        }
        return val;
    }

    // Both threads can call size()
    auto size() const noexcept { return size_.load(); }
};

唯一需要原子访问的数据成员是 size_ 变量。read_pos_ 成员仅由读取线程使用,而 write_pos_ 仅由写入线程使用。那么类型为 std::arraybuffer_ 呢?它是可变的并且被两个线程访问?这不需要同步吗?由于该算法确保两个线程永远不会同时访问数组中的同一元素,C++ 保证数组中的单个元素可以在没有数据竞争的情况下被访问。元素有多小并不重要;即使是 char 数组也持有这个保证。

像这样的非阻塞队列何时有用?一个例子是在音频编程中,当主线程上运行的用户界面需要从实时音频线程发送或接收数据时,而音频线程在任何情况下都不能被阻塞。实时线程不能使用互斥锁、分配/释放内存,或做任何可能导致线程等待优先级较低线程的事情。对于像这样的场景,无锁数据结构是必需的。

LockFreeQueue 中的读取器和写入器都是无锁的,因此我们可以使用队列的两个实例在主线程和音频线程之间进行双向通信,如下图所示:

如前所述,本书只是触及了无锁编程的表面。现在是时候结束本章,并提供一些编写并发程序时的性能指南了。

优化性能指南

我怎么强调都不为过的是,在尝试提高性能之前,必须确保您的并发程序正确运行。此外,在应用任何这些与性能相关的指南之前,您首先需要建立一个可靠的测量方法来衡量您试图改进的内容。

避免竞争

每当多个线程使用共享数据时,就会发生竞争(Contention)。竞争会损害性能,有时竞争导致的开销会使并行算法比单线程替代方案运行得更慢。

使用会导致等待和上下文切换的锁是明显的性能损失,但不那么明显的是,锁和原子操作都会禁用编译器生成的代码中的优化,并且在 CPU 执行代码时在运行时也会这样做。这是保证顺序一致性所必需的。但请记住,解决此类问题的办法绝不是忽略同步,从而引入数据竞争。数据竞争意味着未定义行为,一个快速但不正确的程序不会让任何人满意。

相反,我们需要最小化花费在临界区的时间。我们可以通过减少进入临界区的次数来实现,并通过最小化临界区本身,以便一旦进入,就尽快离开。

避免阻塞操作

要编写一个始终流畅运行的现代响应式 UI 应用程序,绝对有必要主线程阻塞时间不超过几毫秒。一个流畅运行的应用程序每秒更新其界面 60 次。这意味着,如果您正在执行的操作阻塞了 UI 线程超过 16 ms,帧率(FPS)就会下降。

您可以在应用程序中设计内部 API 时考虑到这一点。无论何时编写执行 I/O 或其他可能需要几毫秒以上时间的操作的函数,都需要将其实现为异步函数。这种模式在 iOS 和 Windows 中变得非常普遍,例如,所有网络 API 都已成为异步的。

线程数量/CPU 核心数

机器拥有的 CPU 核心越多,您可以拥有的活动运行线程就越多。如果您设法将一个顺序的 CPU 密集型任务拆分成并行版本,您可以通过让多个核心并行处理该任务来获得性能提升。

在最好的情况下,从单线程算法到可以由两个线程运行的算法,性能可以提高一倍。但是,在添加越来越多的线程之后,您最终会达到一个极限,届时将不再有性能提升。超过该极限添加更多线程实际上会降低性能,因为随着添加的线程越多,上下文切换导致的开销就变得越显著。

I/O 密集型任务,例如网络爬虫,会花费大量时间等待网络数据,它们需要在达到 CPU 资源超额分配的极限之前需要大量线程。一个等待 I/O 的线程很可能会从 CPU 中切换出去,为准备执行的其他线程腾出空间。对于 CPU 密集型任务,使用的线程数通常没有必要超过机器上的核心数。

控制大型程序中的线程总数可能很困难。控制线程数量的一个好方法是使用线程池(thread pool),其大小可以调整以匹配当前的硬件。

在第 14 章《并行算法》中,您将看到如何并行化算法以及如何根据 CPU 核心数调整并发量的示例。

线程优先级

线程的优先级会影响线程的调度方式。优先级高的线程比优先级低的线程更有可能被更频繁地调度。线程优先级对于降低任务的延迟很重要。

操作系统提供的线程通常具有优先级。目前,C++ 线程 API 无法设置线程的优先级。但是,通过使用 std::thread::native_handle,您可以获取底层操作系统线程的句柄,并使用原生 API 来设置优先级。

一个与线程优先级相关且可能损害性能并应避免的现象称为优先级反转(priority inversion)。它发生在高优先级线程正在等待获取一个当前被低优先级线程持有的锁时。这种依赖关系损害了高优先级线程,该线程会被阻塞,直到低优先级线程下次被调度,以便它可以释放锁。

对于实时应用程序,这是一个大问题。实际上,这意味着您不能使用锁来保护需要被实时线程访问的任何共享资源。例如,一个生成实时音频的线程以最高的优先级运行,为了避免优先级反转,音频线程不能调用任何可能阻塞并导致上下文切换的函数(包括 std::malloc())。

线程亲和性

线程亲和性(Thread affinity)使您可以向调度程序提供提示,告知哪些线程可以从共享相同的 CPU 缓存中受益。换句话说,这是向调度程序提出的一个请求,即如果可能,某些线程应在特定核心上执行,以最小化缓存未命中。

为什么会希望一个线程在特定核心上执行?答案(又是)是缓存。操作相同内存的线程可以从在同一核心上运行中受益,从而利用热缓存(warm caches)。对于调度程序来说,这只是在将线程分配给核心时要考虑的众多参数之一,因此这几乎不能保证,而且不同操作系统之间的行为差异很大。线程优先级,甚至所有核心的利用率(以避免过热),都是现代调度程序需要考虑的要求之一。

目前无法使用 C++ API 以可移植的方式设置线程亲和性,但大多数平台都支持通过某种方式在线程上设置亲和性掩码(affinity mask)。为了访问平台特定的功能,您需要获取原生线程的句柄。下面的示例演示了如何在 Linux 上设置线程亲和性掩码:

#include <pthreads> // Non-portable header

auto set_affinity(const std::thread& t, int cpu) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);

    pthread_t native_thread = t.native_handle();
    pthread_set_affinity(native_thread, sizeof(cpu_set_t), &cpuset);
}

请注意,这不是可移植的 C++,但如果您正在进行性能关键的并发编程,您可能需要进行一些不可移植的线程配置。

伪共享

伪共享(False sharing),或称破坏性干扰(destructive interference),可以显著降低性能。它发生在两个线程使用一些数据(这些数据在逻辑上不共享),但碰巧位于同一个缓存行(cache line)中。想象一下,如果这两个线程在不同的核心上执行,并不断更新驻留在共享缓存行上的变量,会发生什么。尽管线程之间没有真正的数据共享,但线程会互相使彼此的缓存行失效。

伪共享最有可能在使用全局数据或动态分配的、在线程之间共享的数据时发生。一个可能发生伪共享的例子是分配一个在线程之间共享的数组,但每个线程只使用数组中的一个元素。

解决这个问题的方法是填充(pad)数组中的每个元素,以便两个相邻的元素不能驻留在同一个缓存行上。自 C++17 起,有一种可移植的方法可以做到这一点,即使用 <new> 中定义的 std::hardware_destructive_interference_size 常量,并结合 alignas 说明符。下面的示例演示了如何创建一个防止伪共享的元素:

struct alignas(std::hardware_destructive_interference_size) Element {
    int counter_{};
};

auto elements = std::vector<Element>(num_threads);

现在,可以保证 vector 中的元素驻留在不同的缓存行上。

总结

在本章中,您了解了如何创建可以并发执行多个线程的程序。我们还介绍了如何通过使用锁或原子操作保护临界区来避免数据竞争。您了解到 C++20 带来了一些有用的同步原语:栅栏(latches)、屏障(barriers)和信号量(semaphores)。然后,我们研究了执行顺序和 C++ 内存模型,这对于编写无锁程序变得非常重要。您还发现不可变数据结构是线程安全的。本章以一些关于改进并发应用程序性能的指南作为结束。

接下来的两章将致力于一个全新的 C++20 功能——协程(coroutines),它允许我们以顺序的方式编写异步代码。