第 1 章 Rust 并发基础

来自 《Rust Atomics and Locks》 的翻译文章,英文原文https://marabos.nl/atomics/basics.html在新窗口打开

早在多核处理器普及之前,操作系统就允许一台计算机同时运行多个程序。这是通过在进程之间快速切换来实现的,允许每个进程一个接一个地重复地取得一点点进展。如今,我们几乎所有的电脑,甚至我们的手机和手表都都配备了多核处理器,可以真正实现多个进程的并行执行。

操作系统尽可能地将进程彼此隔离,允许程序在完全不知道其他进程在做什么的情况下做自己的事情。例如,如果不事先询问操作系统内核,一个进程通常不能访问另一个进程的内存,或以任何方式与其通信。

但是,一个程序可以产生额外的执行线程,作为同一个进程的一部分。同一进程内的线程并不是相互隔离的。线程共享内存并可以通过该内存相互交互。

本章将解释线程是如何在 Rust 中产生的,以及所有关于它们的基本概念,比如如何在多个线程之间安全地共享数据。本章解释的概念是本书其余部分的基础。

提示

如果你已经熟悉Rust的这些部分,请随时跳过。但是,在你继续下一章之前,确保你对线程(thread)、内部可变性(interior mutability)、SendSync 有很好的理解,并且知道互斥量(mutex)、条件变量(condition variable)和线程停放(thread parking)是什么。

Rust中的线程

每个程序都从一个线程开始:主线程。该线程将执行您的 main 函数,必要时可以用来生成更多的线程。

在Rust中,使用标准库中的 std::thread::spawn 函数来生成新线程。它只有一个参数:新线程将要执行的函数。一旦这个函数返回,线程就会停止。

让我们来看一个例子:

use std::thread;

fn main() {
    thread::spawn(f);
    thread::spawn(f);

    println!("Hello from the main thread.");
}

fn f() {
    println!("Hello from another thread!");

    let id = thread::current().id();
    println!("This is my thread id: {id:?}");
}

我们生成两个线程,它们都将执行 f 作为它们的主要功能。这两个线程都会打印一条消息并显示它们的 线程 ID,而主线程也会打印自己的消息。

Thread ID

Rust标准库为每个线程分配了一个唯一的标识符。这个标识符可以通过Thread::id()访问,其类型为ThreadId。除了复制它并检查是否相等之外,你对ThreadId能做的并不多。不能保证这些ID是连续分配的,只能保证它们对每个线程都是不同的。

如果多次运行上面的示例程序,您可能会注意到每次运行的输出都不同。这是在我的机器上得到的输出:

Hello from the main thread.
Hello from another thread!
This is my thread id:

令人惊讶的是,部分输出似乎丢失了。

这里发生的情况是,主线程在新生成的线程完成执行其函数之前就完成了 main 函数的执行。

main 返回将退出整个程序,即使其他线程仍在运行。

在这个特殊的例子中,在程序被主线程关闭之前,其中一个新产生的线程只有足够的时间来完成第二条信息的一半。

如果我们想确保线程在我们从 main 返回之前完成,我们可以通过 join 来等待它们。为此,我们必须使用 spawn 函数返回的 JoinHandle

fn main() {
    let t1 = thread::spawn(f);
    let t2 = thread::spawn(f);

    println!("Hello from the main thread.");

    t1.join().unwrap();
    t2.join().unwrap();
}

.join() 方法会等待线程执行完毕,并返回一个 std::thread::Result。如果该线程因为恐慌而没有成功完成其功能,这将包含恐慌信息。我们可以尝试处理这种情况,或者直接调用 .unwrap() 进行惊慌处理。

运行我们程序的这个版本将不再导致截断输出:

Hello from the main thread.
Hello from another thread!
This is my thread id: ThreadId(3)
Hello from another thread!
This is my thread id: ThreadId(2)

在不同的运行过程中,唯一仍然改变的是信息的打印顺序:

Hello from the main thread.
Hello from another thread!
Hello from another thread!
This is my thread id: ThreadId(2)
This is my thread id: ThreadId(3)

输出锁定(Output Locking)

println 宏使用 std::io::Stdout::lock() 来确保其输出不会被中断。 println!() 表达式将等待任何并发运行的表达式完成,然后再写入任何输出。如果不是这种情况,我们可以获得更多的交错输出,例如:

Hello fromHello from another thread! another This is my threthreadHello fromthread id: ThreadId! ( the main thread. 2)This is my thread id: ThreadId(3)

不是像上面的示例那样将函数名称传递给 std::thread::spawn ,而是将闭包传递给它,这更为常见。这使我们能够捕获要移入新线程的值:

let numbers = vec![1, 2, 3];

thread::spawn(move || {
    for n in &numbers {
        println!("{n}");
    }
}).join().unwrap();

在这里, numbers 的所有权被转移到新生成的线程,因为我们使用了 move 闭包。如果我们没有使用 move 关键字,闭包将通过引用捕获 numbers 。这将导致编译器错误,因为新线程可能比该变量存在时间更长。

由于线程可能会一直运行到程序执行结束, spawn 函数的参数类型有一个 'static 生命周期。换句话说,它只接受可能永远保留的函数。通过引用捕获局部变量的闭包可能不会永远保留,因为当局部变量不复存在时,该引用将变得无效。

从线程中取回一个值是通过从闭包中返回它来完成的。这个返回值可以从 join 方法返回的 Result 中获取:

let numbers = Vec::from_iter(0..=1000);

let t = thread::spawn(move || {
    let len = numbers.len();
    let sum = numbers.iter().sum::<usize>();
    sum / len  // 1
});

let average = t.join().unwrap(); // 2

println!("average: {average}");





 


 


这里,线程的闭包返回值 (1),通过 join 方法 (2) 传回主线程。

如果 numbers 为空,则线程在尝试除以零 (1) 时会出现恐慌,而 join 会返回该恐慌消息,导致主线程也因为 unwrap 而恐慌 (2).

Thread Builder

std::thread::spawn 函数实际上只是 std::thread::Builder::new().spawn().unwrap() 的一个方便的简写。

std::thread::Builder 允许您在生成新线程之前为它添加一些设置。您可以使用它来配置新线程的堆栈大小并为新线程命名。线程的名称可通过 std::thread::current().name() 获得,将在恐慌消息中使用,并将在大多数平台的监控和调试工具中可见。

此外,Builder 的 spawn 函数返回一个 std::io::Result,允许您处理生成新线程失败的情况。如果操作系统内存不足,或者如果您的程序应用了资源限制,则可能会发生这种情况。如果 std::thread::spawn 函数无法生成新线程,它就会发生恐慌。

作用域线程(Scoped Threads)

如果我们确定生成的线程肯定不会超过某个作用域,那么该线程可以安全地借用那些不会永远存在的东西,例如局部变量,只要它们超过该作用域。

Rust 标准库提供了 std::thread::scope 函数来生成这样的作用域线程。它允许我们生成不能超过我们传递给该函数的闭包作用域的线程,从而可以安全地借用局部变量。

它是如何工作的,最好用一个例子来说明:

let numbers = vec![1, 2, 3];

thread::scope(|s| { //1
    s.spawn(|| { //2
        println!("length: {}", numbers.len());
    });
    s.spawn(|| { //2
        for n in &numbers {
            println!("{n}");
        }
    });
}); //3


 
 


 




 

(1) 我们用闭包调用 std::thread::scope 函数。我们的闭包被直接执行,并得到一个代表范围的参数 s

(2) 我们使用 s 来生成线程。闭包可以借用局部变量,如 numbers

(3) 当作用域结束时,所有尚未join的线程都会自动join

此模式保证作用域内生成的线程都不会超过作用域。因此,这个作用域 spawn 方法在其参数类型上没有绑定 'static ,允许我们引用任何超出作用域的东西,例如 numbers

在上面的示例中,两个新线程都并发访问 numbers 。这很好,因为它们都没有修改它。如果我们把第一个线程改为修改 numbers ,如下所示,编译器就不允许我们生成另一个也使用 numbers 的线程:

let mut numbers = vec![1, 2, 3];

thread::scope(|s| {
    s.spawn(|| {
        numbers.push(1);
    });
    s.spawn(|| {
        numbers.push(2); // Error!
    });
});

确切的错误消息取决于 Rust 编译器的版本,因为它经常被改进以产生更好的提示,但尝试编译上面的代码将导致类似这样的结果:

error[E0499]: cannot borrow `numbers` as mutable more than once at a time
 --> example.rs:7:13
  |
