classicube_relay/events/
store.rs

1use super::{CallbackFn, PartialStream, PartialStreamError};
2use crate::packet::{ContinuePacket, Packet, PlayerScope, Scope, StartPacket};
3use std::{
4    collections::HashMap,
5    time::{Duration, Instant},
6};
7use tracing::{debug, error, warn};
8
9#[derive(Debug, thiserror::Error)]
10pub enum StoreError {
11    #[error("got non-Player scope")]
12    Thing,
13
14    #[error("found continue packet before start")]
15    Thing2,
16
17    #[error(transparent)]
18    PartialStream(#[from] PartialStreamError),
19
20    #[error(transparent)]
21    Io(#[from] std::io::Error),
22}
23type Result<T> = std::result::Result<T, StoreError>;
24
25#[derive(Default)]
26pub(crate) struct Store {
27    pub(crate) event_handlers: Vec<CallbackFn>,
28    streams: HashMap<u8, PartialStream>,
29    cleanup_times: HashMap<u8, Instant>,
30}
31
32impl Store {
33    const STREAM_TIMEOUT: Duration = Duration::from_secs(10);
34
35    pub(crate) fn process_packet(&mut self, packet: Packet) -> Result<()> {
36        debug!("process_packet {:?}", packet);
37
38        let finished_stream = match packet {
39            Packet::Start(StartPacket {
40                stream_id,
41                scope,
42                data_length,
43                data_part,
44            }) => {
45                if let Scope::Player(PlayerScope { player_id }) = scope {
46                    debug!(stream_id, player_id, data_length, "new stream");
47
48                    let mut stream = PartialStream {
49                        player_id,
50                        data_length,
51                        data_buffer: Vec::with_capacity(data_length as usize),
52                    };
53                    stream.write_part(data_part)?;
54
55                    if let Some(old_stream) = self.streams.remove(&stream_id) {
56                        warn!("restarting stream {:?}", old_stream);
57                    }
58                    if stream.is_finished() {
59                        Some(stream)
60                    } else {
61                        self.streams.insert(stream_id, stream);
62                        self.cleanup_times
63                            .insert(stream_id, Instant::now() + Self::STREAM_TIMEOUT);
64                        None
65                    }
66                } else {
67                    return Err(StoreError::Thing);
68                }
69            }
70
71            Packet::Continue(ContinuePacket {
72                stream_id,
73                data_part,
74            }) => {
75                let is_finished = if let Some(stream) = self.streams.get_mut(&stream_id) {
76                    stream.write_part(data_part)?;
77                    debug!(
78                        stream_id,
79                        player_id = stream.player_id,
80                        current_length = stream.data_buffer.len(),
81                        data_length = stream.data_length,
82                        "continue stream"
83                    );
84
85                    stream.is_finished()
86                } else {
87                    return Err(StoreError::Thing2);
88                };
89
90                if is_finished {
91                    self.cleanup_times.remove(&stream_id);
92                    Some(self.streams.remove(&stream_id).unwrap())
93                } else {
94                    None
95                }
96            }
97        };
98
99        if let Some(stream) = finished_stream {
100            debug!("finished_stream");
101            for f in &self.event_handlers {
102                f(stream.player_id, &stream.data_buffer);
103            }
104        }
105
106        Ok(())
107    }
108
109    pub(crate) fn tick(&mut self) {
110        let now = Instant::now();
111        let mut stream_ids_to_removes = self
112            .cleanup_times
113            .iter()
114            .filter_map(|(stream_id, cleanup_time)| {
115                if &now > cleanup_time {
116                    Some(*stream_id)
117                } else {
118                    None
119                }
120            })
121            .collect::<Vec<_>>();
122
123        for stream_id in stream_ids_to_removes.drain(..) {
124            debug!(stream_id, "timed out, removing");
125            self.cleanup_times.remove(&stream_id);
126            self.streams.remove(&stream_id);
127        }
128    }
129}