第 2 章 原子

来自 《Rust Atomics and Locks》 的翻译文章,英文原文https://marabos.nl/atomics/atomics.html在新窗口打开

原子这个词来自希腊语 ἄτομος,意思是不可分割的,不能被切割成更小的部分。在计算机科学中,它被用来描述一个不可分割的操作:它要么完全完成,要么尚未发生。

正如第 1 章“借用和数据竞争”中提到的,多个线程同时读取和修改同一个变量通常会导致未定义的行为。然而,原子操作确实允许不同的线程安全地读取和修改同一个变量。由于这样的操作是不可分割的,它要么完全发生在另一个操作之前,要么完全发生在另一个操作之后,从而避免了未定义的行为。稍后,在第 7 章中,我们将了解它在硬件级别的工作原理。

原子操作是任何涉及多线程的主要构建模块。所有其他并发原语,例如互斥锁和条件变量,都是使用原子操作实现的。

在 Rust 中,原子操作可以作为标准原子类型的方法使用,这些原子类型存在于 std::sync::atomic 中。它们的名称都以 Atomic 开头,例如 AtomicI32AtomicUsize 。哪些是可用的,取决于硬件架构,有时也取决于操作系统,但几乎所有平台都至少提供了所有的原子类型,最大为指针大小。

与大多数类型不同,它们允许通过共享引用(例如, &AtomicU8 )进行修改。这要归功于内部可变性,正如第 1 章“内部可变性”中所讨论的那样。

每个可用的原子类型都有相同的接口,有存储和加载的方法,有原子 "获取和修改 "操作的方法,还有一些更高级的 "比较和交换 "方法。我们将在本章的其余部分详细讨论它们。

但是,在我们深入研究不同的原子操作之前,我们需要简要地谈谈一个叫做内存顺序的概念:

每个原子操作都有一个 std::sync::atomic::Ordering 类型的参数,它决定了我们对操作的相对顺序有什么保证。保证最少的最简单的变体是 RelaxedRelaxed 仍然保证单个原子变量的一致性,但不保证不同变量之间的相对操作顺序。

这意味着,两个线程可能会看到对不同变量的操作以不同的顺序发生。例如,如果一个线程先写入一个变量,然后很快写入第二个变量,则另一个线程可能会看到以相反顺序发生的情况。

在本章中,我们将只关注没有问题的用例,并且在任何地方都简单地使用 Relaxed ,而不会深入细节。我们将在第 3 章中讨论内存顺序和其他可用内存顺序的所有细节。

原子加载和存储操作

我们要看的前两个原子操作是最基本的: loadstore 。它们的函数签名如下,以 AtomicI32 为例:

impl AtomicI32 {
    pub fn load(&self, ordering: Ordering) -> i32;
    pub fn store(&self, value: i32, ordering: Ordering);
}

load 方法以原子方式加载存储在原子变量中的值, store 方法以原子方式将新值存储在其中。请注意 store 方法是如何采用共享引用 ( &T ) 而不是独占引用 ( &mut T ),即使它修改了值。

让我们看一下这两种方法的一些实际用例。

示例:停止标志

第一个示例使用 AtomicBool 作为停止标志。这样的标志用于通知其他线程停止运行。

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;

fn main() {
    static STOP: AtomicBool = AtomicBool::new(false);

    // Spawn a thread to do the work.
    let background_thread = thread::spawn(|| {
        while !STOP.load(Relaxed) {
            some_work();
        }
    });

    // Use the main thread to listen for user input.
    for line in std::io::stdin().lines() {
        match line.unwrap().as_str() {
            "help" => println!("commands: help, stop"),
            "stop" => break,
            cmd => println!("unknown command: {cmd:?}"),
        }
    }

    // Inform the background thread it needs to stop.
    STOP.store(true, Relaxed);

    // Wait until the background thread finishes.
    background_thread.join().unwrap();
}

在这个例子中,后台线程重复运行 some_work() ,而主线程允许用户输入一些命令与程序进行交互。在这个简单的例子中,唯一有用的命令是 stop 来停止程序。

