第 6 章 构建我们自己的“Arc”

来自 《Rust Atomics and Locks》 的翻译文章,英文原文https://marabos.nl/atomics/building-arc.html在新窗口打开

第 1 章的“引用计数”中,我们看到了允许通过引用计数共享所有权的 std::sync::Arc<T> 类型。 Arc::new 函数创建一个新的分配,就像 Box::new 一样。但是,与 Box 不同的是,克隆的 Arc 将共享原始分配,而无需创建新分配。只有在 Arc 及其所有克隆都被删除后,共享分配才会被删除。

这种类型的实现中涉及的内存顺序注意事项可能会非常有趣。在本章中,我们将通过实现自己的 Arc<T> 将更多的理论付诸实践。我们将从一个基本版本开始,然后扩展它以支持循环结构的弱指针,并以一个与标准库中的实现几乎相同的优化版本结束本章。

基本引用计数

我们的第一个版本将使用单个 AtomicUsize 来计算共享分配的 Arc 对象的数量。让我们从一个包含这个计数器和 T 对象的结构开始:

struct ArcData<T> {
    ref_count: AtomicUsize,
    data: T,
}

请注意,此结构不是公开的。这是我们 Arc 实现的内部实现细节。

接下来是 Arc<T> 结构本身,它实际上只是一个指向(共享) ArcData<T> 对象的指针。

我们可能很想让它变成一个 Box<ArcData<T>> 的包装器,使用标准 Box 来处理 ArcData<T> 的分配。但是,Box 代表独占所有权,而不是共享所有权。我们也不能使用引用,因为我们不只是借用其他东西拥有的数据,而且它的生命周期(“直到这个 Arc 的最后一个克隆被删除”)不能直接用 Rust 生命周期表示。

相反,我们将不得不求助于使用指针并手动处理分配和所有权的概念。我们将使用 std::ptr::NonNull<T> 代替 *mut T*const T,它代表一个指向永远不为空的 T 的指针。这样,Option<Arc<T>> 将与 Arc<T> 大小相同,使用 None 的空指针表示。

use std::ptr::NonNull;

pub struct Arc<T> {
    ptr: NonNull<ArcData<T>>,
}

使用引用或 Box,编译器会自动理解它应该为哪个 T 进行 SendSync 。然而,当使用原始指针或 NonNull 时,它会保守地假设它永远不会是 SendSync,除非我们明确地告诉它。

跨线程发送 Arc<T> 会导致 T 对象被共享,要求 TSync 。类似地,跨线程发送 Arc<T> 可能会导致另一个线程丢弃该 T ,有效地将它传输到另一个线程,要求 TSend 。换句话说,当且仅当 T 既是 Send 又是 Sync 时, Arc<T> 应该是 Send 。 同样的道理也适用于 Sync,因为可以将共享的 &Arc<T> 克隆到新的 Arc<T> 中。

unsafe impl<T: Send + Sync> Send for Arc<T> {}
unsafe impl<T: Send + Sync> Sync for Arc<T> {}

对于 Arc<T>::new 来说,我们必须使用引用计数为 1 的 ArcData<T> 创建一个新分配。我们将使用 Box::new 来创建一个新的分配, Box::leak 放弃我们对该分配的独占所有权,并使用 NonNull::from 将它变成一个指针:

impl<T> Arc<T> {
    pub fn new(data: T) -> Arc<T> {
        Arc {
            ptr: NonNull::from(Box::leak(Box::new(ArcData {
                ref_count: AtomicUsize::new(1),
                data,
            }))),
        }
    }}

我们知道,只要 Arc 对象存在,指针将始终指向有效的 ArcData<T> 。然而,这并不是编译器所知道的或为我们检查的,因此通过指针访问 ArcData 需要不安全的代码。我们将添加一个私有辅助函数来获取从 ArcArcData 的数据,因为这是我们必须做的操作:

fn data(&self) -> &ArcData<T> {
    unsafe { self.ptr.as_ref() }
}

使用它,我们现在可以实现 Deref 特性,使我们的 Arc<T> 透明地表现得像对 T 的引用:

impl<T> Deref for Arc<T> {
    type Target = T;

    fn deref(&self) -> &T {
        &self.data().data
    }
}

请注意,我们没有实现 DerefMut 。由于 Arc<T> 代表共享所有权,我们不能无条件地提供 &mut T

