valence_server/layer/
message.rs

1use core::fmt;
2use std::convert::Infallible;
3use std::ops::Range;
4
5use valence_protocol::ChunkPos;
6
7use crate::layer::bvh::{ChunkBvh, GetChunkPos};
8use crate::ChunkView;
9
10/// A message buffer of global messages (`G`) and local messages (`L`) meant for
11/// consumption by clients. Local messages are those that have some spatial
12/// component to them and implement the [`GetChunkPos`] trait. Local messages
13/// are placed in a bounding volume hierarchy for fast queries via
14/// [`Self::query_local`]. Global messages do not necessarily have a spatial
15/// component and all globals will be visited when using [`Self::iter_global`].
16///
17/// Every message is associated with an arbitrary span of bytes. The meaning of
18/// the bytes is whatever the message needs it to be.
19///
20/// At the end of the tick and before clients have access to the buffer, all
21/// messages are sorted and then deduplicated by concatenating byte spans
22/// together. This is done for a couple of reasons:
23/// - Messages may rely on sorted message order for correctness, like in the
24///   case of entity spawn & despawn messages. Sorting also makes deduplication
25///   easy.
26/// - Deduplication reduces the total number of messages that all clients must
27///   examine. Consider the case of a message such as "send all clients in view
28///   of this chunk position these packet bytes". If two of these messages have
29///   the same chunk position, then they can just be combined together.
30pub struct Messages<G, L> {
31    global: Vec<(G, Range<u32>)>,
32    local: Vec<(L, Range<u32>)>,
33    bvh: ChunkBvh<MessagePair<L>>,
34    staging: Vec<u8>,
35    ready: Vec<u8>,
36    is_ready: bool,
37}
38
39impl<G, L> Messages<G, L>
40where
41    G: Clone + Ord,
42    L: Clone + Ord + GetChunkPos,
43{
44    pub(crate) fn new() -> Self {
45        Self::default()
46    }
47
48    /// Adds a global message to this message buffer.
49    pub(crate) fn send_global<E>(
50        &mut self,
51        msg: G,
52        f: impl FnOnce(&mut Vec<u8>) -> Result<(), E>,
53    ) -> Result<(), E> {
54        debug_assert!(!self.is_ready);
55
56        let start = self.staging.len();
57        f(&mut self.staging)?;
58        let end = self.staging.len();
59
60        if let Some((m, range)) = self.global.last_mut() {
61            if msg == *m {
62                // Extend the existing message.
63                range.end = end as u32;
64                return Ok(());
65            }
66        }
67
68        self.global.push((msg, start as u32..end as u32));
69
70        Ok(())
71    }
72
73    /// Adds a local message to this message buffer.
74    pub(crate) fn send_local<E>(
75        &mut self,
76        msg: L,
77        f: impl FnOnce(&mut Vec<u8>) -> Result<(), E>,
78    ) -> Result<(), E> {
79        debug_assert!(!self.is_ready);
80
81        let start = self.staging.len();
82        f(&mut self.staging)?;
83        let end = self.staging.len();
84
85        if let Some((m, range)) = self.local.last_mut() {
86            if msg == *m {
87                // Extend the existing message.
88                range.end = end as u32;
89                return Ok(());
90            }
91        }
92
93        self.local.push((msg, start as u32..end as u32));
94
95        Ok(())
96    }
97
98    /// Like [`Self::send_global`] but writing bytes cannot fail.
99    pub(crate) fn send_global_infallible(&mut self, msg: G, f: impl FnOnce(&mut Vec<u8>)) {
100        let _ = self.send_global::<Infallible>(msg, |b| {
101            f(b);
102            Ok(())
103        });
104    }
105
106    /// Like [`Self::send_local`] but writing bytes cannot fail.
107    pub(crate) fn send_local_infallible(&mut self, msg: L, f: impl FnOnce(&mut Vec<u8>)) {
108        let _ = self.send_local::<Infallible>(msg, |b| {
109            f(b);
110            Ok(())
111        });
112    }
113
114    /// Readies messages to be read by clients.
115    pub(crate) fn ready(&mut self) {
116        debug_assert!(!self.is_ready);
117        self.is_ready = true;
118
119        debug_assert!(self.ready.is_empty());
120
121        self.ready.reserve_exact(self.staging.len());
122
123        fn sort_and_merge<M: Clone + Ord>(
124            msgs: &mut Vec<(M, Range<u32>)>,
125            staging: &[u8],
126            ready: &mut Vec<u8>,
127        ) {
128            // Sort must be stable.
129            msgs.sort_by_key(|(msg, _)| msg.clone());
130
131            // Make sure the first element is already copied to "ready".
132            if let Some((_, range)) = msgs.first_mut() {
133                let start = ready.len();
134                ready.extend_from_slice(&staging[range.start as usize..range.end as usize]);
135                let end = ready.len();
136
137                *range = start as u32..end as u32;
138            }
139
140            msgs.dedup_by(|(right_msg, right_range), (left_msg, left_range)| {
141                if *left_msg == *right_msg {
142                    // Extend the left element with the right element. Then delete the right
143                    // element.
144
145                    let right_bytes =
146                        &staging[right_range.start as usize..right_range.end as usize];
147
148                    ready.extend_from_slice(right_bytes);
149
150                    left_range.end += right_bytes.len() as u32;
151
152                    true
153                } else {
154                    // Copy right element to "ready".
155
156                    let right_bytes =
157                        &staging[right_range.start as usize..right_range.end as usize];
158
159                    let start = ready.len();
160                    ready.extend_from_slice(right_bytes);
161                    let end = ready.len();
162
163                    *right_range = start as u32..end as u32;
164
165                    false
166                }
167            });
168        }
169
170        sort_and_merge(&mut self.global, &self.staging, &mut self.ready);
171        sort_and_merge(&mut self.local, &self.staging, &mut self.ready);
172
173        self.bvh.build(
174            self.local
175                .iter()
176                .cloned()
177                .map(|(msg, range)| MessagePair { msg, range }),
178        );
179    }
180
181    pub(crate) fn unready(&mut self) {
182        assert!(self.is_ready);
183        self.is_ready = false;
184
185        self.local.clear();
186        self.global.clear();
187        self.staging.clear();
188        self.ready.clear();
189    }
190
191    pub(crate) fn shrink_to_fit(&mut self) {
192        self.global.shrink_to_fit();
193        self.local.shrink_to_fit();
194        self.bvh.shrink_to_fit();
195        self.staging.shrink_to_fit();
196        self.ready.shrink_to_fit();
197    }
198
199    /// All message bytes. Use this in conjunction with [`Self::iter_global`]
200    /// and [`Self::query_local`].
201    pub fn bytes(&self) -> &[u8] {
202        debug_assert!(self.is_ready);
203
204        &self.ready
205    }
206
207    /// Returns an iterator over all global messages and their span of bytes in
208    /// [`Self::bytes`].
209    pub fn iter_global(&self) -> impl Iterator<Item = (G, Range<usize>)> + '_ {
210        debug_assert!(self.is_ready);
211
212        self.global
213            .iter()
214            .map(|(m, r)| (m.clone(), r.start as usize..r.end as usize))
215    }
216
217    /// Takes a visitor function `f` and visits all local messages contained
218    /// within the chunk view `view`. `f` is called with the local
219    /// message and its span of bytes in [`Self::bytes`].
220    pub fn query_local<F: FnMut(L, Range<usize>)>(&self, view: ChunkView, mut f: F) {
221        debug_assert!(self.is_ready);
222
223        self.bvh.query(view, |pair| {
224            f(
225                pair.msg.clone(),
226                pair.range.start as usize..pair.range.end as usize,
227            )
228        });
229    }
230}
231
232impl<G, L> Default for Messages<G, L> {
233    fn default() -> Self {
234        Self {
235            global: Default::default(),
236            local: Default::default(),
237            bvh: Default::default(),
238            staging: Default::default(),
239            ready: Default::default(),
240            is_ready: Default::default(),
241        }
242    }
243}
244
245impl<G, L> fmt::Debug for Messages<G, L>
246where
247    G: fmt::Debug,
248    L: fmt::Debug,
249{
250    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251        f.debug_struct("Messages")
252            .field("global", &self.global)
253            .field("local", &self.local)
254            .field("is_ready", &self.is_ready)
255            .finish_non_exhaustive()
256    }
257}
258
259#[derive(Debug)]
260struct MessagePair<M> {
261    msg: M,
262    range: Range<u32>,
263}
264
265impl<M: GetChunkPos> GetChunkPos for MessagePair<M> {
266    fn chunk_pos(&self) -> ChunkPos {
267        self.msg.chunk_pos()
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
276    struct DummyLocal;
277
278    impl GetChunkPos for DummyLocal {
279        fn chunk_pos(&self) -> ChunkPos {
280            unimplemented!()
281        }
282    }
283
284    #[test]
285    fn send_global_message() {
286        #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
287        enum TestMsg {
288            Foo,
289            Bar,
290        }
291
292        let mut messages = Messages::<TestMsg, DummyLocal>::new();
293
294        messages.send_global_infallible(TestMsg::Foo, |b| b.extend_from_slice(&[1, 2, 3]));
295        messages.send_global_infallible(TestMsg::Bar, |b| b.extend_from_slice(&[4, 5, 6]));
296        messages.send_global_infallible(TestMsg::Foo, |b| b.extend_from_slice(&[7, 8, 9]));
297
298        messages.ready();
299
300        let bytes = messages.bytes();
301
302        for (msg, range) in messages.iter_global() {
303            match msg {
304                TestMsg::Foo => assert_eq!(&bytes[range.clone()], &[1, 2, 3, 7, 8, 9]),
305                TestMsg::Bar => assert_eq!(&bytes[range.clone()], &[4, 5, 6]),
306            }
307        }
308
309        messages.unready();
310    }
311}