1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::thread;

use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use flume::{Receiver, Sender};
use valence_server::client::{Client, OldView, View};
use valence_server::entity::{EntityLayerId, OldEntityLayerId};
use valence_server::layer::UpdateLayersPreClientSet;
use valence_server::protocol::anyhow;
use valence_server::registry::BiomeRegistry;
use valence_server::{ChunkLayer, ChunkPos};

use crate::parsing::{DimensionFolder, ParsedChunk};

type WorkerResult = anyhow::Result<Option<ParsedChunk>>;

/// The order in which chunks should be processed by the anvil worker. Smaller
/// values are sent first.
type Priority = u64;

#[derive(Component, Debug)]
pub struct AnvilLevel {
    /// Chunk worker state to be moved to another thread.
    worker_state: Option<ChunkWorkerState>,
    /// The set of chunk positions that should not be loaded or unloaded by
    /// the anvil system.
    ///
    /// This set is empty by default, but you can modify it at any time.
    pub ignored_chunks: HashSet<ChunkPos>,
    /// Chunks that need to be loaded. Chunks with `None` priority have already
    /// been sent to the anvil thread.
    pending: HashMap<ChunkPos, Option<Priority>>,
    /// Sender for the chunk worker thread.
    sender: Sender<ChunkPos>,
    /// Receiver for the chunk worker thread.
    receiver: Receiver<(ChunkPos, WorkerResult)>,
}

impl AnvilLevel {
    pub fn new<R: Into<PathBuf>>(world_root: R, biomes: &BiomeRegistry) -> Self {
        let (pending_sender, pending_receiver) = flume::unbounded();
        let (finished_sender, finished_receiver) = flume::bounded(4096);

        Self {
            worker_state: Some(ChunkWorkerState {
                dimension_folder: DimensionFolder::new(world_root, biomes),
                sender: finished_sender,
                receiver: pending_receiver,
            }),
            ignored_chunks: HashSet::new(),
            pending: HashMap::new(),
            sender: pending_sender,
            receiver: finished_receiver,
        }
    }

    /// Forces a chunk to be loaded at a specific position in this world. This
    /// will bypass [`AnvilLevel::ignored_chunks`].
    /// Note that the chunk will be unloaded next tick unless it has been added
    /// to [`AnvilLevel::ignored_chunks`] or it is in view of a client.
    ///
    /// This has no effect if a chunk at the position is already present.
    pub fn force_chunk_load(&mut self, pos: ChunkPos) {
        match self.pending.entry(pos) {
            Entry::Occupied(oe) => {
                // If the chunk is already scheduled to load but hasn't been sent to the chunk
                // worker yet, then give it the highest priority.
                if let Some(priority) = oe.into_mut() {
                    *priority = 0;
                }
            }
            Entry::Vacant(ve) => {
                ve.insert(Some(0));
            }
        }
    }
}

#[derive(Debug)]
struct ChunkWorkerState {
    /// The world folder containing the region folder where chunks are loaded
    /// from.
    dimension_folder: DimensionFolder,
    /// Sender of finished chunks.
    sender: Sender<(ChunkPos, WorkerResult)>,
    /// Receiver of pending chunks.
    receiver: Receiver<ChunkPos>,
}

pub struct AnvilPlugin;

impl Plugin for AnvilPlugin {
    fn build(&self, app: &mut App) {
        app.add_event::<ChunkLoadEvent>()
            .add_event::<ChunkUnloadEvent>()
            .add_systems(PreUpdate, remove_unviewed_chunks)
            .add_systems(
                PostUpdate,
                (init_anvil, update_client_views, send_recv_chunks)
                    .chain()
                    .before(UpdateLayersPreClientSet),
            );
    }
}

fn init_anvil(mut query: Query<&mut AnvilLevel, (Added<AnvilLevel>, With<ChunkLayer>)>) {
    for mut level in &mut query {
        if let Some(state) = level.worker_state.take() {
            thread::spawn(move || anvil_worker(state));
        }
    }
}

/// Removes all chunks no longer viewed by clients.
///
/// This needs to run in `PreUpdate` where the chunk viewer counts have been
/// updated from the previous tick.
fn remove_unviewed_chunks(
    mut chunk_layers: Query<(Entity, &mut ChunkLayer, &AnvilLevel)>,
    mut unload_events: EventWriter<ChunkUnloadEvent>,
) {
    for (entity, mut layer, anvil) in &mut chunk_layers {
        layer.retain_chunks(|pos, chunk| {
            if chunk.viewer_count_mut() > 0 || anvil.ignored_chunks.contains(&pos) {
                true
            } else {
                unload_events.send(ChunkUnloadEvent {
                    chunk_layer: entity,
                    pos,
                });
                false
            }
        });
    }
}

fn update_client_views(
    clients: Query<(&EntityLayerId, Ref<OldEntityLayerId>, View, OldView), With<Client>>,
    mut chunk_layers: Query<(&ChunkLayer, &mut AnvilLevel)>,
) {
    for (loc, old_loc, view, old_view) in &clients {
        let view = view.get();
        let old_view = old_view.get();

        if loc != &*old_loc || view != old_view || old_loc.is_added() {
            let Ok((layer, mut anvil)) = chunk_layers.get_mut(loc.0) else {
                continue;
            };

            let queue_pos = |pos| {
                if !anvil.ignored_chunks.contains(&pos) && layer.chunk(pos).is_none() {
                    // Chunks closer to clients are prioritized.
                    match anvil.pending.entry(pos) {
                        Entry::Occupied(mut oe) => {
                            if let Some(priority) = oe.get_mut() {
                                let dist = view.pos.distance_squared(pos);
                                *priority = (*priority).min(dist);
                            }
                        }
                        Entry::Vacant(ve) => {
                            let dist = view.pos.distance_squared(pos);
                            ve.insert(Some(dist));
                        }
                    }
                }
            };

            // Queue all the new chunks in the view to be sent to the anvil worker.
            if old_loc.is_added() {
                view.iter().for_each(queue_pos);
            } else {
                view.diff(old_view).for_each(queue_pos);
            }
        }
    }
}

fn send_recv_chunks(
    mut layers: Query<(Entity, &mut ChunkLayer, &mut AnvilLevel)>,
    mut to_send: Local<Vec<(Priority, ChunkPos)>>,
    mut load_events: EventWriter<ChunkLoadEvent>,
) {
    for (entity, mut layer, anvil) in &mut layers {
        let anvil = anvil.into_inner();

        // Insert the chunks that are finished loading into the chunk layer and send
        // load events.
        for (pos, res) in anvil.receiver.drain() {
            anvil.pending.remove(&pos);

            let status = match res {
                Ok(Some(ParsedChunk { chunk, timestamp })) => {
                    layer.insert_chunk(pos, chunk);
                    ChunkLoadStatus::Success { timestamp }
                }
                Ok(None) => ChunkLoadStatus::Empty,
                Err(e) => ChunkLoadStatus::Failed(e),
            };

            load_events.send(ChunkLoadEvent {
                chunk_layer: entity,
                pos,
                status,
            });
        }

        // Collect all the new chunks that need to be loaded this tick.
        for (pos, priority) in &mut anvil.pending {
            if let Some(pri) = priority.take() {
                to_send.push((pri, *pos));
            }
        }

        // Sort chunks by ascending priority.
        to_send.sort_unstable_by_key(|(pri, _)| *pri);

        // Send the sorted chunks to be loaded.
        for (_, pos) in to_send.drain(..) {
            let _ = anvil.sender.try_send(pos);
        }
    }
}

fn anvil_worker(mut state: ChunkWorkerState) {
    while let Ok(pos) = state.receiver.recv() {
        let res = state
            .dimension_folder
            .get_chunk(pos)
            .map_err(anyhow::Error::from);

        let _ = state.sender.send((pos, res));
    }
}

/// An event sent by `valence_anvil` after an attempt to load a chunk is made.
#[derive(Event, Debug)]
pub struct ChunkLoadEvent {
    /// The [`ChunkLayer`] where the chunk is located.
    pub chunk_layer: Entity,
    /// The position of the chunk in the layer.
    pub pos: ChunkPos,
    pub status: ChunkLoadStatus,
}

#[derive(Debug)]
pub enum ChunkLoadStatus {
    /// A new chunk was successfully loaded and inserted into the layer.
    Success {
        /// The time this chunk was last modified, measured in seconds since the
        /// epoch.
        timestamp: u32,
    },
    /// The Anvil level does not have a chunk at the position. No chunk was
    /// loaded.
    Empty,
    /// An attempt was made to load the chunk, but something went wrong.
    Failed(anyhow::Error),
}

/// An event sent by `valence_anvil` when a chunk is unloaded from an layer.
#[derive(Event, Debug)]
pub struct ChunkUnloadEvent {
    /// The [`ChunkLayer`] where the chunk was unloaded.
    pub chunk_layer: Entity,
    /// The position of the chunk that was unloaded.
    pub pos: ChunkPos,
}