diff --git a/Cargo.toml b/Cargo.toml index 962e4a904..35f81af66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,12 +10,13 @@ crate-type = ["cdylib"] napi = { version = "2.16.16", default-features = false, features = ["napi4", "napi6", "async"] } napi-derive = "2.16.6" -scylla = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "v1.2.0" } +scylla = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "v1.2.0" , features = ["chrono-04"]} tokio = { version = "1.34", features = ["full"] } futures = "0.3" uuid = "1" regex = "1.11.1" thiserror = "2.0.12" +chrono = "0.4" [build-dependencies] @@ -47,3 +48,19 @@ path = "benchmark/logic_rust/concurrent_select.rs" [[bin]] name = "batch_benchmark" path = "benchmark/logic_rust/batch.rs" + +[[bin]] +name = "deser_benchmark" +path = "benchmark/logic_rust/deser.rs" + +[[bin]] +name = "concurrent_deser_benchmark" +path = "benchmark/logic_rust/concurrent_deser.rs" + +[[bin]] +name = "ser_benchmark" +path = "benchmark/logic_rust/ser.rs" + +[[bin]] +name = "concurrent_ser_benchmark" +path = "benchmark/logic_rust/concurrent_ser.rs" diff --git a/benchmark/README.md b/benchmark/README.md index 2f2596310..4f6c9b0a4 100644 --- a/benchmark/README.md +++ b/benchmark/README.md @@ -89,6 +89,60 @@ Rust: CNT= cargo run --bin select_benchmark -r ``` +- **concurrent deserialization** + +This benchmark uses `executeConcurrent` endpoint to insert `n` rows containing `uuid`, `int`, `timeuuid`, `inet`, `date`, `time` into the database. Afterwards it uses `executeConcurrent` endpoint to select all (`n`) of the inserted rows from the database `n` times. + +JS: +``` +node concurrent_deser.js +``` +Rust: +``` +CNT= cargo run --bin concurrent_deser_benchmark -r +``` + +- **deserialization** + +This benchmark executes `n` `client.execute` queries, that insert a single row containing `uuid`, `int`, `timeuuid`, `inet`, `date`, `time` waiting for the result of the previous query before executing the next one. Afterwards it executes `n` `client.execute` queries, that select all (`n`) of the inserted rows, waiting for the result of the previous query before executing the next one. + +JS: +``` +node deser.js +``` +Rust: +``` +CNT= cargo run --bin deser_benchmark -r +``` + + + +- **concurrent serialization** + +This benchmark uses `executeConcurrent` endpoint to insert `n*n` rows containing `uuid`, `int`, `timeuuid`, `inet`, `date`, `time` into the database. + +JS: +``` +node concurrent_ser.js +``` +Rust: +``` +CNT= cargo run --bin concurrent_ser_benchmark -r +``` + +- **serialization** + +This benchmark executes `n*n` `client.execute` queries, that insert a single row containing `uuid`, `int`, `timeuuid`, `inet`, `date`, `time` waiting for the result of the previous query before executing the next one. + +JS: +``` +node ser.js +``` +Rust: +``` +CNT= cargo run --bin ser_benchmark -r +``` + - **batch** This benchmark uses `client.batch` endpoint to insert `n` rows containing `uuid` and `int` into the database. Afterwards, it checks that the number of rows inserted is correct. diff --git a/benchmark/logic/concurrent_deser.js b/benchmark/logic/concurrent_deser.js new file mode 100644 index 000000000..062ab8016 --- /dev/null +++ b/benchmark/logic/concurrent_deser.js @@ -0,0 +1,49 @@ +"use strict"; +const async = require("async"); +// Possible values of argv[2] (driver) are scylladb-javascript-driver and cassandra-driver. +const cassandra = require(process.argv[2]); +const utils = require("./utils"); +const { exit } = require("process"); + +const client = new cassandra.Client(utils.getClientArgs()); +const iterCnt = parseInt(process.argv[3]); + +async.series( + [ + function initialize(next) { + utils.prepareDatabase(client, utils.tableSchemaDesSer, next); + }, + async function insert(next) { + let allParameters = utils.insertConcurrentDeSer(cassandra, iterCnt); + try { + const _result = await cassandra.concurrent.executeConcurrent(client, allParameters, { prepare: true }); + } catch (err) { + return next(err); + } + next(); + }, + async function select(next) { + + let allParameters = []; + for (let i = 0; i < iterCnt; i++) { + allParameters.push({ + query: 'SELECT * FROM benchmarks.basic', + }); + } + try { + const _result = await cassandra.concurrent.executeConcurrent(client, allParameters, { prepare: true, collectResults: true }); + } catch (err) { + return next(err); + } + next(); + }, + function r() { + exit(0); + } + ], function (err) { + if (err) { + console.error("Error: ", err.message, err.stack); + exit(1); + } + },); + diff --git a/benchmark/logic/concurrent_ser.js b/benchmark/logic/concurrent_ser.js new file mode 100644 index 000000000..57b78e4a4 --- /dev/null +++ b/benchmark/logic/concurrent_ser.js @@ -0,0 +1,34 @@ +"use strict"; +const async = require("async"); +// Possible values of argv[2] (driver) are scylladb-javascript-driver and cassandra-driver. +const cassandra = require(process.argv[2]); +const utils = require("./utils"); +const { exit } = require("process"); + +const client = new cassandra.Client(utils.getClientArgs()); +const iterCnt = parseInt(process.argv[3]); + +async.series( + [ + function initialize(next) { + utils.prepareDatabase(client, utils.tableSchemaDesSer, next); + }, + async function insert(next) { + let allParameters = utils.insertConcurrentDeSer(cassandra, iterCnt * iterCnt); + try { + const _result = await cassandra.concurrent.executeConcurrent(client, allParameters, { prepare: true }); + } catch (err) { + return next(err); + } + next(); + }, + function r() { + exit(0); + } + ], function (err) { + if (err) { + console.error("Error: ", err.message, err.stack); + exit(1); + } + },); + diff --git a/benchmark/logic/deser.js b/benchmark/logic/deser.js new file mode 100644 index 000000000..01b3e845b --- /dev/null +++ b/benchmark/logic/deser.js @@ -0,0 +1,49 @@ +"use strict"; +const async = require("async"); +// Possible values of argv[2] (driver) are scylladb-javascript-driver and cassandra-driver. +const cassandra = require(process.argv[2]); +const utils = require("./utils"); +const { exit } = require("process"); + +const client = new cassandra.Client(utils.getClientArgs()); +const iterCount = parseInt(process.argv[3]); + +async.series( + [ + function initialize(next) { + utils.prepareDatabase(client, utils.tableSchemaDesSer, next); + }, + async function insert(next) { + for (let i = 0; i < iterCount; i++) { + try { + await client.execute(utils.DesSerInsertStatement, utils.insertDeSer(cassandra), { prepare: true }); + } catch (err) { + return next(err); + } + } + next(); + }, + async function select(next) { + const query = "SELECT * FROM benchmarks.basic"; + for (let i = 0; i < iterCount; i++) { + try { + await client.execute(query); + } catch (err) { + return next(err); + } + } + next(); + }, + function r() { + exit(0); + } + ], + function (err) { + if (err) { + console.error("There was an error", err.message, err.stack); + exit(1); + } + + }, +); + diff --git a/benchmark/logic/ser.js b/benchmark/logic/ser.js new file mode 100644 index 000000000..274f28f1a --- /dev/null +++ b/benchmark/logic/ser.js @@ -0,0 +1,38 @@ +"use strict"; +const async = require("async"); +// Possible values of argv[2] (driver) are scylladb-javascript-driver and cassandra-driver. +const cassandra = require(process.argv[2]); +const utils = require("./utils"); +const { exit } = require("process"); + +const client = new cassandra.Client(utils.getClientArgs()); +const iterCount = parseInt(process.argv[3]); + +async.series( + [ + function initialize(next) { + utils.prepareDatabase(client, utils.tableSchemaDesSer, next); + }, + async function insert(next) { + for (let i = 0; i < iterCount * iterCount; i++) { + try { + await client.execute(utils.DesSerInsertStatement, utils.insertDeSer(cassandra), { prepare: true }); + } catch (err) { + return next(err); + } + } + next(); + }, + function r() { + exit(0); + } + ], + function (err) { + if (err) { + console.error("There was an error", err.message, err.stack); + exit(1); + } + + }, +); + diff --git a/benchmark/logic/utils.js b/benchmark/logic/utils.js index 37d89ee82..8663aa50a 100644 --- a/benchmark/logic/utils.js +++ b/benchmark/logic/utils.js @@ -1,8 +1,12 @@ "use strict"; +const { randomBytes } = require("crypto"); +const utils = require("../../lib/utils"); const { _Client } = require("../../main"); const tableSchemaBasic = "CREATE TABLE benchmarks.basic (id uuid, val int, PRIMARY KEY(id))"; +const tableSchemaDesSer = "CREATE TABLE benchmarks.basic (id uuid, val int, tuuid timeuuid, ip inet, date date, time time, PRIMARY KEY(id))"; +const DesSerInsertStatement = "INSERT INTO benchmarks.basic (id, val, tuuid, ip, date, time) VALUES (?, ?, ?, ?, ?, ?)"; const singleStepCount = 1000000; function getClientArgs() { @@ -52,7 +56,36 @@ async function repeatCapped(callback, n) { } + + + +function insertDeSer(cassandra) { + const id = cassandra.types.Uuid.random(); + const tuid = cassandra.types.TimeUuid.fromString("8e14e760-7fa8-11eb-bc66-000000000001"); + const ip = new cassandra.types.InetAddress(utils.allocBufferFromArray(randomBytes(4))); + const date = cassandra.types.LocalDate.now(); + const time = cassandra.types.LocalTime.now(); + + return [id, 100, tuid, ip, date, time]; +} + +function insertConcurrentDeSer(cassandra, n) { + let allParameters = []; + for (let i = 0; i < n; i++) { + allParameters.push({ + query: "INSERT INTO benchmarks.basic (id, val, tuuid, ip, date, time) VALUES (?, ?, ?, ?, ?, ?)", + params: insertDeSer(cassandra) + }); + } + return allParameters; +} + +exports.getClientArgs = getClientArgs; +exports.insertDeSer = insertDeSer; exports.tableSchemaBasic = tableSchemaBasic; +exports.tableSchemaDesSer = tableSchemaDesSer; +exports.DesSerInsertStatement = DesSerInsertStatement; exports.getClientArgs = getClientArgs; exports.prepareDatabase = prepareDatabase; +exports.insertConcurrentDeSer = insertConcurrentDeSer; exports.repeatCapped = repeatCapped; diff --git a/benchmark/logic_rust/concurrent_deser.rs b/benchmark/logic_rust/concurrent_deser.rs new file mode 100644 index 000000000..d7e84a2fb --- /dev/null +++ b/benchmark/logic_rust/concurrent_deser.rs @@ -0,0 +1,129 @@ +use chrono::Local; +use futures::future::join_all; +use scylla::client::session::Session; +use scylla::client::session_builder::SessionBuilder; +use scylla::statement::prepared::PreparedStatement; +use scylla::value::CqlTimeuuid; +use std::env; +use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; +use std::sync::Arc; +use uuid::Uuid; + +const CONCURRENCY: usize = 2000; + +async fn insert_data( + session: Arc, + start_index: usize, + n: i32, + insert_query: &PreparedStatement, +) -> Result<(), Box> { + let mut index = start_index; + + while index < n as usize { + let id = Uuid::new_v4(); + let tuuid = CqlTimeuuid::from_str("8e14e760-7fa8-11eb-bc66-000000000001")?; + let ip: IpAddr = IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)); + let now = Local::now(); + let date = now.date_naive(); + let time = now.time(); + + session + .execute_unpaged(insert_query, (id, 100, tuuid, ip, date, time)) + .await?; + index += CONCURRENCY; + } + + Ok(()) +} + +async fn select_data( + session: Arc, + start_index: usize, + n: i32, + select_query: &PreparedStatement, +) -> Result<(), Box> { + let mut index = start_index; + + while index < n as usize { + session.execute_unpaged(select_query, &[]).await?; + index += CONCURRENCY; + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let n: i32 = env::var("CNT") + .ok() + .and_then(|s: String| s.parse::().ok()) + .expect("CNT parameter is required."); + + let uri: String = env::var("SCYLLA_URI").unwrap_or_else(|_| "172.42.0.2:9042".to_string()); + + let session = SessionBuilder::new().known_node(uri).build().await?; + + session + .query_unpaged( + "CREATE KEYSPACE IF NOT EXISTS benchmarks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }", + &[], + ) + .await?; + + session + .query_unpaged("DROP TABLE IF EXISTS benchmarks.basic", &[]) + .await?; + + session + .query_unpaged( + "CREATE TABLE benchmarks.basic (id uuid, val int, tuuid timeuuid, ip inet, date date, time time, PRIMARY KEY(id))", + &[], + ) + .await?; + + let insert_query = session + .prepare("INSERT INTO benchmarks.basic (id, val, tuuid, ip, date, time) VALUES (?, ?, ?, ?, ?, ?)") + .await?; + + let mut handles = vec![]; + let session = Arc::new(session); + + for i in 0..CONCURRENCY { + let session_clone = Arc::clone(&session); + let insert_query_clone = insert_query.clone(); + handles.push(tokio::spawn(async move { + insert_data(session_clone, i, n, &insert_query_clone) + .await + .unwrap(); + })); + } + + let results = join_all(handles).await; + + for result in results { + result.unwrap(); + } + + let select_query = session.prepare("SELECT * FROM benchmarks.basic").await?; + + let mut handles = vec![]; + + for i in 0..CONCURRENCY { + let session_clone = Arc::clone(&session); + let select_query_clone = select_query.clone(); + handles.push(tokio::spawn(async move { + select_data(session_clone, i, n, &select_query_clone) + .await + .unwrap(); + })); + } + + let results = join_all(handles).await; + + for result in results { + result.unwrap(); + } + + Ok(()) +} diff --git a/benchmark/logic_rust/concurrent_ser.rs b/benchmark/logic_rust/concurrent_ser.rs new file mode 100644 index 000000000..4216174c3 --- /dev/null +++ b/benchmark/logic_rust/concurrent_ser.rs @@ -0,0 +1,93 @@ +use chrono::Local; +use futures::future::join_all; +use scylla::client::session::Session; +use scylla::client::session_builder::SessionBuilder; +use scylla::statement::prepared::PreparedStatement; +use scylla::value::CqlTimeuuid; +use std::env; +use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; +use std::sync::Arc; +use uuid::Uuid; + +const CONCURRENCY: usize = 2000; + +async fn insert_data( + session: Arc, + start_index: usize, + n: i32, + insert_query: &PreparedStatement, +) -> Result<(), Box> { + let mut index = start_index; + + while index < n as usize { + let id = Uuid::new_v4(); + let tuuid = CqlTimeuuid::from_str("8e14e760-7fa8-11eb-bc66-000000000001")?; + let ip: IpAddr = IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)); + let now = Local::now(); + let date = now.date_naive(); + let time = now.time(); + + session + .execute_unpaged(insert_query, (id, 100, tuuid, ip, date, time)) + .await?; + index += CONCURRENCY; + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let n: i32 = env::var("CNT") + .ok() + .and_then(|s: String| s.parse::().ok()) + .expect("CNT parameter is required."); + + let uri: String = env::var("SCYLLA_URI").unwrap_or_else(|_| "172.42.0.2:9042".to_string()); + + let session = SessionBuilder::new().known_node(uri).build().await?; + + session + .query_unpaged( + "CREATE KEYSPACE IF NOT EXISTS benchmarks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }", + &[], + ) + .await?; + + session + .query_unpaged("DROP TABLE IF EXISTS benchmarks.basic", &[]) + .await?; + + session + .query_unpaged( + "CREATE TABLE benchmarks.basic (id uuid, val int, tuuid timeuuid, ip inet, date date, time time, PRIMARY KEY(id))", + &[], + ) + .await?; + + let insert_query = session + .prepare("INSERT INTO benchmarks.basic (id, val, tuuid, ip, date, time) VALUES (?, ?, ?, ?, ?, ?)") + .await?; + + let mut handles = vec![]; + let session = Arc::new(session); + + for i in 0..CONCURRENCY { + let session_clone = Arc::clone(&session); + let insert_query_clone = insert_query.clone(); + handles.push(tokio::spawn(async move { + insert_data(session_clone, i, n * n, &insert_query_clone) + .await + .unwrap(); + })); + } + + let results = join_all(handles).await; + + for result in results { + result.unwrap(); + } + + Ok(()) +} diff --git a/benchmark/logic_rust/deser.rs b/benchmark/logic_rust/deser.rs new file mode 100644 index 000000000..a44d883dc --- /dev/null +++ b/benchmark/logic_rust/deser.rs @@ -0,0 +1,72 @@ +use chrono::Local; +use scylla::value::CqlTimeuuid; +use scylla::{ + client::{caching_session::CachingSession, session_builder::SessionBuilder}, + statement::Statement, +}; +use std::env; +use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; +use uuid::Uuid; + +const DEFAULT_CACHE_SIZE: u32 = 512; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let n: i32 = env::var("CNT") + .ok() + .and_then(|s: String| s.parse::().ok()) + .expect("CNT parameter is required."); + + let uri: String = env::var("SCYLLA_URI").unwrap_or_else(|_| "172.42.0.2:9042".to_string()); + + let session = SessionBuilder::new().known_node(uri).build().await?; + + let session: CachingSession = CachingSession::from(session, DEFAULT_CACHE_SIZE as usize); + + session + .execute_unpaged( + "CREATE KEYSPACE IF NOT EXISTS benchmarks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }", + &[], + ) + .await?; + + session + .execute_unpaged("DROP TABLE IF EXISTS benchmarks.basic", &[]) + .await?; + + session + .execute_unpaged( + "CREATE TABLE benchmarks.basic (id uuid, val int, tuuid timeuuid, ip inet, date date, time time, PRIMARY KEY(id))", + &[], + ) + .await?; + + let insert_query = + "INSERT INTO benchmarks.basic (id, val, tuuid, ip, date, time) VALUES (?, ?, ?, ?, ?, ?)"; + + for _ in 0..n { + // use CachingSession as it is used in the scylla-javascript-driver + let statement: Statement = insert_query.into(); + let prepared = session.add_prepared_statement(&statement).await?; + + let id = Uuid::new_v4(); + let tuuid = CqlTimeuuid::from_str("8e14e760-7fa8-11eb-bc66-000000000001")?; + let ip: IpAddr = IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)); + let now = Local::now(); + let date = now.date_naive(); + let time = now.time(); + + session + .get_session() + .execute_unpaged(&prepared, (id, 100, tuuid, ip, date, time)) + .await?; + } + + let select_query = "SELECT * FROM benchmarks.basic"; + for _ in 0..n { + let _ = session.execute_unpaged(select_query, &[]).await?; + } + + Ok(()) +} diff --git a/benchmark/logic_rust/ser.rs b/benchmark/logic_rust/ser.rs new file mode 100644 index 000000000..60b1debf0 --- /dev/null +++ b/benchmark/logic_rust/ser.rs @@ -0,0 +1,67 @@ +use chrono::Local; +use scylla::value::CqlTimeuuid; +use scylla::{ + client::{caching_session::CachingSession, session_builder::SessionBuilder}, + statement::Statement, +}; +use std::env; +use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; +use uuid::Uuid; + +const DEFAULT_CACHE_SIZE: u32 = 512; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let n: i32 = env::var("CNT") + .ok() + .and_then(|s: String| s.parse::().ok()) + .expect("CNT parameter is required."); + + let uri: String = env::var("SCYLLA_URI").unwrap_or_else(|_| "172.42.0.2:9042".to_string()); + + let session = SessionBuilder::new().known_node(uri).build().await?; + + let session: CachingSession = CachingSession::from(session, DEFAULT_CACHE_SIZE as usize); + + session + .execute_unpaged( + "CREATE KEYSPACE IF NOT EXISTS benchmarks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }", + &[], + ) + .await?; + + session + .execute_unpaged("DROP TABLE IF EXISTS benchmarks.basic", &[]) + .await?; + + session + .execute_unpaged( + "CREATE TABLE benchmarks.basic (id uuid, val int, tuuid timeuuid, ip inet, date date, time time, PRIMARY KEY(id))", + &[], + ) + .await?; + + let insert_query = + "INSERT INTO benchmarks.basic (id, val, tuuid, ip, date, time) VALUES (?, ?, ?, ?, ?, ?)"; + + for _ in 0..n * n { + // use CachingSession as it is used in the scylla-javascript-driver + let statement: Statement = insert_query.into(); + let prepared = session.add_prepared_statement(&statement).await?; + + let id = Uuid::new_v4(); + let tuuid = CqlTimeuuid::from_str("8e14e760-7fa8-11eb-bc66-000000000001")?; + let ip: IpAddr = IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)); + let now = Local::now(); + let date = now.date_naive(); + let time = now.time(); + + session + .get_session() + .execute_unpaged(&prepared, (id, 100, tuuid, ip, date, time)) + .await?; + } + + Ok(()) +} diff --git a/benchmark/runner.py b/benchmark/runner.py index 691d05803..b01e9a981 100644 --- a/benchmark/runner.py +++ b/benchmark/runner.py @@ -59,6 +59,10 @@ def run_process(command): n_min["concurrent_select.js"] = 400_000 / 64 n_min["insert.js"] = 400_000 / 64 n_min["select.js"] = 100_000 / 64 +n_min["deser.js"] = 2_000 / 64 +n_min["concurrent_deser.js"] = 2_000 / 64 +n_min["ser.js"] = 900 / 64 +n_min["concurrent_ser.js"] = 1_200 / 64 n_min["batch.js"] = 3_000_000 / 64 steps = {} @@ -67,14 +71,20 @@ def run_process(command): # --------- libs and rust benchmark names ---------- libs = ["scylladb-javascript-driver", "cassandra-driver"] -benchmarks = ["concurrent_insert.js", "insert.js", "select.js", - "concurrent_select.js", "batch.js"] +benchmarks = [ "insert.js", "concurrent_insert.js", "select.js", + "concurrent_select.js", "deser.js", + "concurrent_deser.js", "ser.js", + "concurrent_ser.js", "batch.js"] name_rust = {} name_rust["concurrent_insert.js"] = "concurrent_insert_benchmark" name_rust["insert.js"] = "insert_benchmark" name_rust["select.js"] = "select_benchmark" name_rust["concurrent_select.js"] = "concurrent_select_benchmark" +name_rust["deser.js"] = "deser_benchmark" +name_rust["concurrent_deser.js"] = "concurrent_deser_benchmark" +name_rust["ser.js"] = "ser_benchmark" +name_rust["concurrent_ser.js"] = "concurrent_ser_benchmark" name_rust["batch.js"] = "batch_benchmark" @@ -147,12 +157,12 @@ def run_process(command): libs.append("rust-driver") -cols = 3 +cols = 4 rows_time = (len(df) + cols - 1) // cols rows_mem = (len(df_mem) + cols - 1) // cols total_rows = rows_time + rows_mem -fig, axes = plt.subplots(total_rows, cols, figsize=(15, 5 * total_rows), +fig, axes = plt.subplots(total_rows, cols, figsize=(20, 5 * total_rows), facecolor="white") axes = axes.flatten()