futures_util/sink/
unfold.rs1use super::assert_sink;
2use crate::unfold_state::UnfoldState;
3use core::{future::Future, pin::Pin};
4use futures_core::ready;
5use futures_core::task::{Context, Poll};
6use futures_sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10    #[derive(Debug)]
12    #[must_use = "sinks do nothing unless polled"]
13    pub struct Unfold<T, F, R> {
14        function: F,
15        #[pin]
16        state: UnfoldState<T, R>,
17    }
18}
19
20pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
40where
41    F: FnMut(T, Item) -> R,
42    R: Future<Output = Result<T, E>>,
43{
44    assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
45}
46
47impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
48where
49    F: FnMut(T, Item) -> R,
50    R: Future<Output = Result<T, E>>,
51{
52    type Error = E;
53
54    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55        self.poll_flush(cx)
56    }
57
58    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
59        let mut this = self.project();
60        let future = match this.state.as_mut().take_value() {
61            Some(value) => (this.function)(value, item),
62            None => panic!("start_send called without poll_ready being called first"),
63        };
64        this.state.set(UnfoldState::Future { future });
65        Ok(())
66    }
67
68    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69        let mut this = self.project();
70        Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
71            match ready!(future.poll(cx)) {
72                Ok(state) => {
73                    this.state.set(UnfoldState::Value { value: state });
74                    Ok(())
75                }
76                Err(err) => {
77                    this.state.set(UnfoldState::Empty);
78                    Err(err)
79                }
80            }
81        } else {
82            Ok(())
83        })
84    }
85
86    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        self.poll_flush(cx)
88    }
89}