为使后台线程停止,原子 STOP 布尔值用于将此条件传达给后台线程。当前台线程读取 stop 命令时,它将标志设置为 true,后台线程在每次新迭代之前检查该标志。主线程会等待,直到后台线程使用 join 方法完成其当前的迭​​代。

只要标志由后台线程定期检查,这个简单的解决方案就可以很好地工作。如果它长时间停留在 some_work() ,可能会导致 stop 命令和程序退出之间出现不可接受的延迟。

示例:进度报告

在我们的下一个示例中,我们在后台线程上一个一个地处理 100 个项目,而主线程向用户定期更新进度:

use std::sync::atomic::AtomicUsize;

fn main() {
    let num_done = AtomicUsize::new(0);

    thread::scope(|s| {
        // A background thread to process all 100 items.
        s.spawn(|| {
            for i in 0..100 {
                process_item(i); // Assuming this takes some time.
                num_done.store(i + 1, Relaxed);
            }
        });

        // The main thread shows status updates, every second.
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 { break; }
            println!("Working.. {n}/100 done");
            thread::sleep(Duration::from_secs(1));
        }
    });

    println!("Done!");
}

这一次,我们使用作用域线程(第 1 章中的“作用域线程”),它会自动为我们处理线程的join,还允许我们借用局部变量。

每次后台线程处理完一个项目时,它都会将处理的项目数存储在 AtomicUsize 中。同时,主线程向用户显示该数字以通知他们进度,大约每秒一次。一旦主线程看到所有 100 个项目都已处理完毕,它就会退出范围,隐式地join后台线程,并通知用户一切都已完成。

同步化(Synchronization)

一旦处理完最后一个项目,主线程可能需要一整秒的时间才能知道,最后引入了不必要的延迟。为了解决这个问题,我们可以使用线程停放(第 1 章中的“线程停放”)在有可能感兴趣的新信息时将主线程从睡眠中唤醒。

这是相同的示例,但现在使用 thread::park_timeout 而不是 thread::sleep

fn main() {
    let num_done = AtomicUsize::new(0);

    let main_thread = thread::current();

    thread::scope(|s| {
        // A background thread to process all 100 items.
        s.spawn(|| {
            for i in 0..100 {
                process_item(i); // Assuming this takes some time.
                num_done.store(i + 1, Relaxed);
                main_thread.unpark(); // Wake up the main thread.
            }
        });

        // The main thread shows status updates.
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 { break; }
            println!("Working.. {n}/100 done");
            thread::park_timeout(Duration::from_secs(1));
        }
    });

    println!("Done!");
}

没有太大变化。我们已经通过 thread::current() 获得了主线程的句柄,后台线程在每次状态更新后都会使用这个句柄来取消主线程的停放。主线程现在使用 park_timeout 而不是 sleep ,这样它就可以被中断。

现在,任何状态更新都会立即报告给用户,同时仍然每秒重复最后一次更新以表明程序仍在运行。

示例:惰性初始化(Lazy Initialization)

在我们继续进行更高级的原子操作之前,最后一个示例是关于惰性初始化的。

假设有一个值 x ,我们正在从文件中读取它,从操作系统中获取它,或者以其他方式计算它,我们希望它在程序运行期间保持不变。也许 x 是操作系统的版本,或者是内存总量,或者是tau(π)的第400位。对于这个例子来说并不重要。

由于我们不希望它发生变化,因此我们可以仅在第一次需要时请求或计算它,并记住结果。第一个需要它的线程必须计算该值,但它可以将它存储在一个原子 static 中,以便所有线程都可以使用它,包括它自己(如果稍后再次需要它)。

让我们看一个例子。为了简单起见,我们假设 x 永远不会为零,这样我们就可以在计算之前使用零作为占位符。

use std::sync::atomic::AtomicU64;

fn get_x() -> u64 {
    static X: AtomicU64 = AtomicU64::new(0);
    let mut x = X.load(Relaxed);
    if x == 0 {
        x = calculate_x();
        X.store(x, Relaxed);
    }
    x
}

调用 get_x() 的第一个线程将检查静态 X 并看到它仍然为零,计算它的值,并将结果存储回静态以供将来使用。之后,任何对 get_x() 的调用都会看到静态中的值不为零,并立即返回它而无需再次计算。

