1use core::fmt;
2use std::convert::Infallible;
3use std::ops::Range;
4
5use valence_protocol::ChunkPos;
6
7use crate::layer::bvh::{ChunkBvh, GetChunkPos};
8use crate::ChunkView;
9
10pub struct Messages<G, L> {
31 global: Vec<(G, Range<u32>)>,
32 local: Vec<(L, Range<u32>)>,
33 bvh: ChunkBvh<MessagePair<L>>,
34 staging: Vec<u8>,
35 ready: Vec<u8>,
36 is_ready: bool,
37}
38
39impl<G, L> Messages<G, L>
40where
41 G: Clone + Ord,
42 L: Clone + Ord + GetChunkPos,
43{
44 pub(crate) fn new() -> Self {
45 Self::default()
46 }
47
48 pub(crate) fn send_global<E>(
50 &mut self,
51 msg: G,
52 f: impl FnOnce(&mut Vec<u8>) -> Result<(), E>,
53 ) -> Result<(), E> {
54 debug_assert!(!self.is_ready);
55
56 let start = self.staging.len();
57 f(&mut self.staging)?;
58 let end = self.staging.len();
59
60 if let Some((m, range)) = self.global.last_mut() {
61 if msg == *m {
62 range.end = end as u32;
64 return Ok(());
65 }
66 }
67
68 self.global.push((msg, start as u32..end as u32));
69
70 Ok(())
71 }
72
73 pub(crate) fn send_local<E>(
75 &mut self,
76 msg: L,
77 f: impl FnOnce(&mut Vec<u8>) -> Result<(), E>,
78 ) -> Result<(), E> {
79 debug_assert!(!self.is_ready);
80
81 let start = self.staging.len();
82 f(&mut self.staging)?;
83 let end = self.staging.len();
84
85 if let Some((m, range)) = self.local.last_mut() {
86 if msg == *m {
87 range.end = end as u32;
89 return Ok(());
90 }
91 }
92
93 self.local.push((msg, start as u32..end as u32));
94
95 Ok(())
96 }
97
98 pub(crate) fn send_global_infallible(&mut self, msg: G, f: impl FnOnce(&mut Vec<u8>)) {
100 let _ = self.send_global::<Infallible>(msg, |b| {
101 f(b);
102 Ok(())
103 });
104 }
105
106 pub(crate) fn send_local_infallible(&mut self, msg: L, f: impl FnOnce(&mut Vec<u8>)) {
108 let _ = self.send_local::<Infallible>(msg, |b| {
109 f(b);
110 Ok(())
111 });
112 }
113
114 pub(crate) fn ready(&mut self) {
116 debug_assert!(!self.is_ready);
117 self.is_ready = true;
118
119 debug_assert!(self.ready.is_empty());
120
121 self.ready.reserve_exact(self.staging.len());
122
123 fn sort_and_merge<M: Clone + Ord>(
124 msgs: &mut Vec<(M, Range<u32>)>,
125 staging: &[u8],
126 ready: &mut Vec<u8>,
127 ) {
128 msgs.sort_by_key(|(msg, _)| msg.clone());
130
131 if let Some((_, range)) = msgs.first_mut() {
133 let start = ready.len();
134 ready.extend_from_slice(&staging[range.start as usize..range.end as usize]);
135 let end = ready.len();
136
137 *range = start as u32..end as u32;
138 }
139
140 msgs.dedup_by(|(right_msg, right_range), (left_msg, left_range)| {
141 if *left_msg == *right_msg {
142 let right_bytes =
146 &staging[right_range.start as usize..right_range.end as usize];
147
148 ready.extend_from_slice(right_bytes);
149
150 left_range.end += right_bytes.len() as u32;
151
152 true
153 } else {
154 let right_bytes =
157 &staging[right_range.start as usize..right_range.end as usize];
158
159 let start = ready.len();
160 ready.extend_from_slice(right_bytes);
161 let end = ready.len();
162
163 *right_range = start as u32..end as u32;
164
165 false
166 }
167 });
168 }
169
170 sort_and_merge(&mut self.global, &self.staging, &mut self.ready);
171 sort_and_merge(&mut self.local, &self.staging, &mut self.ready);
172
173 self.bvh.build(
174 self.local
175 .iter()
176 .cloned()
177 .map(|(msg, range)| MessagePair { msg, range }),
178 );
179 }
180
181 pub(crate) fn unready(&mut self) {
182 assert!(self.is_ready);
183 self.is_ready = false;
184
185 self.local.clear();
186 self.global.clear();
187 self.staging.clear();
188 self.ready.clear();
189 }
190
191 pub(crate) fn shrink_to_fit(&mut self) {
192 self.global.shrink_to_fit();
193 self.local.shrink_to_fit();
194 self.bvh.shrink_to_fit();
195 self.staging.shrink_to_fit();
196 self.ready.shrink_to_fit();
197 }
198
199 pub fn bytes(&self) -> &[u8] {
202 debug_assert!(self.is_ready);
203
204 &self.ready
205 }
206
207 pub fn iter_global(&self) -> impl Iterator<Item = (G, Range<usize>)> + '_ {
210 debug_assert!(self.is_ready);
211
212 self.global
213 .iter()
214 .map(|(m, r)| (m.clone(), r.start as usize..r.end as usize))
215 }
216
217 pub fn query_local<F: FnMut(L, Range<usize>)>(&self, view: ChunkView, mut f: F) {
221 debug_assert!(self.is_ready);
222
223 self.bvh.query(view, |pair| {
224 f(
225 pair.msg.clone(),
226 pair.range.start as usize..pair.range.end as usize,
227 )
228 });
229 }
230}
231
232impl<G, L> Default for Messages<G, L> {
233 fn default() -> Self {
234 Self {
235 global: Default::default(),
236 local: Default::default(),
237 bvh: Default::default(),
238 staging: Default::default(),
239 ready: Default::default(),
240 is_ready: Default::default(),
241 }
242 }
243}
244
245impl<G, L> fmt::Debug for Messages<G, L>
246where
247 G: fmt::Debug,
248 L: fmt::Debug,
249{
250 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251 f.debug_struct("Messages")
252 .field("global", &self.global)
253 .field("local", &self.local)
254 .field("is_ready", &self.is_ready)
255 .finish_non_exhaustive()
256 }
257}
258
259#[derive(Debug)]
260struct MessagePair<M> {
261 msg: M,
262 range: Range<u32>,
263}
264
265impl<M: GetChunkPos> GetChunkPos for MessagePair<M> {
266 fn chunk_pos(&self) -> ChunkPos {
267 self.msg.chunk_pos()
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
276 struct DummyLocal;
277
278 impl GetChunkPos for DummyLocal {
279 fn chunk_pos(&self) -> ChunkPos {
280 unimplemented!()
281 }
282 }
283
284 #[test]
285 fn send_global_message() {
286 #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
287 enum TestMsg {
288 Foo,
289 Bar,
290 }
291
292 let mut messages = Messages::<TestMsg, DummyLocal>::new();
293
294 messages.send_global_infallible(TestMsg::Foo, |b| b.extend_from_slice(&[1, 2, 3]));
295 messages.send_global_infallible(TestMsg::Bar, |b| b.extend_from_slice(&[4, 5, 6]));
296 messages.send_global_infallible(TestMsg::Foo, |b| b.extend_from_slice(&[7, 8, 9]));
297
298 messages.ready();
299
300 let bytes = messages.bytes();
301
302 for (msg, range) in messages.iter_global() {
303 match msg {
304 TestMsg::Foo => assert_eq!(&bytes[range.clone()], &[1, 2, 3, 7, 8, 9]),
305 TestMsg::Bar => assert_eq!(&bytes[range.clone()], &[4, 5, 6]),
306 }
307 }
308
309 messages.unready();
310 }
311}