1use std::{
2    cell::RefCell,
3    fmt,
4    future::Future,
5    pin::Pin,
6    sync::atomic::{AtomicUsize, Ordering},
7    task::{Context, Poll},
8    thread,
9};
10
11use futures_core::ready;
12use tokio::sync::mpsc;
13
14use crate::system::{System, SystemCommand};
15
16pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
17
18thread_local!(
19    static HANDLE: RefCell<Option<ArbiterHandle>> = const { RefCell::new(None) };
20);
21
22pub(crate) enum ArbiterCommand {
23    Stop,
24    Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
25}
26
27impl fmt::Debug for ArbiterCommand {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        match self {
30            ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
31            ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
32        }
33    }
34}
35
36#[derive(Debug, Clone)]
38pub struct ArbiterHandle {
39    tx: mpsc::UnboundedSender<ArbiterCommand>,
40}
41
42impl ArbiterHandle {
43    pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
44        Self { tx }
45    }
46
47    pub fn spawn<Fut>(&self, future: Fut) -> bool
53    where
54        Fut: Future<Output = ()> + Send + 'static,
55    {
56        self.tx
57            .send(ArbiterCommand::Execute(Box::pin(future)))
58            .is_ok()
59    }
60
61    pub fn spawn_fn<F>(&self, f: F) -> bool
68    where
69        F: FnOnce() + Send + 'static,
70    {
71        self.spawn(async { f() })
72    }
73
74    pub fn stop(&self) -> bool {
79        self.tx.send(ArbiterCommand::Stop).is_ok()
80    }
81}
82
83#[derive(Debug)]
88pub struct Arbiter {
89    tx: mpsc::UnboundedSender<ArbiterCommand>,
90    thread_handle: thread::JoinHandle<()>,
91}
92
93impl Arbiter {
94    #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
99    #[allow(clippy::new_without_default)]
100    pub fn new() -> Arbiter {
101        Self::with_tokio_rt(|| {
102            crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
103        })
104    }
105
106    #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
110    pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
111    where
112        F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
113    {
114        let sys = System::current();
115        let system_id = sys.id();
116        let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
117
118        let name = format!("actix-rt|system:{system_id}|arbiter:{arb_id}");
119        let (tx, rx) = mpsc::unbounded_channel();
120
121        let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
122
123        let thread_handle = thread::Builder::new()
124            .name(name.clone())
125            .spawn({
126                let tx = tx.clone();
127                move || {
128                    let rt = crate::runtime::Runtime::from(runtime_factory());
129                    let hnd = ArbiterHandle::new(tx);
130
131                    System::set_current(sys);
132
133                    HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
134
135                    let _ = System::current()
137                        .tx()
138                        .send(SystemCommand::RegisterArbiter(arb_id, hnd));
139
140                    ready_tx.send(()).unwrap();
141
142                    rt.block_on(ArbiterRunner { rx });
144
145                    let _ = System::current()
147                        .tx()
148                        .send(SystemCommand::DeregisterArbiter(arb_id));
149                }
150            })
151            .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));
152
153        ready_rx.recv().unwrap();
154
155        Arbiter { tx, thread_handle }
156    }
157
158    #[cfg(all(target_os = "linux", feature = "io-uring"))]
163    #[allow(clippy::new_without_default)]
164    pub fn new() -> Arbiter {
165        let sys = System::current();
166        let system_id = sys.id();
167        let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
168
169        let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
170        let (tx, rx) = mpsc::unbounded_channel();
171
172        let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
173
174        let thread_handle = thread::Builder::new()
175            .name(name.clone())
176            .spawn({
177                let tx = tx.clone();
178                move || {
179                    let hnd = ArbiterHandle::new(tx);
180
181                    System::set_current(sys);
182
183                    HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
184
185                    let _ = System::current()
187                        .tx()
188                        .send(SystemCommand::RegisterArbiter(arb_id, hnd));
189
190                    ready_tx.send(()).unwrap();
191
192                    tokio_uring::start(ArbiterRunner { rx });
194
195                    let _ = System::current()
197                        .tx()
198                        .send(SystemCommand::DeregisterArbiter(arb_id));
199                }
200            })
201            .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));
202
203        ready_rx.recv().unwrap();
204
205        Arbiter { tx, thread_handle }
206    }
207
208    pub(crate) fn in_new_system() -> ArbiterHandle {
210        let (tx, rx) = mpsc::unbounded_channel();
211
212        let hnd = ArbiterHandle::new(tx);
213
214        HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
215
216        crate::spawn(ArbiterRunner { rx });
217
218        hnd
219    }
220
221    pub fn handle(&self) -> ArbiterHandle {
223        ArbiterHandle::new(self.tx.clone())
224    }
225
226    pub fn current() -> ArbiterHandle {
231        HANDLE.with(|cell| match *cell.borrow() {
232            Some(ref hnd) => hnd.clone(),
233            None => panic!("Arbiter is not running."),
234        })
235    }
236
237    pub fn try_current() -> Option<ArbiterHandle> {
243        HANDLE.with(|cell| cell.borrow().clone())
244    }
245
246    pub fn stop(&self) -> bool {
250        self.tx.send(ArbiterCommand::Stop).is_ok()
251    }
252
253    #[track_caller]
259    pub fn spawn<Fut>(&self, future: Fut) -> bool
260    where
261        Fut: Future<Output = ()> + Send + 'static,
262    {
263        self.tx
264            .send(ArbiterCommand::Execute(Box::pin(future)))
265            .is_ok()
266    }
267
268    #[track_caller]
275    pub fn spawn_fn<F>(&self, f: F) -> bool
276    where
277        F: FnOnce() + Send + 'static,
278    {
279        self.spawn(async { f() })
280    }
281
282    pub fn join(self) -> thread::Result<()> {
286        self.thread_handle.join()
287    }
288}
289
290struct ArbiterRunner {
292    rx: mpsc::UnboundedReceiver<ArbiterCommand>,
293}
294
295impl Future for ArbiterRunner {
296    type Output = ();
297
298    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
299        loop {
301            match ready!(self.rx.poll_recv(cx)) {
302                None => return Poll::Ready(()),
304
305                Some(item) => match item {
307                    ArbiterCommand::Stop => {
308                        return Poll::Ready(());
309                    }
310                    ArbiterCommand::Execute(task_fut) => {
311                        tokio::task::spawn_local(task_fut);
312                    }
313                },
314            }
315        }
316    }
317}