但是,如果第二个线程在第一个线程仍在计算 x 时调用 get_x() ,则第二个线程也将看到零,并且也会并行计算 x 。其中一个线程最终将覆盖另一个线程的结果,具体取决于哪个线程先完成。这叫做竞争。不是数据竞争,这是未定义的行为,如果不使用 unsafe 在 Rust 中是不可能的,但仍然是一场无法预测获胜者的竞争。

由于我们期望 x 保持不变,因此无论谁赢得竞争都无关紧要,因为无论如何结果都是一样的。根据我们期望 calculate_x() 花费多少时间,这可能是一个非常好的或非常糟糕的策略。

如果预计 calculate_x() 会花费很长时间,最好让线程在第一个线程仍在初始化 X 时等待,以避免不必要地浪费处理器时间。您可以使用条件变量或线程停放(第 1 章中的“等待:停放和条件变量”)来实现这一点,但这对于一个小示例来说很快就会变得太复杂。 Rust 标准库通过 std::sync::Oncestd::sync::OnceLock 提供了这个功能,所以通常不需要自己实现这些。

获取和修改操作

现在我们已经看到了基本的 loadstore 操作的一些用例,让我们继续进行更有趣的操作:获取和修改(fetch-and-modify)操作。这些操作修改原子变量,但也加载(获取)原始值,作为单个原子操作。

最常用的是 fetch_addfetch_sub ,分别执行加法和减法。其他一些可用的操作是用于按位操作的 fetch_orfetch_and ,以及可用于保持运行最大值或最小值的 fetch_maxfetch_min

它们的函数签名如下,以 AtomicI32 为例:

impl AtomicI32 {
    pub fn fetch_add(&self, v: i32, ordering: Ordering) -> i32;
    pub fn fetch_sub(&self, v: i32, ordering: Ordering) -> i32;
    pub fn fetch_or(&self, v: i32, ordering: Ordering) -> i32;
    pub fn fetch_and(&self, v: i32, ordering: Ordering) -> i32;
    pub fn fetch_nand(&self, v: i32, ordering: Ordering) -> i32;
    pub fn fetch_xor(&self, v: i32, ordering: Ordering) -> i32;
    pub fn fetch_max(&self, v: i32, ordering: Ordering) -> i32;
    pub fn fetch_min(&self, v: i32, ordering: Ordering) -> i32;
    pub fn swap(&self, v: i32, ordering: Ordering) -> i32; // "fetch_store"
}

一个异常值是仅存储新值的操作,而不考虑旧值。它被称为 swap,而不是 fetch_store

下面是一个快速演示,展示了 fetch_add 如何在操作之前返回值:

use std::sync::atomic::AtomicI32;

let a = AtomicI32::new(100);
let b = a.fetch_add(23, Relaxed);
let c = a.load(Relaxed);

assert_eq!(b, 100);
assert_eq!(c, 123);

fetch_add 操作将 a 从 100 递增到 123,但返回给我们的是旧值 100。任何下一个操作都会看到 123 这个值。

这些操作的返回值并不总是相关的。如果您只需要将操作应用于原子值,但对值本身不感兴趣,则完全可以忽略返回值。

需要牢记的一件重要事情是 fetch_addfetch_sub 实现了溢出的包装行为。增加超过最大可表示值的值将回绕,并导致最小可表示值。这与普通整数的加号和减号运算符的行为不同,后者会在调试模式下溢出时出现恐慌。

“比较和交换操作”中,我们将看到如何使用溢出检查进行原子加法。

但首先,让我们看看这些方法的一些实际用例。

示例:来自多个线程的进度报告

“示例:进度报告”中,我们使用 AtomicUsize 来报告后台线程的进度。例如,如果我们将工作拆分为四个线程,每个线程处理 25 个项目,我们就需要了解所有四个线程的进度。

我们可以为每个线程使用单独的 AtomicUsize ,并将它们全部加载到主线程中,然后将它们相加,但更简单的解决方案是使用单个 AtomicUsize 来跟踪所有线程中已处理项目的总数。

为了让它工作,我们不能再使用 store 方法,因为那样会覆盖其他线程的进度。相反,我们可以使用原子添加操作在每个处理过的项目之后递增计数器。

