diff --git a/Cargo.toml b/Cargo.toml index 89de831..d02231a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,18 +28,20 @@ memory-pager = "0.9.0" merkle-tree-stream = "0.12.0" pretty-hash = "0.4.1" rand = "0.7.3" -random-access-disk = "1.1.0" -random-access-memory = "1.2.0" -random-access-storage = "3.0.0" +random-access-disk = "2.0.0" +random-access-memory = "2.0.0" +random-access-storage = "4.0.0" sha2 = "0.8.1" sleep-parser = "0.8.0" sparse-bitfield = "0.11.0" tree-index = "0.6.0" bitfield-rle = "0.1.1" +futures = "0.3.4" +async-std = "1.5.0" [dev-dependencies] quickcheck = "0.9.2" data-encoding = "2.2.0" remove_dir_all = "0.5.2" tempfile = "3.1.0" -async-std = "1.5.0" +async-std = { version = "1.5.0", features = ["attributes"] } diff --git a/README.md b/README.md index 40a6c69..013b9af 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,13 @@ WIP. Secure, distributed, append-only log structure. Adapted from ## Usage ```rust -let mut feed = hypercore::open("./feed.db")?; +let mut feed = hypercore::open("./feed.db").await?; -feed.append(b"hello")?; -feed.append(b"world")?; +feed.append(b"hello").await?; +feed.append(b"world").await?; -assert_eq!(feed.get(0)?, Some(b"hello".to_vec())); -assert_eq!(feed.get(1)?, Some(b"world".to_vec())); +assert_eq!(feed.get(0).await?, Some(b"hello".to_vec())); +assert_eq!(feed.get(1).await?, Some(b"world".to_vec())); ``` ## Installation diff --git a/benches/bench.rs b/benches/bench.rs index 74d396b..cde6db2 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -5,41 +5,51 @@ use anyhow::Error; use random_access_memory::RandomAccessMemory; use test::Bencher; -use hypercore::{Feed, Storage, Store}; +use hypercore::{Feed, Storage}; -fn create_feed(page_size: usize) -> Result, Error> { - let create = |_store: Store| Ok(RandomAccessMemory::new(page_size)); - let storage = Storage::new(create)?; - Ok(Feed::with_storage(storage)?) +async fn create_feed(page_size: usize) -> Result, Error> { + let storage = + Storage::new(|_| Box::pin(async move { Ok(RandomAccessMemory::new(page_size)) })).await?; + Feed::with_storage(storage).await } #[bench] fn create(b: &mut Bencher) { b.iter(|| { - create_feed(1024).unwrap(); + async_std::task::block_on(async { + create_feed(1024).await.unwrap(); + }); }); } #[bench] fn write(b: &mut Bencher) { - let mut feed = create_feed(1024).unwrap(); - let data = Vec::from("hello"); - b.iter(|| { - feed.append(&data).unwrap(); + async_std::task::block_on(async { + let mut feed = create_feed(1024).await.unwrap(); + let data = Vec::from("hello"); + b.iter(|| { + async_std::task::block_on(async { + feed.append(&data).await.unwrap(); + }); + }); }); } #[bench] fn read(b: &mut Bencher) { - let mut feed = create_feed(1024).unwrap(); - let data = Vec::from("hello"); - for _ in 0..1000 { - feed.append(&data).unwrap(); - } + async_std::task::block_on(async { + let mut feed = create_feed(1024).await.unwrap(); + let data = Vec::from("hello"); + for _ in 0..1000 { + feed.append(&data).await.unwrap(); + } - let mut i = 0; - b.iter(|| { - feed.get(i).unwrap(); - i += 1; + let mut i = 0; + b.iter(|| { + async_std::task::block_on(async { + feed.get(i).await.unwrap(); + i += 1; + }); + }); }); } diff --git a/examples/async.rs b/examples/async.rs index 0b854fe..a052bfe 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -5,17 +5,17 @@ use std::fmt::Debug; async fn append(feed: &mut Feed, content: &[u8]) where - T: RandomAccess> + Debug, + T: RandomAccess> + Debug + Send, { - feed.append(content).unwrap(); + feed.append(content).await.unwrap(); } async fn print(feed: &mut Feed) where - T: RandomAccess> + Debug, + T: RandomAccess> + Debug + Send, { - println!("{:?}", feed.get(0)); - println!("{:?}", feed.get(1)); + println!("{:?}", feed.get(0).await); + println!("{:?}", feed.get(1).await); } fn main() { diff --git a/examples/main.rs b/examples/main.rs index 5073d14..48b1659 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -1,11 +1,12 @@ use hypercore::Feed; -fn main() { +#[async_std::main] +async fn main() { let mut feed = Feed::default(); - feed.append(b"hello").unwrap(); - feed.append(b"world").unwrap(); + feed.append(b"hello").await.unwrap(); + feed.append(b"world").await.unwrap(); - println!("{:?}", feed.get(0)); // prints "hello" - println!("{:?}", feed.get(1)); // prints "world" + println!("{:?}", feed.get(0).await); // prints "hello" + println!("{:?}", feed.get(1).await); // prints "world" } diff --git a/src/feed.rs b/src/feed.rs index 7f62857..c14b13a 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -47,11 +47,11 @@ where impl Feed where - T: RandomAccess> + Debug, + T: RandomAccess> + Debug + Send, { /// Create a new instance with a custom storage backend. - pub fn with_storage(mut storage: crate::storage::Storage) -> Result { - match storage.read_partial_keypair() { + pub async fn with_storage(mut storage: crate::storage::Storage) -> Result { + match storage.read_partial_keypair().await { Some(partial_keypair) => { let builder = FeedBuilder::new(partial_keypair.public, storage); @@ -60,19 +60,17 @@ where return Ok(builder.build()?); } - Ok(builder - .secret_key(partial_keypair.secret.unwrap()) - .build()?) + builder.secret_key(partial_keypair.secret.unwrap()).build() } None => { // we have no keys, generate a pair and save them to the storage let keypair = generate_keypair(); - storage.write_public_key(&keypair.public)?; - storage.write_secret_key(&keypair.secret)?; + storage.write_public_key(&keypair.public).await?; + storage.write_secret_key(&keypair.secret).await?; - Ok(FeedBuilder::new(keypair.public, storage) + FeedBuilder::new(keypair.public, storage) .secret_key(keypair.secret) - .build()?) + .build() } } } @@ -102,7 +100,7 @@ where /// Append data into the log. #[inline] - pub fn append(&mut self, data: &[u8]) -> Result<()> { + pub async fn append(&mut self, data: &[u8]) -> Result<()> { let key = match &self.secret_key { Some(key) => key, None => bail!("no secret key, cannot append."), @@ -110,16 +108,18 @@ where self.merkle.next(data); let mut offset = 0; - self.storage.write_data(self.byte_length + offset, &data)?; + self.storage + .write_data(self.byte_length + offset, &data) + .await?; offset += data.len() as u64; let hash = Hash::from_roots(self.merkle.roots()); let index = self.length; let signature = sign(&self.public_key, key, hash.as_bytes()); - self.storage.put_signature(index, signature)?; + self.storage.put_signature(index, signature).await?; for node in self.merkle.nodes() { - self.storage.put_node(node)?; + self.storage.put_node(node).await?; } self.byte_length += offset; @@ -134,10 +134,10 @@ where /// Get the block of data at the tip of the feed. This will be the most /// recently appended block. #[inline] - pub fn head(&mut self) -> Result>> { + pub async fn head(&mut self) -> Result>> { match self.len() { 0 => Ok(None), - len => self.get(len - 1), + len => self.get(len - 1).await, } } @@ -162,23 +162,23 @@ where /// Retrieve data from the log. #[inline] - pub fn get(&mut self, index: u64) -> Result>> { + pub async fn get(&mut self, index: u64) -> Result>> { if !self.bitfield.get(index) { // NOTE: Do (network) lookup here once we have network code. return Ok(None); } - Ok(Some(self.storage.get_data(index)?)) + Ok(Some(self.storage.get_data(index).await?)) } /// Return the Nodes which prove the correctness for the Node at index. #[inline] - pub fn proof(&mut self, index: u64, include_hash: bool) -> Result { - self.proof_with_digest(index, 0, include_hash) + pub async fn proof(&mut self, index: u64, include_hash: bool) -> Result { + self.proof_with_digest(index, 0, include_hash).await } /// Return the Nodes which prove the correctness for the Node at index with a /// digest. - pub fn proof_with_digest( + pub async fn proof_with_digest( &mut self, index: u64, digest: u64, @@ -205,7 +205,7 @@ where let signature = if has_underflow { None } else { - match self.storage.get_signature(sig_index) { + match self.storage.get_signature(sig_index).await { Ok(sig) => Some(sig), Err(_) => None, } @@ -213,7 +213,7 @@ where let mut nodes = Vec::with_capacity(proof.nodes().len()); for index in proof.nodes() { - let node = self.storage.get_node(*index)?; + let node = self.storage.get_node(*index).await?; nodes.push(node); } @@ -232,7 +232,7 @@ where /// Insert data into the tree at `index`. Verifies the `proof` when inserting /// to make sure data is correct. Useful when replicating data from a remote /// host. - pub fn put(&mut self, index: u64, data: Option<&[u8]>, mut proof: Proof) -> Result<()> { + pub async fn put(&mut self, index: u64, data: Option<&[u8]>, mut proof: Proof) -> Result<()> { let mut next = tree_index(index); let mut trusted: Option = None; let mut missing = vec![]; @@ -265,13 +265,13 @@ where let mut missing_nodes = vec![]; for index in missing { - let node = self.storage.get_node(index)?; + let node = self.storage.get_node(index).await?; missing_nodes.push(node); } let mut trusted_node = None; if let Some(index) = trusted { - let node = self.storage.get_node(index)?; + let node = self.storage.get_node(index).await?; trusted_node = Some(node); } @@ -287,7 +287,7 @@ where // check if we already have the hash for this node if verify_node(&trusted_node, &top) { - self.write(index, data, &visited, None)?; + self.write(index, data, &visited, None).await?; return Ok(()); } @@ -303,9 +303,9 @@ where node = missing_nodes.remove(0); } else { // TODO: panics here - let nodes = self.verify_roots(&top, &mut proof)?; + let nodes = self.verify_roots(&top, &mut proof).await?; visited.extend_from_slice(&nodes); - self.write(index, data, &visited, proof.signature)?; + self.write(index, data, &visited, proof.signature).await?; return Ok(()); } @@ -315,7 +315,7 @@ where top = Node::new(flat::parent(top.index), hash.as_bytes().into(), len); if verify_node(&trusted_node, &top) { - self.write(index, data, &visited, None)?; + self.write(index, data, &visited, None).await?; return Ok(()); } } @@ -336,7 +336,7 @@ where // - ._writeDone() // // Arguments are: (index, data, node, sig, from, cb) - fn write( + async fn write( &mut self, index: u64, data: Option<&[u8]>, @@ -344,16 +344,16 @@ where sig: Option, ) -> Result<()> { for node in nodes { - self.storage.put_node(node)?; + self.storage.put_node(node).await?; } if let Some(data) = data { - self.storage.put_data(index, data, &nodes)?; + self.storage.put_data(index, data, &nodes).await?; } if let Some(sig) = sig { let sig = sig.borrow(); - self.storage.put_signature(index, sig)?; + self.storage.put_signature(index, sig).await?; } for node in nodes { @@ -385,18 +385,18 @@ where } /// Get a signature from the store. - pub fn signature(&mut self, index: u64) -> Result { + pub async fn signature(&mut self, index: u64) -> Result { ensure!( index < self.length, format!("No signature found for index {}", index) ); - Ok(self.storage.next_signature(index)?) + self.storage.next_signature(index).await } /// Verify the entire feed. Checks a signature against the signature of all /// root nodes combined. - pub fn verify(&mut self, index: u64, signature: &Signature) -> Result<()> { - let roots = self.root_hashes(index)?; + pub async fn verify(&mut self, index: u64, signature: &Signature) -> Result<()> { + let roots = self.root_hashes(index).await?; let roots: Vec<_> = roots.into_iter().map(Arc::new).collect(); let message = Hash::from_roots(&roots); @@ -427,7 +427,7 @@ where /// Get all root hashes from the feed. // In the JavaScript implementation this calls to `._getRootsToVerify()` // internally. In Rust it seems better to just inline the code. - pub fn root_hashes(&mut self, index: u64) -> Result> { + pub async fn root_hashes(&mut self, index: u64) -> Result> { ensure!( index <= self.length, format!("Root index bounds exceeded {} > {}", index, self.length) @@ -438,7 +438,7 @@ where let mut roots = Vec::with_capacity(indexes.len()); for index in indexes { - let node = self.storage.get_node(index)?; + let node = self.storage.get_node(index).await?; roots.push(node); } @@ -455,7 +455,7 @@ where &self.secret_key } - fn verify_roots(&mut self, top: &Node, proof: &mut Proof) -> Result> { + async fn verify_roots(&mut self, top: &Node, proof: &mut Proof) -> Result> { let last_node = if !proof.nodes.is_empty() { proof.nodes[proof.nodes.len() - 1].index } else { @@ -477,7 +477,7 @@ where extra_nodes.push(proof.nodes[0].clone()); roots.push(proof.nodes.remove(0)); // TODO: verify this is the right index to push to. } else if self.tree.get(index) { - let node = self.storage.get_node(index)?; + let node = self.storage.get_node(index).await?; roots.push(node); } else { bail!(": Missing tree roots needed for verify"); @@ -501,13 +501,13 @@ where /// Audit all data in the feed. Checks that all current data matches /// the hashes in the merkle tree, and clears the bitfield if not. /// The tuple returns is (valid_blocks, invalid_blocks) - pub fn audit(&mut self) -> Result { + pub async fn audit(&mut self) -> Result { let mut valid_blocks = 0; let mut invalid_blocks = 0; for index in 0..self.length { if self.bitfield.get(index) { - let node = self.storage.get_node(2 * index)?; - let data = self.storage.get_data(index)?; + let node = self.storage.get_node(2 * index).await?; + let data = self.storage.get_data(index).await?; let data_hash = Hash::from_leaf(&data); if node.hash == data_hash.as_bytes() { valid_blocks += 1; @@ -561,10 +561,10 @@ impl Feed { // TODO: Ensure that dir is always a directory. // NOTE: Should we `mkdirp` here? // NOTE: Should we call these `data.bitfield` / `data.tree`? - pub fn open>(path: P) -> Result { + pub async fn open>(path: P) -> Result { let dir = path.as_ref().to_owned(); - let storage = Storage::new_disk(&dir)?; - Ok(Self::with_storage(storage)?) + let storage = Storage::new_disk(&dir).await?; + Self::with_storage(storage).await } } @@ -575,12 +575,14 @@ impl Feed { /// unlikely. impl Default for Feed { fn default() -> Self { - let storage = Storage::new_memory().unwrap(); - Self::with_storage(storage).unwrap() + async_std::task::block_on(async { + let storage = Storage::new_memory().await.unwrap(); + Self::with_storage(storage).await.unwrap() + }) } } -impl> + Debug> Display +impl> + Debug + Send> Display for Feed { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/src/lib.rs b/src/lib.rs index d5d9a35..0c99939 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,14 +13,17 @@ //! ## Example //! ```rust //! # fn main() -> Result<(), Box> { -//! let mut feed = hypercore::open("./feed.db")?; +//! # async_std::task::block_on(async { +//! let mut feed = hypercore::open("./feed.db").await?; //! -//! feed.append(b"hello")?; -//! feed.append(b"world")?; +//! feed.append(b"hello").await?; +//! feed.append(b"world").await?; //! -//! assert_eq!(feed.get(0)?, Some(b"hello".to_vec())); -//! assert_eq!(feed.get(1)?, Some(b"world".to_vec())); -//! # Ok(())} +//! assert_eq!(feed.get(0).await?, Some(b"hello".to_vec())); +//! assert_eq!(feed.get(1).await?, Some(b"world".to_vec())); +//! # Ok(()) +//! # }) +//! # } //! ``` //! //! [dat-node]: https://github.com/mafintosh/hypercore @@ -51,6 +54,8 @@ pub use ed25519_dalek::{PublicKey, SecretKey}; use std::path::Path; /// Create a new Hypercore `Feed`. -pub fn open>(path: P) -> anyhow::Result> { - Feed::open(path) +pub async fn open>( + path: P, +) -> anyhow::Result> { + Feed::open(path).await } diff --git a/src/prelude.rs b/src/prelude.rs index e8fe5a3..4f304c7 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,7 +1,6 @@ //! Convenience wrapper to import all of Hypercore's core. //! //! ```rust -//! extern crate hypercore; //! //! use hypercore::prelude::*; //! diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 736b7ab..713d6f6 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -10,6 +10,7 @@ pub use merkle_tree_stream::Node as NodeTrait; use anyhow::{anyhow, ensure, Result}; use ed25519_dalek::{PublicKey, SecretKey, Signature, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH}; use flat_tree as flat; +use futures::future::FutureExt; use random_access_disk::RandomAccessDisk; use random_access_memory::RandomAccessMemory; use random_access_storage::RandomAccess; @@ -57,40 +58,43 @@ where impl Storage where - T: RandomAccess> + Debug, + T: RandomAccess> + Debug + Send, { /// Create a new instance. Takes a keypair and a callback to create new /// storage instances. // Named `.open()` in the JS version. Replaces the `.openKey()` method too by // requiring a key pair to be initialized before creating a new instance. - pub fn new(create: Cb) -> Result + pub async fn new(create: Cb) -> Result where - Cb: Fn(Store) -> Result, + Cb: Fn(Store) -> std::pin::Pin> + Send>>, { let mut instance = Self { - tree: create(Store::Tree)?, - data: create(Store::Data)?, - bitfield: create(Store::Bitfield)?, - signatures: create(Store::Signatures)?, - keypair: create(Store::Keypair)?, + tree: create(Store::Tree).await?, + data: create(Store::Data).await?, + bitfield: create(Store::Bitfield).await?, + signatures: create(Store::Signatures).await?, + keypair: create(Store::Keypair).await?, }; let header = create_bitfield(); instance .bitfield .write(0, &header.to_vec()) + .await .map_err(|e| anyhow!(e))?; let header = create_signatures(); instance .signatures .write(0, &header.to_vec()) + .await .map_err(|e| anyhow!(e))?; let header = create_tree(); instance .tree .write(0, &header.to_vec()) + .await .map_err(|e| anyhow!(e))?; Ok(instance) @@ -98,8 +102,8 @@ where /// Write data to the feed. #[inline] - pub fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()> { - self.data.write(offset, &data).map_err(|e| anyhow!(e)) + pub async fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()> { + self.data.write(offset, &data).await.map_err(|e| anyhow!(e)) } /// Write a byte vector to a data storage (random-access instance) at the @@ -110,53 +114,66 @@ where /// with mafintosh). /// TODO: Ensure the signature size is correct. /// NOTE: Should we create a `Data` entry type? - pub fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()> { + pub async fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()> { if data.is_empty() { return Ok(()); } - let mut range = self.data_offset(index, nodes)?; + let mut range = self.data_offset(index, nodes).await?; ensure!( range.by_ref().count() == data.len(), format!("length `{:?} != {:?}`", range.count(), data.len()) ); - self.data.write(range.start, data).map_err(|e| anyhow!(e)) + self.data + .write(range.start, data) + .await + .map_err(|e| anyhow!(e)) } /// Get data from disk that the user has written to it. This is stored /// unencrypted, so there's no decryption needed. // FIXME: data_offset always reads out index 0, length 0 #[inline] - pub fn get_data(&mut self, index: u64) -> Result> { + pub async fn get_data(&mut self, index: u64) -> Result> { let cached_nodes = Vec::new(); // TODO: reuse allocation. - let range = self.data_offset(index, &cached_nodes)?; + let range = self.data_offset(index, &cached_nodes).await?; self.data .read(range.start, range.count() as u64) + .await .map_err(|e| anyhow!(e)) } /// Search the signature stores for a `Signature`, starting at `index`. - pub fn next_signature(&mut self, index: u64) -> Result { - let bytes = self - .signatures - .read(HEADER_OFFSET + 64 * index, 64) - .map_err(|e| anyhow!(e))?; - - if not_zeroes(&bytes) { - Ok(Signature::from_bytes(&bytes)?) - } else { - Ok(self.next_signature(index + 1)?) + pub fn next_signature<'a>( + &'a mut self, + index: u64, + ) -> futures::future::BoxFuture<'a, Result> { + let bytes = async_std::task::block_on(async { + self.signatures + .read(HEADER_OFFSET + 64 * index, 64) + .await + .map_err(|e| anyhow!(e)) + }); + async move { + let bytes = bytes?; + if not_zeroes(&bytes) { + Ok(Signature::from_bytes(&bytes)?) + } else { + Ok(self.next_signature(index + 1).await?) + } } + .boxed() } /// Get a `Signature` from the store. #[inline] - pub fn get_signature(&mut self, index: u64) -> Result { + pub async fn get_signature(&mut self, index: u64) -> Result { let bytes = self .signatures .read(HEADER_OFFSET + 64 * index, 64) + .await .map_err(|e| anyhow!(e))?; ensure!(not_zeroes(&bytes), "No signature found"); Ok(Signature::from_bytes(&bytes)?) @@ -166,10 +183,15 @@ where /// TODO: Ensure the signature size is correct. /// NOTE: Should we create a `Signature` entry type? #[inline] - pub fn put_signature(&mut self, index: u64, signature: impl Borrow) -> Result<()> { + pub async fn put_signature( + &mut self, + index: u64, + signature: impl Borrow, + ) -> Result<()> { let signature = signature.borrow(); self.signatures .write(HEADER_OFFSET + 64 * index, &signature.to_bytes()) + .await .map_err(|e| anyhow!(e)) } @@ -178,7 +200,7 @@ where /// /// ## Panics /// A panic can occur if no maximum value is found. - pub fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result> { + pub async fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result> { let mut roots = Vec::new(); // TODO: reuse alloc flat::full_roots(tree_index(index), &mut roots); @@ -189,7 +211,7 @@ where if pending == 0 { let len = match find_node(&cached_nodes, block_index) { Some(node) => node.len(), - None => (self.get_node(block_index)?).len(), + None => (self.get_node(block_index).await?).len(), }; return Ok(offset..offset + len); } @@ -204,7 +226,7 @@ where // None => self.get_node(root), // }; // ``` - let node = self.get_node(root)?; + let node = self.get_node(root).await?; offset += node.len(); pending -= 1; @@ -214,7 +236,7 @@ where let len = match find_node(&cached_nodes, block_index) { Some(node) => node.len(), - None => (self.get_node(block_index)?).len(), + None => (self.get_node(block_index).await?).len(), }; return Ok(offset..offset + len); @@ -225,10 +247,11 @@ where /// Get a `Node` from the `tree` storage. #[inline] - pub fn get_node(&mut self, index: u64) -> Result { + pub async fn get_node(&mut self, index: u64) -> Result { let buf = self .tree .read(HEADER_OFFSET + 40 * index, 40) + .await .map_err(|e| anyhow!(e))?; let node = Node::from_bytes(index, &buf)?; Ok(node) @@ -238,11 +261,12 @@ where /// TODO: prevent extra allocs here. Implement a method on node that can reuse /// a buffer. #[inline] - pub fn put_node(&mut self, node: &Node) -> Result<()> { + pub async fn put_node(&mut self, node: &Node) -> Result<()> { let index = node.index(); let buf = node.to_bytes()?; self.tree .write(HEADER_OFFSET + 40 * index, &buf) + .await .map_err(|e| anyhow!(e)) } @@ -250,50 +274,54 @@ where /// TODO: Ensure the chunk size is correct. /// NOTE: Should we create a bitfield entry type? #[inline] - pub fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()> { + pub async fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()> { self.bitfield .write(HEADER_OFFSET + offset, data) + .await .map_err(|e| anyhow!(e)) } /// Read a public key from storage - pub fn read_public_key(&mut self) -> Result { + pub async fn read_public_key(&mut self) -> Result { let buf = self .keypair .read(0, PUBLIC_KEY_LENGTH as u64) + .await .map_err(|e| anyhow!(e))?; let public_key = PublicKey::from_bytes(&buf)?; Ok(public_key) } /// Read a secret key from storage - pub fn read_secret_key(&mut self) -> Result { + pub async fn read_secret_key(&mut self) -> Result { let buf = self .keypair .read(PUBLIC_KEY_LENGTH as u64, SECRET_KEY_LENGTH as u64) + .await .map_err(|e| anyhow!(e))?; let secret_key = SecretKey::from_bytes(&buf)?; Ok(secret_key) } /// Write a public key to the storage - pub fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()> { + pub async fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()> { let buf: [u8; PUBLIC_KEY_LENGTH] = public_key.to_bytes(); - self.keypair.write(0, &buf).map_err(|e| anyhow!(e)) + self.keypair.write(0, &buf).await.map_err(|e| anyhow!(e)) } /// Write a secret key to the storage - pub fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()> { + pub async fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()> { let buf: [u8; SECRET_KEY_LENGTH] = secret_key.to_bytes(); self.keypair .write(PUBLIC_KEY_LENGTH as u64, &buf) + .await .map_err(|e| anyhow!(e)) } /// Tries to read a partial keypair (ie: with an optional secret_key) from the storage - pub fn read_partial_keypair(&mut self) -> Option { - match self.read_public_key() { - Ok(public) => match self.read_secret_key() { + pub async fn read_partial_keypair(&mut self) -> Option { + match self.read_public_key().await { + Ok(public) => match self.read_secret_key().await { Ok(secret) => Some(PartialKeypair { public, secret: Some(secret), @@ -310,15 +338,15 @@ where impl Storage { /// Create a new instance backed by a `RandomAccessMemory` instance. - pub fn new_memory() -> Result { - let create = |_| Ok(RandomAccessMemory::default()); - Ok(Self::new(create)?) + pub async fn new_memory() -> Result { + let create = |_| async { Ok(RandomAccessMemory::default()) }.boxed(); + Ok(Self::new(create).await?) } } impl Storage { /// Create a new instance backed by a `RandomAccessDisk` instance. - pub fn new_disk(dir: &PathBuf) -> Result { + pub async fn new_disk(dir: &PathBuf) -> Result { let storage = |storage: Store| { let name = match storage { Store::Tree => "tree", @@ -327,9 +355,9 @@ impl Storage { Store::Signatures => "signatures", Store::Keypair => "key", }; - RandomAccessDisk::open(dir.as_path().join(name)) + RandomAccessDisk::open(dir.as_path().join(name)).boxed() }; - Ok(Self::new(storage)?) + Ok(Self::new(storage).await?) } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index cffb176..8e5d830 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,11 +1,12 @@ use hypercore; -extern crate random_access_memory as ram; use anyhow::Error; +use futures::future::FutureExt; use hypercore::{Feed, Storage, Store}; +use random_access_memory as ram; -pub fn create_feed(page_size: usize) -> Result, Error> { - let create = |_store: Store| Ok(ram::RandomAccessMemory::new(page_size)); - let storage = Storage::new(create)?; - Ok(Feed::with_storage(storage)?) +pub async fn create_feed(page_size: usize) -> Result, Error> { + let create = |_store: Store| async move { Ok(ram::RandomAccessMemory::new(page_size)) }.boxed(); + let storage = Storage::new(create).await?; + Feed::with_storage(storage).await } diff --git a/tests/compat.rs b/tests/compat.rs index 3766d4e..f9bf47e 100644 --- a/tests/compat.rs +++ b/tests/compat.rs @@ -15,8 +15,8 @@ use hypercore::{Storage, Store}; use random_access_disk::RandomAccessDisk; use remove_dir_all::remove_dir_all; -#[test] -fn deterministic_data_and_tree() { +#[async_std::test] +async fn deterministic_data_and_tree() { let expected_tree = hex_bytes(concat!( "0502570200002807424c414b4532620000000000000000000000000000000000ab27d45f509274", "ce0d08f4f09ba2d0e0d8df61a0c2a78932e81b5ef26ef398df0000000000000001064321a8413b", @@ -34,12 +34,12 @@ fn deterministic_data_and_tree() { )); for _ in 0..5 { - let (dir, storage) = mk_storage(); - let mut feed = Feed::with_storage(storage).unwrap(); + let (dir, storage) = mk_storage().await; + let mut feed = Feed::with_storage(storage).await.unwrap(); let data = b"abcdef"; for &b in data { - feed.append(&[b]).unwrap(); + feed.append(&[b]).await.unwrap(); } assert_eq!(read_bytes(&dir, Store::Data), data); assert_eq!(read_bytes(&dir, Store::Tree), expected_tree); @@ -55,8 +55,8 @@ fn deterministic_data_and_tree_after_replication() { unimplemented!(); } -#[test] -fn deterministic_signatures() { +#[async_std::test] +async fn deterministic_signatures() { let key = hex_bytes("9718a1ff1c4ca79feac551c0c7212a65e4091278ec886b88be01ee4039682238"); let keypair_bytes = hex_bytes(concat!( "53729c0311846cca9cc0eded07aaf9e6689705b6a0b1bb8c3a2a839b72fda383", @@ -73,7 +73,7 @@ fn deterministic_signatures() { )); for _ in 0..5 { - let (dir, storage) = mk_storage(); + let (dir, storage) = mk_storage().await; let keypair = mk_keypair(&keypair_bytes, &key); let mut feed = Feed::builder(keypair.public, storage) .secret_key(keypair.secret) @@ -82,7 +82,7 @@ fn deterministic_signatures() { let data = b"abc"; for &b in data { - feed.append(&[b]).unwrap(); + feed.append(&[b]).await.unwrap(); } assert_eq!(read_bytes(&dir, Store::Data), data); @@ -114,10 +114,15 @@ fn storage_path>(dir: P, s: Store) -> PathBuf { dir.as_ref().join(filename) } -fn mk_storage() -> (PathBuf, Storage) { +async fn mk_storage() -> (PathBuf, Storage) { let temp_dir = tempfile::tempdir().unwrap(); let dir = temp_dir.into_path(); - let storage = Storage::new(|s| RandomAccessDisk::open(storage_path(dir.clone(), s))).unwrap(); + let storage = Storage::new(|s| { + let dir = dir.clone(); + Box::pin(async move { RandomAccessDisk::open(storage_path(dir, s)).await }) + }) + .await + .unwrap(); (dir, storage) } diff --git a/tests/feed.rs b/tests/feed.rs index 54e2375..6488ab2 100644 --- a/tests/feed.rs +++ b/tests/feed.rs @@ -1,92 +1,102 @@ extern crate random_access_memory as ram; -use random_access_storage; mod common; -use self::random_access_storage::RandomAccess; use common::create_feed; use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage}; +use random_access_storage::RandomAccess; use std::env::temp_dir; use std::fmt::Debug; use std::fs; use std::io::Write; -#[test] -fn create_with_key() { +#[async_std::test] +async fn create_with_key() { let keypair = generate_keypair(); - let storage = Storage::new_memory().unwrap(); + let storage = Storage::new_memory().await.unwrap(); let _feed = Feed::builder(keypair.public, storage) .secret_key(keypair.secret) .build() .unwrap(); } -#[test] -fn display() { - let feed = create_feed(50).unwrap(); +#[async_std::test] +async fn display() { + let feed = create_feed(50).await.unwrap(); let output = format!("{}", feed); assert_eq!(output.len(), 61); } -#[test] +#[async_std::test] /// Verify `.append()` and `.get()` work. -fn set_get() { - let mut feed = create_feed(50).unwrap(); - feed.append(b"hello").unwrap(); - feed.append(b"world").unwrap(); +async fn set_get() { + let mut feed = create_feed(50).await.unwrap(); + feed.append(b"hello").await.unwrap(); + feed.append(b"world").await.unwrap(); - assert_eq!(feed.get(0).unwrap(), Some(b"hello".to_vec())); - assert_eq!(feed.get(1).unwrap(), Some(b"world".to_vec())); + assert_eq!(feed.get(0).await.unwrap(), Some(b"hello".to_vec())); + assert_eq!(feed.get(1).await.unwrap(), Some(b"world".to_vec())); } -#[test] -fn append() { - let mut feed = create_feed(50).unwrap(); - feed.append(br#"{"hello":"world"}"#).unwrap(); - feed.append(br#"{"hello":"mundo"}"#).unwrap(); - feed.append(br#"{"hello":"welt"}"#).unwrap(); +#[async_std::test] +async fn append() { + let mut feed = create_feed(50).await.unwrap(); + feed.append(br#"{"hello":"world"}"#).await.unwrap(); + feed.append(br#"{"hello":"mundo"}"#).await.unwrap(); + feed.append(br#"{"hello":"welt"}"#).await.unwrap(); assert_eq!(feed.len(), 3); assert_eq!(feed.byte_len(), 50); - assert_eq!(feed.get(0).unwrap(), Some(br#"{"hello":"world"}"#.to_vec())); - assert_eq!(feed.get(1).unwrap(), Some(br#"{"hello":"mundo"}"#.to_vec())); - assert_eq!(feed.get(2).unwrap(), Some(br#"{"hello":"welt"}"#.to_vec())); + assert_eq!( + feed.get(0).await.unwrap(), + Some(br#"{"hello":"world"}"#.to_vec()) + ); + assert_eq!( + feed.get(1).await.unwrap(), + Some(br#"{"hello":"mundo"}"#.to_vec()) + ); + assert_eq!( + feed.get(2).await.unwrap(), + Some(br#"{"hello":"welt"}"#.to_vec()) + ); } -#[test] +#[async_std::test] /// Verify the `.root_hashes()` method returns the right nodes. -fn root_hashes() { +async fn root_hashes() { // If no roots exist we should get an error. - let mut feed = create_feed(50).unwrap(); - let res = feed.root_hashes(0); + let mut feed = create_feed(50).await.unwrap(); + let res = feed.root_hashes(0).await; assert!(res.is_err()); // If 1 entry exists, [0] should be the root. - feed.append(b"data").unwrap(); - let roots = feed.root_hashes(0).unwrap(); + feed.append(b"data").await.unwrap(); + let roots = feed.root_hashes(0).await.unwrap(); assert_eq!(roots.len(), 1); assert_eq!(roots[0].index(), 0); // If we query out of bounds, we should get an error. - let res = feed.root_hashes(6); + let res = feed.root_hashes(6).await; assert!(res.is_err()); // If 3 entries exist, [2,4] should be the roots. - feed.append(b"data").unwrap(); - feed.append(b"data").unwrap(); - let roots = feed.root_hashes(2).unwrap(); + feed.append(b"data").await.unwrap(); + feed.append(b"data").await.unwrap(); + let roots = feed.root_hashes(2).await.unwrap(); assert_eq!(roots.len(), 2); assert_eq!(roots[0].index(), 1); assert_eq!(roots[1].index(), 4); } -#[test] -fn verify() { - let mut feed = create_feed(50).unwrap(); +#[async_std::test] +async fn verify() { + let mut feed = create_feed(50).await.unwrap(); let (public, secret) = copy_keys(&feed); let feed_bytes = secret.to_bytes().to_vec(); - let storage = Storage::new(|_| Ok(ram::RandomAccessMemory::new(50))).unwrap(); + let storage = Storage::new(|_| Box::pin(async { Ok(ram::RandomAccessMemory::new(50)) })) + .await + .unwrap(); let mut evil_feed = Feed::builder(public, storage) .secret_key(secret) .build() @@ -101,72 +111,75 @@ fn verify() { assert_eq!(&feed_bytes, &evil_bytes.to_vec()); // Verify that the signature on a single feed is correct. - feed.append(b"test").unwrap(); - let sig = feed.signature(0).unwrap(); - feed.verify(0, &sig).unwrap(); + feed.append(b"test").await.unwrap(); + let sig = feed.signature(0).await.unwrap(); + feed.verify(0, &sig).await.unwrap(); // Verify that the signature between two different feeds is different. - evil_feed.append(b"t0st").unwrap(); - let res = evil_feed.verify(0, &sig); + evil_feed.append(b"t0st").await.unwrap(); + let res = evil_feed.verify(0, &sig).await; assert!(res.is_err()); } -#[test] -fn put() { - let mut a = create_feed(50).unwrap(); +#[async_std::test] +async fn put() { + let mut a = create_feed(50).await.unwrap(); let (public, secret) = copy_keys(&a); - let storage = Storage::new(|_| Ok(ram::RandomAccessMemory::new(50))).unwrap(); + let storage = Storage::new(|_| Box::pin(async { Ok(ram::RandomAccessMemory::new(50)) })) + .await + .unwrap(); let mut b = Feed::builder(public, storage) .secret_key(secret) .build() .unwrap(); for _ in 0..10 { - a.append(b"foo").unwrap(); + a.append(b"foo").await.unwrap(); } - let proof = a.proof(0, true).unwrap(); - b.put(0, None, proof).expect("no error"); + let proof = a.proof(0, true).await.unwrap(); + b.put(0, None, proof).await.expect("no error"); let proof = a .proof_with_digest(4, b.digest(4), true) + .await .expect(".proof() index 4, digest 4"); - b.put(4, None, proof).unwrap(); + b.put(4, None, proof).await.unwrap(); } -#[test] -fn create_with_storage() { - let storage = Storage::new_memory().unwrap(); +#[async_std::test] +async fn create_with_storage() { + let storage = Storage::new_memory().await.unwrap(); assert!( - Feed::with_storage(storage).is_ok(), + Feed::with_storage(storage).await.is_ok(), "Could not create a feed with a storage." ); } -#[test] -fn create_with_stored_public_key() { - let mut storage = Storage::new_memory().unwrap(); +#[async_std::test] +async fn create_with_stored_public_key() { + let mut storage = Storage::new_memory().await.unwrap(); let keypair = generate_keypair(); - storage.write_public_key(&keypair.public).unwrap(); + storage.write_public_key(&keypair.public).await.unwrap(); assert!( - Feed::with_storage(storage).is_ok(), + Feed::with_storage(storage).await.is_ok(), "Could not create a feed with a stored public key." ); } -#[test] -fn create_with_stored_keys() { - let mut storage = Storage::new_memory().unwrap(); +#[async_std::test] +async fn create_with_stored_keys() { + let mut storage = Storage::new_memory().await.unwrap(); let keypair = generate_keypair(); - storage.write_public_key(&keypair.public).unwrap(); - storage.write_secret_key(&keypair.secret).unwrap(); + storage.write_public_key(&keypair.public).await.unwrap(); + storage.write_secret_key(&keypair.secret).await.unwrap(); assert!( - Feed::with_storage(storage).is_ok(), + Feed::with_storage(storage).await.is_ok(), "Could not create a feed with a stored keypair." ); } fn copy_keys( - feed: &Feed> + Debug>, + feed: &Feed> + Debug + Send>, ) -> (PublicKey, SecretKey) { match &feed.secret_key() { Some(secret) => { @@ -182,12 +195,12 @@ fn copy_keys( } } -#[test] -fn audit() { - let mut feed = create_feed(50).unwrap(); - feed.append(b"hello").unwrap(); - feed.append(b"world").unwrap(); - match feed.audit() { +#[async_std::test] +async fn audit() { + let mut feed = create_feed(50).await.unwrap(); + feed.append(b"hello").await.unwrap(); + feed.append(b"world").await.unwrap(); + match feed.audit().await { Ok(audit_report) => { assert_eq!(audit_report.valid_blocks, 2); assert_eq!(audit_report.invalid_blocks, 0); @@ -198,14 +211,14 @@ fn audit() { } } -#[test] -fn audit_bad_data() { +#[async_std::test] +async fn audit_bad_data() { let mut dir = temp_dir(); dir.push("audit_bad_data"); - let storage = Storage::new_disk(&dir).unwrap(); - let mut feed = Feed::with_storage(storage).unwrap(); - feed.append(b"hello").unwrap(); - feed.append(b"world").unwrap(); + let storage = Storage::new_disk(&dir).await.unwrap(); + let mut feed = Feed::with_storage(storage).await.unwrap(); + feed.append(b"hello").await.unwrap(); + feed.append(b"world").await.unwrap(); let datapath = dir.join("data"); let mut hypercore_data = fs::OpenOptions::new() .write(true) @@ -215,12 +228,12 @@ fn audit_bad_data() { .write_all(b"yello") .expect("Unable to corrupt the hypercore data file!"); - match feed.audit() { + match feed.audit().await { Ok(audit_report) => { assert_eq!(audit_report.valid_blocks, 1); assert_eq!(audit_report.invalid_blocks, 1); // Ensure that audit has cleared up the invalid block - match feed.audit() { + match feed.audit().await { Ok(audit_report) => { assert_eq!( audit_report.valid_blocks, 1, diff --git a/tests/model.rs b/tests/model.rs index 5e71999..967f58b 100644 --- a/tests/model.rs +++ b/tests/model.rs @@ -39,39 +39,42 @@ impl Arbitrary for Op { quickcheck! { fn implementation_matches_model(ops: Vec) -> bool { - let page_size = 50; + async_std::task::block_on(async { + let page_size = 50; - let mut insta = create_feed(page_size) - .expect("Instance creation should be successful"); - let mut model = vec![]; + let mut insta = create_feed(page_size) + .await + .expect("Instance creation should be successful"); + let mut model = vec![]; - for op in ops { - match op { - Op::Append { data } => { - insta.append(&data).expect("Append should be successful"); - model.push(data); - }, - Op::Get { index } => { - let data = insta.get(index).expect("Get should be successful"); - if index >= insta.len() { - assert_eq!(data, None); - } else { - assert_eq!(data, Some(model[index as usize].clone())); - } - }, - Op::Verify => { - let len = insta.len(); - if len == 0 { - insta.signature(len).unwrap_err(); - } else { - // Always test index of last entry, which is `len - 1`. - let len = len - 1; - let sig = insta.signature(len).expect("Signature should exist"); - insta.verify(len, &sig).expect("Signature should match"); - } - }, + for op in ops { + match op { + Op::Append { data } => { + insta.append(&data).await.expect("Append should be successful"); + model.push(data); + }, + Op::Get { index } => { + let data = insta.get(index).await.expect("Get should be successful"); + if index >= insta.len() { + assert_eq!(data, None); + } else { + assert_eq!(data, Some(model[index as usize].clone())); + } + }, + Op::Verify => { + let len = insta.len(); + if len == 0 { + insta.signature(len).await.unwrap_err(); + } else { + // Always test index of last entry, which is `len - 1`. + let len = len - 1; + let sig = insta.signature(len).await.expect("Signature should exist"); + insta.verify(len, &sig).await.expect("Signature should match"); + } + }, + } } - } - true + true + }) } } diff --git a/tests/regression.rs b/tests/regression.rs index 4969320..8b6a00e 100644 --- a/tests/regression.rs +++ b/tests/regression.rs @@ -6,13 +6,13 @@ use common::create_feed; // `.signature()` was off. Instead of checking for a range (`<`), we were // checking inclusively `<=`. All we had to do was fix the check, and we all // good. -#[test] -fn regression_01() { - let mut feed = create_feed(50).unwrap(); +#[async_std::test] +async fn regression_01() { + let mut feed = create_feed(50).await.unwrap(); assert_eq!(feed.len(), 0); - feed.signature(0).unwrap_err(); + feed.signature(0).await.unwrap_err(); let data = b"some_data"; - feed.append(data).unwrap(); - feed.signature(0).unwrap(); + feed.append(data).await.unwrap(); + feed.signature(0).await.unwrap(); } diff --git a/tests/storage.rs b/tests/storage.rs index e0258c5..d540b31 100644 --- a/tests/storage.rs +++ b/tests/storage.rs @@ -1,51 +1,51 @@ use ed25519_dalek::PublicKey; use hypercore::{generate_keypair, sign, verify, Signature, Storage}; -#[test] -fn should_write_and_read_keypair() { +#[async_std::test] +async fn should_write_and_read_keypair() { let keypair = generate_keypair(); let msg = b"hello"; // prepare a signature let sig: Signature = sign(&keypair.public, &keypair.secret, msg); - let mut storage = Storage::new_memory().unwrap(); + let mut storage = Storage::new_memory().await.unwrap(); assert!( - storage.write_secret_key(&keypair.secret).is_ok(), + storage.write_secret_key(&keypair.secret).await.is_ok(), "Can not store secret key." ); assert!( - storage.write_public_key(&keypair.public).is_ok(), + storage.write_public_key(&keypair.public).await.is_ok(), "Can not store public key." ); - let read = storage.read_public_key(); + let read = storage.read_public_key().await; assert!(read.is_ok(), "Can not read public key"); let public_key: PublicKey = read.unwrap(); assert!(verify(&public_key, msg, Some(&sig)).is_ok()); } -#[test] -fn should_read_partial_keypair() { +#[async_std::test] +async fn should_read_partial_keypair() { let keypair = generate_keypair(); - let mut storage = Storage::new_memory().unwrap(); + let mut storage = Storage::new_memory().await.unwrap(); assert!( - storage.write_public_key(&keypair.public).is_ok(), + storage.write_public_key(&keypair.public).await.is_ok(), "Can not store public key." ); - let partial = storage.read_partial_keypair().unwrap(); + let partial = storage.read_partial_keypair().await.unwrap(); assert!(partial.secret.is_none(), "A secret key is present"); } -#[test] -fn should_read_no_keypair() { - let mut storage = Storage::new_memory().unwrap(); - let partial = storage.read_partial_keypair(); +#[async_std::test] +async fn should_read_no_keypair() { + let mut storage = Storage::new_memory().await.unwrap(); + let partial = storage.read_partial_keypair().await; assert!(partial.is_none(), "A key is present"); } -#[test] -fn should_read_empty_public_key() { - let mut storage = Storage::new_memory().unwrap(); - assert!(storage.read_public_key().is_err()); +#[async_std::test] +async fn should_read_empty_public_key() { + let mut storage = Storage::new_memory().await.unwrap(); + assert!(storage.read_public_key().await.is_err()); }