第 5 章 建立我们自己的通道

来自 《Rust Atomics and Locks》 的翻译文章,英文原文https://marabos.nl/atomics/building-channels.html在新窗口打开

通道可以用来在线程之间发送数据,它们有很多变体。有些通道只能在一个发送者和一个接收者之间使用,而其他通道可以从任何数量的线程发送,甚至允许多个接收者。 有些通道是阻塞的,这意味着接收(有时是发送)是一个阻塞的操作,使你的线程睡眠,直到该操作完成。一些通道针对吞吐量进行了优化,而其他通道针对低延迟进行了优化。

变化无穷无尽,没有适合所有情况的通用版本。

在本章中,我们将实现几个相对简单的通道,不仅可以探索原子的更多应用,还可以更多了解我们的需求和假设,如何在Rust的类型系统中得到体现。

一个简单的基于互斥锁的通道

基本的通道实现不需要任何原子知识。我们可以采用 VecDeque ,它基本上是一个 Vec ,允许在两端高效地添加和删除元素,并使用 Mutex 保护它以允许多个线程访问它。然后,我们将 VecDeque 用作已发送但尚未收到的数据队列,通常称为消息。任何想要发送消息的线程都可以简单地将它添加到队列的后面,而任何想要接收消息的线程只需从队列的前面移除一个。

还有一件事要补充,它用于使接收操作阻塞:Condvar (请参阅第 1 章中的“条件变量”),用于通知等待的接收者有新消息。

其实现可以非常简短且相对简单,如下所示:

pub struct Channel<T> {
    queue: Mutex<VecDeque<T>>,
    item_ready: Condvar,
}

impl<T> Channel<T> {
    pub fn new() -> Self {
        Self {
            queue: Mutex::new(VecDeque::new()),
            item_ready: Condvar::new(),
        }
    }

    pub fn send(&self, message: T) {
        self.queue.lock().unwrap().push_back(message);
        self.item_ready.notify_one();
    }

    pub fn receive(&self) -> T {
        let mut b = self.queue.lock().unwrap();
        loop {
            if let Some(message) = b.pop_front() {
                return message;
            }
            b = self.item_ready.wait(b).unwrap();
        }
    }
}

请注意,我们不需要使用任何原子类型或不安全代码,也不需要考虑 SendSync 特性。编译器了解Mutex的接口以及该类型提供的保证,并且会隐含地理解,如果Mutex<T>Condvar都能在线程之间安全地共享,那么我们的Channel<T>也可以。

我们的 send 函数锁定互斥量以将新消息推到队列的后面,并在解锁队列后直接通知一个潜在的等待接收者,方法是使用条件变量。

receive 函数还锁定互斥量,以便从队列的前端弹出下一条消息,但如果没有可用消息,则将使用条件变量等待。

提示

请记住, Condvar::wait 方法将在等待时解锁 Mutex ,并在返回前重新锁定它。因此,我们的 receive 函数不会在等待时保持互斥锁锁定。

虽然这个通道在使用上非常灵活,因为它允许任意数量的发送和接收线程,但在许多情况下它的实现可能不是最佳。即使有大量消息准备接收,任何发送或接收操作都会暂时阻塞任何其他发送或接收操作,因为它们都必须锁定同一个互斥锁。如果 VecDeque::push 必须增加 VecDeque 的容量,所有发送和接收线程将不得不等待那个线程完成重新分配,这在某些情况下可能是不可接受的。

另一个可能不受欢迎的属性是,该通道的队列可能无限制地增长。没有什么能阻止发送者以高于接收者处理新消息的速度不断发送新消息。

不安全的一次性通道

通道的各种用例几乎是无穷无尽的。然而,在本章的其余部分,我们将专注于一种特定类型的用例:从一个线程向另一个线程准确地发送一条消息。为这种用例设计的通道通常被称为 一次性通道(one-shot channel)

我们可以采用上面的基于Mutex<VecDeque>的实现,并将VecDeque代替Option,从而有效地将队列的容量减少到刚好一个消息。这可以避免分配,但仍然有一些与使用Mutex相同的弊端。我们可以通过使用原子从头开始构建我们自己的一次性通道来避免这种情况。

