Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ edition = "2018"
[dependencies]
anyhow = "1.0.26"
mkdirp = "1.0.0"
random-access-storage = "3.0.0"
random-access-storage = "4.0.0"
async-std = "1.5.0"
async-trait = "0.1.24"

[dev-dependencies]
quickcheck = "0.9.2"
rand = "0.7.3"
tempfile = "3.1.0"
async-std = { version = "1.5.0", features = ["attributes"] }
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,15 @@ Continuously read,write to disk, using random offsets and lengths. Adapted from

## Usage
```rust
extern crate tempdir;
extern crate random_access_disk;

use std::path::PathBuf;
use tempdir::TempDir;

let dir = TempDir::new("random-access-disk").unwrap();
let mut file = random_access_disk::RandomAccessDisk::new(dir.path().join("README.db"));

file.write(0, b"hello").unwrap();
file.write(5, b" world").unwrap();
let _text = file.read(0, 11).unwrap();
file.write(0, b"hello").await.unwrap();
file.write(5, b" world").await.unwrap();
let _text = file.read(0, 11).await.unwrap();
```

## Installation
Expand Down
81 changes: 47 additions & 34 deletions benches/sync.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,68 @@
#![feature(test)]

mod sync {
extern crate random_access_disk as rad;
extern crate random_access_storage;
extern crate tempfile;
extern crate test;
use random_access_disk as rad;
use test::Bencher;

use self::random_access_storage::RandomAccess;
use self::test::Bencher;
use random_access_storage::RandomAccess;

#[bench]
fn write_hello_world(b: &mut Bencher) {
let dir = tempfile::Builder::new()
.prefix("random-access-disk")
.tempdir()
.unwrap();
let mut file =
rad::RandomAccessDisk::open(dir.path().join("1.db")).unwrap();
b.iter(|| {
file.write(0, b"hello").unwrap();
file.write(5, b" world").unwrap();
async_std::task::block_on(async {
let dir = tempfile::Builder::new()
.prefix("random-access-disk")
.tempdir()
.unwrap();
let mut file = rad::RandomAccessDisk::open(dir.path().join("1.db"))
.await
.unwrap();
b.iter(|| {
async_std::task::block_on(async {
file.write(0, b"hello").await.unwrap();
file.write(5, b" world").await.unwrap();
})
});
});
}

#[bench]
fn read_hello_world(b: &mut Bencher) {
let dir = tempfile::Builder::new()
.prefix("random-access-disk")
.tempdir()
.unwrap();
let mut file =
rad::RandomAccessDisk::open(dir.path().join("2.db")).unwrap();
file.write(0, b"hello").unwrap();
file.write(5, b" world").unwrap();
b.iter(|| {
let _text = file.read(0, 11).unwrap();
async_std::task::block_on(async {
let dir = tempfile::Builder::new()
.prefix("random-access-disk")
.tempdir()
.unwrap();
let mut file = rad::RandomAccessDisk::open(dir.path().join("2.db"))
.await
.unwrap();
file.write(0, b"hello").await.unwrap();
file.write(5, b" world").await.unwrap();
b.iter(|| {
async_std::task::block_on(async {
let _text = file.read(0, 11).await.unwrap();
})
});
});
}

#[bench]
fn read_write_hello_world(b: &mut Bencher) {
let dir = tempfile::Builder::new()
.prefix("random-access-disk")
.tempdir()
.unwrap();
let mut file =
rad::RandomAccessDisk::open(dir.path().join("3.db")).unwrap();
b.iter(|| {
file.write(0, b"hello").unwrap();
file.write(5, b" world").unwrap();
let _text = file.read(0, 11).unwrap();
async_std::task::block_on(async {
let dir = tempfile::Builder::new()
.prefix("random-access-disk")
.tempdir()
.unwrap();
let mut file = rad::RandomAccessDisk::open(dir.path().join("3.db"))
.await
.unwrap();
b.iter(|| {
async_std::task::block_on(async {
file.write(0, b"hello").await.unwrap();
file.write(5, b" world").await.unwrap();
let _text = file.read(0, 11).await.unwrap();
})
});
});
}
}
74 changes: 49 additions & 25 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
#![cfg_attr(test, deny(warnings))]

use anyhow::{anyhow, Error};
use async_std::fs::{self, OpenOptions};
use async_std::io::prelude::{SeekExt, WriteExt};
use async_std::io::{ReadExt, SeekFrom};
use random_access_storage::RandomAccess;
use std::fs::{self, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::Drop;
use std::path;

Expand All @@ -22,23 +23,31 @@ pub struct RandomAccessDisk {
impl RandomAccessDisk {
/// Create a new instance.
#[allow(clippy::new_ret_no_self)]
pub fn open(filename: path::PathBuf) -> Result<RandomAccessDisk, Error> {
Self::builder(filename).build()
pub async fn open(
filename: path::PathBuf,
) -> Result<RandomAccessDisk, Error> {
Self::builder(filename).build().await
}

pub fn builder(filename: path::PathBuf) -> Builder {
Builder::new(filename)
}
}

#[async_trait::async_trait]
impl RandomAccess for RandomAccessDisk {
type Error = Box<dyn std::error::Error + Sync + Send>;

fn write(&mut self, offset: u64, data: &[u8]) -> Result<(), Self::Error> {
async fn write(
&mut self,
offset: u64,
data: &[u8],
) -> Result<(), Self::Error> {
let mut file = self.file.as_ref().expect("self.file was None.");
file.seek(SeekFrom::Start(offset))?;
file.write_all(&data)?;
file.seek(SeekFrom::Start(offset)).await?;
file.write_all(&data).await?;
if self.auto_sync {
file.sync_all()?;
file.sync_all().await?;
}

// We've changed the length of our file.
Expand All @@ -58,7 +67,11 @@ impl RandomAccess for RandomAccessDisk {
// because we're replacing empty data with actual zeroes - which does not
// reflect the state of the world.
// #[cfg_attr(test, allow(unused_io_amount))]
fn read(&mut self, offset: u64, length: u64) -> Result<Vec<u8>, Self::Error> {
async fn read(
&mut self,
offset: u64,
length: u64,
) -> Result<Vec<u8>, Self::Error> {
if (offset + length) as u64 > self.length {
return Err(
anyhow!(
Expand All @@ -73,46 +86,50 @@ impl RandomAccess for RandomAccessDisk {

let mut file = self.file.as_ref().expect("self.file was None.");
let mut buffer = vec![0; length as usize];
file.seek(SeekFrom::Start(offset))?;
let _bytes_read = file.read(&mut buffer[..])?;
file.seek(SeekFrom::Start(offset)).await?;
let _bytes_read = file.read(&mut buffer[..]).await?;
Ok(buffer)
}

fn read_to_writer(
async fn read_to_writer(
&mut self,
_offset: u64,
_length: u64,
_buf: &mut impl Write,
_buf: &mut (impl async_std::io::Write + Send),
) -> Result<(), Self::Error> {
unimplemented!()
}

fn del(&mut self, _offset: u64, _length: u64) -> Result<(), Self::Error> {
async fn del(
&mut self,
_offset: u64,
_length: u64,
) -> Result<(), Self::Error> {
panic!("Not implemented yet");
}

fn truncate(&mut self, length: u64) -> Result<(), Self::Error> {
async fn truncate(&mut self, length: u64) -> Result<(), Self::Error> {
let file = self.file.as_ref().expect("self.file was None.");
self.length = length as u64;
file.set_len(self.length)?;
file.set_len(self.length).await?;
if self.auto_sync {
file.sync_all()?;
file.sync_all().await?;
}
Ok(())
}

fn len(&self) -> Result<u64, Self::Error> {
async fn len(&self) -> Result<u64, Self::Error> {
Ok(self.length)
}

fn is_empty(&mut self) -> Result<bool, Self::Error> {
async fn is_empty(&mut self) -> Result<bool, Self::Error> {
Ok(self.length == 0)
}

fn sync_all(&mut self) -> Result<(), Self::Error> {
async fn sync_all(&mut self) -> Result<(), Self::Error> {
if !self.auto_sync {
let file = self.file.as_ref().expect("self.file was None.");
file.sync_all()?;
file.sync_all().await?;
}
Ok(())
}
Expand All @@ -121,7 +138,12 @@ impl RandomAccess for RandomAccessDisk {
impl Drop for RandomAccessDisk {
fn drop(&mut self) {
if let Some(file) = &self.file {
file.sync_all().unwrap();
// We need to flush the file on drop. Unfortunately, that is not possible to do in a
// non-blocking fashion, but our only other option here is losing data remaining in the
// write cache. Good task schedulers should be resilient to occasional blocking hiccups in
// file destructors so we don't expect this to be a common problem in practice.
// (from async_std::fs::File::drop)
let _ = async_std::task::block_on(file.sync_all());
}
}
}
Expand All @@ -142,16 +164,18 @@ impl Builder {
self.auto_sync = auto_sync;
self
}
pub fn build(self) -> Result<RandomAccessDisk, Error> {

pub async fn build(self) -> Result<RandomAccessDisk, Error> {
if let Some(dirname) = self.filename.parent() {
mkdirp::mkdirp(&dirname)?;
}
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&self.filename)?;
file.sync_all()?;
.open(&self.filename)
.await?;
file.sync_all().await?;

let metadata = self.filename.metadata()?;
Ok(RandomAccessDisk {
Expand Down
54 changes: 28 additions & 26 deletions tests/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,36 @@ impl Arbitrary for Op {

quickcheck! {
fn implementation_matches_model(ops: Vec<Op>) -> bool {
let dir = Builder::new().prefix("random-access-disk").tempdir().unwrap();
async_std::task::block_on(async {
let dir = Builder::new().prefix("random-access-disk").tempdir().unwrap();

let mut implementation = rad::RandomAccessDisk::open(dir.path().join("1.db")).unwrap();
let mut model = vec![];
let mut implementation = rad::RandomAccessDisk::open(dir.path().join("1.db")).await.unwrap();
let mut model = vec![];

for op in ops {
match op {
Read { offset, length } => {
let end = offset + length;
if model.len() as u64 >= end {
assert_eq!(
implementation.read(offset, length).expect("Reads should be successful."),
&model[offset as usize..end as usize]
);
} else {
assert!(implementation.read(offset, length).is_err());
}
},
Write { offset, ref data } => {
let end = offset + (data.len() as u64);
if (model.len() as u64) < end {
model.resize(end as usize, 0);
}
implementation.write(offset, data).expect("Writes should be successful.");
model[offset as usize..end as usize].copy_from_slice(data);
},
for op in ops {
match op {
Read { offset, length } => {
let end = offset + length;
if model.len() as u64 >= end {
assert_eq!(
implementation.read(offset, length).await.expect("Reads should be successful."),
&model[offset as usize..end as usize]
);
} else {
assert!(implementation.read(offset, length).await.is_err());
}
},
Write { offset, ref data } => {
let end = offset + (data.len() as u64);
if (model.len() as u64) < end {
model.resize(end as usize, 0);
}
implementation.write(offset, data).await.expect("Writes should be successful.");
model[offset as usize..end as usize].copy_from_slice(data);
},
}
}
}
true
true
})
}
}
Loading