classicube_relay/events/
mod.rs1pub mod store;
2
3use std::{
4 cell::RefCell,
5 io::{Cursor, Write},
6 rc::Rc,
7};
8
9use classicube_helpers::{events::net::PluginMessageReceivedEventHandler, tick};
10use tracing::error;
11
12use self::store::Store;
13use crate::{packet::Packet, RELAY_CHANNEL_START_INDEX};
14
15pub type CallbackFn = Box<dyn Fn(u8, &[u8])>;
16
17#[derive(Debug, thiserror::Error)]
18pub enum PartialStreamError {
19 #[error("{0} < RELAY_CHANNEL_START_INDEX")]
20 StartIndex(u8),
21
22 #[error(transparent)]
23 Io(#[from] std::io::Error),
24}
25type Result<T> = std::result::Result<T, PartialStreamError>;
26
27#[derive(Debug)]
28struct PartialStream {
29 player_id: u8,
30 data_length: u16,
31 data_buffer: Vec<u8>,
32}
33
34impl PartialStream {
35 pub fn is_finished(&self) -> bool {
36 self.data_buffer.len() == self.data_length as usize
37 }
38
39 pub fn write_part(&mut self, data_part: Vec<u8>) -> Result<()> {
40 let len = data_part
41 .len()
42 .min(self.data_length as usize - self.data_buffer.len());
43 self.data_buffer.write_all(&data_part[..len])?;
44
45 Ok(())
46 }
47}
48
49pub struct RelayListener {
50 pub channel: u8,
51 store: Rc<RefCell<Store>>,
52 _plugin_message_handler: PluginMessageReceivedEventHandler,
53 _tick_handler: tick::TickEventHandler,
54}
55
56impl RelayListener {
57 pub fn new(channel: u8) -> Result<Self> {
58 if channel < RELAY_CHANNEL_START_INDEX {
59 return Err(PartialStreamError::StartIndex(channel));
60 }
61
62 let store: Rc<RefCell<Store>> = Default::default();
63
64 let mut plugin_message_handler = PluginMessageReceivedEventHandler::new();
65 {
66 let store = Rc::downgrade(&store);
67 plugin_message_handler.on(move |event| {
68 if channel != event.channel {
69 return;
70 }
71
72 if let Some(store) = store.upgrade() {
73 let mut store = store.borrow_mut();
74 let store = &mut *store;
75 match Packet::decode(&mut Cursor::new(&event.data)) {
76 Ok(packet) => {
77 if let Err(e) = store.process_packet(packet) {
78 error!("processing packet: {:#?}", e);
79 }
80 }
81
82 Err(e) => {
83 error!("decoding packet: {:#?}", e);
84 }
85 }
86 }
87 });
88 }
89 let mut tick_handler = tick::TickEventHandler::new();
90 {
91 let store = Rc::downgrade(&store);
92 tick_handler.on(move |_event| {
93 if let Some(store) = store.upgrade() {
94 let mut store = store.borrow_mut();
95 let store = &mut *store;
96 store.tick();
97 }
98 });
99 }
100
101 Ok(Self {
102 channel,
103 store,
104 _plugin_message_handler: plugin_message_handler,
105 _tick_handler: tick_handler,
106 })
107 }
108
109 pub fn on<F>(&mut self, callback: F)
110 where
111 F: Fn(u8, &[u8]),
112 F: 'static,
113 {
114 let mut store = self.store.borrow_mut();
115 store.event_handlers.push(Box::new(callback));
116 }
117}