首先,让我们构建一个一次性通道的最小实现,而不用过多考虑它的接口。在本章的后面,我们将探索如何改进它,以及如何与 Rust 的类型系统配合,为我们通道的用户提供愉快的体验。

我们需要使用的工具与我们用于 SpinLock<T> (来自第 4 章)的工具基本相同: UnsafeCell 用于存储, AtomicBool 用于指示其状态。在这种情况下,我们使用原子布尔值来指示消息是否可以被消费。

在发送消息之前,通道是“空的”并且不包含任何 T 类型的消息。我们可以在单元格内使用 Option<T> 以允许缺少 T 。然而,这可能会浪费宝贵的内存空间,因为我们的原子布尔值已经告诉我们是否有消息。相反,我们可以使用 std::mem::MaybeUninit<T> ,它本质上是 Option<T> 的最简单的不安全版本:它需要用户手动跟踪它是否已被初始化,并且几乎它的整个接口都是不安全的,因为它无法执行自己的检查。

综上所述,我们开始使用这个结构定义进行第一次尝试:

use std::mem::MaybeUninit;

pub struct Channel<T> {
    message: UnsafeCell<MaybeUninit<T>>,
    ready: AtomicBool,
}

就像我们的 SpinLock<T> ,我们需要告诉编译器,我们的通道可以安全地在线程之间共享,或者至少只要 TSend

unsafe impl<T> Sync for Channel<T> where T: Send {}

新通道为空, ready 设置为 falsemessage 未初始化:

impl<T> Channel<T> {
    pub const fn new() -> Self {
        Self {
            message: UnsafeCell::new(MaybeUninit::uninit()),
            ready: AtomicBool::new(false),
        }
    }}

要发送消息,首先需要将其存储在单元格中,之后我们可以通过将 ready 标志设置为 true 来将其释放给接收者。尝试多次执行此操作将很危险,因为在设置 ready 标志后,接收方可能会在任何时候读取消息,这可能会与第二次发送消息的尝试竞争。现在,我们通过创建 unsafe 方法并为他们留下注释来让用户负责:

/// Safety: Only call this once!
pub unsafe fn send(&self, message: T) {
    (*self.message.get()).write(message);
    self.ready.store(true, Release);
}

在上面的代码片段中,我们使用 UnsafeCell::get 方法获取指向 MaybeUninit<T> 的指针,并以不安全的方式解除对它的引用以调用 MaybeUninit::write 来初始化它。这可能会在误用时导致未定义的行为,但我们已将责任推给了调用者。

对于内存顺序,我们需要使用释放顺序,因为原子存储有效地将消息释放给接收者。这确保了从接收线程的角度来看,如果它从self.ready中用获取顺序加载true,那么消息的初始化将被完成。

对于接收,我们暂时不会费心提供阻塞接口。相反,我们将提供两种方法:一种用于检查消息是否可用,另一种用于接收消息。如果他们想要阻塞,我们将留给我们通道的用户使用线程停放(第 1 章中的“线程停放”)之类的东西。

这些是完成此版本通道的最后两种方法:

 pub fn is_ready(&self) -> bool {
        self.ready.load(Acquire)
    }

    /// Safety: Only call this once,
    /// and only after is_ready() returns true!
    pub unsafe fn receive(&self) -> T {
        (*self.message.get()).assume_init_read()
    }

虽然 is_ready 方法始终可以安全地调用,但 receive 方法使用了 MaybeUninit::assume_init_read() ,它不安全地假定它已经被初始化了,并且它没有被用于生成非 Copy 对象的多个副本。就像 send 一样,我们只是通过将函数本身设为 unsafe 来解决用户的问题。

结果是一个技术上可用的通道,但它很笨拙且通常令人失望。如果掌握得当,它会做它应该做的事情,但有许多微妙的方法可以滥用它。

多次调用 send 可能会导致数据竞争,因为第二个发送方将覆盖数据,而接收方可能正在尝试读取第一条消息。即使接收已正确同步,从多个线程调用 send 也可能导致两个线程尝试同时写入单元格,再次导致数据竞争。此外,多次调用 receive 会导致消息的两份副本,即使 T 未实现 Copy ,因此无法安全地进行复制。

