valence_anvil/
bevy.rs

1use std::collections::hash_map::Entry;
2use std::collections::{HashMap, HashSet};
3use std::path::PathBuf;
4use std::thread;
5
6use bevy_app::prelude::*;
7use bevy_ecs::prelude::*;
8use flume::{Receiver, Sender};
9use valence_server::client::{Client, OldView, View};
10use valence_server::entity::{EntityLayerId, OldEntityLayerId};
11use valence_server::layer::UpdateLayersPreClientSet;
12use valence_server::protocol::anyhow;
13use valence_server::registry::BiomeRegistry;
14use valence_server::{ChunkLayer, ChunkPos};
15
16use crate::parsing::{DimensionFolder, ParsedChunk};
17
18type WorkerResult = anyhow::Result<Option<ParsedChunk>>;
19
20/// The order in which chunks should be processed by the anvil worker. Smaller
21/// values are sent first.
22type Priority = u64;
23
24#[derive(Component, Debug)]
25pub struct AnvilLevel {
26    /// Chunk worker state to be moved to another thread.
27    worker_state: Option<ChunkWorkerState>,
28    /// The set of chunk positions that should not be loaded or unloaded by
29    /// the anvil system.
30    ///
31    /// This set is empty by default, but you can modify it at any time.
32    pub ignored_chunks: HashSet<ChunkPos>,
33    /// Chunks that need to be loaded. Chunks with `None` priority have already
34    /// been sent to the anvil thread.
35    pending: HashMap<ChunkPos, Option<Priority>>,
36    /// Sender for the chunk worker thread.
37    sender: Sender<ChunkPos>,
38    /// Receiver for the chunk worker thread.
39    receiver: Receiver<(ChunkPos, WorkerResult)>,
40}
41
42impl AnvilLevel {
43    pub fn new<R: Into<PathBuf>>(world_root: R, biomes: &BiomeRegistry) -> Self {
44        let (pending_sender, pending_receiver) = flume::unbounded();
45        let (finished_sender, finished_receiver) = flume::bounded(4096);
46
47        Self {
48            worker_state: Some(ChunkWorkerState {
49                dimension_folder: DimensionFolder::new(world_root, biomes),
50                sender: finished_sender,
51                receiver: pending_receiver,
52            }),
53            ignored_chunks: HashSet::new(),
54            pending: HashMap::new(),
55            sender: pending_sender,
56            receiver: finished_receiver,
57        }
58    }
59
60    /// Forces a chunk to be loaded at a specific position in this world. This
61    /// will bypass [`AnvilLevel::ignored_chunks`].
62    /// Note that the chunk will be unloaded next tick unless it has been added
63    /// to [`AnvilLevel::ignored_chunks`] or it is in view of a client.
64    ///
65    /// This has no effect if a chunk at the position is already present.
66    pub fn force_chunk_load(&mut self, pos: ChunkPos) {
67        match self.pending.entry(pos) {
68            Entry::Occupied(oe) => {
69                // If the chunk is already scheduled to load but hasn't been sent to the chunk
70                // worker yet, then give it the highest priority.
71                if let Some(priority) = oe.into_mut() {
72                    *priority = 0;
73                }
74            }
75            Entry::Vacant(ve) => {
76                ve.insert(Some(0));
77            }
78        }
79    }
80}
81
82#[derive(Debug)]
83struct ChunkWorkerState {
84    /// The world folder containing the region folder where chunks are loaded
85    /// from.
86    dimension_folder: DimensionFolder,
87    /// Sender of finished chunks.
88    sender: Sender<(ChunkPos, WorkerResult)>,
89    /// Receiver of pending chunks.
90    receiver: Receiver<ChunkPos>,
91}
92
93pub struct AnvilPlugin;
94
95impl Plugin for AnvilPlugin {
96    fn build(&self, app: &mut App) {
97        app.add_event::<ChunkLoadEvent>()
98            .add_event::<ChunkUnloadEvent>()
99            .add_systems(PreUpdate, remove_unviewed_chunks)
100            .add_systems(
101                PostUpdate,
102                (init_anvil, update_client_views, send_recv_chunks)
103                    .chain()
104                    .before(UpdateLayersPreClientSet),
105            );
106    }
107}
108
109fn init_anvil(mut query: Query<&mut AnvilLevel, (Added<AnvilLevel>, With<ChunkLayer>)>) {
110    for mut level in &mut query {
111        if let Some(state) = level.worker_state.take() {
112            thread::spawn(move || anvil_worker(state));
113        }
114    }
115}
116
117/// Removes all chunks no longer viewed by clients.
118///
119/// This needs to run in `PreUpdate` where the chunk viewer counts have been
120/// updated from the previous tick.
121fn remove_unviewed_chunks(
122    mut chunk_layers: Query<(Entity, &mut ChunkLayer, &AnvilLevel)>,
123    mut unload_events: EventWriter<ChunkUnloadEvent>,
124) {
125    for (entity, mut layer, anvil) in &mut chunk_layers {
126        layer.retain_chunks(|pos, chunk| {
127            if chunk.viewer_count_mut() > 0 || anvil.ignored_chunks.contains(&pos) {
128                true
129            } else {
130                unload_events.send(ChunkUnloadEvent {
131                    chunk_layer: entity,
132                    pos,
133                });
134                false
135            }
136        });
137    }
138}
139
140fn update_client_views(
141    clients: Query<(&EntityLayerId, Ref<OldEntityLayerId>, View, OldView), With<Client>>,
142    mut chunk_layers: Query<(&ChunkLayer, &mut AnvilLevel)>,
143) {
144    for (loc, old_loc, view, old_view) in &clients {
145        let view = view.get();
146        let old_view = old_view.get();
147
148        if loc != &*old_loc || view != old_view || old_loc.is_added() {
149            let Ok((layer, mut anvil)) = chunk_layers.get_mut(loc.0) else {
150                continue;
151            };
152
153            let queue_pos = |pos| {
154                if !anvil.ignored_chunks.contains(&pos) && layer.chunk(pos).is_none() {
155                    // Chunks closer to clients are prioritized.
156                    match anvil.pending.entry(pos) {
157                        Entry::Occupied(mut oe) => {
158                            if let Some(priority) = oe.get_mut() {
159                                let dist = view.pos.distance_squared(pos);
160                                *priority = (*priority).min(dist);
161                            }
162                        }
163                        Entry::Vacant(ve) => {
164                            let dist = view.pos.distance_squared(pos);
165                            ve.insert(Some(dist));
166                        }
167                    }
168                }
169            };
170
171            // Queue all the new chunks in the view to be sent to the anvil worker.
172            if old_loc.is_added() {
173                view.iter().for_each(queue_pos);
174            } else {
175                view.diff(old_view).for_each(queue_pos);
176            }
177        }
178    }
179}
180
181fn send_recv_chunks(
182    mut layers: Query<(Entity, &mut ChunkLayer, &mut AnvilLevel)>,
183    mut to_send: Local<Vec<(Priority, ChunkPos)>>,
184    mut load_events: EventWriter<ChunkLoadEvent>,
185) {
186    for (entity, mut layer, anvil) in &mut layers {
187        let anvil = anvil.into_inner();
188
189        // Insert the chunks that are finished loading into the chunk layer and send
190        // load events.
191        for (pos, res) in anvil.receiver.drain() {
192            anvil.pending.remove(&pos);
193
194            let status = match res {
195                Ok(Some(ParsedChunk { chunk, timestamp })) => {
196                    layer.insert_chunk(pos, chunk);
197                    ChunkLoadStatus::Success { timestamp }
198                }
199                Ok(None) => ChunkLoadStatus::Empty,
200                Err(e) => ChunkLoadStatus::Failed(e),
201            };
202
203            load_events.send(ChunkLoadEvent {
204                chunk_layer: entity,
205                pos,
206                status,
207            });
208        }
209
210        // Collect all the new chunks that need to be loaded this tick.
211        for (pos, priority) in &mut anvil.pending {
212            if let Some(pri) = priority.take() {
213                to_send.push((pri, *pos));
214            }
215        }
216
217        // Sort chunks by ascending priority.
218        to_send.sort_unstable_by_key(|(pri, _)| *pri);
219
220        // Send the sorted chunks to be loaded.
221        for (_, pos) in to_send.drain(..) {
222            let _ = anvil.sender.try_send(pos);
223        }
224    }
225}
226
227fn anvil_worker(mut state: ChunkWorkerState) {
228    while let Ok(pos) = state.receiver.recv() {
229        let res = state
230            .dimension_folder
231            .get_chunk(pos)
232            .map_err(anyhow::Error::from);
233
234        let _ = state.sender.send((pos, res));
235    }
236}
237
238/// An event sent by `valence_anvil` after an attempt to load a chunk is made.
239#[derive(Event, Debug)]
240pub struct ChunkLoadEvent {
241    /// The [`ChunkLayer`] where the chunk is located.
242    pub chunk_layer: Entity,
243    /// The position of the chunk in the layer.
244    pub pos: ChunkPos,
245    pub status: ChunkLoadStatus,
246}
247
248#[derive(Debug)]
249pub enum ChunkLoadStatus {
250    /// A new chunk was successfully loaded and inserted into the layer.
251    Success {
252        /// The time this chunk was last modified, measured in seconds since the
253        /// epoch.
254        timestamp: u32,
255    },
256    /// The Anvil level does not have a chunk at the position. No chunk was
257    /// loaded.
258    Empty,
259    /// An attempt was made to load the chunk, but something went wrong.
260    Failed(anyhow::Error),
261}
262
263/// An event sent by `valence_anvil` when a chunk is unloaded from an layer.
264#[derive(Event, Debug)]
265pub struct ChunkUnloadEvent {
266    /// The [`ChunkLayer`] where the chunk was unloaded.
267    pub chunk_layer: Entity,
268    /// The position of the chunk that was unloaded.
269    pub pos: ChunkPos,
270}