接下来: Clone 的实现。在递增引用计数器后,克隆的 Arc 将使用相同的指针:

impl<T> Clone for Arc<T> {
    fn clone(&self) -> Self {
        // TODO: Handle overflows.
        self.data().ref_count.fetch_add(1, Relaxed);
        Arc {
            ptr: self.ptr,
        }
    }
}

我们可以使用 Relaxed 内存顺序来增加引用计数器,因为在这个原子操作之前或之后,对其他变量的操作都不需要严格发生。在此操作之前,我们已经可以访问包含的 T (通过原始的 Arc ),之后保持不变(但现在至少通过两个 Arc 对象)。

在计数器有任何溢出的机会之前, Arc 需要被克隆多次,但在循环中运行 std::mem::forget(arc.clone()) 可以使它发生。我们可以使用第 2 章“示例:ID 分配”和第 2 章“示例:无溢出的 ID 分配”中讨论的任何技术来处理此问题。

为了在正常(非溢出)情况下尽可能保持高效,我们将保留原始的 fetch_add 并在我们感到不安地接近溢出时,简单地中止整个过程:

if self.data().ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 {
    std::process::abort();
}

提示

中止这个过程并不是即时的,要留出一些时间,在此期间,另一个线程也可以调用Arc::clone,进一步增加引用计数器。因此,仅仅检查usize::MAX-1是不够的。然而,使用usize::MAX / 2作为限制,效果很好:假设每个线程至少占用几个字节的内存空间,usize::MAX / 2线程不可能同时存在。

就像我们在克隆时增加计数器一样,我们需要在删除 Arc 时减少它。看到计数器从 1 变为 0 的线程知道它丢弃了最后一个 Arc<T> ,并负责丢弃和释放 ArcData<T>

我们将使用 Box::from_raw 来收回分配的独占所有权,然后使用 drop() 立即将其删除:

impl<T> Drop for Arc<T> {
    fn drop(&mut self) {
        // TODO: Memory ordering.
        if self.data().ref_count.fetch_sub(1,) == 1 {
            unsafe {
                drop(Box::from_raw(self.ptr.as_ptr()));
            }
        }
    }
}

对于这个操作,我们不能使用 Relaxed 顺序,因为我们需要确保在我们丢弃数据时,没有任何东西仍在访问数据。换句话说,以前的 Arc 克隆中的每一次丢弃都必须在最后一次丢弃之前发生。因此,最后的 fetch_sub 必须与之前的每个 fetch_sub 操作建立先行发生关系,我们可以使用 release 和 acquire 顺序来做到这一点:例如,将它从2减1有效地“释放”数据,而将它从1递减到0“获得”它的所有权。

我们可以使用 AcqRel 内存顺序来涵盖这两种情况,但只有最后递减到0需要 Acquire ,而其他只需要 Release 。为了提高效率,我们将仅在 fetch_sub 操作中使用 Release ,并仅在必要时使用单独的 Acquire 栅栏:

if self.data().ref_count.fetch_sub(1, Release) == 1 {
    fence(Acquire);
    unsafe {
        drop(Box::from_raw(self.ptr.as_ptr()));
    }
}

测试它

为了测试我们的 Arc 是否按预期运行,我们可以编写一个单元测试来创建一个包含特殊对象的 Arc ,让我们知道它何时被丢弃:

#[test]
fn test() {
    static NUM_DROPS: AtomicUsize = AtomicUsize::new(0);

    struct DetectDrop;

    impl Drop for DetectDrop {
        fn drop(&mut self) {
            NUM_DROPS.fetch_add(1, Relaxed);
        }
    }

    // Create two Arcs sharing an object containing a string
    // and a DetectDrop, to detect when it's dropped.
    let x = Arc::new(("hello", DetectDrop));
    let y = x.clone();

    // Send x to another thread, and use it there.
    let t = std::thread::spawn(move || {
        assert_eq!(x.0, "hello");
    });

    // In parallel, y should still be usable here.
    assert_eq!(y.0, "hello");

    // Wait for the thread to finish.
    t.join().unwrap();

    // One Arc, x, should be dropped by now.
    // We still have y, so the object shouldn't have been dropped yet.
    assert_eq!(NUM_DROPS.load(Relaxed), 0);

    // Drop the remaining `Arc`.
    drop(y);

    // Now that `y` is dropped too,
    // the object should've been dropped.
    assert_eq!(NUM_DROPS.load(Relaxed), 1);
}