让我们更新“示例:进度报告”中的示例,将工作拆分为四个线程:

fn main() {
    let num_done = &AtomicUsize::new(0);

    thread::scope(|s| {
        // Four background threads to process all 100 items, 25 each.
        for t in 0..4 {
            s.spawn(move || {
                for i in 0..25 {
                    process_item(t * 25 + i); // Assuming this takes some time.
                    num_done.fetch_add(1, Relaxed);
                }
            });
        }

        // The main thread shows status updates, every second.
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 { break; }
            println!("Working.. {n}/100 done");
            thread::sleep(Duration::from_secs(1));
        }
    });

    println!("Done!");
}

有几件事发生了变化。最重要的是,我们现在生成四个后台线程而不是一个,并使用 fetch_add 而不是 store 来修改 num_done 原子变量。

更巧妙地,我们现在为后台线程使用 move 闭包, num_done 现在是一个引用。这与我们使用 fetch_add 无关,而是与我们如何在一个循环中生成四个线程有关。此闭包捕获 t 以了解它是四个线程中的哪一个,从而知道是从项目 0、25、50 还是 75 开始。如果没有 move 关键字,闭包将尝试通过引用捕获 t 。这是不允许的,因为它只在循环中短暂存在。

作为 move 闭包,它移动(或复制)它的捕获而不是借用它们,给它一个 t 的副本。因为它也捕获了 num_done ,所以我们将该变量更改为引用,因为我们仍然想借用相同的 AtomicUsize 。请注意,原子类型没有实现 Copy 特性,因此如果我们试图将一个原子类型移动到多个线程中,我们会得到一个错误。

撇开闭包捕捉微妙之处不谈,这里使用 fetch_add 的更改非常简单。我们不知道线程将以何种顺序递增 num_done ,但由于加法是原子的,因此我们不必担心任何事情,并且可以确定当所有线程都完成时它会正好是 100。

示例:统计

继续这个通过原子报告其他线程正在做什么的概念,让我们扩展我们的示例,以收集和报告一些关于处理一个项目所花费的时间的统计数据。

num_done 旁边,我们添加了两个原子变量, total_timemax_time ,以跟踪处理项目所花费的时间。我们将使用这些来报告平均和峰值处理时间。

fn main() {
    let num_done = &AtomicUsize::new(0);
    let total_time = &AtomicU64::new(0);
    let max_time = &AtomicU64::new(0);

    thread::scope(|s| {
        // 四个后台线程处理 100 个项目,每个 25 个。
        for t in 0..4 {
            s.spawn(move || {
                for i in 0..25 {
                    let start = Instant::now();
                    process_item(t * 25 + i); // 假设这需要一些时间。
                    let time_taken = start.elapsed().as_micros() as u64;
                    num_done.fetch_add(1, Relaxed);
                    total_time.fetch_add(time_taken, Relaxed);
                    max_time.fetch_max(time_taken, Relaxed);
                }
            });
        }

        // 主线程每秒显示一次状态更新。
        loop {
            let total_time = Duration::from_micros(total_time.load(Relaxed));
            let max_time = Duration::from_micros(max_time.load(Relaxed));
            let n = num_done.load(Relaxed);
            if n == 100 { break; }
            if n == 0 {
                println!("Working.. nothing done yet.");
            } else {
                println!(
                    "Working.. {n}/100 done, {:?} average, {:?} peak",
                    total_time / n as u32,
                    max_time,
                );
            }
            thread::sleep(Duration::from_secs(1));
        }
    });

    println!("Done!");
}

后台线程现在使用 Instant::now()Instant::elapsed() 来衡量它们在 process_item() 中花费的时间。原子添加操作用于将微秒数添加到 total_time ,原子最大操作用于跟踪 max_time 中的最高测量值。

主线程将总时间除以处理的项目数以获得平均处理时间,然后将其与 max_time 的峰值时间一起报告。

由于三个原子变量是分别更新的,因此主线程可能会在线程递增 num_done 之后但在更新 total_time 之前加载值,从而导致低估平均值。更微妙的是,因为 Relaxed 内存顺序不保证从另一个线程看到的操作的相对顺序,它甚至可能短暂地看到 total_time 的新更​​新值,同时仍然看到 num_done 的旧值,导致高估平均值。

