valence_server/
event_loop.rs

1use std::time::Instant;
2
3use bevy_app::prelude::*;
4use bevy_app::MainScheduleOrder;
5use bevy_ecs::prelude::*;
6use bevy_ecs::schedule::ScheduleLabel;
7use bevy_ecs::system::SystemState;
8use bytes::Bytes;
9use tracing::{debug, warn};
10use valence_protocol::{Decode, Packet};
11
12use crate::client::Client;
13
14pub struct EventLoopPlugin;
15
16impl Plugin for EventLoopPlugin {
17    fn build(&self, app: &mut App) {
18        app.add_event::<PacketEvent>()
19            .add_schedule(Schedule::new(RunEventLoop))
20            .add_schedule(Schedule::new(EventLoopPreUpdate))
21            .add_schedule(Schedule::new(EventLoopUpdate))
22            .add_schedule(Schedule::new(EventLoopPostUpdate))
23            .add_systems(RunEventLoop, run_event_loop);
24
25        app.world_mut()
26            .resource_mut::<MainScheduleOrder>()
27            .insert_after(PreUpdate, RunEventLoop);
28    }
29}
30
31/// The schedule responsible for running [`EventLoopPreUpdate`],
32/// [`EventLoopUpdate`], and [`EventLoopPostUpdate`].
33///
34/// This schedule is situated between [`PreUpdate`] and [`Update`].
35#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
36pub struct RunEventLoop;
37
38#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
39pub struct EventLoopPreUpdate;
40
41#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
42pub struct EventLoopUpdate;
43
44#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
45pub struct EventLoopPostUpdate;
46
47#[derive(Event, Clone, Debug)]
48pub struct PacketEvent {
49    /// The client this packet originated from.
50    pub client: Entity,
51    /// The moment in time this packet arrived.
52    pub timestamp: Instant,
53    /// This packet's ID.
54    pub id: i32,
55    /// The content of the packet, excluding the leading varint packet ID.
56    pub data: Bytes,
57}
58
59impl PacketEvent {
60    /// Attempts to decode this packet as the packet `P`.
61    ///
62    /// If the packet ID is mismatched or an error occurs, `None` is returned.
63    /// Otherwise, `Some` is returned containing the decoded packet.
64    #[inline]
65    pub fn decode<'a, P>(&'a self) -> Option<P>
66    where
67        P: Packet + Decode<'a>,
68    {
69        if self.id == P::ID {
70            let mut r = &self.data[..];
71
72            match P::decode(&mut r) {
73                Ok(pkt) => {
74                    if r.is_empty() {
75                        return Some(pkt);
76                    }
77
78                    warn!(
79                        "missed {} bytes while decoding packet {} (ID = {})",
80                        r.len(),
81                        P::NAME,
82                        P::ID
83                    );
84                    debug!("complete packet after partial decode: {pkt:?}");
85                }
86                Err(e) => {
87                    warn!("failed to decode packet with ID of {}: {e:#}", P::ID);
88                }
89            }
90        }
91
92        None
93    }
94}
95
96fn run_event_loop_schedules(world: &mut World) {
97    world.run_schedule(EventLoopPreUpdate);
98    world.run_schedule(EventLoopUpdate);
99    world.run_schedule(EventLoopPostUpdate);
100}
101
102/// An exclusive system for running the event loop schedule.
103#[allow(clippy::type_complexity)]
104fn run_event_loop(
105    world: &mut World,
106    state: &mut SystemState<(
107        Query<(Entity, &mut Client)>,
108        EventWriter<PacketEvent>,
109        Commands,
110    )>,
111    mut check_again: Local<Vec<(Entity, usize)>>,
112) {
113    debug_assert!(check_again.is_empty());
114
115    let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
116
117    for (entity, mut client) in &mut clients {
118        match client.connection_mut().try_recv() {
119            Ok(Some(pkt)) => {
120                event_writer.send(PacketEvent {
121                    client: entity,
122                    timestamp: pkt.timestamp,
123                    id: pkt.id,
124                    data: pkt.body,
125                });
126
127                let remaining = client.connection().len();
128
129                if remaining > 0 {
130                    check_again.push((entity, remaining));
131                }
132            }
133            Ok(None) => {}
134            Err(e) => {
135                // Client is disconnected.
136                debug!("disconnecting client: {e:#}");
137                commands.entity(entity).remove::<Client>();
138            }
139        }
140    }
141
142    state.apply(world);
143    run_event_loop_schedules(world);
144
145    while !check_again.is_empty() {
146        let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
147
148        check_again.retain_mut(|(entity, remaining)| {
149            debug_assert!(*remaining > 0);
150
151            if let Ok((_, mut client)) = clients.get_mut(*entity) {
152                match client.connection_mut().try_recv() {
153                    Ok(Some(pkt)) => {
154                        event_writer.send(PacketEvent {
155                            client: *entity,
156                            timestamp: pkt.timestamp,
157                            id: pkt.id,
158                            data: pkt.body,
159                        });
160                        *remaining -= 1;
161                        // Keep looping as long as there are packets to process this tick.
162                        *remaining > 0
163                    }
164                    Ok(None) => false,
165                    Err(e) => {
166                        // Client is disconnected.
167                        debug!("disconnecting client: {e:#}");
168                        commands.entity(*entity).remove::<Client>();
169                        false
170                    }
171                }
172            } else {
173                // Client must have been deleted in the last run of the schedule.
174                false
175            }
176        });
177
178        state.apply(world);
179        run_event_loop_schedules(world);
180    }
181}