4 |     s.spawn(|| {
  |             -- first mutable borrow occurs here
5 |         numbers.push(1);
  |         ------- first borrow occurs due to use of `numbers` in closure
  |
7 |     s.spawn(|| {
  |             ^^ second mutable borrow occurs here
8 |         numbers.push(2);
  |         ------- second borrow occurs due to use of `numbers` in closure

泄露事件(The Leakpocalypse)

在 Rust 1.0 之前,标准库有一个名为 std::thread::scoped 的函数,它会直接生成一个线程,就像 std::thread::spawn 一样。它允许非静态捕获,因为它返回的不是 JoinHandle,而是一个 JoinGuard,它在删除时join了线程。任何借用的数据只需要超过这个 JoinGuard 的寿命。这似乎是安全的,只要 JoinGuard 在某个时候被放弃。

就在 Rust 1.0 发布之前,人们慢慢意识到无法保证某些东西会被丢弃。有很多方法,例如创建引用计数节点的循环,可以在不丢弃的情况下忘记或泄漏某些东西。

最终,在一些人所说的 "泄露事件" 中,人们得出了这样的结论:(安全)接口的设计不能依赖于这样的假设:对象在其生命周期结束时总是会被丢弃。泄露一个对象可能会合理地导致泄露更多的对象(例如,泄露一个Vec也会泄露其元素),但它可能不会导致未定义的行为。因为这个结论,std::thread::scoped 不再被认为是安全的,并被从标准库中删除。此外,std::mem::forget 也从一个不安全的函数升级为安全函数,以强调遗忘(或泄露)始终是可能的。

只是在很久以后,在 Rust 1.63 中,添加了一个新的 std::thread::scope 函数,它采用了新的设计,不依赖Drop 来保证正确性。

共享所有权和引用计数(Shared Ownership and Reference Counting)

到目前为止,我们已经研究了使用 move 闭包(Rust 中的线程)将值的所有权转移到线程, 并从寿命更长的父线程(作用域线程)借用数据。当在两个线程之间共享数据时,两个线程都不能保证比另一个线程长寿,那么它们都不能成为该数据的所有者。它们之间共享的任何数据都需要与最长存活线程一样长。

静态(Statics)

有几种方法可以创建不属于单个线程的东西。最简单的一个是 static 值,它由整个程序“拥有”,而不是一个单独的线程。在下面的示例中,两个线程都可以访问 X ,但都不拥有它:

static X: [i32; 3] = [1, 2, 3];

thread::spawn(|| dbg!(&X));
thread::spawn(|| dbg!(&X));

static 项有一个常量初始值设定项,永远不会被丢弃,甚至在程序的主函数启动之前就已经存在。每个线程都可以借用它,因为它保证永远存在。

泄露(Leaking)

共享所有权的另一种方法是泄露分配。使用 Box::leak,可以释放 Box 的所有权,承诺永远不会丢弃它。从那时起,Box 将永远存在,没有所有者,只要程序运行,任何线程都可以借用它。

let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3]));

thread::spawn(move || dbg!(x));
thread::spawn(move || dbg!(x));

move 闭包看起来好像我们正在将所有权转移到线程中,但仔细观察 x 的类型就会发现我们只是为线程提供了对数据的引用。

提示

引用是 Copy ,这意味着当你 move 它们时,原来的仍然存在,就像整数或布尔值一样。

请注意, 'static 生命周期并不意味着该值从程序开始就一直存在,而只是它一直存在到程序结束。

泄漏 Box 的缺点是我们正在泄漏内存。我们分配了一些东西,但从不丢弃和释放它。如果只发生有限次数,这可能没问题。但是如果我们一直这样做下去,程序就会慢慢耗尽内存。

引用计数

为了确保共享数据被删除和释放,我们不能完全放弃它的所有权。相反,我们可以共享所有权。通过跟踪所有者的数量,我们可以确保只有在没有所有者时才删除该值。

Rust 标准库通过 std::rc::Rc 类型(“reference counted”的缩写)提供此功能。它与 Box 非常相似,除了克隆它不会分配任何新的东西,而是递增存储在包含值旁边的计数器。原始的和克隆的 Rc 将引用同一个分配;他们共享所有权。

use std::rc::Rc;

let a = Rc::new([1, 2, 3]);
let b = a.clone();

assert_eq!(a.as_ptr(), b.as_ptr()); // Same allocation!

删除 Rc 将使计数器递减。只有最后一个 Rc ,它会看到计数器降为零,才会丢弃和释放所包含的数据。

但是,如果我们尝试将 Rc 发送到另一个线程,我们将遇到以下编译器错误:

error[E0277]: `Rc` cannot be sent between threads safely
    |
8   |     thread::spawn(move || dbg!(b));
    |                   ^^^^^^^^^^^^^^^       

事实证明, Rc 不是线程安全的(更多信息请参见“线程安全:Send和Sync”)。如果多个线程对同一个分配有一个 Rc ,它们可能会同时尝试修改引用计数器,这会产生不可预知的结果。

相反,我们可以使用 std::sync::Arc ,它代表“原子引用计数”。它与 Rc 相同,除了它保证对引用计数器的修改是不可分割的原子操作,使其可以安全地用于多线程。 (更多内容在第 2 章。)

use std::sync::Arc;

let a = Arc::new([1, 2, 3]); // 1
let b = a.clone(); // 2

thread::spawn(move || dbg!(a)); // 3
thread::spawn(move || dbg!(b)); // 3


 
 

 
 

(1) 我们将一个数组和一个从 1 开始的引用计数器一起放入一个新的分配中。

(2) 克隆 Arc 会将引用计数增加到两个,并为我们提供第二个 Arc 到相同的分配。

(3) 两个线程都有自己的 Arc ,通过它可以访问共享数组。两者都在删除 Arc 时递减引用计数器。最后一个删除其 Arc 的线程将看到计数器降为零,并且将是删除和释放数组的线程。

命名克隆(Naming Clones)

必须给 Arc 的每个克隆一个不同的名称,会很快使代码变得非常混乱且难以理解。虽然 Arc 的每个克隆都是一个单独的对象,但每个克隆都代表相同的共享值,通过不同的命名并不能很好地反映这一点。

Rust 允许(并鼓励)您通过定义一个具有相同名称的新变量来隐藏变量。如果您在同一范围内执行此操作,则无法再命名原始变量。但是通过打开一个新的作用域,像 let a = a.clone(); 这样的语句可用于在该范围内重用相同的名称,同时在范围外保留原始变量。

通过将闭包包装在一个新的作用域中(使用 {}),我们可以在将变量移入闭包之前克隆变量,而不必重命名它们。

Arc 的克隆存在于同一范围内。每个线程都有自己的具有不同名称的克隆。

let a = Arc::new([1, 2, 3]);

let b = a.clone();

thread::spawn(move || {
    dbg!(b);
});

dbg!(a);

Arc 的克隆存活在不同的范围内。我们可以在每个线程中使用相同的名称。

let a = Arc::new([1, 2, 3]);

thread::spawn({
    let a = a.clone();
    move || {
        dbg!(a);
    }
});

dbg!(a);

因为所有权是共享的,所以引用计数指针( Rc<T>Arc<T> )与共享引用( &T )具有相同的限制。它们不会为您提供对其包含的值的可变访问,因为该值可能同时被其他代码借用。

例如,如果我们试图对 Arc<[i32]> 中的整数切片进行排序,编译器会阻止我们这样做,告诉我们不允许改变数据:

error[E0596]: cannot borrow data in an `Arc` as mutable
  |
6 |     a.sort();
  |     ^^^^^^^^

借用和数据竞争

在 Rust 中,可以通过两种方式借用值:

  • 不可变借用

    & 借用的东西会给出一个不可变的引用。这样的引用是可以复制的。对它所引用的数据的访问是在这种引用的所有副本之间共享的。顾名思义,编译器通常不允许您通过此类引用改变某些东西,因为这可能会影响其他正在借用相同数据的代码。

  • 可变借用

    &mut 借用的东西会给出一个可变的引用。可变借用保证它是该数据的唯一有效借用。这确保改变数据不会改变其他代码当前正在查看的任何内容。

这两个概念一起完全防止了数据竞争:一个线程正在改变数据而另一个线程正在并发访问它的情况。数据竞争通常是未定义的行为,这意味着编译器不需要考虑这些情况。它会简单地假设它们不会发生。

为了澄清这一点,让我们来看一个例子,在这个例子中,编译器可以利用借用规则做出一个有用的假设:

fn f(a: &i32, b: &mut i32) {
    let before = *a;
    *b += 1;
    let after = *a;
    if before != after {
        x(); // never happens
    }
}

在这里,我们获得了一个对整数的不可变引用,并在递增 b 引用的整数之前和之后存储该整数的值。编译器可以自由假设关于借用和数据竞争的基本规则得到维护,这意味着 b 不可能引用与 a 相同的整数。事实上,只要 a 正在借用它,整个程序中的任何内容都不能可变地借用 a 引用的整数。因此,编译器可以很容易地断定 *a 不会改变, if 语句的条件永远不会为真,可以从程序中完全去掉对 x 的调用作为优化。

除了使用 unsafe 块来禁用编译器的一些安全检查之外,不可能编写破坏编译器假设的 Rust 程序。

未定义行为(Undefined Behavior)

C、C++ 和 Rust 等语言有一组规则,需要遵循这些规则以避免出现所谓的未定义行为。例如,Rust 的规则之一是对任何对象的可变引用永远不会超过一个。

在 Rust 中,只有在使用不安全代码时才有可能违反这些规则。 “不安全”并不意味着代码不正确或永远不能安全使用,而是编译器没有为您验证代码是否安全。如果代码确实违反了这些规则,则称为不健全(unsound)

允许编译器假设这些规则永远不会被破坏,而无需检查。当被破坏时,这会导致称为未定义行为的事情,我们需要不惜一切代价避免这种行为。如果我们允许编译器做出实际上不正确的假设,它很容易导致对代码的不同部分做出更多错误的结论,从而影响整个程序。

作为一个具体的例子,让我们看一个在切片上使用 get_unchecked 方法的小片段:

let a = [123, 456, 789];
let b = unsafe { a.get_unchecked(index) };

get_unchecked 方法在给定索引的情况下为我们提供切片的元素,就像 a[index] 一样,但允许编译器假定索引始终在边界内,无需任何检查。

这意味着在此代码片段中,由于 a 的长度为 3,编译器可能会假定 index 小于 3。我们有责任确保其假设成立。

如果我们打破这个假设,例如,如果我们在索引等于 3 的情况下运行它,任何事情都可能发生。它可能会导致从内存中读取 a 之后字节中存储的任何内容。它可能会导致程序崩溃。它可能最终会执行程序中一些完全不相关的部分。它会造成各种破坏。

也许令人惊讶的是,未定义的行为甚至可以“回到过去(travel back in time)”,从而导致在它之前的代码中出现问题。要理解这是如何发生的,想象一下我们在前面的代码片段之前有一个 match 语句,如下所示:

match index {
   0 => x(),
   1 => y(),
   _ => z(index),
}

let a = [123, 456, 789];
let b = unsafe { a.get_unchecked(index) };

由于代码不安全,编译器被允许假定 index 仅是 0、1 或 2。它可能逻辑上得出结论,我们的 match 语句的最后一个分支只会匹配 2,因此 z 只会被调用作为 z(2)。该结论不仅可以用于优化匹配,还可以用于优化 z 本身。这可以包括丢弃代码中未使用的部分。

如果我们使用 index 为 3 执行它,我们的程序可能会尝试执行已经优化掉的部分,导致完全不可预测的行为,在我们到达最后一行的不安全块之前就已经出现了。就这样,未定义的行为可以在整个程序中传播,不管是向前还是向后,通常都是以非常意外的方式。

在调用任何不安全的函数时,请仔细阅读其文档,并确保你完全理解其安全要求:作为调用者,你需要坚持的假设,以避免未定义行为。

内部可变性

上一节介绍的借用规则很简单,但可能有很大的局限性--特别是当涉及到多个线程时。遵循这些规则使得线程之间的通信非常有限,几乎不可能,因为多个线程都可以访问的数据都不能被改变。

幸运的是,有一个逃生口:内部可变性(interior mutability)。具有内部可变性的数据类型稍微改变了借用规则。在某些条件下,这些类型可以允许通过“不可变(immutable)”引用进行修改。

在“引用计数”中,我们已经看到了一个涉及内部可变性的微妙示例。 RcArc 都会改变一个引用计数器,即使可能有多个克隆都使用相同的引用计数器。

一旦涉及内部可变类型,将引用称为“不可变(immutable)”或“可变(mutable)”就会变得混乱和不准确,因为有些东西可以通过这两者发生变异。更准确的术语是“共享(shared)”和“独占(exclusive)”:共享引用 ( &T ) 可以复制并与他人共享,而独占引用 ( &mut T ) 保证它是 T 的唯一独占借用。对于大多数类型,共享引用不允许改变,但也有例外。由于在本书中我们将主要处理这些异常,因此我们将在本书的其余部分使用更准确的术语。

注意

请记住,内部可变性只会改变共享借用的规则,以允许在共享时发生变化。它不会改变有关独占借用的任何内容。独占借用仍然保证没有其他活跃的借用。不安全的代码会导致对某事物的多个活动独占引用总是会调用未定义的行为,而不管内部可变性如何。

让我们来看看一些具有内部可变性的类型,以及它们如何允许通过共享引用进行修改而不会导致未定义的行为。

Cell

std::cell::Cell<T> 简单地包装了 T ,但允许通过共享引用进行更改。为避免未定义的行为,它只允许您将值复制出来(如果 TCopy ),或将其整体替换为另一个值。此外,它只能在单线程内使用。

让我们看一个与上一节中的示例类似的示例,但这次使用 Cell<i32> 而不是 i32

use std::cell::Cell;

fn f(a: &Cell<i32>, b: &Cell<i32>) {
    let before = a.get();
    b.set(b.get() + 1);
    let after = a.get();
    if before != after {
        x(); // might happen
    }
}

与上次不同,现在 if 条件可能为真。因为 Cell<i32> 具有内部可变性,所以编译器不能再假定只要我们有对它的共享引用,它的值就不会改变。 ab 可能引用相同的值,这样通过 b 的变化也可能影响 a 。然而,它可能仍然假定没有其他线程正在同时访问这些单元。

Cell 上的限制并不总是很容易处理。由于它不能直接让我们借用它持有的值,我们需要移出一个值(在原处留下一些东西),修改它,然后再放回去,以改变它的内容:

fn f(v: &Cell<Vec<i32>>) {
    let mut v2 = v.take(); // 用一个空的 Vec 替换 Cell 的内容
    v2.push(1);
    v.set(v2); // 把修改后的 Vec 放回去
}

RefCell

与常规的 Cell 不同, std::cell::RefCell 允许您以较小的运行时成本借用其内容。 RefCell<T> 不仅包含 T ,而且还包含一个计数器,用于跟踪任何未完成的借用。如果你试图在它已经被可变地借用时借用它,它会发生恐慌,从而避免未定义的行为。就像 Cell 一样, RefCell 只能在单个线程中使用。

通过调用 borrowborrow_mut 来借用 RefCell 的内容:

use std::cell::RefCell;

fn f(v: &RefCell<Vec<i32>>) {
    v.borrow_mut().push(1); // 我们可以直接修改`Vec`
}

虽然 CellRefCell 可能非常有用,但当我们需要用多线程做一些事情时,它们就变得毫无用处。那么让我们继续讨论与并发相关的类型。

Mutex 和 RwLock

RwLock 或读写锁是 RefCell 的并发版本。 RwLock<T> 持有 T 并跟踪任何未完成的借用。然而,与 RefCell 不同的是,它不会对冲突的借用产生恐慌。相反,它会阻塞当前线程——使其进入睡眠状态——同时等待冲突借用消失。在其他线程处理完数据后,我们只需要耐心等待轮到我们处理数据。

借用 RwLock 的内容称为锁定。通过锁定它,我们暂时阻止了并发的冲突借用,允许我们在不引起数据竞争的情况下借用它。

Mutex 非常相似,但概念上稍微简单一些。它不像 RwLock 那样跟踪共享和独占借用的数量,它只允许独占借用。

我们将在“锁:互斥锁和读写锁”中更详细地介绍这些类型。

Atomics

原子类型表示 Cell 的并发版本,是第 2 章第 3 章的主题。与 Cell 一样,它们通过让我们把值作为一个整体复制进去,而不是让我们直接借用内容来避免未定义行为。

但是,与 Cell 不同的是,它们不能是任意大小。因此,任何 T 都没有通用的 Atomic<T> 类型,只有 AtomicU32AtomicPtr<T> 等特定的原子类型。哪些可用取决于平台,因为它们需要处理器的支持以避免数据竞争。 (我们将在第 7 章深入探讨。)

由于它们的大小非常有限,原子通常不直接包含需要在线程之间共享的信息。相反,它们经常被用作一种工具,可以在线程之间共享其他(通常是更大的)东西。当原子被用来描述其他数据时,事情会变得异常复杂。

UnsafeCell

UnsafeCell 是内部可变性的原始构建块。

UnsafeCell<T> 包装 T ,但没有任何条件或限制来避免未定义的行为。相反,它的 get() 方法只是提供一个指向它包装的值的原始指针,它只能在 unsafe 块中有意义地使用。它让用户以不会导致任何未定义行为的方式使用它。

最常见的是, UnsafeCell 不直接使用,而是包装在另一种通过有限接口提供安全性的类型中,例如 CellMutex 。所有具有内部可变性的类型——包括上面讨论的所有类型——都建立在 UnsafeCell 之上。

线程安全:Send和Sync

在本章中,我们看到了几种非线程安全的类型,它们只能在单个线程上使用,例如 RcCell 等。由于该限制是避免未定义行为所必需的,因此编译器需要了解并为您检查,因此您可以使用这些类型而无需使用 unsafe 块。

该语言使用两个特殊特征来跟踪可以跨线程安全使用的类型:

  • Send

    如果一个类型可以被发送到另一个线程,它就是 Send 。换句话说,如果该类型的值的所有权可以转移到另一个线程。例如, Arc<i32>Send ,而 Rc<i32> 不是。

  • Sync

    如果一个类型可以与另一个线程共享,则它是 Sync 。换句话说,类型 TSync 当且仅当对该类型 &T 的共享引用是 Send 时。例如, i32Sync ,而 Cell<i32> 不是。 (但是, Cell<i32>Send 。)

i32boolstr 等所有原始类型都是 SendSync

这两个特征都是自动特征,这意味着它们会根据它们的字段自动为您的类型实现。字段全为 SendSyncstruct 本身也是 SendSync

选择不使用这两种方法的方法是给你的类型添加一个不实现该特征的字段。为此,特殊的 std::marker::PhantomData<T> 类型通常会派上用场。该类型被编译器视为 T ,但它在运行时实际上并不存在。它是零大小的类型,不占用空间。

我们来看看下面的 struct

use std::marker::PhantomData;

struct X {
    handle: i32,
    _not_sync: PhantomData<Cell<()>>,
}

在此示例中,如果 handle 是它的唯一字段,则 X 将同时是 SendSync 。但是,我们添加了一个零大小的 PhantomData<Cell<()>> 字段,它被视为 Cell<()> 。由于 Cell<()> 不是 Sync ,所以 X 也不是。然而,它仍然是 Send ,因为它的所有字段都实现了 Send

原始指针( *const T*mut T )既不是 Send 也不是 Sync ,因为编译器不太了解它们代表什么。

选择加入任一特征的方式与选择任何其他特征的方式相同;使用 impl 块为您的类型实现特征:

struct X {
    p: *mut i32,
}

unsafe impl Send for X {}
unsafe impl Sync for X {}

请注意如何实现这些特征需要 unsafe 关键字,因为编译器无法检查它是否正确。这是您对编译器做出的承诺,它只需要相信它。

如果你试图将某些东西移动到另一个不是 Send 的线程中,编译器会礼貌地阻止你这样做。这是一个小例子来证明:

fn main() {
    let a = Rc::new(123);
    thread::spawn(move || { // Error!
        dbg!(a);
    });
}

在这里,我们尝试将 Rc<i32> 发送到新线程,但是 Rc<i32>Arc<i32> 不同,它没有实现 Send

如果我们尝试编译上面的示例,我们会遇到如下所示的错误:

error[E0277]: `Rc<i32>` cannot be sent between threads safely
   --> src/main.rs:3:5
    |
3   |     thread::spawn(move || {
    |     ^^^^^^^^^^^^^ `Rc<i32>` cannot be sent between threads safely
    |
    = help: within `[closure]`, the trait `Send` is not implemented for `Rc<i32>`
note: required because it's used within this closure
   --> src/main.rs:3:19
    |
3   |     thread::spawn(move || {
    |                   ^^^^^^^
note: required by a bound in `spawn`

thread::spawn 函数要求它的参数是 Send ,而一个闭包只有在它的所有捕获都是 Send 时才是。如果我们试图捕获不是 Send 的东西,我们的错误就会被发现,从而保护我们不受未定义行为的影响。

锁:互斥锁和读写锁

在线程之间共享(可变)数据的最常用工具是互斥锁(mutex),它是“mutual exclusion”的缩写。互斥锁的作用是通过暂时阻塞同时尝试访问它的其他线程,来确保线程可以独占访问某些数据。

从概念上讲,互斥锁只有两种状态:锁定和解锁。当线程锁定未锁定的互斥体时,互斥锁被标记为已锁定并且线程可以立即继续。当一个线程随后试图锁定一个已经锁定的互斥锁时,该操作将被阻塞。线程在等待互斥体解锁时进入休眠状态。解锁只能在锁定的互斥锁上进行,并且应该由锁定它的同一个线程完成。如果其他线程正在等待锁定互斥锁,解锁将导致其中一个线程被唤醒,因此它可以再次尝试锁定互斥锁并继续其过程。

使用互斥锁保护数据只是所有线程之间的协议,即它们只有在锁定互斥锁时才会访问数据。这样,任何两个线程都不能同时访问该数据并导致数据竞争。

Rust 的互斥锁

Rust 标准库通过 std::sync::Mutex<T> 提供了这个功能。它在 T 类型上是通用的,这是互斥体保护的数据类型。通过使 T 成为互斥锁的一部分,数据只能通过互斥锁访问,从而提供一个安全接口,可以保证所有线程都遵守协议。

为确保锁定的互斥锁只能由锁定它的线程解锁,它没有 unlock() 方法。相反,它的 lock() 方法返回一个称为 MutexGuard 的特殊类型。这个守卫代表我们已经锁定互斥锁的保证。它的行为类似于通过 DerefMut 特征的独占引用,使我们能够独占访问互斥锁保护的数据。解锁互斥锁是通过放弃守卫来完成的。当我们放弃守卫时,我们就放弃了访问数据的能力,守卫的 Drop 实现将解锁互斥锁。

让我们看一个例子,看看实践中的互斥锁:

use std::sync::Mutex;

fn main() {
    let n = Mutex::new(0);
    thread::scope(|s| {
        for _ in 0..10 {
            s.spawn(|| {
                let mut guard = n.lock().unwrap();
                for _ in 0..100 {
                    *guard += 1;
                }
            });
        }
    });
    assert_eq!(n.into_inner().unwrap(), 1000);
}

在这里,我们有一个 Mutex<i32> ,一个保护整数的互斥锁,我们生成 10 个线程,每个线程将整数递增 100 次。每个线程首先会锁定互斥锁以获得一个 MutexGuard ,然后使用那个 guard 来访问整数并修改它。当该变量超出范围时, guard 会立即隐式删除。

线程完成后,我们可以通过 into_inner() 安全地移除对整数的保护。 into_inner 方法获取互斥量的所有权,这保证没有其他任何东西可以再引用互斥量,从而不需要锁定。

即使增量以 1 为步长发生,观察整数的线程也只会看到 100 的倍数,因为它只能在互斥量解锁时查看整数。实际上,多亏了互斥锁,一百个增量加在一起现在是一个不可分割的——原子的——操作。

为了清楚地看到互斥锁的效果,我们可以让每个线程在解锁互斥锁之前等待一秒钟:

use std::time::Duration;

fn main() {
    let n = Mutex::new(0);
    thread::scope(|s| {
        for _ in 0..10 {
            s.spawn(|| {
                let mut guard = n.lock().unwrap();
                for _ in 0..100 {
                    *guard += 1;
                }
                thread::sleep(Duration::from_secs(1)); // New!
            });
        }
    });
    assert_eq!(n.into_inner().unwrap(), 1000);
}

现在运行该程序时,您会看到它大约需要 10 秒才能完成。每个线程只等待一秒钟,但互斥锁确保一次只有一个线程可以这样做。

如果我们在休眠一秒钟之前放弃守卫——因此解锁互斥锁——我们将看到它并行发生:

fn main() {
    let n = Mutex::new(0);
    thread::scope(|s| {
        for _ in 0..10 {
            s.spawn(|| {
                let mut guard = n.lock().unwrap();
                for _ in 0..100 {
                    *guard += 1;
                }
                drop(guard); // New: drop the guard before sleeping!
                thread::sleep(Duration::from_secs(1));
            });
        }
    });
    assert_eq!(n.into_inner().unwrap(), 1000);
}

有了这个改变,这个程序只需要大约一秒钟,因为现在 10 个线程可以同时执行他们的一秒钟睡眠。这表明保持互斥体锁定时间尽可能短的重要性。将互斥体保持锁定的时间超过必要的时间可以完全抵消并行的任何好处,有效地迫使一切以串行方式发生。

锁中毒(Lock Poisoning)

上面示例中的 unwrap() 调用与锁中毒有关。

当线程在持有锁时发生恐慌时,Rust 中的 Mutex 会被标记为中毒。发生这种情况时, Mutex 将不再被锁定,但调用它的 lock 方法将导致 Err 表明它已中毒。

这是一种防止使互斥锁保护的数据处于不一致状态的机制。在我们上面的示例中,如果一个线程在将整数递增少于 100 次后出现恐慌,则互斥锁将解锁并且整数将处于意外状态,即它不再是 100 的倍数,这可能会破坏其他线程所做的假设。在这种情况下自动将互斥锁标记为中毒会强制用户处理这种可能性。

在中毒的互斥锁上调用 lock() 仍然会锁定互斥锁。 lock() 返回的 Err 包含 MutexGuard ,允许我们在必要时纠正不一致的状态。

虽然锁中毒似乎是一种强大的机制,但实际上并不经常从潜在的不一致状态中恢复。大多数代码要么忽略中毒,要么在锁中毒时使用 unwrap() 来恐慌,有效地将恐慌传播给互斥锁的所有用户。

MutexGuard 的生命周期

虽然隐式删除守卫来解锁互斥锁很方便,但有时会导致微妙的意外。如果我们用 let 语句为守卫分配一个名称(如上面的示例所示),则可以相对简单地查看何时删除它,因为局部变量在定义它们的范围的末尾被删除。不过,如上面的示例所示,不显式删除守卫可能会导致互斥锁锁定的时间超过必要的时间。

不指定名称也可以使用守卫,有时会非常方便。由于 MutexGuard 的行为类似于对受保护数据的独占引用,我们可以直接使用它而无需先为守卫分配名称。例如,如果您有一个 Mutex<Vec<i32>> ,您可以锁定互斥锁,将一个项目压入 Vec ,然后再次解锁互斥锁,只需一条语句:

list.lock().unwrap().push(1);

在较大的表达式中生成的任何临时变量,例如 lock() 返回的守卫,都将在语句末尾删除。虽然这看起来显而易见且合理,但它会导致一个常见的陷阱,该陷阱通常涉及 matchif letwhile let 语句。这是一个遇到这个陷阱的例子:

if let Some(item) = list.lock().unwrap().pop() {
    process_item(item);
}

如果我们的意图是锁定列表,弹出一个项目,解锁列表,然后在列表解锁后处理该项目,我们在这里犯了一个微妙但重要的错误。临时守卫直到整个 if let 语句结束才会被删除,这意味着我们在处理项目时不必要地持有锁。

也许令人惊讶的是,类似的 if 语句不会发生这种情况,例如在本例中:

if list.lock().unwrap().pop() == Some(1) {
    do_something();
}

在这里,临时守卫在 if 语句的主体执行之前确实被删除了。原因是常规 if 语句的条件始终是一个普通的布尔值,不能借用任何东西。没有理由将临时对象的生命周期从条件延长到语句结束。但是,对于 if let 语句,情况可能并非如此。 例如,如果我们使用了 front() 而不是 pop() ,那么 item 就会从列表中借用,因此有必要保留守卫。由于借用检查器只是一个真正的检查,不会影响何时或以什么顺序丢弃东西,所以当我们使用 pop() 时也会发生同样的情况,即使那不是必需的。

我们可以通过将 pop 操作移动到单独的 let 语句来避免这种情况。然后 guard 被删除在该语句的末尾, if let 之前:

let item = list.lock().unwrap().pop();
if let Some(item) = item {
    process_item(item);
}

读写锁

互斥锁只涉及独占访问。 MutexGuard 将为我们提供一个受保护数据的独占引用 ( &mut T ),即使我们只想查看一下数据,一个共享引用 ( &T ) 就足够了。

读写锁是互斥锁的稍微复杂的版本,它理解独占访问和共享访问之间的区别,并且可以提供其中任何一种。它具有三种状态:未锁定、由单个写入者锁定(用于独占访问)和由任意数量的读取器锁定(用于共享访问)。它常用于经常被多个线程读取,但只是偶尔更新一次的数据。

Rust 标准库通过 std::sync::RwLock<T> 类型提供这种锁。它的工作方式类似于标准的 Mutex ,除了它的接口主要分为两部分。它不是单一的 lock() 方法,而是一个 read()write() 方法用于锁定为读取器或写入器。它带有两种守卫类型,一种用于读,一种用于写: RwLockReadGuardRwLockWriteGuard 。前者仅实现 Deref ,使其表现得像对受保护数据的共享引用,而后者还实现 DerefMut ,表现得像独占引用。

它实际上是 RefCell 的多线程版本,动态跟踪引用数量以确保遵守借用规则。

Mutex<T>RwLock<T> 都要求 TSend ,因为它们可用于将 T 发送到另一个线程。 RwLock<T> 还要求 T 也实现 Sync ,因为它允许多个线程持有对受保护数据的共享引用 ( &T )。 (严格来说,您可以为不满足这些要求的 T 创建一个锁,但您不能在线程之间共享它,因为锁本身不会实现 Sync 。)

Rust 标准库只提供了一种通用的 RwLock 类型,但它的实现取决于操作系统。读写锁实现之间存在许多微妙的差异。大多数实现会在有写入者等待时阻止新的读取者,即使锁已经被读取锁定。这样做是为了防止写入者饥饿,在这种情况下,许多读者共同阻止锁解锁,永远不允许任何写入者更新数据。

其他语言中的互斥锁

Rust 的标准 MutexRwLock 类型看起来与您在其他语言(如 C 或 C++)中找到的有点不同。

最大的区别是 Rust 的 Mutex<T> 包含它正在保护的数据。例如,在 C++ 中, std::mutex 不包含它保护的数据,它甚至不知道它在保护什么。这意味着用户有责任记住哪些数据受保护以及由哪个互斥锁保护,并确保每次访问“受保护”数据时锁定正确的互斥锁。在阅读其他语言中涉及互斥锁的代码时,或者在与不熟悉 Rust 的程序员交流时,记住这一点很有用。 Rust 程序员可能会谈论“互斥锁中的数据”,或者说“将其包装在互斥锁中”之类的话,这可能会让那些只熟悉其他语言中的互斥锁的人感到困惑。

如果你真的需要一个不包含任何东西的独立互斥锁,例如保护一些外部硬件,你可以使用 Mutex<()> 。但即使在这种情况下,您最好定义一个(可能为零大小的)类型来与该硬件接口并将其包装在 Mutex 中。这样,在与硬件交互之前,您仍然被迫锁定互斥锁。

等待: 停放和条件变量

当数据被多个线程改变时,在很多情况下,它们需要等待某个事件,等待数据的某些条件变为真。例如,如果我们有一个保护 Vec 的互斥锁,我们可能希望等到它包含任何东西。

虽然互斥锁确实允许线程等待直到它解锁,但它不提供等待任何其他条件的功能。如果我们只有一个互斥锁,我们就必须一直锁定互斥锁以反复检查 Vec 中是否还有任何东西。

线程停放(Thread Praking)

等待来自另一个线程的通知的一种方法称为线程停放(Thread parking)。线程可以自行停放(park),使其进入睡眠状态,从而停止消耗任何 CPU 周期。然后另一个线程可以取消停放的线程,将其从睡眠中唤醒。

线程停放可通过 std::thread::park() 函数获得。对于 unparking,您可以在表示要 unpark 的线程的 Thread 对象上调用 unpark() 方法。这样的对象可以从 spawn 返回的join handle中获取,也可以通过 std::thread::current() 由线程自己获取。

让我们深入研究一个使用互斥锁在两个线程之间共享队列的示例。在下面的示例中,一个新生成的线程将使用队列中的项目,而主线程将每秒向队列中插入一个新项目。线程停放用于在队列为空时让消费线程等待。

use std::collections::VecDeque;

fn main() {
    let queue = Mutex::new(VecDeque::new());

    thread::scope(|s| {
        // Consuming thread
        let t = s.spawn(|| loop {
            let item = queue.lock().unwrap().pop_front();
            if let Some(item) = item {
                dbg!(item);
            } else {
                thread::park();
            }
        });

        // Producing thread
        for i in 0.. {
            queue.lock().unwrap().push_back(i);
            t.thread().unpark();
            thread::sleep(Duration::from_secs(1));
        }
    });
}

消费线程运行一个无限循环,在该循环中它从队列中弹出项目以使用 dbg 宏显示它们。当队列为空时,它会停止并使用 park() 函数进入休眠状态。如果未停放, park() 调用返回, loop 继续,再次从队列中弹出项目,直到它为空。等等。

生产线程通过将其推入队列来每秒产生一个新数字。每次它添加一个项目时,它都会在引用消费线程的 Thread 对象上使用 unpark() 方法来取消它。这样,消费线程就会被唤醒以处理新元素。

这里要注意的是,如果我们取消停放,这个程序在理论上仍然是正确的,尽管效率低下。这很重要,因为 park() 不保证它只会因为匹配的 unpark() 而返回。虽然有点罕见,但它可能会有虚假的唤醒。我们的示例处理得很好,因为消费线程将锁定队列,看到它是空的,然后直接解锁并再次停放自己。

线程停放的一个重要属性是在线程停放自身之前对 unpark() 的调用不会丢失。取消停放的请求仍然被记录下来,下次线程尝试停放自己时,它会清除该请求并直接继续,而不会真正进入睡眠状态。为了了解为什么这对正确操作至关重要,让我们来看看两个线程执行的步骤的可能顺序:

  1. 消费线程——我们称它为 C——锁定队列。

  2. C 试图从队列中弹出一个项目,但它是空的,导致 None 。

  3. C 解锁队列。

  4. 生产线程,我们称之为 P,锁定队列。

  5. P 将一个新项目推送到队列中。

  6. P 再次解锁队列。

  7. P 调用 unpark() 通知 C 有新项目。

  8. C 调用 park() 进入睡眠状态,等待更多项目。

虽然在第 3 步释放队列和第 8 步停放之间很可能只有很短的时间,但第 4 步到第 7 步可能发生在线程停放自身之前的那一刻。如果线程未停放, unpark() 将不执行任何操作,则通知将丢失。消费线程仍将等待,即使队列中有一个项目。由于保存了 unpark 请求以供将来调用 park() ,我们不必担心这一点。

但是,unpark 请求不会叠加。调用 unpark() 两次然后再调用 park() 两次仍然会导致线程进入休眠状态。第一个 park() 清除请求直接返回,但是第二个照常休眠。

这意味着在我们上面的例子中,重要的是我们只在我们看到队列为空时才停放线程,而不是在每个处理的项目之后停放它。虽然由于长时间(一秒)的休眠,此示例极不可能发生这种情况,但多个 unpark() 调用可能只唤醒一个 park() 调用。

不幸的是,这确实意味着如果在 park() 返回之后立即调用 unpark() ,但在队列被锁定和清空之前, unpark() 调用是不必要的,但仍会导致下一个 park() 调用立即返回。这导致(空)队列被额外锁定和解锁。虽然这不会影响程序的正确性,但会影响其效率和性能。

这种机制适用于我们示例中的简单情况,但当事情变得更复杂时很快就会崩溃。例如,如果我们有多个消费者线程从同一个队列中获取项目,生产者线程将无法知道哪个消费者实际上正在等待并且应该被唤醒。生产者必须确切地知道消费者何时在等待,以及它在等待什么条件。

条件变量

条件变量是一个更常用的选项,用于等待受互斥锁保护的数据发生某些事情。它们有两个基本操作:等待和通知。线程可以等待一个条件变量,之后当另一个线程通知同一个条件变量时它们可以被唤醒。多个线程可以等待同一个条件变量,通知可以发送给一个等待线程,也可以发送给所有线程。

这意味着我们可以为我们感兴趣的特定事件或条件创建条件变量,例如队列非空,并等待该条件。任何导致该事件或条件发生的线程都会通知条件变量,而不必知道哪个或有多少线程对该通知感兴趣。

为了避免在解锁互斥锁和等待条件变量之间的短暂时刻丢失通知的问题,条件变量提供了一种以原子方式解锁互斥锁并开始等待的方法。这意味着通知根本不可能丢失。

Rust 标准库提供了一个条件变量 std::sync::Condvar 。它的 wait 方法接受一个 MutexGuard 来证明我们已经锁定了互斥锁。它首先解锁互斥锁并进入休眠状态。稍后,当被唤醒时,它会重新锁定互斥锁并返回一个新的 MutexGuard (这证明互斥锁再次被锁定)。

它有两个通知函数: notify_one 只唤醒一个等待线程(如果有的话), notify_all 唤醒所有线程。

让我们修改用于线程停放的示例,改为使用 Condvar

use std::sync::Condvar;

let queue = Mutex::new(VecDeque::new());
let not_empty = Condvar::new();

thread::scope(|s| {
    s.spawn(|| {
        loop {
            let mut q = queue.lock().unwrap();
            let item = loop {
                if let Some(item) = q.pop_front() {
                    break item;
                } else {
                    q = not_empty.wait(q).unwrap();
                }
            };
            drop(q);
            dbg!(item);
        }
    });

    for i in 0.. {
        queue.lock().unwrap().push_back(i);
        not_empty.notify_one();
        thread::sleep(Duration::from_secs(1));
    }
});

我们必须改变一些事情:

  • 我们现在不仅有一个包含队列的 Mutex ,还有一个用于传达“非空”条件的 Condvar

  • 我们不再需要知道唤醒哪个线程,所以我们不再存储 spawn 的返回值。相反,我们使用 notify_one 方法通过条件变量通知消费者。

  • 解锁、等待、重锁都是 wait 方法完成的。我们不得不稍微重组控制流,以便能够将守卫传递给 wait 方法,同时在处理项目之前仍然将其丢弃。

现在我们可以生成任意数量的消费线程,甚至可以在以后生成更多线程,而无需进行任何更改。条件变量负责将通知传递给任何感兴趣的线程。

如果我们有一个更复杂的系统,其中包含对不同条件感兴趣的线程,我们可以为每个条件定义一个 Condvar 。例如,我们可以定义一个表示队列非空,另一个表示队列为空。然后每个线程将等待与他们正在做的事情相关的任何条件。

通常, Condvar 仅与单个 Mutex 一起使用。如果两个线程尝试使用两个不同的互斥量在条件变量上并发 wait ,则可能会导致恐慌。

Condvar 的一个缺点是它仅在与 Mutex 一起使用时才有效,但对于大多数用例来说这完全没问题,因为这正是已经用来保护数据的东西。

thread::park()Condvar::wait() 还有一个有时间限制的变体: thread::park_timeout()Condvar::wait_timeout() 。这些将 Duration 作为一个额外的参数,这是它应该放弃等待通知并无条件唤醒的时间。

总结

  • 多个线程可以在同一个程序中并发运行,并且可以在任何时候生成。

  • 当主线程结束时,整个程序就结束了。

  • 数据竞争是未定义的行为,Rust 的类型系统完全阻止了(在安全代码中)。

  • Send 的数据可以发送给其他线程, Sync 的数据可以在线程之间共享。

  • 常规线程可能会在程序运行时运行,因此只能借用 'static 数据,例如静态和泄漏分配。

  • 引用计数 ( Arc ) 可用于共享所有权,以确保只要至少有一个线程在使用数据,数据就会一直存在。

  • 作用域线程对于限制线程的生命周期以允许它借用非 'static 数据(例如局部变量)很有用。

  • &T 是共享引用。 &mut T 是独占引用。常规类型不允许通过共享引用进行修改。

  • 由于 UnsafeCell ,某些类型具有内部可变性,它允许通过共享引用进行改变。

  • CellRefCell 是单线程内部可变性的标准类型。 Atomics、 MutexRwLock 是它们的多线程等价物。

  • Cell 和原子只允许替换整个值,而 RefCellMutexRwLock 允许您通过动态执行访问规则直接改变值。

  • 线程停放是等待某些条件的便捷方式。

  • 当条件是关于受 Mutex 保护的数据时,使用 Condvar 比线程停放更方便,也更有效。

上次更新: