classicube_relay/events/
store.rs1use super::{CallbackFn, PartialStream, PartialStreamError};
2use crate::packet::{ContinuePacket, Packet, PlayerScope, Scope, StartPacket};
3use std::{
4 collections::HashMap,
5 time::{Duration, Instant},
6};
7use tracing::{debug, error, warn};
8
9#[derive(Debug, thiserror::Error)]
10pub enum StoreError {
11 #[error("got non-Player scope")]
12 Thing,
13
14 #[error("found continue packet before start")]
15 Thing2,
16
17 #[error(transparent)]
18 PartialStream(#[from] PartialStreamError),
19
20 #[error(transparent)]
21 Io(#[from] std::io::Error),
22}
23type Result<T> = std::result::Result<T, StoreError>;
24
25#[derive(Default)]
26pub(crate) struct Store {
27 pub(crate) event_handlers: Vec<CallbackFn>,
28 streams: HashMap<u8, PartialStream>,
29 cleanup_times: HashMap<u8, Instant>,
30}
31
32impl Store {
33 const STREAM_TIMEOUT: Duration = Duration::from_secs(10);
34
35 pub(crate) fn process_packet(&mut self, packet: Packet) -> Result<()> {
36 debug!("process_packet {:?}", packet);
37
38 let finished_stream = match packet {
39 Packet::Start(StartPacket {
40 stream_id,
41 scope,
42 data_length,
43 data_part,
44 }) => {
45 if let Scope::Player(PlayerScope { player_id }) = scope {
46 debug!(stream_id, player_id, data_length, "new stream");
47
48 let mut stream = PartialStream {
49 player_id,
50 data_length,
51 data_buffer: Vec::with_capacity(data_length as usize),
52 };
53 stream.write_part(data_part)?;
54
55 if let Some(old_stream) = self.streams.remove(&stream_id) {
56 warn!("restarting stream {:?}", old_stream);
57 }
58 if stream.is_finished() {
59 Some(stream)
60 } else {
61 self.streams.insert(stream_id, stream);
62 self.cleanup_times
63 .insert(stream_id, Instant::now() + Self::STREAM_TIMEOUT);
64 None
65 }
66 } else {
67 return Err(StoreError::Thing);
68 }
69 }
70
71 Packet::Continue(ContinuePacket {
72 stream_id,
73 data_part,
74 }) => {
75 let is_finished = if let Some(stream) = self.streams.get_mut(&stream_id) {
76 stream.write_part(data_part)?;
77 debug!(
78 stream_id,
79 player_id = stream.player_id,
80 current_length = stream.data_buffer.len(),
81 data_length = stream.data_length,
82 "continue stream"
83 );
84
85 stream.is_finished()
86 } else {
87 return Err(StoreError::Thing2);
88 };
89
90 if is_finished {
91 self.cleanup_times.remove(&stream_id);
92 Some(self.streams.remove(&stream_id).unwrap())
93 } else {
94 None
95 }
96 }
97 };
98
99 if let Some(stream) = finished_stream {
100 debug!("finished_stream");
101 for f in &self.event_handlers {
102 f(stream.player_id, &stream.data_buffer);
103 }
104 }
105
106 Ok(())
107 }
108
109 pub(crate) fn tick(&mut self) {
110 let now = Instant::now();
111 let mut stream_ids_to_removes = self
112 .cleanup_times
113 .iter()
114 .filter_map(|(stream_id, cleanup_time)| {
115 if &now > cleanup_time {
116 Some(*stream_id)
117 } else {
118 None
119 }
120 })
121 .collect::<Vec<_>>();
122
123 for stream_id in stream_ids_to_removes.drain(..) {
124 debug!(stream_id, "timed out, removing");
125 self.cleanup_times.remove(&stream_id);
126 self.streams.remove(&stream_id);
127 }
128 }
129}