packet_inspector/
lib.rs

1mod packet_io;
2mod packet_registry;
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::bail;
9use bytes::{BufMut, BytesMut};
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::RwLock;
12use tokio::task::JoinHandle;
13use valence_protocol::decode::PacketFrame;
14use valence_protocol::packets::handshaking::handshake_c2s::HandshakeNextState;
15use valence_protocol::packets::handshaking::HandshakeC2s;
16use valence_protocol::packets::login::{
17    LoginCompressionS2c, LoginDisconnectS2c, LoginHelloS2c, LoginSuccessS2c,
18};
19use valence_protocol::text::color::NamedColor;
20use valence_protocol::text::{Color, IntoText};
21use valence_protocol::{
22    CompressionThreshold, Decode, Encode, Packet as ValencePacket, PacketSide, PacketState,
23};
24
25use crate::packet_io::PacketIo;
26pub use crate::packet_registry::Packet;
27use crate::packet_registry::PacketRegistry;
28
29include!(concat!(env!("OUT_DIR"), "/packets.rs"));
30
31/// Messages for talking to the running proxy task
32#[derive(Debug, Clone)]
33pub enum ProxyMessage {
34    Stop,
35    // In the future there could be a message like
36    // InjectPacket(SocketAddr, PacketFrame)
37}
38
39#[derive(Debug)]
40pub enum DisconnectionReason {
41    OnlineModeRequired,
42    Error(anyhow::Error),
43}
44
45/// Messages sent by the proxy for the controlling GUI/CLI.
46#[derive(Debug)]
47pub enum ProxyLog {
48    /// A new client has connected to the listener.
49    ClientConnected(SocketAddr),
50    /// A client has disconnected from the listener.
51    ClientDisconnected(SocketAddr, DisconnectionReason),
52}
53
54pub struct Proxy {
55    pub main_task: JoinHandle<anyhow::Result<()>>,
56    pub message_tx: flume::Sender<ProxyMessage>,
57    pub logs_rx: flume::Receiver<ProxyLog>,
58    pub packet_registry: Arc<RwLock<PacketRegistry>>,
59}
60
61impl Proxy {
62    /// Creates a new proxy, and starts its listener task.
63    pub async fn start(listener_addr: SocketAddr, server_addr: SocketAddr) -> anyhow::Result<Self> {
64        let (message_tx, message_rx) = flume::unbounded();
65        let (logs_tx, logs_rx) = flume::unbounded();
66
67        let packet_registry = Arc::new(RwLock::new({
68            let registry = PacketRegistry::new();
69            registry.register_all(&STD_PACKETS);
70            registry
71        }));
72
73        let main_task = tokio::spawn(Self::run_main_task(
74            packet_registry.clone(),
75            TcpListener::bind(listener_addr).await?,
76            server_addr,
77            message_rx,
78            logs_tx,
79        ));
80
81        Ok(Self {
82            main_task,
83            message_tx,
84            logs_rx,
85            packet_registry,
86        })
87    }
88
89    /// Subscribes to the proxy's packet registry.
90    pub async fn subscribe(&self) -> flume::Receiver<Packet> {
91        self.packet_registry.read().await.subscribe()
92    }
93
94    /// Sends a request to stop the proxy and awaits its task's termination.
95    /// There's a hardcoded 5 second long timeout after which the task is
96    /// considered unresponsive and automatically aborted.
97    pub async fn stop(self) {
98        // The task may have already stopped, so we can ignore a Disconnected error
99        let _ = self.message_tx.send_async(ProxyMessage::Stop).await;
100
101        let abort_handle = self.main_task.abort_handle();
102        tokio::select! {
103            _ = self.main_task => {},
104
105            // If the main task doesn't stop after 5 seconds, we force terminate it
106            () = tokio::time::sleep(Duration::from_secs(5)) => {
107                abort_handle.abort();
108            },
109        }
110    }
111
112    /// The main listener task is responsible for handling the TCP listener and
113    /// managing child tasks for each client connected to the inspector.
114    async fn run_main_task(
115        packet_registry: Arc<RwLock<PacketRegistry>>,
116        listener: TcpListener,
117        server_addr: SocketAddr,
118        message_rx: flume::Receiver<ProxyMessage>,
119        logs_tx: flume::Sender<ProxyLog>,
120    ) -> anyhow::Result<()> {
121        let mut individual_tasks = vec![];
122        loop {
123            tokio::select! {
124                r = listener.accept() => {
125                    let (stream, addr) = r?;
126
127                    logs_tx.send_async(ProxyLog::ClientConnected(addr)).await?;
128                    individual_tasks.push(tokio::spawn(Self::run_individual_proxy(
129                        stream,
130                        TcpStream::connect(server_addr).await?,
131                        logs_tx.clone(),
132                        packet_registry.clone(),
133                    )));
134                }
135                m = message_rx.recv_async() => match m {
136                    Ok(ProxyMessage::Stop) | Err(_) => {
137                        tracing::trace!("Stopping the proxy task...");
138
139                        // TODO: stop these tasks properly instead of just leaving the TCP connections for timeout
140                        for task in individual_tasks.drain(..) {
141                            task.abort();
142                        }
143
144                        return Ok(());
145                    }
146                }
147            }
148        }
149    }
150
151    /// Each client connected to the inspector is handled in its own individual
152    /// task, defined here.
153    async fn run_individual_proxy(
154        client: TcpStream,
155        server: TcpStream,
156        a_logs_tx: flume::Sender<ProxyLog>,
157        packet_registry: Arc<RwLock<PacketRegistry>>,
158    ) -> anyhow::Result<()> {
159        let client_addr = client.peer_addr()?;
160
161        let client = PacketIo::new(client);
162        let server = PacketIo::new(server);
163
164        let (mut client_reader, mut client_writer) = client.split();
165        let (mut server_reader, mut server_writer) = server.split();
166
167        let a_state = Arc::new(RwLock::new(PacketState::Handshaking));
168        let a_threshold = Arc::new(RwLock::new(CompressionThreshold::DEFAULT));
169
170        let registry = packet_registry.clone();
171        let state_lock = a_state.clone();
172        let threshold_lock = a_threshold.clone();
173        let logs_tx = a_logs_tx.clone();
174        let c2s = tokio::spawn(async move {
175            loop {
176                let threshold = *threshold_lock.read().await;
177                client_reader.set_compression(threshold);
178                server_writer.set_compression(threshold);
179
180                // client to server handling
181                let packet = match client_reader.recv_packet_raw().await {
182                    Ok(packet) => packet,
183                    Err(e) => {
184                        server_writer.shutdown().await?;
185                        logs_tx
186                            .send_async(ProxyLog::ClientDisconnected(
187                                client_addr,
188                                DisconnectionReason::Error(e),
189                            ))
190                            .await?;
191
192                        bail!("connection error");
193                    }
194                };
195
196                let state = *state_lock.read().await;
197
198                registry
199                    .write()
200                    .await
201                    .process(PacketSide::Serverbound, state, threshold, &packet)
202                    .await?;
203
204                if state == PacketState::Handshaking {
205                    if let Some(handshake) = extrapolate_packet::<HandshakeC2s>(&packet) {
206                        *state_lock.write().await = match handshake.next_state {
207                            HandshakeNextState::Status => PacketState::Status,
208                            HandshakeNextState::Login => PacketState::Login,
209                        };
210                    }
211                }
212
213                server_writer.send_packet_raw(&packet).await?;
214            }
215        });
216
217        let registry = packet_registry.clone();
218        let state_lock = a_state.clone();
219        let threshold_lock = a_threshold.clone();
220        let logs_tx = a_logs_tx.clone();
221        let s2c = tokio::spawn(async move {
222            loop {
223                let threshold = *threshold_lock.read().await;
224                server_reader.set_compression(threshold);
225                client_writer.set_compression(threshold);
226
227                // server to client handling
228                let packet = match server_reader.recv_packet_raw().await {
229                    Ok(packet) => packet,
230                    Err(e) => {
231                        client_writer.shutdown().await?;
232                        return Err(e);
233                    }
234                };
235
236                let state = *state_lock.read().await;
237
238                if state == PacketState::Login {
239                    if let Some(LoginCompressionS2c { threshold }) = extrapolate_packet(&packet) {
240                        *threshold_lock.write().await = CompressionThreshold(threshold.0);
241                    }
242
243                    if extrapolate_packet::<LoginSuccessS2c>(&packet).is_some() {
244                        *state_lock.write().await = PacketState::Play;
245                    }
246                }
247
248                registry
249                    .write()
250                    .await
251                    .process(PacketSide::Clientbound, state, threshold, &packet)
252                    .await?;
253
254                // (The check is done in this if rather than the one above, to still send the
255                // encryption request packet to the inspector)
256                if state == PacketState::Login
257                    && extrapolate_packet::<LoginHelloS2c>(&packet).is_some()
258                {
259                    // The server is requesting encryption, we can't support that
260
261                    let disconnect_packet = LoginDisconnectS2c {
262                        reason: "This server is running in online mode, which is unsupported by \
263                                 the Packet Inspector."
264                            .into_text()
265                            .color(Color::Named(NamedColor::Red))
266                            .into_cow_text(),
267                    };
268
269                    client_writer
270                        .send_packet_raw(&PacketFrame {
271                            id: LoginDisconnectS2c::ID,
272                            body: {
273                                let mut writer = BytesMut::new().writer();
274                                disconnect_packet.encode(&mut writer)?;
275                                writer.into_inner()
276                            },
277                        })
278                        .await?;
279
280                    client_writer.shutdown().await?;
281
282                    logs_tx
283                        .send_async(ProxyLog::ClientDisconnected(
284                            client_addr,
285                            DisconnectionReason::OnlineModeRequired,
286                        ))
287                        .await?;
288
289                    bail!("server is running in online mode");
290                }
291
292                client_writer.send_packet_raw(&packet).await?;
293            }
294        });
295
296        // wait for either to finish
297        tokio::select! {
298            res = c2s => res?,
299            res = s2c => res?,
300        }
301    }
302}
303
304fn extrapolate_packet<'a, P>(packet: &'a PacketFrame) -> Option<P>
305where
306    P: ValencePacket + Decode<'a> + Clone,
307{
308    if packet.id != P::ID {
309        return None;
310    }
311
312    let mut r = &packet.body[..];
313    let packet = P::decode(&mut r).ok()?;
314    Some(packet)
315}