第 9 章 构建我们自己的锁

来自 《Rust Atomics and Locks》 的翻译文章,英文原文https://marabos.nl/atomics/building-locks.html在新窗口打开

在本章中,我们将构建自己的互斥锁、条件变量和读写锁。对于每个版本,我们将从一个非常基本的版本开始,然后对其进行扩展以使其更加高效。

由于我们不打算使用标准库中的锁类型,我们将不得不使用第8章中的工具,以便能够让线程在没有忙循环的情况下等待。然而,正如我们在那一章中所看到的,操作系统提供的可用工具在每个平台上都有很大的不同,这使得我们很难建立一个能跨平台工作的工具。

幸运的是,大多数现代操作系统都支持类似 futex 的功能,或者至少支持唤醒和等待操作。正如我们在第 8 章中看到的,Linux 自 2003 年以来通过 futex 系统调用支持它们,Windows 自 2012 年以来通过 WaitOnAddress 系列函数支持它们,FreeBSD 自 2016 年以来作为 _umtx_op 系统调用的一部分,等等。

最明显的例外是 macOS。虽然它的内核确实支持这些操作,但它并没有通过我们可以使用的任何稳定的、公开可用的 C 函数公开。然而,macOS 确实附带了最新版本的 libc++,它是 C++ 标准库的实现。该库包含对 C++20 的支持,这是 C++ 的版本,内置了对非常基本的原子等待和唤醒操作(如 std::atomic<T>::wait() )的支持。虽然由于各种原因,使用 Rust 的功能有点棘手,但这当然是可能的,让我们也可以在 macOS 上访问基本的类似 futex 的等待和唤醒功能。

我们不会深入研究肮脏的细节,而是利用 crates.io 的 atomic-wait crate为我们提供锁定原语的构建块。这个crate只提供三个函数: wait()wake_one()wake_all() 。它使用我们上面讨论的各种特定于平台的实现,为所有主要平台实现这些。这意味着我们不再需要考虑任何特定于平台的细节,只要我们坚持这三个功能即可。

这些函数的行为类似于我们在第 8 章的“Futex”中为 Linux 实现的同名函数,但让我们快速回顾一下它们是如何工作的:

  • wait(&AtomicU32, u32)

    该函数用于等待原子变量不再包含给定值。如果存储在原子变量中的值等于给定值,它将阻塞。当另一个线程修改原子变量的值时,该线程需要对同一个原子变量调用下面的唤醒函数之一,以将等待线程从睡眠中唤醒。

    该函数可能会错误地返回,而没有相应的唤醒操作。因此,请确保在原子变量返回后检查其值,并在必要时重复 wait() 调用。

  • wake_one(&AtomicU32)

    这会唤醒当前在同一原子变量的 wait() 上阻塞的单个线程。修改原子变量后立即使用此命令,以通知一个等待线程这一更改。

  • wake_all(&AtomicU32)

    这会唤醒当前在同一原子变量上 wait() 上阻塞的所有线程。修改原子变量后立即使用此命令,以通知等待线程更改。

仅支持 32 位原子,因为这是所有主要平台都支持的唯一大小。

提示

第 8 章的“Futex”中,我们讨论了一个非常简单的示例,展示了如何在实践中使用这些函数。如果您忘记了,请确保在继续之前查看该示例。

要使用 atomic-wait crate,请将 atomic-wait = "1" 添加到 Cargo.toml[dependencies] 部分;或者运行 cargo add atomic-wait@1 ,它​​会为你做到这一点。这三个函数定义在 crate 的根目录中,可以使用 use atomic_wait::{wait, wake_one, wake_all}; 导入。

提示

当您阅读本文时,可能已经有此 crate 的更高版本,但本章仅制作了主要版本 1。更高版本可能没有兼容的接口。

现在我们已经准备好了基本的构建块,让我们开始吧。

互斥锁

在构建Mutex<T>时,我们将以第 4 章中的SpinLock<T>类型作为参考。不涉及阻塞的部分,比如说守护类型的设计,将保持不变。

让我们从类型定义开始。与自旋锁相比,我们必须进行一项更改:我们将使用设置为 0 或 1 的 AtomicU32 ,而不是设置为 false 或 true 的 AtomicBool ,这样我们就可以用原子等待和唤醒函数来使用它。

pub struct Mutex<T> {
    /// 0: unlocked
    /// 1: locked
    state: AtomicU32,
    value: UnsafeCell<T>,
}

就像自旋锁一样,我们需要保证 Mutex<T> 可以在线程之间共享,即使它包含可怕的 UnsafeCell

unsafe impl<T> Sync for Mutex<T> where T: Send {}

我们还将添加一个实现 Deref 特征的 MutexGuard 类型,以提供完全安全的锁定接口,就像我们在第 4 章的“使用锁守卫的安全接口”中所做的那样:

pub struct MutexGuard<'a, T> {
    mutex: &'a Mutex<T>,
}

impl<T> Deref for MutexGuard<'_, T> {
    type Target = T;
    fn deref(&self) -> &T {
        unsafe { &*self.mutex.value.get() }
    }
}

impl<T> DerefMut for MutexGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut *self.mutex.value.get() }
    }
}

提示

有关锁守卫类型的设计和操作,请参阅第 4 章中的“使用锁守卫的安全接口”

在我们继续有趣的部分之前,让我们先了解一下 Mutex::new 函数。

impl<T> Mutex<T> {
    pub const fn new(value: T) -> Self {
        Self {
            state: AtomicU32::new(0), // unlocked state
            value: UnsafeCell::new(value),
        }
    }}

现在我们已经解决了所有问题,还剩下两部分:锁定( Mutex::lock() )和解锁( Drop for MutexGuard<T> )。

我们为自旋锁实现的锁定函数使用原子交换操作来尝试获取锁,如果成功将状态从“解锁”更改为“锁定”,则返回。如果不成功,它会立即重试。

要锁定互斥锁,我们将执行几乎相同的操作,只不过我们使用 wait() 等待后再尝试:

  pub fn lock(&self) -> MutexGuard<T> {
        // Set the state to 1: locked.
        while self.state.swap(1, Acquire) == 1 {
            // If it was already locked..
            // .. wait, unless the state is no longer 1.
            wait(&self.state, 1);
        }
        MutexGuard { mutex: self }
    }

提示

对于内存顺序,同样的推理适用于我们的自旋锁。有关详细信息,请参阅第 4 章

请注意 wait() 函数只有在我们调用它时状态仍设置为 1(锁定)时才会阻塞,这样我们就不必担心在交换和等待调用之间错过唤醒的可能性。