在我们的示例中,这两者都不是大问题。可能发生的最坏情况是向用户简要报告不准确的平均值。

如果我们想避免这种情况,我们可以将三个统计信息放在一个 Mutex 中。然后我们会在更新三个数字时短暂锁定互斥锁,这三个数字不再必须是原子的。这有效地将三个更新变成了一个原子操作,代价是锁定和解锁互斥锁,并可能暂时阻塞线程。

示例:ID 分配

让我们继续讨论一个实际需要来自 fetch_add 的返回值的用例。

假设我们需要一些函数, allocate_new_id() ,每次调用它时都会给出一个新的唯一数字。我们可能会使用这些数字来识别我们程序中的任务或其他事物;需要通过可以轻松存储并在线程之间传递的小东西(例如整数)来唯一标识的东西。

使用 fetch_add 实现这个函数被证明是微不足道的:

use std::sync::atomic::AtomicU32;

fn allocate_new_id() -> u32 {
    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
    NEXT_ID.fetch_add(1, Relaxed)
}

我们只需跟踪下一个要给出的数字,并在每次加载它时递增它。第一个调用者将得到 0,第二个调用者将得到 1,依此类推。

这里唯一的问题是溢出时的包装行为。第 4,294,967,296 次调用将溢出 32 位整数,这样下一次调用将再次返回 0。

这是否是一个问题取决于用例:它经常被调用的可能性有多大,如果数字不是唯一的,最坏的情况是什么?虽然这看起来是一个巨大的数字,但现代计算机可以在几秒钟内轻松地多次执行我们的函数。如果内存安全依赖于这些数字的唯一性,那么我们上面的实现是不可接受的。

为了解决这个问题,我们可以尝试让函数在被调用太多次时出现恐慌,如下所示:

// 这个版本有问题。
fn allocate_new_id() -> u32 {
    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
    let id = NEXT_ID.fetch_add(1, Relaxed);
    assert!(id < 1000, "too many IDs!");
    id
}

现在, assert 语句将在调用一千次后出现恐慌。然而,这发生在原子添加操作已经发生之后,这意味着当我们恐慌时 NEXT_ID 已经递增到 1001。如果另一个线程随后调用该函数,它会在 panic 之前将其递增到 1002,依此类推。尽管可能需要更长的时间,但在 4,294,966,296 次恐慌之后, NEXT_ID 再次溢出为零时,我们会遇到同样的问题。

这个问题有三种常见的解决方案。第一个是不要恐慌,而是在溢出时完全中止进程。 std::process::abort 函数将中止整个进程,排除任何继续调用我们函数的可能性。虽然中止进程可能需要一小会儿,在此期间该函数仍可被其他线程调用,但在程序真正中止之前发生数十亿次的可能性微乎其微。

事实上,这就是标准库中 Arc::clone() 中的溢出检查是如何实现的,以防您以某种方式设法克隆它 isize::MAX 次。这在 64 位计算机上需要数百年时间,但如果 isize 只有 32 位,则可以在几秒钟内实现。

处理溢出的第二种方法是在恐慌之前使用 fetch_sub 再次递减计数器,如下所示:

fn allocate_new_id() -> u32 {
    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
    let id = NEXT_ID.fetch_add(1, Relaxed);
    if id >= 1000 {
        NEXT_ID.fetch_sub(1, Relaxed);
        panic!("too many IDs!");
    }
    id
}

当多个线程同时执行此函数时,计数器仍有可能非常短暂地增加到 1000 以上,但它受活动线程数的限制。假设永远不会同时有数十亿个活动线程是合理的,尤其是不会在 fetch_addfetch_sub 之间的短暂时刻同时执行相同的功能。

这就是在标准库的 thread::scope 实现中针对正在运行的线程数处理溢出的方式。

第三种处理溢出的方法可以说是唯一真正正确的方法,因为如果加法溢出,它根本不会发生。然而,我们不能用我们目前看到的原子操作来实现它。为此,我们需要比较和交换(compare-and-exchange)操作,我们将在接下来探讨。

比较和交换操作

