valence_server/
keepalive.rs

1use std::time::{Duration, Instant};
2
3use bevy_app::prelude::*;
4use bevy_ecs::prelude::*;
5use derive_more::Deref;
6use tracing::warn;
7use valence_protocol::packets::play::{KeepAliveC2s, KeepAliveS2c};
8use valence_protocol::WritePacket;
9
10use crate::client::{Client, UpdateClientsSet};
11use crate::event_loop::{EventLoopPreUpdate, PacketEvent};
12
13pub struct KeepalivePlugin;
14
15impl Plugin for KeepalivePlugin {
16    fn build(&self, app: &mut App) {
17        app.init_resource::<KeepaliveSettings>()
18            .add_systems(PostUpdate, send_keepalive.in_set(UpdateClientsSet))
19            .add_systems(EventLoopPreUpdate, handle_keepalive_response);
20    }
21}
22
23#[derive(Resource, Debug)]
24pub struct KeepaliveSettings {
25    // How long to wait before sending keepalives and how long to wait for a response.
26    pub period: Duration,
27}
28
29impl Default for KeepaliveSettings {
30    fn default() -> Self {
31        Self {
32            period: Duration::from_secs(8),
33        }
34    }
35}
36
37#[derive(Component, Debug)]
38pub struct KeepaliveState {
39    got_keepalive: bool,
40    last_keepalive_id: u64,
41    last_send: Instant,
42}
43
44/// Delay measured in milliseconds. Negative values indicate absence.
45#[derive(Component, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Deref)]
46pub struct Ping(pub i32);
47
48impl Default for Ping {
49    fn default() -> Self {
50        Self(-1)
51    }
52}
53
54impl KeepaliveState {
55    pub(super) fn new() -> Self {
56        Self {
57            got_keepalive: true,
58            last_keepalive_id: 0,
59            last_send: Instant::now(),
60        }
61    }
62
63    /// When the last keepalive was sent for this client.
64    pub fn last_send(&self) -> Instant {
65        self.last_send
66    }
67}
68
69fn send_keepalive(
70    mut clients: Query<(Entity, &mut Client, &mut KeepaliveState)>,
71    settings: Res<KeepaliveSettings>,
72    mut commands: Commands,
73) {
74    let now = Instant::now();
75
76    for (entity, mut client, mut state) in &mut clients {
77        if now.duration_since(state.last_send) >= settings.period {
78            if state.got_keepalive {
79                let id = rand::random();
80                client.write_packet(&KeepAliveS2c { id });
81
82                state.got_keepalive = false;
83                state.last_keepalive_id = id;
84                state.last_send = now;
85            } else {
86                let millis = settings.period.as_millis();
87                warn!("Client {entity:?} timed out: no keepalive response after {millis}ms");
88                commands.entity(entity).remove::<Client>();
89            }
90        }
91    }
92}
93
94fn handle_keepalive_response(
95    mut packets: EventReader<PacketEvent>,
96    mut clients: Query<(Entity, &mut KeepaliveState, &mut Ping)>,
97    mut commands: Commands,
98) {
99    for packet in packets.read() {
100        if let Some(pkt) = packet.decode::<KeepAliveC2s>() {
101            if let Ok((entity, mut state, mut ping)) = clients.get_mut(packet.client) {
102                if state.got_keepalive {
103                    warn!("unexpected keepalive from client {entity:?}");
104                    commands.entity(entity).remove::<Client>();
105                } else if pkt.id != state.last_keepalive_id {
106                    warn!(
107                        "keepalive IDs don't match for client {entity:?} (expected {}, got {})",
108                        state.last_keepalive_id, pkt.id,
109                    );
110                    commands.entity(entity).remove::<Client>();
111                } else {
112                    state.got_keepalive = true;
113                    ping.0 = state.last_send.elapsed().as_millis() as i32;
114                }
115            }
116        }
117    }
118}