valence_server/
event_loop.rs
1use std::time::Instant;
2
3use bevy_app::prelude::*;
4use bevy_app::MainScheduleOrder;
5use bevy_ecs::prelude::*;
6use bevy_ecs::schedule::ScheduleLabel;
7use bevy_ecs::system::SystemState;
8use bytes::Bytes;
9use tracing::{debug, warn};
10use valence_protocol::{Decode, Packet};
11
12use crate::client::Client;
13
14pub struct EventLoopPlugin;
15
16impl Plugin for EventLoopPlugin {
17 fn build(&self, app: &mut App) {
18 app.add_event::<PacketEvent>()
19 .add_schedule(Schedule::new(RunEventLoop))
20 .add_schedule(Schedule::new(EventLoopPreUpdate))
21 .add_schedule(Schedule::new(EventLoopUpdate))
22 .add_schedule(Schedule::new(EventLoopPostUpdate))
23 .add_systems(RunEventLoop, run_event_loop);
24
25 app.world_mut()
26 .resource_mut::<MainScheduleOrder>()
27 .insert_after(PreUpdate, RunEventLoop);
28 }
29}
30
31#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
36pub struct RunEventLoop;
37
38#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
39pub struct EventLoopPreUpdate;
40
41#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
42pub struct EventLoopUpdate;
43
44#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
45pub struct EventLoopPostUpdate;
46
47#[derive(Event, Clone, Debug)]
48pub struct PacketEvent {
49 pub client: Entity,
51 pub timestamp: Instant,
53 pub id: i32,
55 pub data: Bytes,
57}
58
59impl PacketEvent {
60 #[inline]
65 pub fn decode<'a, P>(&'a self) -> Option<P>
66 where
67 P: Packet + Decode<'a>,
68 {
69 if self.id == P::ID {
70 let mut r = &self.data[..];
71
72 match P::decode(&mut r) {
73 Ok(pkt) => {
74 if r.is_empty() {
75 return Some(pkt);
76 }
77
78 warn!(
79 "missed {} bytes while decoding packet {} (ID = {})",
80 r.len(),
81 P::NAME,
82 P::ID
83 );
84 debug!("complete packet after partial decode: {pkt:?}");
85 }
86 Err(e) => {
87 warn!("failed to decode packet with ID of {}: {e:#}", P::ID);
88 }
89 }
90 }
91
92 None
93 }
94}
95
96fn run_event_loop_schedules(world: &mut World) {
97 world.run_schedule(EventLoopPreUpdate);
98 world.run_schedule(EventLoopUpdate);
99 world.run_schedule(EventLoopPostUpdate);
100}
101
102#[allow(clippy::type_complexity)]
104fn run_event_loop(
105 world: &mut World,
106 state: &mut SystemState<(
107 Query<(Entity, &mut Client)>,
108 EventWriter<PacketEvent>,
109 Commands,
110 )>,
111 mut check_again: Local<Vec<(Entity, usize)>>,
112) {
113 debug_assert!(check_again.is_empty());
114
115 let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
116
117 for (entity, mut client) in &mut clients {
118 match client.connection_mut().try_recv() {
119 Ok(Some(pkt)) => {
120 event_writer.send(PacketEvent {
121 client: entity,
122 timestamp: pkt.timestamp,
123 id: pkt.id,
124 data: pkt.body,
125 });
126
127 let remaining = client.connection().len();
128
129 if remaining > 0 {
130 check_again.push((entity, remaining));
131 }
132 }
133 Ok(None) => {}
134 Err(e) => {
135 debug!("disconnecting client: {e:#}");
137 commands.entity(entity).remove::<Client>();
138 }
139 }
140 }
141
142 state.apply(world);
143 run_event_loop_schedules(world);
144
145 while !check_again.is_empty() {
146 let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
147
148 check_again.retain_mut(|(entity, remaining)| {
149 debug_assert!(*remaining > 0);
150
151 if let Ok((_, mut client)) = clients.get_mut(*entity) {
152 match client.connection_mut().try_recv() {
153 Ok(Some(pkt)) => {
154 event_writer.send(PacketEvent {
155 client: *entity,
156 timestamp: pkt.timestamp,
157 id: pkt.id,
158 data: pkt.body,
159 });
160 *remaining -= 1;
161 *remaining > 0
163 }
164 Ok(None) => false,
165 Err(e) => {
166 debug!("disconnecting client: {e:#}");
168 commands.entity(*entity).remove::<Client>();
169 false
170 }
171 }
172 } else {
173 false
175 }
176 });
177
178 state.apply(world);
179 run_event_loop_schedules(world);
180 }
181}