1use std::io;
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use memchr::memchr;
5
6use super::{Decoder, Encoder};
7
8#[derive(Debug, Copy, Clone, Default)]
13#[non_exhaustive]
14pub struct LinesCodec;
15
16impl<T: AsRef<str>> Encoder<T> for LinesCodec {
17 type Error = io::Error;
18
19 #[inline]
20 fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
21 let item = item.as_ref();
22 dst.reserve(item.len() + 1);
23 dst.put_slice(item.as_bytes());
24 dst.put_u8(b'\n');
25 Ok(())
26 }
27}
28
29impl Decoder for LinesCodec {
30 type Item = String;
31 type Error = io::Error;
32
33 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
34 if src.is_empty() {
35 return Ok(None);
36 }
37
38 let len = match memchr(b'\n', src) {
39 Some(n) => n,
40 None => {
41 return Ok(None);
42 }
43 };
44
45 let mut buf = src.split_to(len);
47 debug_assert_eq!(len, buf.len());
48
49 src.advance(1);
51
52 match buf.last() {
53 Some(b'\r') => buf.truncate(len - 1),
55
56 None => return Ok(Some(String::new())),
58
59 _ => {}
60 }
61
62 try_into_utf8(buf.freeze())
63 }
64
65 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
66 match self.decode(src)? {
67 Some(frame) => Ok(Some(frame)),
68 None if src.is_empty() => Ok(None),
69 None => {
70 let buf = match src.last() {
71 Some(b'\r') => src.split_to(src.len() - 1),
73
74 _ => src.split(),
76 };
77
78 if buf.is_empty() {
79 return Ok(None);
80 }
81
82 try_into_utf8(buf.freeze())
83 }
84 }
85 }
86}
87
88fn try_into_utf8(buf: Bytes) -> io::Result<Option<String>> {
90 String::from_utf8(buf.to_vec())
91 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
92 .map(Some)
93}
94
95#[cfg(test)]
96mod tests {
97 use bytes::BufMut as _;
98
99 use super::*;
100
101 #[test]
102 fn lines_decoder() {
103 let mut codec = LinesCodec::default();
104 let mut buf = BytesMut::from("\nline 1\nline 2\r\nline 3\n\r\n\r");
105
106 assert_eq!("", codec.decode(&mut buf).unwrap().unwrap());
107 assert_eq!("line 1", codec.decode(&mut buf).unwrap().unwrap());
108 assert_eq!("line 2", codec.decode(&mut buf).unwrap().unwrap());
109 assert_eq!("line 3", codec.decode(&mut buf).unwrap().unwrap());
110 assert_eq!("", codec.decode(&mut buf).unwrap().unwrap());
111 assert!(codec.decode(&mut buf).unwrap().is_none());
112 assert!(codec.decode_eof(&mut buf).unwrap().is_none());
113
114 buf.put_slice(b"k");
115 assert!(codec.decode(&mut buf).unwrap().is_none());
116 assert_eq!("\rk", codec.decode_eof(&mut buf).unwrap().unwrap());
117
118 assert!(codec.decode(&mut buf).unwrap().is_none());
119 assert!(codec.decode_eof(&mut buf).unwrap().is_none());
120 }
121
122 #[test]
123 fn lines_encoder() {
124 let mut codec = LinesCodec::default();
125
126 let mut buf = BytesMut::new();
127
128 codec.encode("", &mut buf).unwrap();
129 assert_eq!(&buf[..], b"\n");
130
131 codec.encode("test", &mut buf).unwrap();
132 assert_eq!(&buf[..], b"\ntest\n");
133
134 codec.encode("a\nb", &mut buf).unwrap();
135 assert_eq!(&buf[..], b"\ntest\na\nb\n");
136 }
137
138 #[test]
139 fn lines_encoder_no_overflow() {
140 let mut codec = LinesCodec::default();
141
142 let mut buf = BytesMut::new();
143 codec.encode("1234567", &mut buf).unwrap();
144 assert_eq!(&buf[..], b"1234567\n");
145
146 let mut buf = BytesMut::new();
147 codec.encode("12345678", &mut buf).unwrap();
148 assert_eq!(&buf[..], b"12345678\n");
149
150 let mut buf = BytesMut::new();
151 codec.encode("123456789111213", &mut buf).unwrap();
152 assert_eq!(&buf[..], b"123456789111213\n");
153
154 let mut buf = BytesMut::new();
155 codec.encode("1234567891112131", &mut buf).unwrap();
156 assert_eq!(&buf[..], b"1234567891112131\n");
157 }
158}