async_oneshot/
sender.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use crate::*;
use alloc::sync::Arc;
use core::{future::Future, task::Poll};
use futures_micro::poll_fn;

/// The sending half of a oneshot channel.
#[derive(Debug)]
pub struct Sender<T> {
    inner: Arc<Inner<T>>,
    done: bool,
}

impl<T> Sender<T> {
    #[inline(always)]
    pub(crate) fn new(inner: Arc<Inner<T>>) -> Self {
        Sender { inner, done: false }
    }

    /// Closes the channel by causing an immediate drop
    #[inline(always)]
    pub fn close(self) { }

    /// true if the channel is closed
    #[inline(always)]
    pub fn is_closed(&self) -> bool { self.inner.state().closed() }

    /// Waits for a Receiver to be waiting for us to send something
    /// (i.e. allows you to produce a value to send on demand).
    /// Fails if the Receiver is dropped.
    #[inline]
    pub fn wait(self) -> impl Future<Output = Result<Self, Closed>> {
        let mut this = Some(self);
        poll_fn(move |ctx| {
            let mut that = this.take().unwrap();
            let state = that.inner.state();
            if state.closed() {
                that.done = true;
                Poll::Ready(Err(Closed()))
            } else if state.recv() {
                Poll::Ready(Ok(that))
            } else {
                that.inner.set_send(ctx.waker().clone());
                this = Some(that);
                Poll::Pending
            }
        })
    }

    /// Sends a message on the channel. Fails if the Receiver is dropped.
    #[inline]
    pub fn send(&mut self, value: T) -> Result<(), Closed> {
        if self.done {
            Err(Closed())
        } else {
            self.done = true;
            let inner = &mut self.inner;
            let state = inner.set_value(value);
            if !state.closed() {
                if state.recv() {
                    inner.recv().wake_by_ref();
                    Ok(())
                } else {
                    Ok(())
                }
            } else {
                inner.take_value(); // force drop.
                Err(Closed())
            }
        }
    }

}

impl<T> Drop for Sender<T> {
    #[inline(always)]
    fn drop(&mut self) {
        if !self.done {
            let state = self.inner.state();
            if !state.closed() {
                let old = self.inner.close();
                if old.recv() { self.inner.recv().wake_by_ref(); }
            }
        }
    }
}