actix_web/types/
readlines.rs1use std::{
4    borrow::Cow,
5    pin::Pin,
6    str,
7    task::{Context, Poll},
8};
9
10use bytes::{Bytes, BytesMut};
11use encoding_rs::{Encoding, UTF_8};
12use futures_core::{ready, stream::Stream};
13
14use crate::{
15    dev::Payload,
16    error::{PayloadError, ReadlinesError},
17    HttpMessage,
18};
19
20pub struct Readlines<T: HttpMessage> {
22    stream: Payload<T::Stream>,
23    buf: BytesMut,
24    limit: usize,
25    checked_buff: bool,
26    encoding: &'static Encoding,
27    err: Option<ReadlinesError>,
28}
29
30impl<T> Readlines<T>
31where
32    T: HttpMessage,
33    T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
34{
35    pub fn new(req: &mut T) -> Self {
37        let encoding = match req.encoding() {
38            Ok(enc) => enc,
39            Err(err) => return Self::err(err.into()),
40        };
41
42        Readlines {
43            stream: req.take_payload(),
44            buf: BytesMut::with_capacity(262_144),
45            limit: 262_144,
46            checked_buff: true,
47            err: None,
48            encoding,
49        }
50    }
51
52    pub fn limit(mut self, limit: usize) -> Self {
54        self.limit = limit;
55        self
56    }
57
58    fn err(err: ReadlinesError) -> Self {
59        Readlines {
60            stream: Payload::None,
61            buf: BytesMut::new(),
62            limit: 262_144,
63            checked_buff: true,
64            encoding: UTF_8,
65            err: Some(err),
66        }
67    }
68}
69
70impl<T> Stream for Readlines<T>
71where
72    T: HttpMessage,
73    T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
74{
75    type Item = Result<String, ReadlinesError>;
76
77    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        let this = self.get_mut();
79
80        if let Some(err) = this.err.take() {
81            return Poll::Ready(Some(Err(err)));
82        }
83
84        if !this.checked_buff {
86            let mut found: Option<usize> = None;
87            for (ind, b) in this.buf.iter().enumerate() {
88                if *b == b'\n' {
89                    found = Some(ind);
90                    break;
91                }
92            }
93            if let Some(ind) = found {
94                if ind + 1 > this.limit {
96                    return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
97                }
98                let line = if this.encoding == UTF_8 {
99                    str::from_utf8(&this.buf.split_to(ind + 1))
100                        .map_err(|_| ReadlinesError::EncodingError)?
101                        .to_owned()
102                } else {
103                    this.encoding
104                        .decode_without_bom_handling_and_without_replacement(
105                            &this.buf.split_to(ind + 1),
106                        )
107                        .map(Cow::into_owned)
108                        .ok_or(ReadlinesError::EncodingError)?
109                };
110                return Poll::Ready(Some(Ok(line)));
111            }
112            this.checked_buff = true;
113        }
114
115        match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
117            Some(Ok(mut bytes)) => {
118                let mut found: Option<usize> = None;
120                for (ind, b) in bytes.iter().enumerate() {
121                    if *b == b'\n' {
122                        found = Some(ind);
123                        break;
124                    }
125                }
126                if let Some(ind) = found {
127                    if ind + 1 > this.limit {
129                        return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
130                    }
131                    let line = if this.encoding == UTF_8 {
132                        str::from_utf8(&bytes.split_to(ind + 1))
133                            .map_err(|_| ReadlinesError::EncodingError)?
134                            .to_owned()
135                    } else {
136                        this.encoding
137                            .decode_without_bom_handling_and_without_replacement(
138                                &bytes.split_to(ind + 1),
139                            )
140                            .map(Cow::into_owned)
141                            .ok_or(ReadlinesError::EncodingError)?
142                    };
143                    this.buf.extend_from_slice(&bytes);
145                    this.checked_buff = false;
146                    return Poll::Ready(Some(Ok(line)));
147                }
148                this.buf.extend_from_slice(&bytes);
149                Poll::Pending
150            }
151
152            None => {
153                if this.buf.is_empty() {
154                    return Poll::Ready(None);
155                }
156                if this.buf.len() > this.limit {
157                    return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
158                }
159                let line = if this.encoding == UTF_8 {
160                    str::from_utf8(&this.buf)
161                        .map_err(|_| ReadlinesError::EncodingError)?
162                        .to_owned()
163                } else {
164                    this.encoding
165                        .decode_without_bom_handling_and_without_replacement(&this.buf)
166                        .map(Cow::into_owned)
167                        .ok_or(ReadlinesError::EncodingError)?
168                };
169                this.buf.clear();
170                Poll::Ready(Some(Ok(line)))
171            }
172
173            Some(Err(err)) => Poll::Ready(Some(Err(ReadlinesError::from(err)))),
174        }
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use futures_util::StreamExt as _;
181
182    use super::*;
183    use crate::test::TestRequest;
184
185    #[actix_rt::test]
186    async fn test_readlines() {
187        let mut req = TestRequest::default()
188            .set_payload(Bytes::from_static(
189                b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
190                  industry. Lorem Ipsum has been the industry's standard dummy\n\
191                  Contrary to popular belief, Lorem Ipsum is not simply random text.",
192            ))
193            .to_request();
194
195        let mut stream = Readlines::new(&mut req);
196        assert_eq!(
197            stream.next().await.unwrap().unwrap(),
198            "Lorem Ipsum is simply dummy text of the printing and typesetting\n"
199        );
200
201        assert_eq!(
202            stream.next().await.unwrap().unwrap(),
203            "industry. Lorem Ipsum has been the industry's standard dummy\n"
204        );
205
206        assert_eq!(
207            stream.next().await.unwrap().unwrap(),
208            "Contrary to popular belief, Lorem Ipsum is not simply random text."
209        );
210    }
211}