1use std::collections::hash_map::Entry;
2use std::collections::{HashMap, HashSet};
3use std::path::PathBuf;
4use std::thread;
5
6use bevy_app::prelude::*;
7use bevy_ecs::prelude::*;
8use flume::{Receiver, Sender};
9use valence_server::client::{Client, OldView, View};
10use valence_server::entity::{EntityLayerId, OldEntityLayerId};
11use valence_server::layer::UpdateLayersPreClientSet;
12use valence_server::protocol::anyhow;
13use valence_server::registry::BiomeRegistry;
14use valence_server::{ChunkLayer, ChunkPos};
15
16use crate::parsing::{DimensionFolder, ParsedChunk};
17
18type WorkerResult = anyhow::Result<Option<ParsedChunk>>;
19
20type Priority = u64;
23
24#[derive(Component, Debug)]
25pub struct AnvilLevel {
26 worker_state: Option<ChunkWorkerState>,
28 pub ignored_chunks: HashSet<ChunkPos>,
33 pending: HashMap<ChunkPos, Option<Priority>>,
36 sender: Sender<ChunkPos>,
38 receiver: Receiver<(ChunkPos, WorkerResult)>,
40}
41
42impl AnvilLevel {
43 pub fn new<R: Into<PathBuf>>(world_root: R, biomes: &BiomeRegistry) -> Self {
44 let (pending_sender, pending_receiver) = flume::unbounded();
45 let (finished_sender, finished_receiver) = flume::bounded(4096);
46
47 Self {
48 worker_state: Some(ChunkWorkerState {
49 dimension_folder: DimensionFolder::new(world_root, biomes),
50 sender: finished_sender,
51 receiver: pending_receiver,
52 }),
53 ignored_chunks: HashSet::new(),
54 pending: HashMap::new(),
55 sender: pending_sender,
56 receiver: finished_receiver,
57 }
58 }
59
60 pub fn force_chunk_load(&mut self, pos: ChunkPos) {
67 match self.pending.entry(pos) {
68 Entry::Occupied(oe) => {
69 if let Some(priority) = oe.into_mut() {
72 *priority = 0;
73 }
74 }
75 Entry::Vacant(ve) => {
76 ve.insert(Some(0));
77 }
78 }
79 }
80}
81
82#[derive(Debug)]
83struct ChunkWorkerState {
84 dimension_folder: DimensionFolder,
87 sender: Sender<(ChunkPos, WorkerResult)>,
89 receiver: Receiver<ChunkPos>,
91}
92
93pub struct AnvilPlugin;
94
95impl Plugin for AnvilPlugin {
96 fn build(&self, app: &mut App) {
97 app.add_event::<ChunkLoadEvent>()
98 .add_event::<ChunkUnloadEvent>()
99 .add_systems(PreUpdate, remove_unviewed_chunks)
100 .add_systems(
101 PostUpdate,
102 (init_anvil, update_client_views, send_recv_chunks)
103 .chain()
104 .before(UpdateLayersPreClientSet),
105 );
106 }
107}
108
109fn init_anvil(mut query: Query<&mut AnvilLevel, (Added<AnvilLevel>, With<ChunkLayer>)>) {
110 for mut level in &mut query {
111 if let Some(state) = level.worker_state.take() {
112 thread::spawn(move || anvil_worker(state));
113 }
114 }
115}
116
117fn remove_unviewed_chunks(
122 mut chunk_layers: Query<(Entity, &mut ChunkLayer, &AnvilLevel)>,
123 mut unload_events: EventWriter<ChunkUnloadEvent>,
124) {
125 for (entity, mut layer, anvil) in &mut chunk_layers {
126 layer.retain_chunks(|pos, chunk| {
127 if chunk.viewer_count_mut() > 0 || anvil.ignored_chunks.contains(&pos) {
128 true
129 } else {
130 unload_events.send(ChunkUnloadEvent {
131 chunk_layer: entity,
132 pos,
133 });
134 false
135 }
136 });
137 }
138}
139
140fn update_client_views(
141 clients: Query<(&EntityLayerId, Ref<OldEntityLayerId>, View, OldView), With<Client>>,
142 mut chunk_layers: Query<(&ChunkLayer, &mut AnvilLevel)>,
143) {
144 for (loc, old_loc, view, old_view) in &clients {
145 let view = view.get();
146 let old_view = old_view.get();
147
148 if loc != &*old_loc || view != old_view || old_loc.is_added() {
149 let Ok((layer, mut anvil)) = chunk_layers.get_mut(loc.0) else {
150 continue;
151 };
152
153 let queue_pos = |pos| {
154 if !anvil.ignored_chunks.contains(&pos) && layer.chunk(pos).is_none() {
155 match anvil.pending.entry(pos) {
157 Entry::Occupied(mut oe) => {
158 if let Some(priority) = oe.get_mut() {
159 let dist = view.pos.distance_squared(pos);
160 *priority = (*priority).min(dist);
161 }
162 }
163 Entry::Vacant(ve) => {
164 let dist = view.pos.distance_squared(pos);
165 ve.insert(Some(dist));
166 }
167 }
168 }
169 };
170
171 if old_loc.is_added() {
173 view.iter().for_each(queue_pos);
174 } else {
175 view.diff(old_view).for_each(queue_pos);
176 }
177 }
178 }
179}
180
181fn send_recv_chunks(
182 mut layers: Query<(Entity, &mut ChunkLayer, &mut AnvilLevel)>,
183 mut to_send: Local<Vec<(Priority, ChunkPos)>>,
184 mut load_events: EventWriter<ChunkLoadEvent>,
185) {
186 for (entity, mut layer, anvil) in &mut layers {
187 let anvil = anvil.into_inner();
188
189 for (pos, res) in anvil.receiver.drain() {
192 anvil.pending.remove(&pos);
193
194 let status = match res {
195 Ok(Some(ParsedChunk { chunk, timestamp })) => {
196 layer.insert_chunk(pos, chunk);
197 ChunkLoadStatus::Success { timestamp }
198 }
199 Ok(None) => ChunkLoadStatus::Empty,
200 Err(e) => ChunkLoadStatus::Failed(e),
201 };
202
203 load_events.send(ChunkLoadEvent {
204 chunk_layer: entity,
205 pos,
206 status,
207 });
208 }
209
210 for (pos, priority) in &mut anvil.pending {
212 if let Some(pri) = priority.take() {
213 to_send.push((pri, *pos));
214 }
215 }
216
217 to_send.sort_unstable_by_key(|(pri, _)| *pri);
219
220 for (_, pos) in to_send.drain(..) {
222 let _ = anvil.sender.try_send(pos);
223 }
224 }
225}
226
227fn anvil_worker(mut state: ChunkWorkerState) {
228 while let Ok(pos) = state.receiver.recv() {
229 let res = state
230 .dimension_folder
231 .get_chunk(pos)
232 .map_err(anyhow::Error::from);
233
234 let _ = state.sender.send((pos, res));
235 }
236}
237
238#[derive(Event, Debug)]
240pub struct ChunkLoadEvent {
241 pub chunk_layer: Entity,
243 pub pos: ChunkPos,
245 pub status: ChunkLoadStatus,
246}
247
248#[derive(Debug)]
249pub enum ChunkLoadStatus {
250 Success {
252 timestamp: u32,
255 },
256 Empty,
259 Failed(anyhow::Error),
261}
262
263#[derive(Event, Debug)]
265pub struct ChunkUnloadEvent {
266 pub chunk_layer: Entity,
268 pub pos: ChunkPos,
270}