一个更微妙的问题是我们的通道缺少 Drop 实现。 MaybeUninit 类型不跟踪它是否已被初始化,因此在删除时不会自动删除其内容。这意味着如果消息已发送但从未收到,则该消息永远不会被丢弃。这并非不妥,但仍需避免。虽然泄漏在 Rust 中被普遍认为是安全的,但通常只有作为另一个泄漏的结果才可以接受。例如,泄漏 Vec 也会泄漏其内容,但经常使用 Vec 不会导致任何泄漏。

由于我们让用户对一切负责,因此发生不幸事故只是时间问题。

通过运行时检查确保安全

为了提供更安全的接口,我们可以添加一些检查以防止误用导致恐慌并显示明确的消息,这比未定义的行为更可取。

让我们从消息准备好之前调用 receive 的问题开始。这个处理起来很简单,因为我们所要做的就是让 receive 方法在尝试读取消息之前验证 ready 标志:

/// Panics if no message is available yet.
///
/// Tip: Use `is_ready` to check first.
///
/// Safety: Only call this once!
pub unsafe fn receive(&self) -> T {
    if !self.ready.load(Acquire) {
        panic!("no message available!");
    }
    (*self.message.get()).assume_init_read()
}

该函数仍然不安全,因为用户仍然有责任不多次调用此函数,但未能首先检查 is_ready() 将不再导致未定义的行为。

由于我们现在在 receive 方法中,获得了 ready 标志的获取加载,提供了必要的同步,因此我们可以将 is_ready 中加载的内存顺序降低到 Relaxed ,因为该顺序个现在仅用于指示目的:

pub fn is_ready(&self) -> bool {
    self.ready.load(Relaxed)
}

提示

请记住, ready 上的总修改顺序(请参阅第 3 章中的“宽松顺序”)保证在 is_ready 从它加载 true 之后, receive 也会看到 true 。无论 is_ready 中使用的内存顺序如何, is_ready 都不可能返回 true 并且 receive() 仍然出现恐慌。

下一个要解决的问题是多次调用 receive 时会发生什么。通过在我们的 receive 方法中将 ready 标志设置回 false ,我们也可以很容易地导致恐慌,如下所示:

/// Panics if no message is available yet,
/// or if the message was already consumed.
///
/// Tip: Use `is_ready` to check first.
pub fn receive(&self) -> T {
    if !self.ready.swap(false, Acquire) {
        panic!("no message available!");
    }
    // Safety: We've just checked (and reset) the ready flag.
    unsafe { (*self.message.get()).assume_init_read() }
}

我们只是将 swapload 更改为 false ,突然间, receive 方法在任何情况下都可以完全安全地调用。该函数不再标记为 unsafe 。我们现在不再让用户对所有事情负责,而是对不安全的代码负责,从而减轻用户的压力。

对于 send ,事情稍微复杂一些。为了防止多个 send 调用同时访问单元格(cell),我们需要知道另一个 send 调用是否已经开始。 ready 标志只告诉我们另一个 send 调用是否已经完成,所以这还不够。

让我们添加第二个标志,命名为 in_use ,以指示通道是否已被占用:

pub struct Channel<T> {
    message: UnsafeCell<MaybeUninit<T>>,
    in_use: AtomicBool, // New!
    ready: AtomicBool,
}

impl<T> Channel<T> {
    pub const fn new() -> Self {
        Self {
            message: UnsafeCell::new(MaybeUninit::uninit()),
            in_use: AtomicBool::new(false), // New!
            ready: AtomicBool::new(false),
        }
    }}

现在我们需要做的就是在访问单元格之前,在 send 方法中将 in_use 设置为 true ,如果它已被另一个调用设置则恐慌:

/// Panics when trying to send more than one message.
pub fn send(&self, message: T) {
    if self.in_use.swap(true, Relaxed) {
        panic!("can't send more than one message!");
    }
    unsafe { (*self.message.get()).write(message) };
    self.ready.store(true, Release);
}

我们可以对原子交换操作使用宽松的内存顺序,因为 in_use 的总修改顺序(参见第 3 章中的“宽松顺序”)保证在 in_use 上只有一个交换操作将返回 false,这是 send 尝试访问单元格的唯一情况。

我们现在有了一个完全安全的接口,但仍然存在一个问题。最后一个问题发生在发送从未收到的消息时:它永远不会被丢弃。虽然这不会导致未定义的行为并且在安全代码中是允许的,但绝对要避免。

由于我们在 receive 方法中重置了 ready 标志,修复此问题很容易: ready 标志指示单元格中是否有需要删除的尚未收到的消息。

在我们的 ChannelDrop 实现中,我们不需要使用原子操作来检查原子 ready 标志,因为一个对象只有在被删除它的线程完全拥有时才能被删除,没有未偿还的借用。这意味着我们可以使用 AtomicBool::get_mut 方法,它采用独占引用 ( &mut self ),证明原子访问是不必要的。 UnsafeCellUnsafeCell::get_mut 也是如此。

使用它,这是我们完全安全且无泄漏通道的最后一部分:

impl<T> Drop for Channel<T> {
    fn drop(&mut self) {
        if *self.ready.get_mut() {
            unsafe { self.message.get_mut().assume_init_drop() }
        }
    }
}

让我们试试吧!

由于我们的 Channel 还没有提供阻塞接口,我们将手动使用线程停放来等待消息。只要没有消息准备好,接收线程就会 park() 它自己,一旦它发送了一些东西,发送线程就会 unpark() 接收者。

这是一个完整的测试程序,通过我们的 Channel 将字符串文字 "hello world!" 从第二个线程发送回主线程:

fn main() {
    let channel = Channel::new();
    let t = thread::current();
    thread::scope(|s| {
        s.spawn(|| {
            channel.send("hello world!");
            t.unpark();
        });
        while !channel.is_ready() {
            thread::park();
        }
        assert_eq!(channel.receive(), "hello world!");
    });
}

该程序编译、运行并干净地退出,表明我们的 Channel 正常工作。

如果我们复制 send 行,我们还可以看到我们的一项安全检查正在运行,在程序运行时产生以下恐慌消息:

thread '<unnamed>' panicked at 'can't send more than one message!', src/main.rs

虽然 panic 程序不是很好,但对于一个程序来说,可靠地 panic 比接近未定义行为的潜在恐怖要好得多。

使用单个原子作为通道状态

如果你对实现通道还不满足,这里有一个微妙的变化,可以节省一个字节的内存。

我们不使用两个单独的原子布尔值来表示通道的状态,而是使用一个 AtomicU8 来表示所有的四种状态。 我们必须使用 compare_exchange 来原子地检查通道是否处于预期的状态,并将其更改为另一个状态,而不是原子地交换布尔值。

const EMPTY: u8 = 0;
const WRITING: u8 = 1;
const READY: u8 = 2;
const READING: u8 = 3;

pub struct Channel<T> {
    message: UnsafeCell<MaybeUninit<T>>,
    state: AtomicU8,
}

unsafe impl<T: Send> Sync for Channel<T> {}

impl<T> Channel<T> {
    pub const fn new() -> Self {
        Self {
            message: UnsafeCell::new(MaybeUninit::uninit()),
            state: AtomicU8::new(EMPTY),
        }
    }

    pub fn send(&self, message: T) {
        if self.state.compare_exchange(
            EMPTY, WRITING, Relaxed, Relaxed
        ).is_err() {
            panic!("can't send more than one message!");
        }
        unsafe { (*self.message.get()).write(message) };
        self.state.store(READY, Release);
    }

    pub fn is_ready(&self) -> bool {
        self.state.load(Relaxed) == READY
    }

    pub fn receive(&self) -> T {
        if self.state.compare_exchange(
            READY, READING, Acquire, Relaxed
        ).is_err() {
            panic!("no message available!");
        }
        unsafe { (*self.message.get()).assume_init_read() }
    }
}

impl<T> Drop for Channel<T> {
    fn drop(&mut self) {
        if *self.state.get_mut() == READY {
            unsafe { self.message.get_mut().assume_init_drop() }
        }
    }
}

通过类型确保安全

虽然我们已经成功地保护了我们的 Channel 的用户免受未定义行为的影响,但如果他们不小心使用不当,他们仍然会有恐慌的风险。理想情况下,编译器会在程序运行之前检查正确的用法并指出误用。

我们来看看多次调用 sendreceive 的问题。

为了防止一个函数被多次调用,我们可以让它按值接受一个参数,对于非 Copy 类型,它会消耗该对象。一个对象被消耗或移动后,它就从调用者那里消失了,防止它被再次使用。

通过将调用 sendreceive 的能力分别表示为单独的(非 Copy )类型,并在执行操作时使用该对象,我们可以确保每个调用只能发生一次。

这将我们带到以下接口设计中,其中通道由一对 SenderReceiver 表示,而不是单个 Channel 类型,每个通道都有一个按值获取 self 的方法:

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {}

pub struct Sender<T> {}
pub struct Receiver<T> {}

impl<T> Sender<T> {
    pub fn send(self, message: T) {}
}

impl<T> Receiver<T> {
    pub fn is_ready(&self) -> bool {}
    pub fn receive(self) -> T {}
}

用户可以通过调用 channel() 创建一个通道,这将给他们一个 Sender 和一个 Receiver 。他们可以自由地传递这些对象,将它们移动到另一个线程,等等。但是,他们最终不能拥有其中任何一个对象的多个副本,从而保证 sendreceive 只能被调用一次。

要实现这一点,我们需要为我们的 UnsafeCellAtomicBool 找到一个位置。以前,我们只有一个包含这些字段的结构,但现在我们有两个独立的结构,每个结构都可以比另一个生命周期长。

由于发送方和接收方需要共享这些变量的所有权,我们将使用 Arc第 1 章中的“引用计数”)为我们提供引用计数共享分配,我们在其中存储共享的 Channel 对象。如下所示, Channel 类型不必是公共的,因为它的存在只是一个与用户无关的实现细节。

pub struct Sender<T> {
    channel: Arc<Channel<T>>,
}

pub struct Receiver<T> {
    channel: Arc<Channel<T>>,
}

struct Channel<T> { // no longer `pub`
    message: UnsafeCell<MaybeUninit<T>>,
    ready: AtomicBool,
}

unsafe impl<T> Sync for Channel<T> where T: Send {}

和之前一样,我们在 TSend 的条件下,为 Channel<T> 实现 Sync ,使其可以跨线程使用。

请注意,我们不再需要 in_use 原子布尔值,就像我们在之前的通道实现中所做的那样。 send 仅使用它来检查它是否被多次调用,现在通过类型系统静态保证。

用于创建通道和发送方-接收方对的 channel 函数看起来类似于我们之前的 Channel::new 函数,除了它将 Channel 包装在 Arc 中,并将 Arc 和它的克隆包装起来在 SenderReceiver 类型中:

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let a = Arc::new(Channel {
        message: UnsafeCell::new(MaybeUninit::uninit()),
        ready: AtomicBool::new(false),
    });
    (Sender { channel: a.clone() }, Receiver { channel: a })
}

sendis_readyreceive 方法与我们之前实现的方法基本相同,只有一些不同之处:

  • 它们现在被移动到各自的类型,这样只有(一个)发送者可以发送,并且只有(一个)接收者可以接收。

  • sendreceive 现在通过值而不是引用来获取 self ,以确保它们每个只能被调用一次。

  • send 不再恐慌,因为它的前提条件(只被调用一次)现在是静态保证的。

所以,他们现在看起来像这样:

impl<T> Sender<T> {
    /// This never panics. :)
    pub fn send(self, message: T) {
        unsafe { (*self.channel.message.get()).write(message) };
        self.channel.ready.store(true, Release);
    }
}

impl<T> Receiver<T> {
    pub fn is_ready(&self) -> bool {
        self.channel.ready.load(Relaxed)
    }

    pub fn receive(self) -> T {
        if !self.channel.ready.swap(false, Acquire) {
            panic!("no message available!");
        }
        unsafe { (*self.channel.message.get()).assume_init_read() }
    }
}

receive 函数仍然会出现 panic,因为用户可能仍会在 is_ready() 返回 true 之前调用它。它还使用 swapready 标志设置回 false (而不仅仅是 load ),以便 ChannelDrop 实现知道是否有需要删除的未读消息.

Drop 实现与我们之前实现的完全相同:

impl<T> Drop for Channel<T> {
    fn drop(&mut self) {
        if *self.ready.get_mut() {
            unsafe { self.message.get_mut().assume_init_drop() }
        }
    }
}

当删除 Sender<T>Receiver<T> 时, Arc<Channel<T>>Drop 实现将递减分配的引用计数器。当丢弃第二个时,该计数器变为零, Channel<T> 本身也被丢弃。这将调用我们上面的 Drop 实现,如果消息已发送但未收到,我们将在其中删除消息。

让我们试试看:

fn main() {
    thread::scope(|s| {
        let (sender, receiver) = channel();
        let t = thread::current();
        s.spawn(move || {
            sender.send("hello world!");
            t.unpark();
        });
        while !receiver.is_ready() {
            thread::park();
        }
        assert_eq!(receiver.receive(), "hello world!");
    });
}

我们仍然必须手动使用线程停放来等待消息,这有点不方便,但我们稍后会处理这个问题。

目前,我们的目标是使至少一种形式的误用在编译时不可能发生。与上次不同,尝试发送两次不会导致程序崩溃,而是根本不会产生一个有效的程序。如果我们在上面的程序中添加另一个 send 调用,编译器现在会捕获问题,并耐心地通知我们的错误:

error[E0382]: use of moved value: `sender`
  --> src/main.rs
   |
   |             sender.send("hello world!");
   |                    --------------------
   |                     `sender` moved due to this method call
   |
   |             sender.send("second message");
   |             ^^^^^^ value used here after move
   |
note: this function takes ownership of the receiver `self`, which moves `sender`
  --> src/lib.rs
   |
   |     pub fn send(self, message: T) {
   |                 ^^^^
   = note: move occurs because `sender` has type `Sender<&str>`,
           which does not implement the `Copy` trait

根据具体情况,设计一个在编译时捕获错误的接口可能会非常棘手。如果这种情况确实适合这样的接口,它不仅可以为用户带来更多便利,还可以减少对现在静态保证的事物的运行时检查次数。例如,我们不再需要 in_use 标志,并从 send 方法中删除了 swap 和检查。

不幸的是,可能会出现新的问题,从而导致更多的运行时开销。在这种情况下,问题在于所有权分割,为此我们不得不去找一个 Arc ,并支付分配成本。

在安全性、便利性、灵活性、简单性和性能之间做出权衡是不幸的,但有时是不可避免的。一般来说,Rust致力于让你在所有这些方面都能轻松胜任,但有时也会让你用其中一点来换取另一点的最大化。

借用以避免分配

我们刚刚设计的基于 Arc 的通道,实现使用起来非常方便——但会牺牲一些性能,因为它必须分配内存。如果我们想优化效率,我们可以通过让用户负责共享的 Channel 对象来牺牲一些便利来换取性能。我们可以强制用户创建一个可以由 SenderReceiver 借用的 Channel ,而不是在幕后处理 Channel 的分配和所有权。这样,他们可以选择简单地将 Channel 放入局部变量中,避免分配内存的开销。

我们还必须以某种简单的方式进行交换,因为我们现在必须处理借用和生命周期。

因此,这三种类型现在看起来如下, Channel 再次公开, SenderReceiver 在一定生命周期内借用它。

pub struct Channel<T> {
    message: UnsafeCell<MaybeUninit<T>>,
    ready: AtomicBool,
}

unsafe impl<T> Sync for Channel<T> where T: Send {}

pub struct Sender<'a, T> {
    channel: &'a Channel<T>,
}

pub struct Receiver<'a, T> {
    channel: &'a Channel<T>,
}

我们没有使用 channel() 函数来创建 (Sender, Receiver) 对,而是回到本章前面的 Channel::new ,允许用户创建这样的对象作为局部变量。

此外,我们需要一种方法,让用户创建一个 SenderReceiver 对象来借用这个通道。这将需要独占借用 ( &mut Channel ),以确保同一通道不能有多个发送者或接收者。通过同时提供 SenderReceiver ,我们可以将独占借用分成两个共享借用,这样发送方和接收方都可以引用通道,同时防止其他任何东西接触通道。

这使我们实现以下操作:

impl<T> Channel<T> {
    pub const fn new() -> Self {
        Self {
            message: UnsafeCell::new(MaybeUninit::uninit()),
            ready: AtomicBool::new(false),
        }
    }

    pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) {
        *self = Self::new();
        (Sender { channel: self }, Receiver { channel: self })
    }
}

split 方法的签名有些复杂,值得仔细研究一下。它通过独占引用专门借用 self ,但它将其拆分为两个共享引用,包装在 SenderReceiver 类型中。 'a 生命周期清楚地表明这两个对象都借用了有限生命周期的东西;在这种情况下, 就是 Channel 本身。由于 Channel 是独占借用的,只要 SenderReceiver 对象存在,调用者就不能借用或移动它。

然而,一旦这两个对象都不复存在,可变借用就会过期,编译器很乐意让 Channel 对象通过第二次调用 split() 再次借用。虽然我们可以假设当 SenderReceiver 仍然存在时,不能再次调用 split() ,但我们无法阻止在这些对象被删除或遗忘后对 split() 的第二次调用。我们需要确保不会意外地为已经设置了 ready 标志的通道创建新的 SenderReceiver 对象,因为这会破坏防止未定义行为的假设。

通过在 split() 中用新的空通道覆盖 *self ,我们确保它在创建 SenderReceiver 状态时处于预期状态。这也会调用旧的 *self 上的 Drop 实现,它将负责删除之前发送但未收到的消息。

提示

由于 split 签名中的生命周期来自 self ,所以可以省略。上面代码片段中 split 的签名与这个不太冗长的版本相同:

pub fn split(&mut self) -> (Sender<T>, Receiver<T>) {}

虽然这个版本没有明确表明返回的对象借用了 self ,但编译器仍然检查生命周期的正确使用,这与它在更详细的版本中所做的完全一样。

除了 SenderReceiver 类型的附加 '_ 生命周期参数外,其余方法和 Drop 实现与我们基于 Arc 的实现相同。 (如果您忘记了这些,编译器会建议添加它们。)

为了完整起见,这里是剩余的代码:

impl<T> Sender<'_, T> {
    pub fn send(self, message: T) {
        unsafe { (*self.channel.message.get()).write(message) };
        self.channel.ready.store(true, Release);
    }
}

impl<T> Receiver<'_, T> {
    pub fn is_ready(&self) -> bool {
        self.channel.ready.load(Relaxed)
    }

    pub fn receive(self) -> T {
        if !self.channel.ready.swap(false, Acquire) {
            panic!("no message available!");
        }
        unsafe { (*self.channel.message.get()).assume_init_read() }
    }
}

impl<T> Drop for Channel<T> {
    fn drop(&mut self) {
        if *self.ready.get_mut() {
            unsafe { self.message.get_mut().assume_init_drop() }
        }
    }
}

让我们测试一下!

fn main() {
    let mut channel = Channel::new();
    thread::scope(|s| {
        let (sender, receiver) = channel.split();
        let t = thread::current();
        s.spawn(move || {
            sender.send("hello world!");
            t.unpark();
        });
        while !receiver.is_ready() {
            thread::park();
        }
        assert_eq!(receiver.receive(), "hello world!");
    });
}

与基于 Arc 的版本相比,便利性的降低是非常小的:我们只需要多写一行来手动创建一个 Channel 对象。但是请注意,通道必须在作用域之前被创建,以向编译器证明它的存在将超过发送者和接收者。

要查看编​​译器的借用检查器的运行情况,请尝试在不同位置添加对 channel.split() 的第二次调用。 你会发现,在线程作用域内第二次调用它将导致错误,而在作用域之后调用它是可以接受的。甚至在作用域之前调用 split() 也是可以的,只要你在作用域开始之前停止使用返回的 SenderReceiver 即可。

阻塞

最后让我们来处理我们的 Channel 剩下的最后一个主要的不便之处,即缺少一个阻塞接口。我们已经在每次测试我们的通道的新变体时使用了线程停放。把这种模式整合到通道本身应该不会太难。

为了能够取消接收方的停放,发送方需要知道哪个线程要取消停放。 std::thread::Thread 类型表示线程句柄,这正是我们调用 unpark() 所需要的。我们将把接收线程的句柄存储在 Sender 对象中,如下所示:

use std::thread::Thread;

pub struct Sender<'a, T> {
    channel: &'a Channel<T>,
    receiving_thread: Thread, // New!
}

但是,如果在线程之间发送 Receiver 对象,则此句柄将指向错误的线程。 Sender 不会意识到这一点,并且仍会引用最初持有 Receiver 的线程。

我们可以通过使 Receiver 更具限制性,不再允许它在线程之间发送来解决这个问题。正如第 1 章“线程安全:发送和同步”中所讨论的,我们可以使用特殊的 PhantomData 标记类型将此限制添加到我们的结构中。 PhantomData<*const ()> 可以完成这项工作,因为原始指针(例如 *const () )不会实现 Send

pub struct Receiver<'a, T> {
    channel: &'a Channel<T>,
    _no_send: PhantomData<*const ()>, // New!
}

接下来,我们必须修改 Channel::split 方法来填写新字段,如下所示:

  pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) {
        *self = Self::new();
        (
            Sender {
                channel: self,
                receiving_thread: thread::current(), // New!
            },
            Receiver {
                channel: self,
                _no_send: PhantomData, // New!
            }
        )
    }

我们将当前线程的句柄用于 receiving_thread 字段,因为我们返回的 Receiver 对象将保留在当前线程中。

send 方法没有太大变化,如下所示。我们只需要在 receiving_thread 上调用 unpark() 来唤醒正在等待的接收者:

impl<T> Sender<'_, T> {
    pub fn send(self, message: T) {
        unsafe { (*self.channel.message.get()).write(message) };
        self.channel.ready.store(true, Release);
        self.receiving_thread.unpark(); // New!
    }
}

receive 函数发生了稍微大一点的变化。如果还没有消息,新版本不会恐慌,而是会耐心等待使用 thread::park() 的消息,然后重试,必要时多次。

impl<T> Receiver<'_, T> {
    pub fn receive(self) -> T {
        while !self.channel.ready.swap(false, Acquire) {
            thread::park();
        }
        unsafe { (*self.channel.message.get()).assume_init_read() }
    }
}

提示

请记住, thread::park() 可能会虚假返回。 (或者因为我们的 send 方法之外的其他东西调用了 unpark() 。)这意味着我们不能假设 park() 返回时已经设置了 ready 标志。因此,我们需要使用一个循环在 unparked 之后再次检查标志。

Channel<T> 结构、它的 Sync 实现、它的 new 函数和它的 Drop 实现保持不变。

让我们试试吧!

fn main() {
    let mut channel = Channel::new();
    thread::scope(|s| {
        let (sender, receiver) = channel.split();
        s.spawn(move || {
            sender.send("hello world!");
        });
        assert_eq!(receiver.receive(), "hello world!");
    });
}

显然,这个 Channel 比上一个使用起来更方便,至少在这个简单的测试程序中是这样。我们不得不通过牺牲一些灵活性来换取这种便利:只有调用 split() 的线程才能调用 receive() 。如果交换了 sendreceive ,该程序将不再编译。根据不同的使用情况,这可能是完全好的,有用的,或者非常不方便的。

有多种方法可以解决该问题,其中许多方法会增加我们的复杂性并影响性能。总的来说,我们可以继续探索的变化和权衡的数量几乎是无穷无尽的。

我们可以很容易地花费不健康的时间来实现另外 20 个一次性通道的变体,每个变体的属性都略有不同,适用于每个可以想象的用例等等。虽然这听起来很有趣,但我们或许应该避开那个兔子洞,并在事情失控之前结束本章。

总结

  • 通道用于在线程之间发送消息。

  • 一个简单灵活但可能效率低下的通道相对容易实现,只需一个 Mutex 和一个 Condvar 。

  • 一次性通道是设计为仅发送一条消息的通道。

  • MaybeUninit<T> 类型可用于表示可能尚未初始化的 T 。它的接口大多是不安全的,它的用户有责任跟踪它是否已被初始化,而不是复制非 Copy 数据,并在必要时删除其内容。

  • Not dropping objects (also called leaking or forgetting) is safe, but frowned upon when done without good reason.

  • 恐慌是创建安全接口的重要工具。

  • 按值获取非 Copy 对象可用于防止多次执行某些操作。

  • 独占借用和拆分借用可以成为强制正确性的有力工具。

  • 我们可以通过确保对象的类型不实现 Send 来确保对象保持在同一个线程上,这可以通过 PhantomData 标记类型来实现。

  • 每个设计和实施决策都涉及权衡,最好在考虑特定用例的情况下做出。

  • 在没有用例的情况下设计一些东西可能既有趣又有教育意义,但结果可能是一项无休止的任务。

上次更新: