Skip to content

Commit 4103020

Browse files
committed
enable parking_lot feature in tokio
1 parent cbe680f commit 4103020

File tree

37 files changed

+151
-148
lines changed

37 files changed

+151
-148
lines changed

ballista-examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ datafusion = { path = "../datafusion" }
3333
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
3434
prost = "0.9"
3535
tonic = "0.6"
36-
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
36+
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
3737
futures = "0.3"
3838
num_cpus = "1.13.0"

ballista/rust/client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ log = "0.4"
3535
tokio = "1.0"
3636
tempfile = "3"
3737
sqlparser = "0.13"
38+
parking_lot = "0.11"
3839

3940
datafusion = { path = "../../../datafusion", version = "6.0.0" }
4041

ballista/rust/client/src/context.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
//! Distributed execution context.
1919
20+
use parking_lot::Mutex;
2021
use sqlparser::ast::Statement;
2122
use std::collections::HashMap;
2223
use std::fs;
2324
use std::path::PathBuf;
24-
use std::sync::{Arc, Mutex};
25+
use std::sync::Arc;
2526

2627
use ballista_core::config::BallistaConfig;
2728
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
@@ -142,7 +143,7 @@ impl BallistaContext {
142143

143144
// use local DataFusion context for now but later this might call the scheduler
144145
let mut ctx = {
145-
let guard = self.state.lock().unwrap();
146+
let guard = self.state.lock();
146147
create_df_ctx_with_ballista_query_planner(
147148
&guard.scheduler_host,
148149
guard.scheduler_port,
@@ -162,7 +163,7 @@ impl BallistaContext {
162163

163164
// use local DataFusion context for now but later this might call the scheduler
164165
let mut ctx = {
165-
let guard = self.state.lock().unwrap();
166+
let guard = self.state.lock();
166167
create_df_ctx_with_ballista_query_planner(
167168
&guard.scheduler_host,
168169
guard.scheduler_port,
@@ -186,7 +187,7 @@ impl BallistaContext {
186187

187188
// use local DataFusion context for now but later this might call the scheduler
188189
let mut ctx = {
189-
let guard = self.state.lock().unwrap();
190+
let guard = self.state.lock();
190191
create_df_ctx_with_ballista_query_planner(
191192
&guard.scheduler_host,
192193
guard.scheduler_port,
@@ -203,7 +204,7 @@ impl BallistaContext {
203204
name: &str,
204205
table: Arc<dyn TableProvider>,
205206
) -> Result<()> {
206-
let mut state = self.state.lock().unwrap();
207+
let mut state = self.state.lock();
207208
state.tables.insert(name.to_owned(), table);
208209
Ok(())
209210
}
@@ -280,7 +281,7 @@ impl BallistaContext {
280281
/// might require the schema to be inferred.
281282
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
282283
let mut ctx = {
283-
let state = self.state.lock().unwrap();
284+
let state = self.state.lock();
284285
create_df_ctx_with_ballista_query_planner(
285286
&state.scheduler_host,
286287
state.scheduler_port,
@@ -291,7 +292,7 @@ impl BallistaContext {
291292
let is_show = self.is_show_statement(sql).await?;
292293
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
293294
if is_show {
294-
let state = self.state.lock().unwrap();
295+
let state = self.state.lock();
295296
ctx = ExecutionContext::with_config(
296297
ExecutionConfig::new().with_information_schema(
297298
state.config.default_with_information_schema(),
@@ -301,7 +302,7 @@ impl BallistaContext {
301302

302303
// register tables with DataFusion context
303304
{
304-
let state = self.state.lock().unwrap();
305+
let state = self.state.lock();
305306
for (name, prov) in &state.tables {
306307
ctx.register_table(
307308
TableReference::Bare { table: name },
@@ -483,7 +484,7 @@ mod tests {
483484
.unwrap();
484485

485486
{
486-
let mut guard = context.state.lock().unwrap();
487+
let mut guard = context.state.lock();
487488
let csv_table = guard.tables.get("single_nan");
488489

489490
if let Some(table_provide) = csv_table {

ballista/rust/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ parse_arg = "0.1.3"
4848
arrow-flight = { version = "8.0.0" }
4949
datafusion = { path = "../../../datafusion", version = "6.0.0" }
5050

51+
parking_lot = "0.11"
52+
5153
[dev-dependencies]
5254
tempfile = "3"
5355

ballista/rust/core/src/client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
//! Client API for sending requests to executors.
1919
20-
use std::sync::{Arc, Mutex};
20+
use parking_lot::Mutex;
21+
use std::sync::Arc;
2122
use std::{collections::HashMap, pin::Pin};
2223
use std::{
2324
convert::{TryFrom, TryInto},
@@ -154,7 +155,7 @@ impl Stream for FlightDataStream {
154155
self: std::pin::Pin<&mut Self>,
155156
cx: &mut Context<'_>,
156157
) -> Poll<Option<Self::Item>> {
157-
let mut stream = self.stream.lock().expect("mutex is bad");
158+
let mut stream = self.stream.lock();
158159
stream.poll_next_unpin(cx).map(|x| match x {
159160
Some(flight_data_chunk_result) => {
160161
let converted_chunk = flight_data_chunk_result

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
2121
//! will use the ShuffleReaderExec to read these results.
2222
23+
use parking_lot::Mutex;
2324
use std::fs::File;
2425
use std::iter::Iterator;
2526
use std::path::PathBuf;
26-
use std::sync::{Arc, Mutex};
27+
use std::sync::Arc;
2728
use std::time::Instant;
2829
use std::{any::Any, pin::Pin};
2930

ballista/rust/executor/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ futures = "0.3"
4141
log = "0.4"
4242
snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
4343
tempfile = "3"
44-
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
44+
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
4545
tokio-stream = { version = "0.1", features = ["net"] }
4646
tonic = "0.6"
4747
uuid = { version = "0.8", features = ["v4"] }
4848
hyper = "0.14.4"
49+
parking_lot = "0.11"
4950

5051
[dev-dependencies]
5152

ballista/rust/scheduler/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true }
5353
tonic = "0.6"
5454
tower = { version = "0.4" }
5555
warp = "0.3"
56+
parking_lot = "0.11"
5657

5758
[dev-dependencies]
5859
ballista-core = { path = "../core", version = "0.6.0" }

benchmarks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ snmalloc = ["snmalloc-rs"]
3535
datafusion = { path = "../datafusion" }
3636
ballista = { path = "../ballista/rust/client" }
3737
structopt = { version = "0.3", default-features = false }
38-
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
38+
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
3939
futures = "0.3"
4040
env_logger = "0.9"
4141
mimalloc = { version = "0.1", optional = true, default-features = false }

datafusion-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ rust-version = "1.58"
2929
[dependencies]
3030
clap = { version = "3", features = ["derive", "cargo"] }
3131
rustyline = "9.0"
32-
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
32+
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
3333
datafusion = { path = "../datafusion", version = "6.0.0" }
3434
arrow = { version = "8.0.0" }
3535
ballista = { path = "../ballista/rust/client", version = "0.6.0" }

0 commit comments

Comments
 (0)