这编译并运行良好,所以我们的 Arc 似乎按预期运行!虽然这令人鼓舞,但并不能证明实现是完全正确的。建议使用涉及多个线程的长时间压力测试以获得更多信心。

Miri

使用 Miri 运行测试也非常有用。 Miri 是一个实验性但非常有用和强大的工具,用于检查不安全代码是否存在各种形式的未定义行为。

Miri 是 Rust 编译器的中级中间表示的解释器。这意味着它运行您的代码不是通过将代码编译为本机处理器指令,而是通过在类型和生命周期等信息仍然可用时解释它。因此,Miri 运行程序的速度比正常编译和运行时要慢得多,但能够检测到许多会导致未定义行为的错误。

它包括对检测数据竞争的实验性支持,这使得它能够检测内存顺序问题。

有关如何使用 Miri 的更多详细信息和指南,请参阅其 GitHub 页面在新窗口打开

Mutation

如前所述,我们不能为我们的 Arc 实现 DerefMut 。我们不能无条件地承诺对数据的独占访问 ( &mut T ),因为它可能会通过其他 Arc 对象访问。

但是,我们能做的是有条件地允许它。我们可以创建一个仅在引用计数器为 1 时,才给出 &mut T 的方法,证明没有其他 Arc 对象可用于访问相同的数据。

这个函数,我们称之为get_mut,将必须接受一个&mut Self,以确保没有其他任何东西可以使用同一个 Arc 来访问 T 。如果 Arc 仍然可以共享,那么知道只有一个 Arc 将毫无意义。

我们需要使用获取内存顺序来确保以前拥有 Arc 克隆的线程不再访问数据。我们需要与每个导致引用计数器为 1 的 drop 建立一个先行发生关系。

这只有在引用计数器实际为1时才重要;如果它更高,我们将不提供&mut T,并且内存顺序无关紧要。因此,我们可以使用一个宽松的加载,然后是一个有条件的获取栅栏,如下所示:

 pub fn get_mut(arc: &mut Self) -> Option<&mut T> {
    if arc.data().ref_count.load(Relaxed) == 1 {
        fence(Acquire);
        // Safety: Nothing else can access the data, since
        // there's only one Arc, to which we have exclusive access.
        unsafe { Some(&mut arc.ptr.as_mut().data) }
    } else {
        None
    }
}

此函数不采用 self 参数,而是采用常规参数(名为 arc )。这意味着它只能被称为 Arc::get_mut(&mut a) ,而不能被称为 a.get_mut() 。这对于实现 Deref 的类型是可取的,以避免与底层 T 上类似命名的方法产生歧义。

返回的可变引用,隐式地从参数中借用了生命周期,这意味着只要返回的 &mut T 仍然存在,就没有任何东西可以使用原始的 Arc ,从而允许安全变异。

&mut T 的生命周期到期时, Arc 可以再次使用并与其他线程共享。有人可能想知道我们是否需要担心之后访问数据的线程的内存顺序。然而,这是用于与另一个线程共享 Arc (或它的新克隆)的任何机制的责任。 (例如,互斥锁、通道或生成新线程。)

弱指针

当在内存中表示由多个对象组成的结构时,引用计数非常有用。例如,树结构中的每个节点都可以包含指向其每个子节点的 Arc 。这样,当一个节点被删除时,其不再使用的子节点也将全部(递归地)删除。

然而,这对于循环结构来说是不成立的。如果一个子节点也包含一个指向其父节点的 Arc ,那么两者都不会被删除,因为总是至少有一个 Arc 仍然引用它。

标准库的 Arc 提供了针对该问题的解决方案: Weak<T>Weak<T>也称为弱指针,其行为有点像 Arc<T> ,但不会阻止对象被删除。 T 可以在多个 Arc<T>Weak<T> 对象之间共享,但是当所有 Arc<T> 对象都消失时, T 将被删除,无论是否还有任何 Weak<T> 对象。

这意味着 Weak<T> 可以在没有 T 的情况下存在,因此不能像 Arc<T> 那样无条件地提供 &T 。但是,要访问给定 Weak<T>T ,可以通过其 upgrade() 方法将其升级为 Arc<T> 。此方法返回 Option<Arc<T>> ,如果 T 已被删除,则返回 None

在基于 Arc 的结构中, Weak 可用于中断循环。例如,树结构中的子节点可以使用 Weak 而不是 Arc 作为它们的父节点。然后,父节点的删除不会因为它的子节点的存在而被阻止。

让我们来实现它。

就像之前一样,当 Arc 对象的数量为零时,我们可以删除包含的 T 对象。但是,我们还不能删除和释放 ArcData ,因为可能仍然有弱指针引用它。只有最后一个 Weak 指针也消失了,我们才能删除并释放 ArcData

因此,我们将使用两个计数器:一个用于“引用 T 的事物的数量”,另一个用于“引用 ArcData<T> 的事物的数量”。换句话说,第一个计数器与之前相同:它计算 Arc 个对象,而第二个计数器同时计算 ArcWeak 个对象。

我们还需要一些东西,允许我们在 ArcData<T> 仍在被弱指针使用时删除包含的对象 ( T )。我们将使用 Option<T> ,以便我们可以在数据被删除时使用 None ,并将其包装在 UnsafeCell 中以实现内部可变性(第 1 章中的“内部可变性”),以允许在 ArcData<T> 不是独占时发生这种情况:

struct ArcData<T> {
    /// Number of `Arc`s.
    data_ref_count: AtomicUsize,
    /// Number of `Arc`s and `Weak`s combined.
    alloc_ref_count: AtomicUsize,
    /// The data. `None` if there's only weak pointers left.
    data: UnsafeCell<Option<T>>,
}

如果我们将 Weak<T> 视为负责保持 ArcData<T> 存活的对象,那么将 Arc<T> 实现为包含 Weak<T> 的结构是有意义的,因为 Arc<T> 需要做同样的事情,甚至更多。

pub struct Arc<T> {
    weak: Weak<T>,
}

pub struct Weak<T> {
    ptr: NonNull<ArcData<T>>,
}

unsafe impl<T: Sync + Send> Send for Weak<T> {}
unsafe impl<T: Sync + Send> Sync for Weak<T> {}

new 函数与以前基本相同,除了它现在有两个计数器要同时初始化:

impl<T> Arc<T> {
    pub fn new(data: T) -> Arc<T> {
        Arc {
            weak: Weak {
                ptr: NonNull::from(Box::leak(Box::new(ArcData {
                    alloc_ref_count: AtomicUsize::new(1),
                    data_ref_count: AtomicUsize::new(1),
                    data: UnsafeCell::new(Some(data)),
                }))),
            },
        }
    }}

就像以前一样,我们假设 ptr 字段始终指向有效的 ArcData<T> 。这一次,我们将把这个假设编码为 Weak<T> 上的私有 data() 辅助方法:

impl<T> Weak<T> {
    fn data(&self) -> &ArcData<T> {
        unsafe { self.ptr.as_ref() }
    }}

Arc<T>Deref 实现中,我们现在必须使用 UnsafeCell::get() 获取指向单元格内容的指针,并使用不安全代码来保证此时可以安全地共享它。我们还需要 as_ref().unwrap() 来获取对 Option<T> 的引用。我们不必担心这种恐慌,因为当没有 Arc 对象时, Option 只会是 None

impl<T> Deref for Arc<T> {
    type Target = T;

    fn deref(&self) -> &T {
        let ptr = self.weak.data().data.get();
        // Safety: Since there's an Arc to the data,
        // the data exists and may be shared.
        unsafe { (*ptr).as_ref().unwrap() }
    }
}

Weak<T>Clone 实现非常简单;它与我们之前对 Arc<T>Clone 实现几乎相同:

impl<T> Clone for Weak<T> {
    fn clone(&self) -> Self {
        if self.data().alloc_ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 {
            std::process::abort();
        }
        Weak { ptr: self.ptr }
    }
}

在我们新的 Arc<T>Clone 实现中,我们需要递增两个计数器。我们将简单地使用 self.weak.clone() 为第一个计数器重用上面的代码,因此我们只需要手动增加第二个计数器:

impl<T> Clone for Arc<T> {
    fn clone(&self) -> Self {
        let weak = self.weak.clone();
        if weak.data().data_ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 {
            std::process::abort();
        }
        Arc { weak }
    }
}

删除 Weak 应该减少其计数器,并在计数器从 1 变为零时删除并释放 ArcData 。这与我们之前 ArcDrop 实现所做的相同。

impl<T> Drop for Weak<T> {
    fn drop(&mut self) {
        if self.data().alloc_ref_count.fetch_sub(1, Release) == 1 {
            fence(Acquire);
            unsafe {
                drop(Box::from_raw(self.ptr.as_ptr()));
            }
        }
    }
}

删除 Arc 应该减少两个计数器。请注意,其中一个已被自动处理,因为每个 Arc 都包含一个 Weak ,这样删除一个 Arc 也会导致删除一个 Weak 。我们只需要处理另一个计数器:

impl<T> Drop for Arc<T> {
    fn drop(&mut self) {
        if self.weak.data().data_ref_count.fetch_sub(1, Release) == 1 {
            fence(Acquire);
            let ptr = self.weak.data().data.get();
            // Safety: The data reference counter is zero,
            // so nothing will access it.
            unsafe {
                (*ptr) = None;
            }
        }
    }
}

提示

在 Rust 中删除一个对象将首先运行它的 Drop::drop 函数(如果它实现了 Drop ),然后递归地一个接一个地删除它的所有字段。

get_mut 方法中的检查基本保持不变,只是它现在需要考虑弱指针。看起来它在检查排他性时可以忽略弱指针,但 Weak<T> 可以随时升级为 Arc<T> 。因此, get_mut 必须先检查没有其他的 Arc<T>Weak<T> 指针,然后才能发出 &mut T

impl<T> Arc<T> {pub fn get_mut(arc: &mut Self) -> Option<&mut T> {
        if arc.weak.data().alloc_ref_count.load(Relaxed) == 1 {
            fence(Acquire);
            // Safety: Nothing else can access the data, since
            // there's only one Arc, to which we have exclusive access,
            // and no Weak pointers.
            let arcdata = unsafe { arc.weak.ptr.as_mut() };
            let option = arcdata.data.get_mut();
            // We know the data is still available since we
            // have an Arc to it, so this won't panic.
            let data = option.as_mut().unwrap();
            Some(data)
        } else {
            None
        }
    }}

接下来:升级弱指针。只有当数据仍然存在时,才能将 Weak 升级为 Arc 。如果只剩下弱指针,那么就没有可以通过 Arc 共享的数据了。因此,我们必须增加 Arc 计数器,但只有当它尚未为零时才能这样做。我们将使用比较和交换循环(第 2 章中的“比较和交换操作”)来执行此操作。

就像以前一样,宽松的内存顺序非常适合递增引用计数器。在这个原子操作之前或之后,没有对其他变量的操作需要严格发生。

impl<T> Weak<T> {pub fn upgrade(&self) -> Option<Arc<T>> {
        let mut n = self.data().data_ref_count.load(Relaxed);
        loop {
            if n == 0 {
                return None;
            }
            assert!(n <= usize::MAX / 2);
            if let Err(e) =
                self.data()
                    .data_ref_count
                    .compare_exchange_weak(n, n + 1, Relaxed, Relaxed)
            {
                n = e;
                continue;
            }
            return Some(Arc { weak: self.clone() });
        }
    }
}

相反,从 Arc<T> 获取 Weak<T> 则要简单得多:

impl<T> Arc<T> {pub fn downgrade(arc: &Self) -> Weak<T> {
        arc.weak.clone()
    }
}

测试它

为了快速测试我们的创建,我们将修改之前的单元测试以使用弱指针并验证它们是否可以在预期时升级:

#[test]
fn test() {
    static NUM_DROPS: AtomicUsize = AtomicUsize::new(0);

    struct DetectDrop;

    impl Drop for DetectDrop {
        fn drop(&mut self) {
            NUM_DROPS.fetch_add(1, Relaxed);
        }
    }

    // Create an Arc with two weak pointers.
    let x = Arc::new(("hello", DetectDrop));
    let y = Arc::downgrade(&x);
    let z = Arc::downgrade(&x);

    let t = std::thread::spawn(move || {
        // Weak pointer should be upgradable at this point.
        let y = y.upgrade().unwrap();
        assert_eq!(y.0, "hello");
    });
    assert_eq!(x.0, "hello");
    t.join().unwrap();

    // The data shouldn't be dropped yet,
    // and the weak pointer should be upgradable.
    assert_eq!(NUM_DROPS.load(Relaxed), 0);
    assert!(z.upgrade().is_some());

    drop(x);

    // Now, the data should be dropped, and the
    // weak pointer should no longer be upgradable.
    assert_eq!(NUM_DROPS.load(Relaxed), 1);
    assert!(z.upgrade().is_none());
}

这也可以毫无问题地编译和运行,这给我们留下了一个非常有用的自定义 Arc 实现。

优化

虽然弱指针可能很有用,但 Arc 类型通常在没有任何弱指针的情况下使用。我们上一个实现的一个缺点是,克隆和删除 Arc 现在都需要两个原子操作,因为它们必须递增或递减两个计数器。这使得所有 Arc 用户“支付”弱指针的成本,即使他们没有使用它们。

解决方案似乎是分别计算 Arc<T>Weak<T> 指针,但这样我们就无法自动检查两个计数器是否为零。要理解这是怎么回事,想象一下我们有一个线程正在执行以下烦人的函数:

fn annoying(mut arc: Arc<Something>) {
    loop {
        let weak = Arc::downgrade(&arc);
        drop(arc);
        println!("I have no Arc!"); // 1
        arc = weak.upgrade().unwrap();
        drop(weak);
        println!("I have no Weak!"); // 2
    }
}

该线程不断地降级和升级 Arc ,这样它就反复循环不持有 Arc (1) 的时刻和不持有 Weak (2) 的时刻。如果我们检查两个计数器以查看是否有任何线程仍在使用分配,如果我们不走运,该线程可能会隐藏其存在并在其第一个打印语句 (1) 期间检查 Arc 计数器,但检查 Weak 在其第二个打印语句 (2) 期间计数器。

在我们最后的实现中,我们通过将每个 Arc 也算作 Weak 来解决这个问题。解决这个问题的一种更巧妙的方法是将所有 Arc 指针组合为一个 Weak 指针。这样,只要仍然有至少一个 Arc 对象,弱指针计数器 ( alloc_ref_count ) 就永远不会达到零,就像在我们上一个实现中一样,但是克隆一个 Arc 根本不需要触动这个计数器。只有丢弃最后一个 Arc 才会使弱指针计数器递减。

让我们试试看。

这一次,我们不能只将 Arc<T> 实现为 Weak<T> 的包装器,所以两者都将包装一个指向分配的非空指针:

pub struct Arc<T> {
    ptr: NonNull<ArcData<T>>,
}

unsafe impl<T: Sync + Send> Send for Arc<T> {}
unsafe impl<T: Sync + Send> Sync for Arc<T> {}

pub struct Weak<T> {
    ptr: NonNull<ArcData<T>>,
}

unsafe impl<T: Sync + Send> Send for Weak<T> {}
unsafe impl<T: Sync + Send> Sync for Weak<T> {}

由于我们正在优化我们的实现,我们不妨通过使用 std::mem::ManuallyDrop<T> 而不是 Option<T> 来使 ArcData<T> 稍微小一些。我们使用 Option<T> 来在删除数据时用 None 替换 Some(T) ,但我们实际上不需要单独的 None 状态来告诉我们数据已经消失,因为存在或不存在 Arc<T> 已经告诉我们了。 ManuallyDrop<T> 占用的空间量与 T 完全相同,但允许我们通过使用对 ManuallyDrop::drop() 的不安全调用在任何时候手动删除它:

use std::mem::ManuallyDrop;

struct ArcData<T> {
    /// Number of `Arc`s.
    data_ref_count: AtomicUsize,
    /// Number of `Weak`s, plus one if there are any `Arc`s.
    alloc_ref_count: AtomicUsize,
    /// The data. Dropped if there are only weak pointers left.
    data: UnsafeCell<ManuallyDrop<T>>,
}

Arc::new 函数几乎保持不变,像以前一样同时初始化两个计数器,但现在使用 ManuallyDrop::new() 而不是 Some()

impl<T> Arc<T> {
    pub fn new(data: T) -> Arc<T> {
        Arc {
            ptr: NonNull::from(Box::leak(Box::new(ArcData {
                alloc_ref_count: AtomicUsize::new(1),
                data_ref_count: AtomicUsize::new(1),
                data: UnsafeCell::new(ManuallyDrop::new(data)),
            }))),
        }
    }}

Deref 实现不能再使用 Weak 类型的私有 data 方法,因此我们将在 Arc<T> 上添加相同的私有辅助函数:

impl<T> Arc<T> {fn data(&self) -> &ArcData<T> {
        unsafe { self.ptr.as_ref() }
    }}

impl<T> Deref for Arc<T> {
    type Target = T;

    fn deref(&self) -> &T {
        // Safety: Since there's an Arc to the data,
        // the data exists and may be shared.
        unsafe { &*self.data().data.get() }
    }
}

Weak<T>CloneDrop 实现与我们上次的实现完全相同。在这里它们是为了完整性,包括私有的 Weak::data 辅助函数:

impl<T> Weak<T> {
    fn data(&self) -> &ArcData<T> {
        unsafe { self.ptr.as_ref() }
    }}

impl<T> Clone for Weak<T> {
    fn clone(&self) -> Self {
        if self.data().alloc_ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 {
            std::process::abort();
        }
        Weak { ptr: self.ptr }
    }
}

impl<T> Drop for Weak<T> {
    fn drop(&mut self) {
        if self.data().alloc_ref_count.fetch_sub(1, Release) == 1 {
            fence(Acquire);
            unsafe {
                drop(Box::from_raw(self.ptr.as_ptr()));
            }
        }
    }
}

现在我们开始讨论这个新的优化实现的全部内容——克隆一个 Arc<T> 现在只需要接触一个计数器:

impl<T> Clone for Arc<T> {
    fn clone(&self) -> Self {
        if self.data().data_ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 {
            std::process::abort();
        }
        Arc { ptr: self.ptr }
    }
}

类似地,删除 Arc<T> 现在只需要递减一个计数器,除了看到该计数器从 1 变为0的最后一次删除。在这种情况下,弱指针计数器也需要递减,这样一旦没有弱指针,它就可以达到0。我们通过简单地凭空创建一个 Weak<T> 并立即删除它来做到这一点:

impl<T> Drop for Arc<T> {
    fn drop(&mut self) {
        if self.data().data_ref_count.fetch_sub(1, Release) == 1 {
            fence(Acquire);
            // Safety: The data reference counter is zero,
            // so nothing will access the data anymore.
            unsafe {
                ManuallyDrop::drop(&mut *self.data().data.get());
            }
            // Now that there's no `Arc<T>`s left,
            // drop the implicit weak pointer that represented all `Arc<T>`s.
            drop(Weak { ptr: self.ptr });
        }
    }
}

Weak<T> 上的 upgrade 方法基本保持不变,除了它不再克隆弱指针,因为它不再需要增加弱指针计数器。只有在分配中已经有至少一个 Arc<T> ,升级才会成功,这意味着弱指针计数器中已经考虑了 Arc

impl<T> Weak<T> {pub fn upgrade(&self) -> Option<Arc<T>> {
        let mut n = self.data().data_ref_count.load(Relaxed);
        loop {
            if n == 0 {
                return None;
            }
            assert!(n <= usize::MAX / 2);
            if let Err(e) =
                self.data()
                    .data_ref_count
                    .compare_exchange_weak(n, n + 1, Relaxed, Relaxed)
            {
                n = e;
                continue;
            }
            return Some(Arc { ptr: self.ptr });
        }
    }
}

到目前为止,这与我们之前的实现之间的差异非常小。然而,事情变得棘手的是我们仍然需要实现的最后两个方法: downgradeget_mut

与以前不同, get_mut 方法现在需要检查两个计数器是否都设置为 1,以便能够确定是否只剩下一个 Arc<T> 而没有 Weak<T> ,因为 1 的弱指针计数器现在可以表示多个 Arc<T> 指针。读取计数器是两个发生在(稍微)不同时间的独立操作,因此我们必须非常小心,不要错过任何并发降级,例如我们在“优化”开始时看到的示例。

如果我们首先检查 data_ref_count 是一个,那么在检查另一个计数器之前我们可能会错过后续的 upgrade() 。但是,如果我们首先检查 alloc_ref_count 是一个,那么在检查另一个计数器之前我们可能会错过后续的 downgrade()

摆脱这种困境的一种方法是通过“锁定”弱指针计数器来暂时阻止 downgrade() 操作。为此,我们不需要互斥体之类的东西。我们可以使用一个特殊的值,比如 usize::MAX ,来表示弱指针计数器的一个特殊的“锁定”状态。它只会被非常短暂地锁定,只是为了加载另一个计数器,所以 downgrade 方法可以一直旋转直到它被解锁,在不太可能的情况下,它与 get_mut 在同一时刻运行。

因此,在 get_mut 中,我们首先必须检查 alloc_ref_count 是否为1,同时如果确实为1,则用 usize::MAX 替换它。这是 compare_exchange 的工作。

然后我们必须检查另一个计数器是否也是1,之后我们可以立即解锁弱指针计数器。如果第二个计数器也是 1,我们就知道我们可以独占访问分配和数据,这样我们就可以返回 &mut T

 pub fn get_mut(arc: &mut Self) -> Option<&mut T> {
        // Acquire matches Weak::drop's Release decrement, to make sure any
        // upgraded pointers are visible in the next data_ref_count.load.
        if arc.data().alloc_ref_count.compare_exchange(
            1, usize::MAX, Acquire, Relaxed
        ).is_err() {
            return None;
        }
        let is_unique = arc.data().data_ref_count.load(Relaxed) == 1;
        // Release matches Acquire increment in `downgrade`, to make sure any
        // changes to the data_ref_count that come after `downgrade` don't
        // change the is_unique result above.
        arc.data().alloc_ref_count.store(1, Release);
        if !is_unique {
            return None;
        }
        // Acquire to match Arc::drop's Release decrement, to make sure nothing
        // else is accessing the data.
        fence(Acquire);
        unsafe { Some(&mut *arc.data().data.get()) }
    }

如您所料,锁定操作( compare_exchange )必须使用 Acquire 内存顺序,解锁操作( store )必须使用 Release 内存顺序。

如果我们使用 Relaxed 代替 compare_exchangedata_ref_count 的后续 load 可能看不到新升级的 Weak 指针的新值,即使 compare_exchange 已经确认每个 Weak 指针都已被丢弃。

如果我们对 store 使用了 Relaxed ,则前面的 load 可能会观察到未来 Arc::drop 对仍可降级的 Arc 的结果。

获取栅栏与之前相同:它与 Arc::Drop 中的释放递减操作同步,以确保通过前 Arc 克隆的每次访问都在新的独占访问之前发生。

最后一块拼图是 downgrade 方法,它必须检查特殊的 usize::MAX 值,以查看弱指针计数器是否被锁定,并自旋直到它被解锁。就像在 upgrade 实现中一样,我们将使用比较和交换循环来检查特殊值和溢出,然后再递增计数器:

pub fn downgrade(arc: &Self) -> Weak<T> {
        let mut n = arc.data().alloc_ref_count.load(Relaxed);
        loop {
            if n == usize::MAX {
                std::hint::spin_loop();
                n = arc.data().alloc_ref_count.load(Relaxed);
                continue;
            }
            assert!(n <= usize::MAX / 2);
            // Acquire synchronises with get_mut's release-store.
            if let Err(e) =
                arc.data()
                    .alloc_ref_count
                    .compare_exchange_weak(n, n + 1, Acquire, Relaxed)
            {
                n = e;
                continue;
            }
            return Weak { ptr: arc.ptr };
        }
    }

我们为 compare_exchange_weak 使用获取内存顺序,它与 get_mut 函数中的释放存储同步。否则,后续 Arc::drop 的效果可能在解锁计数器之前对运行 get_mut 的线程可见。

换句话说,这里的获取比较和交换操作有效地“锁定”了 get_mut ,阻止它成功。它可以被稍后的 Weak::drop 再次“解锁”,使用释放内存顺序将计数器递减回1。

提示

我们刚刚制作的 Arc<T>Weak<T> 的优化实现与 Rust 标准库中包含的几乎相同。

如果我们运行与之前完全相同的测试(“测试它”),我们会看到这个优化的实现也编译并通过了我们的测试。

提示

如果您觉得为这个优化的实现做出正确的内存顺序决策很困难,请不要担心。许多并发数据结构比这个更容易正确实现。这个 Arc 实现之所以包含在本章中,是因为它在内存顺序方面有一些微妙之处。

总结

  • Arc<T> 提供引用计数分配的共享所有权。

  • 通过检查引用计数器是否恰好为 1, Arc<T> 可以有条件地提供独占访问 ( &mut T )。

  • 可以使用宽松的操作来增加原子引用计数器,但最终的减量必须与所有先前的减量同步。

  • 可以使用弱指针 ( Weak<T> ) 来避免循环。

  • NonNull<T> 类型表示指向永远不为空的 T 的指针。

  • ManuallyDrop<T> 类型可用于使用 unsafe 代码手动决定何时删除 T

  • 一旦涉及多个原子变量,事情就会变得更加复杂。

  • 实现临时(自旋)锁有时可以成为一次对多个原子变量进行操作的有效策略。

上次更新: