Skip to content

Commit d275017

Browse files
committed
asset: use bevy_tasks in AssetServer
1 parent dd6f0b5 commit d275017

File tree

16 files changed

+141
-174
lines changed

16 files changed

+141
-174
lines changed

crates/bevy_app/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ dynamic_plugins = ["libloading"]
1919
# bevy
2020
bevy_derive = { path = "../bevy_derive", version = "0.2.1" }
2121
bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" }
22-
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
2322
bevy_math = { path = "../bevy_math", version = "0.2.1" }
2423

2524
# other

crates/bevy_app/src/app.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions};
1+
use crate::app_builder::AppBuilder;
22
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};
33

44
#[allow(clippy::needless_doctest_main)]
@@ -64,20 +64,16 @@ impl App {
6464
}
6565

6666
pub fn run(mut self) {
67-
// Setup the default bevy task pools
68-
self.resources
69-
.get_cloned::<DefaultTaskPoolOptions>()
70-
.unwrap_or_else(DefaultTaskPoolOptions::default)
71-
.create_default_pools(&mut self.resources);
72-
7367
self.startup_schedule
7468
.initialize(&mut self.world, &mut self.resources);
69+
self.startup_executor.initialize(&mut self.resources);
7570
self.startup_executor.run(
7671
&mut self.startup_schedule,
7772
&mut self.world,
7873
&mut self.resources,
7974
);
8075

76+
self.executor.initialize(&mut self.resources);
8177
let runner = std::mem::replace(&mut self.runner, Box::new(run_once));
8278
(runner)(self);
8379
}

crates/bevy_app/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,13 @@ mod app_builder;
88
mod event;
99
mod plugin;
1010
mod schedule_runner;
11-
mod task_pool_options;
1211

1312
pub use app::*;
1413
pub use app_builder::*;
1514
pub use bevy_derive::DynamicPlugin;
1615
pub use event::*;
1716
pub use plugin::*;
1817
pub use schedule_runner::*;
19-
pub use task_pool_options::*;
2018

2119
pub mod prelude {
2220
pub use crate::{

crates/bevy_asset/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ filesystem_watcher = ["notify"]
2020
# bevy
2121
bevy_app = { path = "../bevy_app", version = "0.2.1" }
2222
bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" }
23+
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
2324
bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" }
2425
bevy_property = { path = "../bevy_property", version = "0.2.1" }
2526
bevy_utils = { path = "../bevy_utils", version = "0.2.1" }

crates/bevy_asset/src/asset_server.rs

Lines changed: 58 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use crate::{
44
};
55
use anyhow::Result;
66
use bevy_ecs::{Res, Resource, Resources};
7+
use bevy_tasks::TaskPool;
78
use bevy_utils::{HashMap, HashSet};
89
use crossbeam_channel::TryRecvError;
910
use parking_lot::RwLock;
1011
use std::{
1112
env, fs, io,
1213
path::{Path, PathBuf},
1314
sync::Arc,
14-
thread,
1515
};
1616

1717
use thiserror::Error;
@@ -38,12 +38,6 @@ pub enum AssetServerError {
3838
AssetWatchError { path: PathBuf },
3939
}
4040

41-
struct LoaderThread {
42-
// NOTE: these must remain private. the LoaderThread Arc counters are used to determine thread liveness
43-
// if there is one reference, the loader thread is dead. if there are two references, the loader thread is active
44-
requests: Arc<RwLock<Vec<LoadRequest>>>,
45-
}
46-
4741
/// Info about a specific asset, such as its path and its current load state
4842
#[derive(Clone, Debug)]
4943
pub struct AssetInfo {
@@ -73,11 +67,10 @@ impl LoadState {
7367
/// Loads assets from the filesystem on background threads
7468
pub struct AssetServer {
7569
asset_folders: RwLock<Vec<PathBuf>>,
76-
loader_threads: RwLock<Vec<LoaderThread>>,
77-
max_loader_threads: usize,
7870
asset_handlers: Arc<RwLock<Vec<Box<dyn AssetLoadRequestHandler>>>>,
7971
// TODO: this is a hack to enable retrieving generic AssetLoader<T>s. there must be a better way!
8072
loaders: Vec<Resources>,
73+
task_pool: TaskPool,
8174
extension_to_handler_index: HashMap<String, usize>,
8275
extension_to_loader_index: HashMap<String, usize>,
8376
asset_info: RwLock<HashMap<HandleId, AssetInfo>>,
@@ -86,25 +79,22 @@ pub struct AssetServer {
8679
filesystem_watcher: Arc<RwLock<Option<FilesystemWatcher>>>,
8780
}
8881

89-
impl Default for AssetServer {
90-
fn default() -> Self {
82+
impl AssetServer {
83+
pub fn new(task_pool: TaskPool) -> Self {
9184
AssetServer {
92-
#[cfg(feature = "filesystem_watcher")]
93-
filesystem_watcher: Arc::new(RwLock::new(None)),
94-
max_loader_threads: 4,
9585
asset_folders: Default::default(),
96-
loader_threads: Default::default(),
9786
asset_handlers: Default::default(),
9887
loaders: Default::default(),
9988
extension_to_handler_index: Default::default(),
10089
extension_to_loader_index: Default::default(),
10190
asset_info_paths: Default::default(),
10291
asset_info: Default::default(),
92+
task_pool,
93+
#[cfg(feature = "filesystem_watcher")]
94+
filesystem_watcher: Arc::new(RwLock::new(None)),
10395
}
10496
}
105-
}
10697

107-
impl AssetServer {
10898
pub fn add_handler<T>(&mut self, asset_handler: T)
10999
where
110100
T: AssetLoadRequestHandler,
@@ -183,46 +173,6 @@ impl AssetServer {
183173
Ok(())
184174
}
185175

186-
#[cfg(feature = "filesystem_watcher")]
187-
pub fn filesystem_watcher_system(asset_server: Res<AssetServer>) {
188-
let mut changed = HashSet::default();
189-
190-
loop {
191-
let result = {
192-
let rwlock_guard = asset_server.filesystem_watcher.read();
193-
if let Some(filesystem_watcher) = rwlock_guard.as_ref() {
194-
filesystem_watcher.receiver.try_recv()
195-
} else {
196-
break;
197-
}
198-
};
199-
let event = match result {
200-
Ok(result) => result.unwrap(),
201-
Err(TryRecvError::Empty) => break,
202-
Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"),
203-
};
204-
if let notify::event::Event {
205-
kind: notify::event::EventKind::Modify(_),
206-
paths,
207-
..
208-
} = event
209-
{
210-
for path in paths.iter() {
211-
if !changed.contains(path) {
212-
let root_path = asset_server.get_root_path().unwrap();
213-
let relative_path = path.strip_prefix(root_path).unwrap();
214-
match asset_server.load_untyped(relative_path) {
215-
Ok(_) => {}
216-
Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error),
217-
Err(_) => {}
218-
}
219-
}
220-
}
221-
changed.extend(paths);
222-
}
223-
}
224-
}
225-
226176
fn get_root_path(&self) -> Result<PathBuf, AssetServerError> {
227177
if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") {
228178
Ok(PathBuf::from(manifest_dir))
@@ -315,12 +265,21 @@ impl AssetServer {
315265
}
316266
};
317267

318-
self.send_request_to_loader_thread(LoadRequest {
268+
let load_request = LoadRequest {
319269
handle_id,
320270
path: path.to_owned(),
321271
handler_index: *index,
322272
version: new_version,
323-
});
273+
};
274+
275+
let asset_handlers = self.asset_handlers.clone();
276+
self.task_pool
277+
.spawn(async move {
278+
let handlers = asset_handlers.read();
279+
let request_handler = &handlers[load_request.handler_index];
280+
request_handler.handle_request(&load_request);
281+
})
282+
.detach();
324283

325284
// TODO: watching each asset explicitly is a simpler implementation, its possible it would be more efficient to watch
326285
// folders instead (when possible)
@@ -370,56 +329,6 @@ impl AssetServer {
370329
Some(load_state)
371330
}
372331

373-
fn send_request_to_loader_thread(&self, load_request: LoadRequest) {
374-
// NOTE: This lock makes the call to Arc::strong_count safe. Removing (or reordering) it could result in undefined behavior
375-
let mut loader_threads = self.loader_threads.write();
376-
if loader_threads.len() < self.max_loader_threads {
377-
let loader_thread = LoaderThread {
378-
requests: Arc::new(RwLock::new(vec![load_request])),
379-
};
380-
let requests = loader_thread.requests.clone();
381-
loader_threads.push(loader_thread);
382-
Self::start_thread(self.asset_handlers.clone(), requests);
383-
} else {
384-
let most_free_thread = loader_threads
385-
.iter()
386-
.min_by_key(|l| l.requests.read().len())
387-
.unwrap();
388-
let mut requests = most_free_thread.requests.write();
389-
requests.push(load_request);
390-
// if most free thread only has one reference, the thread as spun down. if so, we need to spin it back up!
391-
if Arc::strong_count(&most_free_thread.requests) == 1 {
392-
Self::start_thread(
393-
self.asset_handlers.clone(),
394-
most_free_thread.requests.clone(),
395-
);
396-
}
397-
}
398-
}
399-
400-
fn start_thread(
401-
request_handlers: Arc<RwLock<Vec<Box<dyn AssetLoadRequestHandler>>>>,
402-
requests: Arc<RwLock<Vec<LoadRequest>>>,
403-
) {
404-
thread::spawn(move || {
405-
loop {
406-
let request = {
407-
let mut current_requests = requests.write();
408-
if current_requests.len() == 0 {
409-
// if there are no requests, spin down the thread
410-
break;
411-
}
412-
413-
current_requests.pop().unwrap()
414-
};
415-
416-
let handlers = request_handlers.read();
417-
let request_handler = &handlers[request.handler_index];
418-
request_handler.handle_request(&request);
419-
}
420-
});
421-
}
422-
423332
fn load_assets_in_folder_recursive(
424333
&self,
425334
path: &Path,
@@ -456,3 +365,43 @@ impl AssetServer {
456365
Ok(handle_ids)
457366
}
458367
}
368+
369+
#[cfg(feature = "filesystem_watcher")]
370+
pub fn filesystem_watcher_system(asset_server: Res<AssetServer>) {
371+
let mut changed = HashSet::default();
372+
373+
loop {
374+
let result = {
375+
let rwlock_guard = asset_server.filesystem_watcher.read();
376+
if let Some(filesystem_watcher) = rwlock_guard.as_ref() {
377+
filesystem_watcher.receiver.try_recv()
378+
} else {
379+
break;
380+
}
381+
};
382+
let event = match result {
383+
Ok(result) => result.unwrap(),
384+
Err(TryRecvError::Empty) => break,
385+
Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"),
386+
};
387+
if let notify::event::Event {
388+
kind: notify::event::EventKind::Modify(_),
389+
paths,
390+
..
391+
} = event
392+
{
393+
for path in paths.iter() {
394+
if !changed.contains(path) {
395+
let root_path = asset_server.get_root_path().unwrap();
396+
let relative_path = path.strip_prefix(root_path).unwrap();
397+
match asset_server.load_untyped(relative_path) {
398+
Ok(_) => {}
399+
Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error),
400+
Err(_) => {}
401+
}
402+
}
403+
}
404+
changed.extend(paths);
405+
}
406+
}
407+
}

crates/bevy_asset/src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod loader;
88

99
pub use asset_server::*;
1010
pub use assets::*;
11+
use bevy_tasks::IoTaskPool;
1112
pub use handle::*;
1213
pub use load_request::*;
1314
pub use loader::*;
@@ -33,15 +34,21 @@ pub struct AssetPlugin;
3334

3435
impl Plugin for AssetPlugin {
3536
fn build(&self, app: &mut AppBuilder) {
37+
let task_pool = app
38+
.resources()
39+
.get::<IoTaskPool>()
40+
.expect("IoTaskPool resource not found")
41+
.0
42+
.clone();
3643
app.add_stage_before(bevy_app::stage::PRE_UPDATE, stage::LOAD_ASSETS)
3744
.add_stage_after(bevy_app::stage::POST_UPDATE, stage::ASSET_EVENTS)
38-
.init_resource::<AssetServer>()
45+
.add_resource(AssetServer::new(task_pool))
3946
.register_property::<HandleId>();
4047

4148
#[cfg(feature = "filesystem_watcher")]
4249
app.add_system_to_stage(
4350
stage::LOAD_ASSETS,
44-
AssetServer::filesystem_watcher_system.system(),
51+
asset_server::filesystem_watcher_system.system(),
4552
);
4653
}
4754
}

crates/bevy_core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ bevy_property = { path = "../bevy_property", version = "0.2.1" }
2626
bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" }
2727
bevy_math = { path = "../bevy_math", version = "0.2.1" }
2828
bevy_utils = { path = "../bevy_utils", version = "0.2.1" }
29+
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
30+
31+
log = { version = "0.4", features = ["release_max_level_info"] }
2932

3033
[target.'cfg(target_arch = "wasm32")'.dependencies]
3134
instant = "0.1.6"

crates/bevy_core/src/lib.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
mod bytes;
22
mod float_ord;
33
mod label;
4+
mod task_pool_options;
45
mod time;
56

67
pub use bytes::*;
78
pub use float_ord::*;
89
pub use label::*;
10+
pub use task_pool_options::DefaultTaskPoolOptions;
911
pub use time::*;
1012

1113
pub mod prelude {
12-
pub use crate::{EntityLabels, Labels, Time, Timer};
14+
pub use crate::{DefaultTaskPoolOptions, EntityLabels, Labels, Time, Timer};
1315
}
1416

1517
use bevy_app::prelude::*;
@@ -23,6 +25,12 @@ pub struct CorePlugin;
2325

2426
impl Plugin for CorePlugin {
2527
fn build(&self, app: &mut AppBuilder) {
28+
// Setup the default bevy task pools
29+
app.resources_mut()
30+
.get_cloned::<DefaultTaskPoolOptions>()
31+
.unwrap_or_else(DefaultTaskPoolOptions::default)
32+
.create_default_pools(app.resources_mut());
33+
2634
app.init_resource::<Time>()
2735
.init_resource::<EntityLabels>()
2836
.register_component::<Timer>()

0 commit comments

Comments
 (0)