第 2 章 原子
来自 《Rust Atomics and Locks》 的翻译文章,英文原文:https://marabos.nl/atomics/atomics.html
原子这个词来自希腊语 ἄτομος,意思是不可分割的,不能被切割成更小的部分。在计算机科学中,它被用来描述一个不可分割的操作:它要么完全完成,要么尚未发生。
正如第 1 章“借用和数据竞争”中提到的,多个线程同时读取和修改同一个变量通常会导致未定义的行为。然而,原子操作确实允许不同的线程安全地读取和修改同一个变量。由于这样的操作是不可分割的,它要么完全发生在另一个操作之前,要么完全发生在另一个操作之后,从而避免了未定义的行为。稍后,在第 7 章中,我们将了解它在硬件级别的工作原理。
原子操作是任何涉及多线程的主要构建模块。所有其他并发原语,例如互斥锁和条件变量,都是使用原子操作实现的。
在 Rust 中,原子操作可以作为标准原子类型的方法使用,这些原子类型存在于 std::sync::atomic
中。它们的名称都以 Atomic
开头,例如 AtomicI32
或 AtomicUsize
。哪些是可用的,取决于硬件架构,有时也取决于操作系统,但几乎所有平台都至少提供了所有的原子类型,最大为指针大小。
与大多数类型不同,它们允许通过共享引用(例如, &AtomicU8
)进行修改。这要归功于内部可变性,正如第 1 章“内部可变性”中所讨论的那样。
每个可用的原子类型都有相同的接口,有存储和加载的方法,有原子 "获取和修改 "操作的方法,还有一些更高级的 "比较和交换 "方法。我们将在本章的其余部分详细讨论它们。
但是,在我们深入研究不同的原子操作之前,我们需要简要地谈谈一个叫做内存顺序的概念:
每个原子操作都有一个 std::sync::atomic::Ordering
类型的参数,它决定了我们对操作的相对顺序有什么保证。保证最少的最简单的变体是 Relaxed
。 Relaxed
仍然保证单个原子变量的一致性,但不保证不同变量之间的相对操作顺序。
这意味着,两个线程可能会看到对不同变量的操作以不同的顺序发生。例如,如果一个线程先写入一个变量,然后很快写入第二个变量,则另一个线程可能会看到以相反顺序发生的情况。
在本章中,我们将只关注没有问题的用例,并且在任何地方都简单地使用 Relaxed
,而不会深入细节。我们将在第 3 章中讨论内存顺序和其他可用内存顺序的所有细节。
原子加载和存储操作
我们要看的前两个原子操作是最基本的: load
和 store
。它们的函数签名如下,以 AtomicI32
为例:
impl AtomicI32 {
pub fn load(&self, ordering: Ordering) -> i32;
pub fn store(&self, value: i32, ordering: Ordering);
}
load
方法以原子方式加载存储在原子变量中的值, store
方法以原子方式将新值存储在其中。请注意 store
方法是如何采用共享引用 ( &T
) 而不是独占引用 ( &mut T
),即使它修改了值。
让我们看一下这两种方法的一些实际用例。
示例:停止标志
第一个示例使用 AtomicBool
作为停止标志。这样的标志用于通知其他线程停止运行。
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
fn main() {
static STOP: AtomicBool = AtomicBool::new(false);
// Spawn a thread to do the work.
let background_thread = thread::spawn(|| {
while !STOP.load(Relaxed) {
some_work();
}
});
// Use the main thread to listen for user input.
for line in std::io::stdin().lines() {
match line.unwrap().as_str() {
"help" => println!("commands: help, stop"),
"stop" => break,
cmd => println!("unknown command: {cmd:?}"),
}
}
// Inform the background thread it needs to stop.
STOP.store(true, Relaxed);
// Wait until the background thread finishes.
background_thread.join().unwrap();
}
在这个例子中,后台线程重复运行 some_work()
,而主线程允许用户输入一些命令与程序进行交互。在这个简单的例子中,唯一有用的命令是 stop
来停止程序。
为使后台线程停止,原子 STOP
布尔值用于将此条件传达给后台线程。当前台线程读取 stop
命令时,它将标志设置为 true
,后台线程在每次新迭代之前检查该标志。主线程会等待,直到后台线程使用 join
方法完成其当前的迭代。
只要标志由后台线程定期检查,这个简单的解决方案就可以很好地工作。如果它长时间停留在 some_work()
,可能会导致 stop
命令和程序退出之间出现不可接受的延迟。
示例:进度报告
在我们的下一个示例中,我们在后台线程上一个一个地处理 100 个项目,而主线程向用户定期更新进度:
use std::sync::atomic::AtomicUsize;
fn main() {
let num_done = AtomicUsize::new(0);
thread::scope(|s| {
// A background thread to process all 100 items.
s.spawn(|| {
for i in 0..100 {
process_item(i); // Assuming this takes some time.
num_done.store(i + 1, Relaxed);
}
});
// The main thread shows status updates, every second.
loop {
let n = num_done.load(Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
这一次,我们使用作用域线程(第 1 章中的“作用域线程”),它会自动为我们处理线程的join,还允许我们借用局部变量。
每次后台线程处理完一个项目时,它都会将处理的项目数存储在 AtomicUsize
中。同时,主线程向用户显示该数字以通知他们进度,大约每秒一次。一旦主线程看到所有 100 个项目都已处理完毕,它就会退出范围,隐式地join后台线程,并通知用户一切都已完成。
同步化(Synchronization)
一旦处理完最后一个项目,主线程可能需要一整秒的时间才能知道,最后引入了不必要的延迟。为了解决这个问题,我们可以使用线程停放(第 1 章中的“线程停放”)在有可能感兴趣的新信息时将主线程从睡眠中唤醒。
这是相同的示例,但现在使用 thread::park_timeout
而不是 thread::sleep
:
fn main() {
let num_done = AtomicUsize::new(0);
let main_thread = thread::current();
thread::scope(|s| {
// A background thread to process all 100 items.
s.spawn(|| {
for i in 0..100 {
process_item(i); // Assuming this takes some time.
num_done.store(i + 1, Relaxed);
main_thread.unpark(); // Wake up the main thread.
}
});
// The main thread shows status updates.
loop {
let n = num_done.load(Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::park_timeout(Duration::from_secs(1));
}
});
println!("Done!");
}
没有太大变化。我们已经通过 thread::current()
获得了主线程的句柄,后台线程在每次状态更新后都会使用这个句柄来取消主线程的停放。主线程现在使用 park_timeout
而不是 sleep
,这样它就可以被中断。
现在,任何状态更新都会立即报告给用户,同时仍然每秒重复最后一次更新以表明程序仍在运行。
示例:惰性初始化(Lazy Initialization)
在我们继续进行更高级的原子操作之前,最后一个示例是关于惰性初始化的。
假设有一个值 x
,我们正在从文件中读取它,从操作系统中获取它,或者以其他方式计算它,我们希望它在程序运行期间保持不变。也许 x
是操作系统的版本,或者是内存总量,或者是tau(π)
的第400位。对于这个例子来说并不重要。
由于我们不希望它发生变化,因此我们可以仅在第一次需要时请求或计算它,并记住结果。第一个需要它的线程必须计算该值,但它可以将它存储在一个原子 static
中,以便所有线程都可以使用它,包括它自己(如果稍后再次需要它)。
让我们看一个例子。为了简单起见,我们假设 x
永远不会为零,这样我们就可以在计算之前使用零作为占位符。
use std::sync::atomic::AtomicU64;
fn get_x() -> u64 {
static X: AtomicU64 = AtomicU64::new(0);
let mut x = X.load(Relaxed);
if x == 0 {
x = calculate_x();
X.store(x, Relaxed);
}
x
}
调用 get_x()
的第一个线程将检查静态 X
并看到它仍然为零,计算它的值,并将结果存储回静态以供将来使用。之后,任何对 get_x()
的调用都会看到静态中的值不为零,并立即返回它而无需再次计算。
但是,如果第二个线程在第一个线程仍在计算 x
时调用 get_x()
,则第二个线程也将看到零,并且也会并行计算 x
。其中一个线程最终将覆盖另一个线程的结果,具体取决于哪个线程先完成。这叫做竞争。不是数据竞争,这是未定义的行为,如果不使用 unsafe
在 Rust 中是不可能的,但仍然是一场无法预测获胜者的竞争。
由于我们期望 x
保持不变,因此无论谁赢得竞争都无关紧要,因为无论如何结果都是一样的。根据我们期望 calculate_x()
花费多少时间,这可能是一个非常好的或非常糟糕的策略。
如果预计 calculate_x()
会花费很长时间,最好让线程在第一个线程仍在初始化 X
时等待,以避免不必要地浪费处理器时间。您可以使用条件变量或线程停放(第 1 章中的“等待:停放和条件变量”)来实现这一点,但这对于一个小示例来说很快就会变得太复杂。 Rust 标准库通过 std::sync::Once
和 std::sync::OnceLock
提供了这个功能,所以通常不需要自己实现这些。
获取和修改操作
现在我们已经看到了基本的 load
和 store
操作的一些用例,让我们继续进行更有趣的操作:获取和修改(fetch-and-modify)操作。这些操作修改原子变量,但也加载(获取)原始值,作为单个原子操作。
最常用的是 fetch_add
和 fetch_sub
,分别执行加法和减法。其他一些可用的操作是用于按位操作的 fetch_or
和 fetch_and
,以及可用于保持运行最大值或最小值的 fetch_max
和 fetch_min
。
它们的函数签名如下,以 AtomicI32
为例:
impl AtomicI32 {
pub fn fetch_add(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_sub(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_or(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_and(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_nand(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_xor(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_max(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_min(&self, v: i32, ordering: Ordering) -> i32;
pub fn swap(&self, v: i32, ordering: Ordering) -> i32; // "fetch_store"
}
一个异常值是仅存储新值的操作,而不考虑旧值。它被称为 swap
,而不是 fetch_store
。
下面是一个快速演示,展示了 fetch_add
如何在操作之前返回值:
use std::sync::atomic::AtomicI32;
let a = AtomicI32::new(100);
let b = a.fetch_add(23, Relaxed);
let c = a.load(Relaxed);
assert_eq!(b, 100);
assert_eq!(c, 123);
fetch_add
操作将 a
从 100 递增到 123,但返回给我们的是旧值 100。任何下一个操作都会看到 123 这个值。
这些操作的返回值并不总是相关的。如果您只需要将操作应用于原子值,但对值本身不感兴趣,则完全可以忽略返回值。
需要牢记的一件重要事情是 fetch_add
和 fetch_sub
实现了溢出的包装行为。增加超过最大可表示值的值将回绕,并导致最小可表示值。这与普通整数的加号和减号运算符的行为不同,后者会在调试模式下溢出时出现恐慌。
在“比较和交换操作”中,我们将看到如何使用溢出检查进行原子加法。
但首先,让我们看看这些方法的一些实际用例。
示例:来自多个线程的进度报告
在“示例:进度报告”中,我们使用 AtomicUsize
来报告后台线程的进度。例如,如果我们将工作拆分为四个线程,每个线程处理 25 个项目,我们就需要了解所有四个线程的进度。
我们可以为每个线程使用单独的 AtomicUsize
,并将它们全部加载到主线程中,然后将它们相加,但更简单的解决方案是使用单个 AtomicUsize
来跟踪所有线程中已处理项目的总数。
为了让它工作,我们不能再使用 store
方法,因为那样会覆盖其他线程的进度。相反,我们可以使用原子添加操作在每个处理过的项目之后递增计数器。
让我们更新“示例:进度报告”中的示例,将工作拆分为四个线程:
fn main() {
let num_done = &AtomicUsize::new(0);
thread::scope(|s| {
// Four background threads to process all 100 items, 25 each.
for t in 0..4 {
s.spawn(move || {
for i in 0..25 {
process_item(t * 25 + i); // Assuming this takes some time.
num_done.fetch_add(1, Relaxed);
}
});
}
// The main thread shows status updates, every second.
loop {
let n = num_done.load(Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
有几件事发生了变化。最重要的是,我们现在生成四个后台线程而不是一个,并使用 fetch_add
而不是 store
来修改 num_done
原子变量。
更巧妙地,我们现在为后台线程使用 move
闭包, num_done
现在是一个引用。这与我们使用 fetch_add
无关,而是与我们如何在一个循环中生成四个线程有关。此闭包捕获 t
以了解它是四个线程中的哪一个,从而知道是从项目 0、25、50 还是 75 开始。如果没有 move
关键字,闭包将尝试通过引用捕获 t
。这是不允许的,因为它只在循环中短暂存在。
作为 move
闭包,它移动(或复制)它的捕获而不是借用它们,给它一个 t
的副本。因为它也捕获了 num_done
,所以我们将该变量更改为引用,因为我们仍然想借用相同的 AtomicUsize
。请注意,原子类型没有实现 Copy
特性,因此如果我们试图将一个原子类型移动到多个线程中,我们会得到一个错误。
撇开闭包捕捉微妙之处不谈,这里使用 fetch_add
的更改非常简单。我们不知道线程将以何种顺序递增 num_done
,但由于加法是原子的,因此我们不必担心任何事情,并且可以确定当所有线程都完成时它会正好是 100。
示例:统计
继续这个通过原子报告其他线程正在做什么的概念,让我们扩展我们的示例,以收集和报告一些关于处理一个项目所花费的时间的统计数据。
在 num_done
旁边,我们添加了两个原子变量, total_time
和 max_time
,以跟踪处理项目所花费的时间。我们将使用这些来报告平均和峰值处理时间。
fn main() {
let num_done = &AtomicUsize::new(0);
let total_time = &AtomicU64::new(0);
let max_time = &AtomicU64::new(0);
thread::scope(|s| {
// 四个后台线程处理 100 个项目,每个 25 个。
for t in 0..4 {
s.spawn(move || {
for i in 0..25 {
let start = Instant::now();
process_item(t * 25 + i); // 假设这需要一些时间。
let time_taken = start.elapsed().as_micros() as u64;
num_done.fetch_add(1, Relaxed);
total_time.fetch_add(time_taken, Relaxed);
max_time.fetch_max(time_taken, Relaxed);
}
});
}
// 主线程每秒显示一次状态更新。
loop {
let total_time = Duration::from_micros(total_time.load(Relaxed));
let max_time = Duration::from_micros(max_time.load(Relaxed));
let n = num_done.load(Relaxed);
if n == 100 { break; }
if n == 0 {
println!("Working.. nothing done yet.");
} else {
println!(
"Working.. {n}/100 done, {:?} average, {:?} peak",
total_time / n as u32,
max_time,
);
}
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
后台线程现在使用 Instant::now()
和 Instant::elapsed()
来衡量它们在 process_item()
中花费的时间。原子添加操作用于将微秒数添加到 total_time
,原子最大操作用于跟踪 max_time
中的最高测量值。
主线程将总时间除以处理的项目数以获得平均处理时间,然后将其与 max_time
的峰值时间一起报告。
由于三个原子变量是分别更新的,因此主线程可能会在线程递增 num_done
之后但在更新 total_time
之前加载值,从而导致低估平均值。更微妙的是,因为 Relaxed
内存顺序不保证从另一个线程看到的操作的相对顺序,它甚至可能短暂地看到 total_time
的新更新值,同时仍然看到 num_done
的旧值,导致高估平均值。
在我们的示例中,这两者都不是大问题。可能发生的最坏情况是向用户简要报告不准确的平均值。
如果我们想避免这种情况,我们可以将三个统计信息放在一个 Mutex
中。然后我们会在更新三个数字时短暂锁定互斥锁,这三个数字不再必须是原子的。这有效地将三个更新变成了一个原子操作,代价是锁定和解锁互斥锁,并可能暂时阻塞线程。
示例:ID 分配
让我们继续讨论一个实际需要来自 fetch_add
的返回值的用例。
假设我们需要一些函数, allocate_new_id()
,每次调用它时都会给出一个新的唯一数字。我们可能会使用这些数字来识别我们程序中的任务或其他事物;需要通过可以轻松存储并在线程之间传递的小东西(例如整数)来唯一标识的东西。
使用 fetch_add
实现这个函数被证明是微不足道的:
use std::sync::atomic::AtomicU32;
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
NEXT_ID.fetch_add(1, Relaxed)
}
我们只需跟踪下一个要给出的数字,并在每次加载它时递增它。第一个调用者将得到 0,第二个调用者将得到 1,依此类推。
这里唯一的问题是溢出时的包装行为。第 4,294,967,296 次调用将溢出 32 位整数,这样下一次调用将再次返回 0。
这是否是一个问题取决于用例:它经常被调用的可能性有多大,如果数字不是唯一的,最坏的情况是什么?虽然这看起来是一个巨大的数字,但现代计算机可以在几秒钟内轻松地多次执行我们的函数。如果内存安全依赖于这些数字的唯一性,那么我们上面的实现是不可接受的。
为了解决这个问题,我们可以尝试让函数在被调用太多次时出现恐慌,如下所示:
// 这个版本有问题。
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
let id = NEXT_ID.fetch_add(1, Relaxed);
assert!(id < 1000, "too many IDs!");
id
}
现在, assert
语句将在调用一千次后出现恐慌。然而,这发生在原子添加操作已经发生之后,这意味着当我们恐慌时 NEXT_ID
已经递增到 1001。如果另一个线程随后调用该函数,它会在 panic
之前将其递增到 1002,依此类推。尽管可能需要更长的时间,但在 4,294,966,296 次恐慌之后, NEXT_ID
再次溢出为零时,我们会遇到同样的问题。
这个问题有三种常见的解决方案。第一个是不要恐慌,而是在溢出时完全中止进程。 std::process::abort
函数将中止整个进程,排除任何继续调用我们函数的可能性。虽然中止进程可能需要一小会儿,在此期间该函数仍可被其他线程调用,但在程序真正中止之前发生数十亿次的可能性微乎其微。
事实上,这就是标准库中 Arc::clone()
中的溢出检查是如何实现的,以防您以某种方式设法克隆它 isize::MAX
次。这在 64 位计算机上需要数百年时间,但如果 isize 只有 32 位,则可以在几秒钟内实现。
处理溢出的第二种方法是在恐慌之前使用 fetch_sub
再次递减计数器,如下所示:
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
let id = NEXT_ID.fetch_add(1, Relaxed);
if id >= 1000 {
NEXT_ID.fetch_sub(1, Relaxed);
panic!("too many IDs!");
}
id
}
当多个线程同时执行此函数时,计数器仍有可能非常短暂地增加到 1000 以上,但它受活动线程数的限制。假设永远不会同时有数十亿个活动线程是合理的,尤其是不会在 fetch_add
和 fetch_sub
之间的短暂时刻同时执行相同的功能。
这就是在标准库的 thread::scope
实现中针对正在运行的线程数处理溢出的方式。
第三种处理溢出的方法可以说是唯一真正正确的方法,因为如果加法溢出,它根本不会发生。然而,我们不能用我们目前看到的原子操作来实现它。为此,我们需要比较和交换(compare-and-exchange)操作,我们将在接下来探讨。
比较和交换操作
最先进和灵活的原子操作是比较和交换操作。此操作检查原子值是否等于给定值,只有在这种情况下,它才会用新值替换它,所有操作都以原子方式进行。它会返回之前的值并告诉我们它是否替换了它。
它的签名比我们目前看到的要复杂一些。以 AtomicI32
为例,它看起来是这样的:
impl AtomicI32 {
pub fn compare_exchange(
&self,
expected: i32,
new: i32,
success_order: Ordering,
failure_order: Ordering
) -> Result<i32, i32>;
}
暂时忽略内存顺序,它与以下实现基本相同,除了它都是作为单个不可分割的原子操作发生的:
impl AtomicI32 {
pub fn compare_exchange(&self, expected: i32, new: i32) -> Result<i32, i32> {
// 实际上,加载、比较和存储,
// 所有这些都是作为单个原子操作发生的。
let v = self.load();
if v == expected {
// 值符合预期。
// 替换它并报告成功。
self.store(new);
Ok(v)
} else {
// 该值不符合预期。
// 保持不变并报告失败。
Err(v)
}
}
}
使用它,我们可以从原子变量加载一个值,执行我们喜欢的任何计算,然后如果原子变量在此期间没有改变,则只存储新计算的值。如果我们把它放在一个循环中以在它确实发生变化时重试,我们可以使用它来实现所有其他原子操作,使它成为最通用的操作。
为了演示,让我们在不使用 fetch_add
的情况下将 AtomicU32
递增 1,看看在实践中如何使用 compare_exchange
:
fn increment(a: &AtomicU32) {
let mut current = a.load(Relaxed); // 1
loop {
let new = current + 1; // 2
match a.compare_exchange(current, new, Relaxed, Relaxed) { // 3
Ok(_) => return, // 4
Err(v) => current = v, // 5
}
}
}
(1) 首先,我们加载 a
的当前值。
(2) 我们计算要存储在 a
中的新值,而不考虑其他线程对 a
的潜在并发修改。
(3) 我们使用 compare_exchange
来更新 a
的值,但前提是它的值仍然是我们之前加载的值。
(4) 如果 a
确实和以前一样,它现在被我们的新值取代,我们就完成了。
(5) 如果 a
与之前不同,那么在我们加载它后的短时间内,另一个线程一定已经更改了它。 compare_exchange
操作为我们提供了 a
所具有的更改值,我们将再次尝试使用该值。加载和更新之间的短暂时间是如此之短,以至于它不太可能循环超过几次迭代。
注意
如果原子变量从某个值 A
更改为 B
,然后在 load
操作之后返回到 A
,但在 compare_exchange
操作之前,它仍然会成功,即使原子变量已更改(并改回)同时。在许多情况下,就像我们的 increment
示例一样,这不是问题。然而,对于某些算法,通常涉及原子指针,这可能是一个问题。这被称为 ABA
问题。
在 compare_exchange
旁边,有一个名为 compare_exchange_weak
的类似方法。不同之处在于,即使原子值与预期值匹配,弱版本有时仍可能保留值不变并返回 Err
。在某些平台上,这种方法可以更有效地实现,并且在伪造的比较和交换失败的后果微不足道的情况下应该是首选,例如在我们上面的 increment
函数中。在第 7 章中,我们将深入底层细节,找出为什么弱版本可以更高效。
示例:无溢出的 ID 分配
现在,回到“示例:ID 分配”中 allocate_new_id() 中的溢出问题。
要停止递增 NEXT_ID
超过某个限制以防止溢出,我们可以使用 compare_exchange
来实现具有上限的原子加法。使用这个想法,让我们制作一个始终正确处理溢出的 allocate_new_id
版本,即使在几乎不可能的情况下也是如此:
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
let mut id = NEXT_ID.load(Relaxed);
loop {
assert!(id < 1000, "too many IDs!");
match NEXT_ID.compare_exchange_weak(id, id + 1, Relaxed, Relaxed) {
Ok(_) => return id,
Err(v) => id = v,
}
}
}
现在我们在修改 NEXT_ID
之前检查并恐慌,保证它永远不会增加到超过 1000,从而不可能溢出。如果需要,我们现在可以将上限从 1000 提高到 u32::MAX
,而不必担心它可能会增加到超出限制的边缘情况。
获取更新(Fetch-Update)
原子类型有一个名为 fetch_update
的便捷方法,用于比较和交换循环模式。它相当于 load 操作后跟一个重复计算的循环和 compare_exchange_weak
,就像我们上面所做的一样。
使用它,我们可以用一行代码实现我们的 allocate_new_id
函数:
NEXT_ID.fetch_update(Relaxed, Relaxed,
|n| n.checked_add(1)).expect("too many IDs!")
查看该方法的文档以了解详细信息。
我们不会在本书中使用 fetch_update
方法,因此我们可以专注于单个原子操作。
示例:惰性一次性初始化
在“示例:惰性初始化”中,我们查看了一个常量值的惰性初始化示例。我们创建了一个函数,它在第一次调用时延迟初始化一个值,但在以后的调用中重用它。当多个线程在第一次调用时并发运行该函数时,可能会有多个线程执行初始化,它们将以不可预测的顺序覆盖彼此的结果。
这适用于我们希望保持不变的值,或者当我们不关心值的变化时。然而,也有这样的值每次都被初始化为不同值的用例,即使我们需要在程序的单次运行中每次调用函数都返回相同的值。
例如,假设一个函数 get_key()
返回一个随机生成的密钥,该密钥在每次程序运行时只生成一次。它可能是用于与程序通信的加密密钥,每次运行程序时都需要唯一,但在进程中保持不变。
这意味着我们不能在生成密钥后简单地使用 store
操作,因为这可能会覆盖另一个线程刚才生成的密钥,从而导致两个线程使用不同的密钥。相反,我们可以使用 compare_exchange
来确保我们只在没有其他线程已经这样做的情况下存储密钥,否则丢弃我们的密钥并使用存储的密钥。
这是这个想法的实现:
fn get_key() -> u64 {
static KEY: AtomicU64 = AtomicU64::new(0);
let key = KEY.load(Relaxed);
if key == 0 {
let new_key = generate_random_key(); // 1
match KEY.compare_exchange(0, new_key, Relaxed, Relaxed) { // 2
Ok(_) => new_key, // 3
Err(k) => k, // 4
}
} else {
key
}
}
(1) 如果 KEY
尚未初始化,我们只会生成一个新密钥。
(2) 我们用新生成的密钥替换 KEY
,但前提是它仍然为0。
(3) 如果我们用0交换新密钥,我们将返回新生成的密钥。 get_key()
的新调用将返回现在存储在 KEY
中的相同新密钥。
(4) 如果我们输给了另一个在我们之前初始化了 KEY
的线程,我们就会忘记我们新生成的密钥,而是使用来自 KEY
的密钥。
这是 compare_exchange
比其弱变体更合适的情况的一个很好的例子。我们不会在循环中运行我们的比较和交换操作,如果操作虚假地失败,我们也不想返回零。
如“示例:惰性初始化”中所述,如果 generate_random_key()
花费大量时间,则在初始化期间阻塞线程可能更有意义,以避免可能花费时间生成不会使用的密钥。 Rust 标准库通过 std::sync::Once
和 std::sync::OnceLock
提供了这样的功能。
总结
原子操作是不可分割的;他们要么已经完全完成,要么尚未发生。
Rust 中的原子操作是通过
std::sync::atomic
中的原子类型完成的,例如AtomicI32
。并非所有原子类型都适用于所有平台。
当涉及多个变量时,原子操作的相对顺序很棘手。第 3 章中有更多内容。
简单的加载和存储非常适合非常基本的线程间通信,例如停止标志和状态报告。
惰性初始化可以作为一种竞争来完成,而不会导致数据竞争。
获取和修改操作允许进行一小组基本的原子修改,这在多个线程修改同一个原子变量时特别有用。
原子加法和减法在溢出时默默地环绕。
比较和交换操作是最灵活和通用的,并且是进行任何其他原子操作的构建块。
弱的比较和交换操作可能会稍微更有效率。