1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::mem;
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::{self, AtomicUsize, AtomicBool};
use std::sync::atomic::Ordering::{Relaxed, Acquire, Release, SeqCst};
use mem::epoch::{Atomic, Guard, garbage, global};
use mem::epoch::participants::ParticipantNode;
pub struct Participant {
epoch: AtomicUsize,
in_critical: AtomicUsize,
garbage: UnsafeCell<garbage::Local>,
pub active: AtomicBool,
pub next: Atomic<ParticipantNode>,
}
impl fmt::Debug for Participant {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Participant {{ ... }}")
}
}
unsafe impl Sync for Participant {}
const GC_THRESH: usize = 32;
impl Participant {
pub fn new() -> Participant {
Participant {
epoch: AtomicUsize::new(0),
in_critical: AtomicUsize::new(0),
active: AtomicBool::new(true),
garbage: UnsafeCell::new(garbage::Local::new()),
next: Atomic::null(),
}
}
pub fn enter(&self) -> bool {
let new_count = self.in_critical.load(Relaxed) + 1;
self.in_critical.store(new_count, Relaxed);
if new_count > 1 { return false }
atomic::fence(SeqCst);
let global_epoch = global::get().epoch.load(Relaxed);
if global_epoch != self.epoch.load(Relaxed) {
self.epoch.store(global_epoch, Relaxed);
unsafe { (*self.garbage.get()).collect(); }
}
true
}
pub fn exit(&self) {
let new_count = self.in_critical.load(Relaxed) - 1;
self.in_critical.store(
new_count,
if new_count > 0 { Relaxed } else { Release });
}
pub unsafe fn reclaim<T>(&self, data: *mut T) {
(*self.garbage.get()).insert(data);
}
pub fn try_collect(&self, guard: &Guard) -> bool {
let cur_epoch = global::get().epoch.load(SeqCst);
for p in global::get().participants.iter(guard) {
if p.in_critical.load(Relaxed) > 0 && p.epoch.load(Relaxed) != cur_epoch {
return false
}
}
let new_epoch = cur_epoch.wrapping_add(1);
atomic::fence(Acquire);
if global::get().epoch.compare_and_swap(cur_epoch, new_epoch, SeqCst) != cur_epoch {
return false
}
unsafe {
(*self.garbage.get()).collect();
global::get().garbage[new_epoch.wrapping_add(1) % 3].collect();
}
self.epoch.store(new_epoch, Release);
true
}
pub fn migrate_garbage(&self) {
let cur_epoch = self.epoch.load(Relaxed);
let local = unsafe { mem::replace(&mut *self.garbage.get(), garbage::Local::new()) };
global::get().garbage[cur_epoch.wrapping_sub(1) % 3].insert(local.old);
global::get().garbage[cur_epoch % 3].insert(local.cur);
global::get().garbage[global::get().epoch.load(Relaxed) % 3].insert(local.new);
}
pub fn garbage_size(&self) -> usize {
unsafe { (*self.garbage.get()).size() }
}
pub fn should_gc(&self) -> bool {
self.garbage_size() >= GC_THRESH
}
}