最先进和灵活的原子操作是比较和交换操作。此操作检查原子值是否等于给定值,只有在这种情况下,它才会用新值替换它,所有操作都以原子方式进行。它会返回之前的值并告诉我们它是否替换了它。

它的签名比我们目前看到的要复杂一些。以 AtomicI32 为例,它看起来是这样的:

impl AtomicI32 {
    pub fn compare_exchange(
        &self,
        expected: i32,
        new: i32,
        success_order: Ordering,
        failure_order: Ordering
    ) -> Result<i32, i32>;
}

暂时忽略内存顺序,它与以下实现基本相同,除了它都是作为单个不可分割的原子操作发生的:

impl AtomicI32 {
    pub fn compare_exchange(&self, expected: i32, new: i32) -> Result<i32, i32> {
        // 实际上,加载、比较和存储,
        // 所有这些都是作为单个原子操作发生的。
        let v = self.load();
        if v == expected {
            // 值符合预期。
            // 替换它并报告成功。
            self.store(new);
            Ok(v)
        } else {
            // 该值不符合预期。
            // 保持不变并报告失败。
            Err(v)
        }
    }
}

使用它,我们可以从原子变量加载一个值,执行我们喜欢的任何计算,然后如果原子变量在此期间没有改变,则只存储新计算的值。如果我们把它放在一个循环中以在它确实发生变化时重试,我们可以使用它来实现所有其他原子操作,使它成为最通用的操作。

为了演示,让我们在不使用 fetch_add 的情况下将 AtomicU32 递增 1,看看在实践中如何使用 compare_exchange

fn increment(a: &AtomicU32) {
    let mut current = a.load(Relaxed); // 1
    loop {
        let new = current + 1; // 2
        match a.compare_exchange(current, new, Relaxed, Relaxed) { // 3
            Ok(_) => return, // 4
            Err(v) => current = v, // 5
        }
    }
}

 

 
 
 
 



(1) 首先,我们加载 a 的当前值。

(2) 我们计算要存储在 a 中的新值,而不考虑其他线程对 a 的潜在并发修改。

(3) 我们使用 compare_exchange 来更新 a 的值,但前提是它的值仍然是我们之前加载的值。

(4) 如果 a 确实和以前一样,它现在被我们的新值取代,我们就完成了。

(5) 如果 a 与之前不同,那么在我们加载它后的短时间内,另一个线程一定已经更改了它。 compare_exchange 操作为我们提供了 a 所具有的更改值,我们将再次尝试使用该值。加载和更新之间的短暂时间是如此之短,以至于它不太可能循环超过几次迭代。

注意

如果原子变量从某个值 A 更改为 B ,然后在 load 操作之后返回到 A ,但在 compare_exchange 操作之前,它仍然会成功,即使原子变量已更改(并改回)同时。在许多情况下,就像我们的 increment 示例一样,这不是问题。然而,对于某些算法,通常涉及原子指针,这可能是一个问题。这被称为 ABA 问题。

compare_exchange 旁边,有一个名为 compare_exchange_weak 的类似方法。不同之处在于,即使原子值与预期值匹配,弱版本有时仍可能保留值不变并返回 Err 。在某些平台上,这种方法可以更有效地实现,并且在伪造的比较和交换失败的后果微不足道的情况下应该是首选,例如在我们上面的 increment 函数中。在第 7 章中,我们将深入底层细节,找出为什么弱版本可以更高效。

示例:无溢出的 ID 分配

现在,回到“示例:ID 分配”中 allocate_new_id() 中的溢出问题。

要停止递增 NEXT_ID 超过某个限制以防止溢出,我们可以使用 compare_exchange 来实现具有上限的原子加法。使用这个想法,让我们制作一个始终正确处理溢出的 allocate_new_id 版本,即使在几乎不可能的情况下也是如此:

fn allocate_new_id() -> u32 {
    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
    let mut id = NEXT_ID.load(Relaxed);
    loop {
        assert!(id < 1000, "too many IDs!");
        match NEXT_ID.compare_exchange_weak(id, id + 1, Relaxed, Relaxed) {
            Ok(_) => return id,
            Err(v) => id = v,
        }
    }
}

现在我们在修改 NEXT_ID 之前检查并恐慌,保证它永远不会增加到超过 1000,从而不可能溢出。如果需要,我们现在可以将上限从 1000 提高到 u32::MAX ,而不必担心它可能会增加到超出限制的边缘情况。

获取更新(Fetch-Update)

原子类型有一个名为 fetch_update 的便捷方法,用于比较和交换循环模式。它相当于 load 操作后跟一个重复计算的循环和 compare_exchange_weak ,就像我们上面所做的一样。

使用它,我们可以用一行代码实现我们的 allocate_new_id 函数:

 NEXT_ID.fetch_update(Relaxed, Relaxed,
        |n| n.checked_add(1)).expect("too many IDs!")

查看该方法的文档以了解详细信息。

我们不会在本书中使用 fetch_update 方法,因此我们可以专注于单个原子操作。

示例:惰性一次性初始化

“示例:惰性初始化”中,我们查看了一个常量值的惰性初始化示例。我们创建了一个函数,它在第一次调用时延迟初始化一个值,但在以后的调用中重用它。当多个线程在第一次调用时并发运行该函数时,可能会有多个线程执行初始化,它们将以不可预测的顺序覆盖彼此的结果。

这适用于我们希望保持不变的值,或者当我们不关心值的变化时。然而,也有这样的值每次都被初始化为不同值的用例,即使我们需要在程序的单次运行中每次调用函数都返回相同的值。

例如,假设一个函数 get_key() 返回一个随机生成的密钥,该密钥在每次程序运行时只生成一次。它可能是用于与程序通信的加密密钥,每次运行程序时都需要唯一,但在进程中保持不变。

这意味着我们不能在生成密钥后简单地使用 store 操作,因为这可能会覆盖另一个线程刚才生成的密钥,从而导致两个线程使用不同的密钥。相反,我们可以使用 compare_exchange 来确保我们只在没有其他线程已经这样做的情况下存储密钥,否则丢弃我们的密钥并使用存储的密钥。

这是这个想法的实现:

fn get_key() -> u64 {
    static KEY: AtomicU64 = AtomicU64::new(0);
    let key = KEY.load(Relaxed);
    if key == 0 {
        let new_key = generate_random_key(); // 1
        match KEY.compare_exchange(0, new_key, Relaxed, Relaxed) { // 2
            Ok(_) => new_key, // 3
            Err(k) => k, // 4
        }
    } else {
        key
    }
}




 
 
 
 





(1) 如果 KEY 尚未初始化,我们只会生成一个新密钥。

(2) 我们用新生成的密钥替换 KEY ,但前提是它仍然为0。

(3) 如果我们用0交换新密钥,我们将返回新生成的密钥。 get_key() 的新调用将返回现在存储在 KEY 中的相同新密钥。

(4) 如果我们输给了另一个在我们之前初始化了 KEY 的线程,我们就会忘记我们新生成的密钥,而是使用来自 KEY 的密钥。

这是 compare_exchange 比其弱变体更合适的情况的一个很好的例子。我们不会在循环中运行我们的比较和交换操作,如果操作虚假地失败,我们也不想返回零。

“示例:惰性初始化”中所述,如果 generate_random_key() 花费大量时间,则在初始化期间阻塞线程可能更有意义,以避免可能花费时间生成不会使用的密钥。 Rust 标准库通过 std::sync::Oncestd::sync::OnceLock 提供了这样的功能。

总结

  • 原子操作是不可分割的;他们要么已经完全完成,要么尚未发生。

  • Rust 中的原子操作是通过 std::sync::atomic 中的原子类型完成的,例如 AtomicI32

  • 并非所有原子类型都适用于所有平台。

  • 当涉及多个变量时,原子操作的相对顺序很棘手。第 3 章中有更多内容。

  • 简单的加载和存储非常适合非常基本的线程间通信,例如停止标志和状态报告。

  • 惰性初始化可以作为一种竞争来完成,而不会导致数据竞争。

  • 获取和修改操作允许进行一小组基本的原子修改,这在多个线程修改同一个原子变量时特别有用。

  • 原子加法和减法在溢出时默默地环绕。

  • 比较和交换操作是最灵活和通用的,并且是进行任何其他原子操作的构建块。

  • 弱的比较和交换操作可能会稍微更有效率。

上次更新: