1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Manages a single participant in the epoch scheme. This is where all
// of the actual epoch management logic happens!

use std::mem;
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::{self, AtomicUsize, AtomicBool};
use std::sync::atomic::Ordering::{Relaxed, Acquire, Release, SeqCst};

use mem::epoch::{Atomic, Guard, garbage, global};
use mem::epoch::participants::ParticipantNode;

/// Thread-local data for epoch participation.
pub struct Participant {
    /// The local epoch.
    epoch: AtomicUsize,

    /// Number of pending uses of `epoch::pin()`; keeping a count allows for
    /// reentrant use of epoch management.
    in_critical: AtomicUsize,

    /// Thread-local garbage tracking
    garbage: UnsafeCell<garbage::Local>,

    /// Is the thread still active? Becomes `false` when the thread exits. This
    /// is ultimately used to free `Participant` records.
    pub active: AtomicBool,

    /// The participant list is coded intrusively; here's the `next` pointer.
    pub next: Atomic<ParticipantNode>,
}

impl fmt::Debug for Participant {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Participant {{ ... }}")
    }
}

unsafe impl Sync for Participant {}

const GC_THRESH: usize = 32;

impl Participant {
    pub fn new() -> Participant {
        Participant {
            epoch: AtomicUsize::new(0),
            in_critical: AtomicUsize::new(0),
            active: AtomicBool::new(true),
            garbage: UnsafeCell::new(garbage::Local::new()),
            next: Atomic::null(),
        }
    }

    /// Enter a critical section.
    ///
    /// This method is reentrant, allowing for nested critical sections.
    ///
    /// Returns `true` is this is the first entry on the stack (as opposed to a
    /// re-entrant call).
    pub fn enter(&self) -> bool {
        let new_count = self.in_critical.load(Relaxed) + 1;
        self.in_critical.store(new_count, Relaxed);
        if new_count > 1 { return false }

        atomic::fence(SeqCst);

        let global_epoch = global::get().epoch.load(Relaxed);
        if global_epoch != self.epoch.load(Relaxed) {
            self.epoch.store(global_epoch, Relaxed);
            unsafe { (*self.garbage.get()).collect(); }
        }

        true
    }

    /// Exit the current (nested) critical section.
    pub fn exit(&self) {
        let new_count = self.in_critical.load(Relaxed) - 1;
        self.in_critical.store(
            new_count,
            if new_count > 0 { Relaxed } else { Release });
    }

    /// Begin the reclamation process for a piece of data.
    pub unsafe fn reclaim<T>(&self, data: *mut T) {
        (*self.garbage.get()).insert(data);
    }

    /// Attempt to collect garbage by moving the global epoch forward.
    ///
    /// Returns `true` on success.
    pub fn try_collect(&self, guard: &Guard) -> bool {
        let cur_epoch = global::get().epoch.load(SeqCst);

        for p in global::get().participants.iter(guard) {
            if p.in_critical.load(Relaxed) > 0 && p.epoch.load(Relaxed) != cur_epoch {
                return false
            }
        }

        let new_epoch = cur_epoch.wrapping_add(1);
        atomic::fence(Acquire);
        if global::get().epoch.compare_and_swap(cur_epoch, new_epoch, SeqCst) != cur_epoch {
            return false
        }

        unsafe {
            (*self.garbage.get()).collect();
            global::get().garbage[new_epoch.wrapping_add(1) % 3].collect();
        }
        self.epoch.store(new_epoch, Release);

        true
    }

    /// Move the current thread-local garbage into the global garbage bags.
    pub fn migrate_garbage(&self) {
        let cur_epoch = self.epoch.load(Relaxed);
        let local = unsafe { mem::replace(&mut *self.garbage.get(), garbage::Local::new()) };
        global::get().garbage[cur_epoch.wrapping_sub(1) % 3].insert(local.old);
        global::get().garbage[cur_epoch % 3].insert(local.cur);
        global::get().garbage[global::get().epoch.load(Relaxed) % 3].insert(local.new);
    }

    /// How much garbage is this participant currently storing?
    pub fn garbage_size(&self) -> usize {
        unsafe { (*self.garbage.get()).size() }
    }
    /// Is this participant past its local GC threshhold?
    pub fn should_gc(&self) -> bool {
        self.garbage_size() >= GC_THRESH
    }
}