packet_inspector/
lib.rs
1mod packet_io;
2mod packet_registry;
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::bail;
9use bytes::{BufMut, BytesMut};
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::RwLock;
12use tokio::task::JoinHandle;
13use valence_protocol::decode::PacketFrame;
14use valence_protocol::packets::handshaking::handshake_c2s::HandshakeNextState;
15use valence_protocol::packets::handshaking::HandshakeC2s;
16use valence_protocol::packets::login::{
17 LoginCompressionS2c, LoginDisconnectS2c, LoginHelloS2c, LoginSuccessS2c,
18};
19use valence_protocol::text::color::NamedColor;
20use valence_protocol::text::{Color, IntoText};
21use valence_protocol::{
22 CompressionThreshold, Decode, Encode, Packet as ValencePacket, PacketSide, PacketState,
23};
24
25use crate::packet_io::PacketIo;
26pub use crate::packet_registry::Packet;
27use crate::packet_registry::PacketRegistry;
28
29include!(concat!(env!("OUT_DIR"), "/packets.rs"));
30
31#[derive(Debug, Clone)]
33pub enum ProxyMessage {
34 Stop,
35 }
38
39#[derive(Debug)]
40pub enum DisconnectionReason {
41 OnlineModeRequired,
42 Error(anyhow::Error),
43}
44
45#[derive(Debug)]
47pub enum ProxyLog {
48 ClientConnected(SocketAddr),
50 ClientDisconnected(SocketAddr, DisconnectionReason),
52}
53
54pub struct Proxy {
55 pub main_task: JoinHandle<anyhow::Result<()>>,
56 pub message_tx: flume::Sender<ProxyMessage>,
57 pub logs_rx: flume::Receiver<ProxyLog>,
58 pub packet_registry: Arc<RwLock<PacketRegistry>>,
59}
60
61impl Proxy {
62 pub async fn start(listener_addr: SocketAddr, server_addr: SocketAddr) -> anyhow::Result<Self> {
64 let (message_tx, message_rx) = flume::unbounded();
65 let (logs_tx, logs_rx) = flume::unbounded();
66
67 let packet_registry = Arc::new(RwLock::new({
68 let registry = PacketRegistry::new();
69 registry.register_all(&STD_PACKETS);
70 registry
71 }));
72
73 let main_task = tokio::spawn(Self::run_main_task(
74 packet_registry.clone(),
75 TcpListener::bind(listener_addr).await?,
76 server_addr,
77 message_rx,
78 logs_tx,
79 ));
80
81 Ok(Self {
82 main_task,
83 message_tx,
84 logs_rx,
85 packet_registry,
86 })
87 }
88
89 pub async fn subscribe(&self) -> flume::Receiver<Packet> {
91 self.packet_registry.read().await.subscribe()
92 }
93
94 pub async fn stop(self) {
98 let _ = self.message_tx.send_async(ProxyMessage::Stop).await;
100
101 let abort_handle = self.main_task.abort_handle();
102 tokio::select! {
103 _ = self.main_task => {},
104
105 () = tokio::time::sleep(Duration::from_secs(5)) => {
107 abort_handle.abort();
108 },
109 }
110 }
111
112 async fn run_main_task(
115 packet_registry: Arc<RwLock<PacketRegistry>>,
116 listener: TcpListener,
117 server_addr: SocketAddr,
118 message_rx: flume::Receiver<ProxyMessage>,
119 logs_tx: flume::Sender<ProxyLog>,
120 ) -> anyhow::Result<()> {
121 let mut individual_tasks = vec![];
122 loop {
123 tokio::select! {
124 r = listener.accept() => {
125 let (stream, addr) = r?;
126
127 logs_tx.send_async(ProxyLog::ClientConnected(addr)).await?;
128 individual_tasks.push(tokio::spawn(Self::run_individual_proxy(
129 stream,
130 TcpStream::connect(server_addr).await?,
131 logs_tx.clone(),
132 packet_registry.clone(),
133 )));
134 }
135 m = message_rx.recv_async() => match m {
136 Ok(ProxyMessage::Stop) | Err(_) => {
137 tracing::trace!("Stopping the proxy task...");
138
139 for task in individual_tasks.drain(..) {
141 task.abort();
142 }
143
144 return Ok(());
145 }
146 }
147 }
148 }
149 }
150
151 async fn run_individual_proxy(
154 client: TcpStream,
155 server: TcpStream,
156 a_logs_tx: flume::Sender<ProxyLog>,
157 packet_registry: Arc<RwLock<PacketRegistry>>,
158 ) -> anyhow::Result<()> {
159 let client_addr = client.peer_addr()?;
160
161 let client = PacketIo::new(client);
162 let server = PacketIo::new(server);
163
164 let (mut client_reader, mut client_writer) = client.split();
165 let (mut server_reader, mut server_writer) = server.split();
166
167 let a_state = Arc::new(RwLock::new(PacketState::Handshaking));
168 let a_threshold = Arc::new(RwLock::new(CompressionThreshold::DEFAULT));
169
170 let registry = packet_registry.clone();
171 let state_lock = a_state.clone();
172 let threshold_lock = a_threshold.clone();
173 let logs_tx = a_logs_tx.clone();
174 let c2s = tokio::spawn(async move {
175 loop {
176 let threshold = *threshold_lock.read().await;
177 client_reader.set_compression(threshold);
178 server_writer.set_compression(threshold);
179
180 let packet = match client_reader.recv_packet_raw().await {
182 Ok(packet) => packet,
183 Err(e) => {
184 server_writer.shutdown().await?;
185 logs_tx
186 .send_async(ProxyLog::ClientDisconnected(
187 client_addr,
188 DisconnectionReason::Error(e),
189 ))
190 .await?;
191
192 bail!("connection error");
193 }
194 };
195
196 let state = *state_lock.read().await;
197
198 registry
199 .write()
200 .await
201 .process(PacketSide::Serverbound, state, threshold, &packet)
202 .await?;
203
204 if state == PacketState::Handshaking {
205 if let Some(handshake) = extrapolate_packet::<HandshakeC2s>(&packet) {
206 *state_lock.write().await = match handshake.next_state {
207 HandshakeNextState::Status => PacketState::Status,
208 HandshakeNextState::Login => PacketState::Login,
209 };
210 }
211 }
212
213 server_writer.send_packet_raw(&packet).await?;
214 }
215 });
216
217 let registry = packet_registry.clone();
218 let state_lock = a_state.clone();
219 let threshold_lock = a_threshold.clone();
220 let logs_tx = a_logs_tx.clone();
221 let s2c = tokio::spawn(async move {
222 loop {
223 let threshold = *threshold_lock.read().await;
224 server_reader.set_compression(threshold);
225 client_writer.set_compression(threshold);
226
227 let packet = match server_reader.recv_packet_raw().await {
229 Ok(packet) => packet,
230 Err(e) => {
231 client_writer.shutdown().await?;
232 return Err(e);
233 }
234 };
235
236 let state = *state_lock.read().await;
237
238 if state == PacketState::Login {
239 if let Some(LoginCompressionS2c { threshold }) = extrapolate_packet(&packet) {
240 *threshold_lock.write().await = CompressionThreshold(threshold.0);
241 }
242
243 if extrapolate_packet::<LoginSuccessS2c>(&packet).is_some() {
244 *state_lock.write().await = PacketState::Play;
245 }
246 }
247
248 registry
249 .write()
250 .await
251 .process(PacketSide::Clientbound, state, threshold, &packet)
252 .await?;
253
254 if state == PacketState::Login
257 && extrapolate_packet::<LoginHelloS2c>(&packet).is_some()
258 {
259 let disconnect_packet = LoginDisconnectS2c {
262 reason: "This server is running in online mode, which is unsupported by \
263 the Packet Inspector."
264 .into_text()
265 .color(Color::Named(NamedColor::Red))
266 .into_cow_text(),
267 };
268
269 client_writer
270 .send_packet_raw(&PacketFrame {
271 id: LoginDisconnectS2c::ID,
272 body: {
273 let mut writer = BytesMut::new().writer();
274 disconnect_packet.encode(&mut writer)?;
275 writer.into_inner()
276 },
277 })
278 .await?;
279
280 client_writer.shutdown().await?;
281
282 logs_tx
283 .send_async(ProxyLog::ClientDisconnected(
284 client_addr,
285 DisconnectionReason::OnlineModeRequired,
286 ))
287 .await?;
288
289 bail!("server is running in online mode");
290 }
291
292 client_writer.send_packet_raw(&packet).await?;
293 }
294 });
295
296 tokio::select! {
298 res = c2s => res?,
299 res = s2c => res?,
300 }
301 }
302}
303
304fn extrapolate_packet<'a, P>(packet: &'a PacketFrame) -> Option<P>
305where
306 P: ValencePacket + Decode<'a> + Clone,
307{
308 if packet.id != P::ID {
309 return None;
310 }
311
312 let mut r = &packet.body[..];
313 let packet = P::decode(&mut r).ok()?;
314 Some(packet)
315}