守卫类型的 Drop 实现负责解锁互斥锁。解锁我们的自旋锁很简单:只需将状态设置回 false (解锁)。然而,对于我们的互斥锁来说,这还不够。如果有一个线程等待锁定互斥锁,它不会知道互斥锁已被解锁,除非我们使用唤醒操作通知它。如果我们不唤醒它,它很可能会永远沉睡。 (也许它很幸运,在正确的时间被虚假地唤醒,但我们不要指望这一点。)

因此,我们不仅会将状态设置回 0(解锁),还会立即调用 wake_one()

impl<T> Drop for MutexGuard<'_, T> {
    fn drop(&mut self) {
        // Set the state back to 0: unlocked.
        self.mutex.state.store(0, Release);
        // Wake up one of the waiting threads, if any.
        wake_one(&self.mutex.state);
    }
}

唤醒一个线程就足够了,因为即使有多个线程在等待,也只有其中一个能够声明锁。下一个锁定它的线程将在完成锁定后唤醒另一个线程,依此类推。一次唤醒多个线程只会让这些线程失望,浪费宝贵的处理器时间,因为除了其中一个线程之外,所有线程都意识到自己的锁定机会已被另一个幸运的线程夺走了,然后才再次进入睡眠状态。

请注意,不能保证我们唤醒的一个线程能够获取锁。另一个线程可能仍然在获得机会之前抓住锁。

这里需要注意的是,在没有等待和唤醒函数的情况下,该互斥锁实现在技术上仍然是正确的(即内存安全)。因为 wait() 操作可能会被虚假唤醒,所以我们无法对它何时返回做出任何假设。我们仍然必须自己管理锁定原语的状态。如果我们要删除等待和唤醒函数调用,我们的互斥锁将基本上与我们的自旋锁相同。

一般来说,从内存安全的角度来看,原子等待和唤醒函数永远不会影响正确性。它们只是为了避免忙循环而进行的(非常认真的)优化。这并不意味着按照任何实际标准,无法使用的低效锁都会被视为“正确”,但在尝试推理 unsafe Rust 代码时,这种见解可能会很有帮助。

锁API

如果您打算将实现 Rust 锁作为一种新的爱好,您可能很快就会对提供安全接口所涉及的样板代码感到厌倦。即 UnsafeCellSync 实现、守卫类型、 Deref 实现等等。

crates.io 上的 lock_api crate 可用于自动处理所有这些事情。您只需创建一个表示锁定状态的类型,并通过(不安全) lock_api::RawMutex 特征提供(不安全)锁定和解锁功能。作为回报, lock_api::Mutex 类型将根据您的锁实现为您提供完全安全且符合人体工程学的互斥锁类型,包括互斥锁防护。

避免系统调用

到目前为止,互斥锁中最慢的部分是等待和唤醒,因为它们(可能)导致系统调用,即对操作系统内核的调用。像这样与内核对话是一个相当复杂的过程,而且往往非常慢,特别是与原子操作相比。因此,对于高性能互斥锁实现,我们应该尽可能避免等待和唤醒调用。

幸运的是,我们已经成功了一半。因为我们的锁定函数中的 while 循环会检查 wait() 调用之前的状态,所以在互斥锁未锁定时,在我们不需要等待的情况下,将完全跳过等待操作。但是,我们在解锁时无条件调用 wake_one() 函数。

如果我们知道没有其他线程在等待,我们可以跳过 wake_one() 。要知道是否有等待线程,我们需要自己跟踪这些信息。

我们可以通过将“锁定”状态分为两个单独的状态来做到这一点:“无服务员锁定”和“有服务员锁定”。我们将使用值 1 和 2,并更新结构定义中 state 字段的文档注释:

pub struct Mutex<T> {
    /// 0: unlocked
    /// 1: locked, no other threads waiting
    /// 2: locked, other threads waiting
    state: AtomicU32,
    value: UnsafeCell<T>,
}

现在,对于未锁定的互斥锁,我们的锁定函数仍然需要将状态设置为 1 来锁定它。然而,如果它已经被锁定,我们的锁定函数现在需要在进入睡眠之前将状态设置为 2,以便解锁函数可以知道有一个等待线程。

为此,我们首先使用比较和交换函数尝试将状态从 0 更改为 1。如果成功,我们就锁定了互斥锁,并且我们知道没有其他等待者,因为互斥锁之前没锁。如果失败,则一定是因为互斥锁当前已锁定(处于状态 1 或 2)。在这种情况下,我们将使用原子交换操作将其设置为 2。如果该交换操作返回旧值 1 或 2,则意味着互斥锁确实仍处于锁定状态,只有那时我们才使用 wait() 来阻止直到它发生变化。如果交换操作返回 0,则意味着我们已成功锁定互斥锁,将其状态从 0 更改为 2。

   pub fn lock(&self) -> MutexGuard<T> {
        if self.state.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
            while self.state.swap(2, Acquire) != 0 {
                wait(&self.state, 2);
            }
        }
        MutexGuard { mutex: self }
    }

现在,我们的解锁函数可以在不需要时跳过 wake_one() 调用来利用新信息。现在,我们将使用交换操作来查看其先前的值,而不是仅仅存储 0 来解锁互斥锁。只有当该值为 2 时,我们才会继续唤醒线程:

impl<T> Drop for MutexGuard<'_, T> {
    fn drop(&mut self) {
        if self.mutex.state.swap(0, Release) == 2 {
            wake_one(&self.mutex.state);
        }
    }
}

请注意,将状态设置回零后,不再指示是否有任何等待线程。被唤醒的线程负责将状态设置回 2,以确保任何其他等待线程不会被忘记。这就是为什么比较和交换操作不是我们的锁定函数中 while 循环的一部分。

这确实意味着每次线程在锁定时必须调用 wait() ,解锁时它也会调用 wake_one() ,即使这不是必要的。然而,最重要的是,在无竞争的情况下,即线程不同时尝试获取锁的理想情况,wait()wake_one() 调用都完全避免。

图 9-1 可视化了两个线程同时尝试锁定 Mutex 的情况下的操作和先行发生关系。第一个线程通过将状态从 0 更改为 1 来锁定互斥锁。此时,第二个线程将无法获取锁,因此在将状态从 1 更改为 2 后进入睡眠状态。当第一个线程解锁互斥锁时, Mutex ,它将状态交换回0。因为它是2,表示正在等待线程,所以它调用 wake_one() 来唤醒第二个线程。请注意我们如何不依赖于唤醒和等待操作之间的任何先行发生关系。虽然唤醒操作很可能是负责唤醒等待线程的操作,但先行发生关系是通过获取交换操作观察释放交换操作存储的值来建立的。

图 9-1。两个线程之间的happens-before关系同时尝试锁定我们的 Mutex 。

进一步优化

在这一点上,我们似乎没有什么可以进一步优化的了。在无竞争的情况下,我们执行零系统调用,剩下的只是两个非常简单的原子操作。

避免等待和唤醒操作的唯一方法是回到我们的旋转锁实现。虽然自旋通常是非常低效的,但它至少可以避免系统调用的潜在开销。只有在等待时间很短的情况下,自旋才会更有效率。

对于锁定互斥体,这种情况仅发生在当前持有锁的线程在不同的处理器内核上并行运行,并且只会非常短暂地保留锁的情况下。然而,这是一个非常常见的情况。

我们可以尝试结合两种方法的优点,在调用 wait() 之前先自旋很短的时间。这样,如果锁释放得非常快,我们根本不需要调用 wait() ,但我们仍然可以避免消耗不合理的处理器时间,而其他线程可以更好地利用这些时间。

实现这一点只需要更改我们的锁定功能。

为了在无竞争的情况下保持尽可能高的性能,我们将在锁定函数开始时保留原始的比较和交换操作。我们将把自旋留给一个单独的函数。

impl<T> Mutex<T> {pub fn lock(&self) -> MutexGuard<T> {
        if self.state.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
            // The lock was already locked. :(
            lock_contended(&self.state);
        }
        MutexGuard { mutex: self }
    }
}

fn lock_contended(state: &AtomicU32) {}

lock_contended 中,我们可以简单地重复相同的比较和交换操作数百次,然后再继续等待循环。 然而,比较和交换操作通常会尝试获得对相关缓存行的独占访问(请参阅第 7 章中的“MESI 协议”),这在重复执行时可能比简单的加载操作更昂贵。

考虑到这一点,我们得出以下 lock_contended 实现:

fn lock_contended(state: &AtomicU32) {
    let mut spin_count = 0;

    while state.load(Relaxed) == 1 && spin_count < 100 {
        spin_count += 1;
        std::hint::spin_loop();
    }

    if state.compare_exchange(0, 1, Acquire, Relaxed).is_ok() {
        return;
    }

    while state.swap(2, Acquire) != 0 {
        wait(state, 2);
    }
}

首先,我们自旋最多 100 次,使用自旋循环提示,就像我们在第 4 章中所做的那样。我们只在互斥锁被锁定并且没有等待者的情况下自旋。如果另一个线程已经在等待,则意味着它放弃了自旋,因为它花费了太长时间,这可能表明自旋对于该线程可能也不是很有用。

提示

100个循环的自旋持续时间大多是任意选择的。迭代所需的时间和系统调用的持续时间(我们试图避免)在很大程度上取决于平台。广泛的基准测试可以帮助选择正确的数字,但不幸的是没有一个正确的答案。

Rust 标准库中 std::sync::Mutex 的 Linux 实现(至少在 Rust 1.66.0 中)使用的旋转计数为 100。

锁定状态更改后,我们再次尝试将其设置为 1 来锁定它,然后放弃并开始等待。正如我们之前讨论的,在调用 wait() 之后,我们不能再通过将互斥锁的状态设置为 1 来锁定互斥锁,因为这可能会导致其他等待者被遗忘。

冷属性和内联属性(Cold and Inline Attributes)

您可以将 #[cold] 属性添加到 lock_contended 函数定义中,以帮助编译器了解在常见(无争用)情况下不会调用此函数,这有助于优化 lock 方法。

此外,您可以将 #[inline] 属性添加到 MutexMutexGuard 方法中,以通知编译器内联它们可能是一个好主意:将直接在调用该方法的地方生成指令。一般来说,这是否会提高性能很难说,但对于像这样的非常小的函数,通常会提高性能。

基准测试

测试一个互斥锁的性能是很难的。写一个基准测试并得到一些数字很容易,但要得到任何有意义的数字却很难。

优化一个互斥锁的实现,使其在特定的基准测试中表现良好,是相对容易的,但不是很有用。毕竟,问题的关键是要让它在实际程序中表现良好,而不仅仅是在测试程序中。

我们将尝试编写两个简单的基准测试,表明我们的优化至少对一些用例产生了一些积极的影响,但请记住,任何结论在不同的场景下都不一定成立。

对于我们的第一个测试,我们将创建一个 Mutex 并锁定和解锁它几百万次,所有这些都在同一个线程上,测量所需的总时间。这是对简单的无竞争场景的测试,其中永远没有任何线程需要被唤醒。希望这将向我们展示 2 状态和 3 状态版本之间的显着差异。

fn main() {
    let m = Mutex::new(0);
    std::hint::black_box(&m);
    let start = Instant::now();
    for _ in 0..5_000_000 {
        *m.lock() += 1;
    }
    let duration = start.elapsed();
    println!("locked {} times in {:?}", *m.lock(), duration);
}

提示

我们使用 std::hint::black_box (就像我们在第 7 章“对性能的影响”中所做的那样)强制编译器假设可能有更多代码访问互斥体,从而防止它优化循环或锁定操作。

结果将根据硬件和操作系统的不同而有很大差异。在一台配备最新 AMD 处理器的特定 Linux 计算机上尝试此操作,未优化的 2 状态互斥体的总时间约为 400 毫秒,而更优化的 3 状态互斥体的总时间约为 40 毫秒。十倍的改进!在另一台配备较旧 Intel 处理器的 Linux 计算机上,差异甚至更大:大约 1800 毫秒对 60 毫秒。这证实了第三状态的添加确实可以是一个非常重要的优化。

然而,在运行 macOS 的计算机上运行它会产生完全不同的结果:两个版本都大约 50 毫秒,这表明它都高度依赖于平台。

事实证明,我们在 macOS 上使用的 libc++ 的 std::atomic<T>::wake() 实现已经执行了自己的记录,独立于内核,以避免不必要的系统调用。 Windows 上的 WakeByAddressSingle() 也是如此。

避免调用这些函数仍然可以带来稍微更好的性能,因为它们的实现远非微不足道,特别是因为它们无法在原子变量本身中存储任何信息。然而,如果我们只针对这些操作系统,我们应该质疑向互斥体添加第三个状态是否真的值得付出努力。

为了看看我们的自旋优化是否产生了任何积极的影响,我们需要一个不同的基准测试:一个有大量争用的测试,多个线程反复尝试锁定已经锁定的互斥体。

让我们尝试一个场景,其中四个线程同时尝试锁定和解锁互斥体数百万次:

fn main() {
    let m = Mutex::new(0);
    std::hint::black_box(&m);
    let start = Instant::now();
    thread::scope(|s| {
        for _ in 0..4 {
            s.spawn(|| {
                for _ in 0..5_000_000 {
                    *m.lock() += 1;
                }
            });
        }
    });
    let duration = start.elapsed();
    println!("locked {} times in {:?}", *m.lock(), duration);
}

请注意,这是一个极端且不切实际的场景。互斥体仅保留极短的时间(仅增加一个整数),并且线程在解锁后将立即尝试再次锁定互斥体。不同的场景很可能会导致截然不同的结果。

让我们像以前一样在两台 Linux 计算机上运行这个基准测试。在使用较旧的英特尔处理器的处理器上,不自旋的互斥体版本大约需要 900 毫秒,而使用自旋版本时大约需要 750 毫秒。这是一种提升!然而,在配备较新 AMD 处理器的计算机上,我们得到相反的结果:不自旋时大约 650 毫秒,自旋时大约 800 毫秒。

总之,不幸的是,关于旋转是否真的能提高性能的答案是“这取决于情况”,即使只考虑一种情况。

条件变量

让我们继续做一些更有趣的事情:实现条件变量。

正如我们在第 1 章的“条件变量”中看到的,条件变量与互斥体一起使用,以等待受互斥体保护的数据匹配某个条件。它有一个等待方法,可以解锁互斥体,等待信号,然后再次锁定同一互斥体。信号由其他线程发送,通常在修改互斥锁保护的数据后立即发送到一个等待线程(通常称为“通知一个”或“信号”)或所有等待线程(通常称为“通知所有”或“广播”) 。

虽然条件变量试图使等待线程保持睡眠状态直到收到信号,但等待线程可能会在没有相应信号的情况下被虚假唤醒。不过,条件变量的等待操作在返回之前仍会重新锁定互斥锁。

请注意,这个接口与我们类似 futex 的 wait()wake_one()wake_all() 函数几乎相同。主要区别在于用于防止丢失信号的机制。条件变量将在解锁互斥锁之前开始“监听”信号,以免错过任何信号,而我们的 futex 风格 wait() 函数依赖于检查原子变量的状态,来确保等待仍然是一个好主意。

这导致了以下条件变量的最小实现思路:如果我们确保每个通知都会更改一个原子变量(如计数器),那么我们的 Condvar::wait() 方法需要做的就是检查该变量的值在解锁互斥锁之前将其传递给 futex 风格的 wait() 函数。这样,如果在解锁互斥体后有任何通知信号到达,它就不会进入睡眠状态。

让我们尝试一下吧!

我们从一个 Condvar 结构开始,它只包含一个 AtomicU32 ,我们将其初始化为零:

pub struct Condvar {
    counter: AtomicU32,
}

impl Condvar {
    pub const fn new() -> Self {
        Self { counter: AtomicU32::new(0) }
    }}

通知方法很简单。他们只需要更改计数器并使用相应的唤醒操作来通知任何等待的线程:

pub fn notify_one(&self) {
    self.counter.fetch_add(1, Relaxed);
    wake_one(&self.counter);
}

pub fn notify_all(&self) {
    self.counter.fetch_add(1, Relaxed);
    wake_all(&self.counter);
}

(我们稍后将讨论内存顺序。)

wait 方法将采用 MutexGuard ,因为它代表锁定互斥体的证明。它还会返回 MutexGuard ,因为它会确保互斥体在返回之前再次锁定。

正如我们上面概述的,该方法将在解锁互斥体之前首先检查计数器的当前值。解锁互斥体后,它应该只在计数器没有改变的情况下等待,以确保我们没有错过任何信号。代码如下:

 pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
    let counter_value = self.counter.load(Relaxed);

    // Unlock the mutex by dropping the guard,
    // but remember the mutex so we can lock it again later.
    let mutex = guard.mutex;
    drop(guard);

    // Wait, but only if the counter hasn't changed since unlocking.
    wait(&self.counter, counter_value);

    mutex.lock()
}

提示

这利用了 MutexGuard 的私有 mutex 字段。 Rust 中的隐私基于模块,因此如果您在与 MutexGuard 不同的模块中定义它,则需要将 MutexGuard 的互斥字段标记为 pub(crate),以使其可供其他人使用crate中的模块。

在庆祝我们成功完成条件变量之前,让我们先考虑一下内存顺序。

当互斥体被锁定时,任何其他线程都不能更改受互斥体保护的数据。因此,我们不需要担心在解锁互斥体之前发出的通知,因为只要我们保持互斥体锁定,数据就不会发生任何使我们改变想要休眠和等待的想法的事情。

我们唯一感兴趣的情况是,在我们释放互斥体之后,另一个线程出现并锁定互斥体,更改受保护的数据,并向我们发出信号(希望在解锁互斥体之后)。

在这种情况下,解锁 Condvar::wait() 中的互斥体与锁定通知线程中的互斥体之间存在先行发生关系。这种先行发生关系保证了在解锁之前进行的宽松加载会在通知线程的宽松增量操作之前观察到这个值,而这个操作发生在锁定之后。

我们不知道 wait() 操作是否会在递增之前或之后看到该值,因为没有任何东西可以保证此时的任何顺序。不过,这并不重要,因为 wait() 相对于相应的唤醒操作而言是原子行为的。 要么它看到新值,在这种情况下它根本不会进入睡眠状态,要么它看到旧值,在这种情况下它进入睡眠状态并会被相应的 wake_one()wake_all() 从通知中调用。

图 9-2 显示了一种情况的操作和先行发生关系,其中一个线程使用 Condvar::wait() 等待某些受互斥锁保护的数据发生更改,并被修改数据的第二个线程唤醒,并且调用 Condvar::wake_one() 。请注意,由于解锁和锁定操作,如何保证第一个加载操作在值递增之前观察到该值。

图 9-2。使用 Condvar::wait() 的一个线程的操作和先行发生关系被另一个使用 Condvar::notify_one() 的线程唤醒。

我们还应该考虑如果计数器溢出会发生什么。

计数器的实际值并不重要,只要它在每次通知后都不同即可。不幸的是,在超过40亿条通知之后,计数器将溢出并从零开始,回到之前使用的值。从技术上讲,我们的 Condvar::wait() 实现有可能在不应该进入睡眠状态时进入睡眠状态:如果它恰好错过了 4,294,967,296 条通知(或其任意倍数),计数器就会溢出到以前的值。

我们完全可以认为这种情况发生的几率是可以忽略不计的。与我们在互斥锁方法中所做的相反,我们在这里唤醒后并没有重新检查状态和重复调用wait(),所以我们只需要担心在宽松加载计数器和调用wait()之间的时刻。 如果一个线程可以被中断很长时间,以至于允许(确切地)发生那么多通知,那么很可能已经出现了严重错误,并且程序已经变得没有响应。到那时,人们可能会合理地认为,线程处于休眠状态的微小额外风险不再重要。

提示

在支持有时间限制的 futex 式等待的平台上,可以通过对等待操作使用几秒钟的超时来减轻溢出风险。发送 40 亿条通知将花费更长的时间,此时额外几秒钟的风险影响很小。这完全消除了由于等待线程错误地永远处于休眠状态而导致程序锁定的任何风险。

让我们看看它是否有效!

#[test]
fn test_condvar() {
    let mutex = Mutex::new(0);
    let condvar = Condvar::new();

    let mut wakeups = 0;

    thread::scope(|s| {
        s.spawn(|| {
            thread::sleep(Duration::from_secs(1));
            *mutex.lock() = 123;
            condvar.notify_one();
        });

        let mut m = mutex.lock();
        while *m < 100 {
            m = condvar.wait(m);
            wakeups += 1;
        }

        assert_eq!(*m, 123);
    });

    // Check that the main thread actually did wait (not busy-loop),
    // while still allowing for a few spurious wake ups.
    assert!(wakeups < 10);
}

我们计算条件变量从其等待方法返回的次数,以确保它确实进入了睡眠状态。如果这个数字非常高,那就表明我们不小心在自旋循环了。测试这一点很重要,因为一个从不休眠的条件变量仍然会导致 "正确 "的行为,但会有效地将等待循环变成一个自旋循环。

如果我们运行这个测试,我们会发现它的编译和通过都很好,证实了我们的条件变量确实让主线程进入了睡眠状态。当然,这并不能证明其实现是正确的。如果有必要,可以用一个涉及许多线程的长时间压力测试,最好是在一个弱序处理器架构的计算机上运行,以获得更多的信心。

避免系统调用

正如我们在“互斥体:避免系统调用”中意识到的那样,优化锁定原语主要是为了避免不必要的等待和唤醒操作。

就我们的条件变量而言,尝试避免 Condvar::wait() 实现中的 wait() 调用没有多大用处。当线程决定等待条件变量时,它已经检查过它正在等待的事情尚未发生,并且需要进入睡眠状态。如果不需要等待,则根本不会调用 Condvar::wait()

但是,如果没有等待线程,我们可以避免 wake_one()wake_all() 调用,类似于我们对 Mutex 所做的操作。

执行此操作的一个简单方法是跟踪等待线程的数量。我们的等待方法需要在等待之前增加它,并在完成时减少它。然后,如果该数字为零,我们的通知方法可以跳过发送信号。

因此,我们在 Condvar 结构中添加一个新字段来跟踪活跃等待者的数量:

pub struct Condvar {
    counter: AtomicU32,
    num_waiters: AtomicUsize, // New!
}

impl Condvar {
    pub const fn new() -> Self {
        Self {
            counter: AtomicU32::new(0),
            num_waiters: AtomicUsize::new(0), // New!
        }
    }}

通过使用 AtomicUsize 代替 num_waiters ,我们不必担心它溢出。 usize 足够大,可以计算内存中的每个字节,因此,如果我们假设每个活动线程至少占用一个字节的内存,那么它绝对足够大,可以计算任意数量的并发存在线程。

接下来,我们更新通知函数,如果没有等待者,则不执行任何操作:

  pub fn notify_one(&self) {
        if self.num_waiters.load(Relaxed) > 0 { // New!
            self.counter.fetch_add(1, Relaxed);
            wake_one(&self.counter);
        }
    }

    pub fn notify_all(&self) {
        if self.num_waiters.load(Relaxed) > 0 { // New!
            self.counter.fetch_add(1, Relaxed);
            wake_all(&self.counter);
        }
    }

(我们稍后将讨论内存顺序。)

最后,最重要的是,我们在 wait 方法开始时递增它,并在它唤醒后立即递减:

pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
    self.num_waiters.fetch_add(1, Relaxed); // New!

    let counter_value = self.counter.load(Relaxed);

    let mutex = guard.mutex;
    drop(guard);

    wait(&self.counter, counter_value);

    self.num_waiters.fetch_sub(1, Relaxed); // New!

    mutex.lock()
}

我们应该再次仔细地问自己,宽松的内存顺序是否足以满足所有这些原子操作。

我们引入的一个新的潜在风险是,其中一个通知方法在 num_waiters 中观察到零,跳过其唤醒操作,而实际上有一个线程需要唤醒。当通知方法在递增操作之前或递减操作之后观察值时,可能会发生这种情况。

正如计数器的宽松加载一样,等待者在递增 num_waiters 时仍然保持互斥锁锁定,这一事实确保在解锁互斥锁后发生的任何 num_waiters 的加载都不会看到它被增加之前的值。

我们也不需要担心通知线程“太早”观察到递减的值,因为一旦执行了递减操作,也许在虚假唤醒之后,等待线程就不再需要被唤醒了。

换句话说,互斥体建立的先发生关系仍然提供了我们需要的所有保证。

避免虚假唤醒

我们优化条件变量的另一种方法是避免虚假唤醒。每次唤醒线程时,它都会尝试锁定互斥锁,这可能会与其他线程竞争,这会对性能产生很大影响。

底层 wait() 操作虚假唤醒的情况相当罕见,但我们的条件变量实现很容易允许 notify_one() 导致多个线程停止等待。如果线程正在进入睡眠状态,刚刚加载了计数器值,但尚未进入睡眠状态,则调用 notify_one() 将阻止该线程进入睡眠状态,因为更新的计数器值计数器,但它也会由于随后的 wake_one() 操作而导致第二个线程唤醒。然后,这两个线程将竞争锁定互斥体,浪费宝贵的处理器时间。

这听起来像是很少发生的情况,但实际上很容易发生,因为互斥锁最终如何同步线程。对条件变量调用 notify_one() 的线程很可能会在之前锁定和解锁互斥体,以更改等待线程正在等待的数据的某些内容。这意味着一旦 Condvar::wait() 方法解锁互斥体,可能会立即解锁正在等待互斥体的通知线程。此时,两个线程正在竞争:等待线程进入睡眠状态,通知线程锁定和解锁互斥体并通知条件变量。如果通知线程赢得了该竞争,则等待线程将不会因为计数器递增而进入睡眠状态,但通知线程仍将调用 wake_one() 。这正是上面描述的有问题的情况,它可能不必要地唤醒额外的等待线程。

一个相对简单的解决方案是跟踪允许唤醒的线程数(即从 Condvar::wait() 返回)。 notify_one 方法会将其增加 1,如果它不为零, wait 方法将尝试将其减少 1。如果计数器为零,它可能会进入(返回)睡眠状态,而不是尝试重新锁定互斥体并返回。 (可以通过专门为 notify_all 添加另一个永不递减的计数器来通知所有线程。)

这种方法有效,但带来了一个新的、更微妙的问题:通知可能会唤醒一个甚至还没有调用 Condvar::wait() 的线程,包括它自己。对 Condvar::notify_one() 的调用将增加应唤醒的线程数,并使用 wake_one() 唤醒一个等待线程。然后,如果另一个(甚至同一个)线程随后调用 Condvar::wait() ,在已经等待的线程有机会醒来之前,新等待的线程可以看到有一个通知待处理,并通过以下方式声明它:将计数器递减至零,并立即返回。然后,第一个正在等待的线程将返回睡眠状态,因为另一个线程已经收到了通知。

根据用例,这可能完全没问题,也可能是一个大问题,导致某些线程永远无法取得进展。

提示

GNU libc 的 pthread_cond_t 实现曾经遇到过这个问题。经过对 POSIX 规范是否允许这样做的大量讨论后,这个问题最终随着 2017 年 GNU libc 2.25 的发布而得到解决,其中包括全新的条件变量实现。

在许多使用条件变量的情况下,等待者抢走较早的通知是完全可以的。但是,当实现一般用途而非特定类型用例的条件变量时,这种行为可能是不可接受的。

同样,我们必须得出这样的结论:我们是否应该使用优化方法的答案是“视情况而定”,这并不奇怪。

提示

有多种方法可以避免此问题,同时还可以避免虚假唤醒,但这些方法比其他方法复杂得多。

GNU libc 的新条件变量使用的解决方案是将等待者分为两组,只允许第一组使用通知,并在第一组没有等待者时交换组。

这种方法的缺点不仅是算法的复杂性,而且还显着增加了条件变量类型的大小,因为它现在需要跟踪更多信息。

惊群问题(Thundering Herd Problem)

使用条件变量时可能会遇到的另一个性能问题是在使用 notify_all() 唤醒等待同一事件的许多线程时发生的。

问题是,在唤醒之后,所有这些线程都会立即尝试锁定同一个互斥锁。最有可能的是,只有一个线程会成功,而其他所有的线程将不得不继续睡眠。这种许多线程都争相申请同一资源的资源浪费问题,被称为惊群问题。

认为 Condvar::notify_all() 从根本上来说是一种不值得优化的反模式并不是没有道理的。条件变量的目的是解锁互斥体并在收到通知时重新锁定它,因此也许一次通知多个线程永远不会带来任何好处。

即便如此,如果我们想针对这种情况进行优化,我们可以在支持类似 futex 重新排队操作的操作系统上执行此操作,例如 Linux 上的 FUTEX_REQUEUE 。 (请参阅第 8 章中的“Futex 操作”。)

除了一个线程外,其他线程一旦意识到锁已经被夺走,就会立即回到睡眠状态,而不是唤醒许多线程,我们可以将除一个线程之外的所有线程重新排队,使其futex等待操作不再等待条件变量的计数器,而是开始等待互斥锁状态。

重新排队等待线程不会唤醒它。事实上,线程甚至不知道它已被重新排队。不幸的是,这可能会导致一些非常微妙的陷阱。

例如,还记得一个3状态的互斥锁在唤醒后总是必须被锁定到正确的状态("与等待者锁定"),以确保其他等待者不会被遗忘?这意味着我们不应该再在Condvar::wait()实现中使用常规的互斥锁锁定方法,因为它可能会将互斥锁设置为错误的状态。

重新排队条件变量实现需要存储指向等待线程使用的互斥体的指针。否则,通知方法将不知道将等待线程重新排队到哪个原子变量(互斥状态)。这就是为什么条件变量通常不允许两个线程等待不同的互斥体。尽管许多条件变量实现不使用重新排队,但为未来版本保留这样做的可能性可能会很有用。

读写锁

是时候实现读写锁了!

回想一下,与互斥锁不同,读写锁支持两种类型的锁定:读锁定和写锁定,有时称为共享锁定和独占锁定。写锁定的行为与锁定互斥体相同,一次只允许一个锁定,而读锁定允许多个读取者同时持有锁定。换句话说,它与 Rust 中的独占引用 ( &mut T ) 和共享引用 ( &T ) 工作方式紧密匹配,仅允许一个独占引用或任意数量的共享引用同时活跃。

对于我们的互斥体,我们只需要跟踪它是否被锁定。然而,对于我们的读写锁,我们还需要知道当前持有多少读锁,以确保写锁定仅在所有读者释放其锁后发生。

让我们从使用单个 AtomicU32 作为其状态的 RwLock 结构开始。我们将用它来表示当前获取的读锁的数量,零值意味着它已解锁。为了表示写锁定状态,我们使用特殊值 u32::MAX

pub struct RwLock<T> {
    /// The number of readers, or u32::MAX if write-locked.
    state: AtomicU32,
    value: UnsafeCell<T>,
}

对于我们的 Mutex<T> ,我们必须将其 Sync 实现限制为实现 Send 的类型 T ,以确保它不能被用来发送,例如,用于将 Rc 发送到另一个线程。对于我们新的 RwLock<T> ,我们还需要要求 T 也实现 Sync ,因为多个读者将能够同时访问数据:

unsafe impl<T> Sync for RwLock<T> where T: Send + Sync {}

因为我们的 RwLock 可以通过两种不同的方式锁定,所以我们将有两个独立的锁定函数,每个函数都有自己的守卫类型:

impl<T> RwLock<T> {
    pub const fn new(value: T) -> Self {
        Self {
            state: AtomicU32::new(0), // Unlocked.
            value: UnsafeCell::new(value),
        }
    }

    pub fn read(&self) -> ReadGuard<T> {}

    pub fn write(&self) -> WriteGuard<T> {}
}

pub struct ReadGuard<'a, T> {
    rwlock: &'a RwLock<T>,
}

pub struct WriteGuard<'a, T> {
    rwlock: &'a RwLock<T>,
}

写守卫的行为应该类似于独占引用 ( &mut T ),我们通过为其实现 DerefDerefMut 来实现:

impl<T> Deref for WriteGuard<'_, T> {
    type Target = T;
    fn deref(&self) -> &T {
        unsafe { &*self.rwlock.value.get() }
    }
}

impl<T> DerefMut for WriteGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut *self.rwlock.value.get() }
    }
}

但是,读守卫应该只实现 Deref ,而不是 DerefMut ,因为它没有对数据的独占访问权限,使其行为类似于共享引用( &T ):

impl<T> Deref for ReadGuard<'_, T> {
    type Target = T;
    fn deref(&self) -> &T {
        unsafe { &*self.rwlock.value.get() }
    }
}

现在我们已经完成了所有样板代码,让我们开始有趣的部分:锁定和解锁。

要读锁定我们的 RwLock ,我们必须将状态加1,但前提是它尚未被写锁定。我们将使用比较和交换循环(第 2 章中的“比较和交换操作”)来执行此操作。如果状态为 u32::MAX ,意味着 RwLock 被写锁定,我们将使用 wait() 操作来休眠并稍后重试。

pub fn read(&self) -> ReadGuard<T> {
    let mut s = self.state.load(Relaxed);
    loop {
        if s < u32::MAX {
            assert!(s < u32::MAX - 1, "too many readers");
            match self.state.compare_exchange_weak(
                s, s + 1, Acquire, Relaxed
            ) {
                Ok(_) => return ReadGuard { rwlock: self },
                Err(e) => s = e,
            }
        }
        if s == u32::MAX {
            wait(&self.state, u32::MAX);
            s = self.state.load(Relaxed);
        }
    }
}

写锁定更容易;我们只需要将状态从零更改为 u32::MAX ,或者 wait() (如果它已经被锁定):

pub fn write(&self) -> WriteGuard<T> {
    while let Err(s) = self.state.compare_exchange(
        0, u32::MAX, Acquire, Relaxed
    ) {
        // Wait while already locked.
        wait(&self.state, s);
    }
    WriteGuard { rwlock: self }
}

请注意锁定的 RwLock 的确切状态值如何变化,但 wait() 操作希望我们为其提供一个准确的值来与状态进行比较。这就是为什么我们将比较和交换操作的返回值用于 wait() 操作。

解锁一个读取器涉及到将状态递减1。最终解锁RwLock的读取器,即把状态从1变为0,负责唤醒正在等待的写入器,如果有的话。

只需唤醒一个线程就足够了,因为我们知道此时不能有任何等待的读者。读者根本没有理由等待读锁定的 RwLock

impl<T> Drop for ReadGuard<'_, T> {
    fn drop(&mut self) {
        if self.rwlock.state.fetch_sub(1, Release) == 1 {
            // Wake up a waiting writer, if any.
            wake_one(&self.rwlock.state);
        }
    }
}

写入器必须将状态重置为零才能解锁,之后它应该唤醒一个等待的写入器或所有等待的读取器。

我们不知道是读者还是写者在等待,我们也没有办法只唤醒写者或只唤醒读者。因此,我们将唤醒所有线程:

impl<T> Drop for WriteGuard<'_, T> {
    fn drop(&mut self) {
        self.rwlock.state.store(0, Release);
        // Wake up all waiting readers and writers.
        wake_all(&self.rwlock.state);
    }
}

就是这样!我们构建了一个非常简单但完全可用的读写锁。

是时候解决一些问题了。

避免忙循环写入器

我们的实现的一个问题是写锁定可能会导致意外的忙循环。

如果我们有一个RwLock,有很多读者反复地锁定和解锁它,那么锁的状态可能会不断地变化,迅速地上升和下降。 对于我们的write来说,这导致在比较和交换操作和随后的wait()操作之间,锁的状态很有可能发生变化,尤其是当wait()操作被直接实现为一个(相对较慢的)系统调用时。这意味着wait()操作经常会立即返回,即使锁从未被解锁;它只是有一个与预期不同的读者数量。

解决方案可以是使用不同的 AtomicU32 让写入器等待,并且仅在我们真正想要唤醒写入器时才更改该原子的值。

让我们尝试一下,向 RwLock 添加一个新的 writer_wake_counter 字段:

pub struct RwLock<T> {
    /// The number of readers, or u32::MAX if write-locked.
    state: AtomicU32,
    /// Incremented to wake up writers.
    writer_wake_counter: AtomicU32, // New!
    value: UnsafeCell<T>,
}

impl<T> RwLock<T> {
    pub const fn new(value: T) -> Self {
        Self {
            state: AtomicU32::new(0),
            writer_wake_counter: AtomicU32::new(0), // New!
            value: UnsafeCell::new(value),
        }
    }}

read 方法保持不变,但 write 方法现在需要等待新的原子变量。为了确保我们在看到 RwLock 被读锁定和实际进入睡眠状态之间不会错过任何通知,我们将使用一种类似于我们用于实现条件变量的模式:在检查我们是否仍要睡眠之前检查writer_wake_counter

pub fn write(&self) -> WriteGuard<T> {
    while self.state.compare_exchange(
        0, u32::MAX, Acquire, Relaxed
    ).is_err() {
        let w = self.writer_wake_counter.load(Acquire);
        if self.state.load(Relaxed) != 0 {
            // Wait if the RwLock is still locked, but only if
            // there have been no wake signals since we checked.
            wait(&self.writer_wake_counter, w);
        }
    }
    WriteGuard { rwlock: self }
}

writer_wake_counter 的获取加载操作将与释放增量操作形成先行发生关系,该操作在解锁状态后、唤醒等待的写入器之前立即执行:

impl<T> Drop for ReadGuard<'_, T> {
    fn drop(&mut self) {
        if self.rwlock.state.fetch_sub(1, Release) == 1 {
            self.rwlock.writer_wake_counter.fetch_add(1, Release); // New!
            wake_one(&self.rwlock.writer_wake_counter); // Changed!
        }
    }
}

先行发生关系确保 write 方法无法观察到递增的 writer_wake_counter 值,同时仍能看到尚未递减的 state 值。否则,写锁定线程可能会认为 RwLock 仍然处于锁定状态,但错过了唤醒调用。

和以前一样,写解锁应该唤醒一个等待的写入者或所有等待的读者。由于我们仍然不知道是否有写入者或读取者在等待,因此我们必须唤醒一个等待的写入者(通过 wake_one )和所有等待的读取者(使用 wake_all ):

impl<T> Drop for WriteGuard<'_, T> {
    fn drop(&mut self) {
        self.rwlock.state.store(0, Release);
        self.rwlock.writer_wake_counter.fetch_add(1, Release); // New!
        wake_one(&self.rwlock.writer_wake_counter); // New!
        wake_all(&self.rwlock.state);
    }
}

提示

在某些操作系统上,唤醒操作后面的操作会返回其唤醒的线程数。它可能表明唤醒线程的实际数量低于实际数量(由于虚假唤醒线程),但其返回值仍然可以作为优化有用。

例如,在上面的 drop 实现中,如果 wake_one() 操作表明它实际上唤醒了一个线程,我们可以跳过 wake_all() 调用。

避免写入器饥饿

RwLock的一个常见用例是有许多频繁的读者,但很少,通常只有一个不频繁的写者。例如,一个线程可能负责读出一些传感器的输入,或者定期下载一些其他线程需要使用的新数据。

在这种情况下,我们很快就会遇到一个称为“写入器饥饿”的问题:写入器永远没有机会锁定 RwLock ,因为周围总是有读取器来保留 RwLock 读锁定。

此问题的一种解决方案是,在有写入器等待时,防止任何新的读者获取锁,即使 RwLock 仍处于读锁定状态。这样,所有新读者都必须等到写入器轮到为止,以确保读者能够访问写入器想要分享的最新数据。

让我们来实现它。

要做到这一点,我们需要跟踪是否有任何等待的写入器。为了给状态变量中的这一信息留出空间,我们可以将读者计数乘以2,并在有写入器等待的情况下加1。这意味着6或7的状态都代表有三个活动的读锁的情况: 6没有等待的写者,7有一个等待的写者。

如果我们将奇数 u32::MAX 保留为写锁定状态,那么如果状态是奇数,读者将不得不等待,但如果状态是偶数,则可以自由地通过增加2来获得一个读锁。

pub struct RwLock<T> {
    /// The number of read locks times two, plus one if there's a writer waiting.
    /// u32::MAX if write locked.
    ///
    /// This means that readers may acquire the lock when
    /// the state is even, but need to block when odd.
    state: AtomicU32,
    /// Incremented to wake up writers.
    writer_wake_counter: AtomicU32,
    value: UnsafeCell<T>,
}

我们必须更改 read 方法中的两个 if 语句,以不再将状态与 u32::MAX 进行比较,而是检查状态是否为偶数或奇数的。我们还需要确保通过增加 2 而不是 1 来锁定。

pub fn read(&self) -> ReadGuard<T> {
    let mut s = self.state.load(Relaxed);
    loop {
        if s % 2 == 0 { // Even.
            assert!(s < u32::MAX - 2, "too many readers");
            match self.state.compare_exchange_weak(
                s, s + 2, Acquire, Relaxed
            ) {
                Ok(_) => return ReadGuard { rwlock: self },
                Err(e) => s = e,
            }
        }
        if s % 2 == 1 { // Odd.
            wait(&self.state, s);
            s = self.state.load(Relaxed);
        }
    }
}

我们的 write 方法必须进行更大的改变。我们将使用比较和交换循环,就像上面的 read 方法一样。如果状态为 0 或 1,这意味着 RwLock 已解锁,我们将尝试将状态更改为 u32::MAX 以对其进行写锁定。否则,我们将不得不等待。然而,在此之前,我们需要确保状态是奇数,以阻止新的读者获取锁。确保状态为奇数后,我们等待 writer_wake_counter 变量,同时确保锁尚未解锁。

在代码中,它看起来像这样:

 pub fn write(&self) -> WriteGuard<T> {
    let mut s = self.state.load(Relaxed);
    loop {
        // Try to lock if unlocked.
        if s <= 1 {
            match self.state.compare_exchange(
                s, u32::MAX, Acquire, Relaxed
            ) {
                Ok(_) => return WriteGuard { rwlock: self },
                Err(e) => { s = e; continue; }
            }
        }
        // Block new readers, by making sure the state is odd.
        if s % 2 == 0 {
            match self.state.compare_exchange(
                s, s + 1, Relaxed, Relaxed
            ) {
                Ok(_) => {}
                Err(e) => { s = e; continue; }
            }
        }
        // Wait, if it's still locked
        let w = self.writer_wake_counter.load(Acquire);
        s = self.state.load(Relaxed);
        if s >= 2 {
            wait(&self.writer_wake_counter, w);
            s = self.state.load(Relaxed);
        }
    }
}

由于我们现在跟踪是否有任何等待的写入器,因此读取解锁现在可以在不必要时跳过 wake_one() 调用:

impl<T> Drop for ReadGuard<'_, T> {
    fn drop(&mut self) {
        // Decrement the state by 2 to remove one read-lock.
        if self.rwlock.state.fetch_sub(2, Release) == 3 {
            // If we decremented from 3 to 1, that means
            // the RwLock is now unlocked _and_ there is
            // a waiting writer, which we wake up.
            self.rwlock.writer_wake_counter.fetch_add(1, Release);
            wake_one(&self.rwlock.writer_wake_counter);
        }
    }
}

当写锁定( u32::MAX 状态)时,我们不会跟踪有关是否有任何线程正在等待的任何信息。因此,我们没有可用于写入解锁的新信息,这些信息将保持不变:

impl<T> Drop for WriteGuard<'_, T> {
    fn drop(&mut self) {
        self.rwlock.state.store(0, Release);
        self.rwlock.writer_wake_counter.fetch_add(1, Release);
        wake_one(&self.rwlock.writer_wake_counter);
        wake_all(&self.rwlock.state);
    }
}

对于针对“频繁读取和不频繁写入”用例进行优化的读写器锁,这是完全可以接受的,因为写入锁定(以及因此写入解锁)很少发生。

然而,对于更通用的读写锁,绝对值得进一步优化,以使写锁定和解锁的性能接近高效三态互斥体的性能。这留给读者作为一个有趣的练习。

总结

  • atomic-wait crate 提供基本的类似 futex 的功能,适用于所有主要操作系统(最新版本)。

  • 最小的互斥锁实现只需要两个状态,就像第 4 章中的 SpinLock 一样。

  • 更高效的互斥体会跟踪是否有任何等待线程,因此可以避免不必要的唤醒操作。

  • 在某些情况下,睡眠前自旋可能是有益的,但这在很大程度上取决于具体情况、操作系统和硬件。

  • 最小条件变量只需要一个通知计数器, Condvar::wait 必须在解锁互斥锁之前和之后进行检查。

  • 条件变量可以跟踪等待线程的数量,以避免不必要的唤醒操作。

  • 避免从 Condvar::wait 虚假唤醒可能很棘手,需要额外的记录。

  • 最小读写锁只需要一个原子计数器作为其状态。

  • 一个附加的原子变量可用于独立于读取器唤醒写入器。

  • 为了避免写入器饥饿,需要额外的状态来优先考虑等待写入器而不是新读取器。

上次更新: