classicube_relay/events/
mod.rs

1pub mod store;
2
3use std::{
4    cell::RefCell,
5    io::{Cursor, Write},
6    rc::Rc,
7};
8
9use classicube_helpers::{events::net::PluginMessageReceivedEventHandler, tick};
10use tracing::error;
11
12use self::store::Store;
13use crate::{packet::Packet, RELAY_CHANNEL_START_INDEX};
14
15pub type CallbackFn = Box<dyn Fn(u8, &[u8])>;
16
17#[derive(Debug, thiserror::Error)]
18pub enum PartialStreamError {
19    #[error("{0} < RELAY_CHANNEL_START_INDEX")]
20    StartIndex(u8),
21
22    #[error(transparent)]
23    Io(#[from] std::io::Error),
24}
25type Result<T> = std::result::Result<T, PartialStreamError>;
26
27#[derive(Debug)]
28struct PartialStream {
29    player_id: u8,
30    data_length: u16,
31    data_buffer: Vec<u8>,
32}
33
34impl PartialStream {
35    pub fn is_finished(&self) -> bool {
36        self.data_buffer.len() == self.data_length as usize
37    }
38
39    pub fn write_part(&mut self, data_part: Vec<u8>) -> Result<()> {
40        let len = data_part
41            .len()
42            .min(self.data_length as usize - self.data_buffer.len());
43        self.data_buffer.write_all(&data_part[..len])?;
44
45        Ok(())
46    }
47}
48
49pub struct RelayListener {
50    pub channel: u8,
51    store: Rc<RefCell<Store>>,
52    _plugin_message_handler: PluginMessageReceivedEventHandler,
53    _tick_handler: tick::TickEventHandler,
54}
55
56impl RelayListener {
57    pub fn new(channel: u8) -> Result<Self> {
58        if channel < RELAY_CHANNEL_START_INDEX {
59            return Err(PartialStreamError::StartIndex(channel));
60        }
61
62        let store: Rc<RefCell<Store>> = Default::default();
63
64        let mut plugin_message_handler = PluginMessageReceivedEventHandler::new();
65        {
66            let store = Rc::downgrade(&store);
67            plugin_message_handler.on(move |event| {
68                if channel != event.channel {
69                    return;
70                }
71
72                if let Some(store) = store.upgrade() {
73                    let mut store = store.borrow_mut();
74                    let store = &mut *store;
75                    match Packet::decode(&mut Cursor::new(&event.data)) {
76                        Ok(packet) => {
77                            if let Err(e) = store.process_packet(packet) {
78                                error!("processing packet: {:#?}", e);
79                            }
80                        }
81
82                        Err(e) => {
83                            error!("decoding packet: {:#?}", e);
84                        }
85                    }
86                }
87            });
88        }
89        let mut tick_handler = tick::TickEventHandler::new();
90        {
91            let store = Rc::downgrade(&store);
92            tick_handler.on(move |_event| {
93                if let Some(store) = store.upgrade() {
94                    let mut store = store.borrow_mut();
95                    let store = &mut *store;
96                    store.tick();
97                }
98            });
99        }
100
101        Ok(Self {
102            channel,
103            store,
104            _plugin_message_handler: plugin_message_handler,
105            _tick_handler: tick_handler,
106        })
107    }
108
109    pub fn on<F>(&mut self, callback: F)
110    where
111        F: Fn(u8, &[u8]),
112        F: 'static,
113    {
114        let mut store = self.store.borrow_mut();
115        store.event_handlers.push(Box::new(callback));
116    }
117}