Skip to content

Commit 203f81e

Browse files
committed
feat: Streaming read and writes (readstream and writestream)
Added method `OdbBackend::open_read_stream`, corresponding to the `readstream` function of `git_odb_backend`. Added trait `OdbReadStream` as a subset of `git_odb_stream`'s interface; in particular, the `read` function of `git_odb_stream`. Added associated type `OdbBackend::ReadStream: OdbReadStream`. Added `OdbBackend::open_write_stream`, corresponding to the `writestream` function of `git_odb_backend`. Added trait `OdbWriteStream` as a subset of `git_odb_stream`'s interface; in particular, the `write` and `finalize_write` functions of `git_odb_stream`. Added associated type `OdbBackend::WriteStream: OdbWriteStream`. Added `allocate` and `free` functions to reduce code repetition.
1 parent 5e4fb91 commit 203f81e

File tree

1 file changed

+321
-4
lines changed

1 file changed

+321
-4
lines changed

src/odb_backend.rs

Lines changed: 321 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ pub trait OdbBackend: Sized {
3232
///
3333
/// [`Infallible`]: std::convert::Infallible
3434
type Writepack: OdbWritepack<Self>;
35+
/// Backend-specific readable stream.
36+
///
37+
/// If the backend doesn't support reading through streams, this type should be [`Infallible`].
38+
type ReadStream: OdbReadStream<Self>;
39+
/// Backend-specific writable stream.
40+
///
41+
/// If the backend doesn't support writing through streams, this type should be [`Infallible`].
42+
type WriteStream: OdbWriteStream<Self>;
3543

3644
/// Returns the supported operations of this backend.
3745
/// The return value is used to determine what functions to provide to libgit2.
@@ -328,8 +336,62 @@ pub trait OdbBackend: Sized {
328336
unimplemented!("OdbBackend::write_multipack_index")
329337
}
330338

331-
// TODO: fn writestream()
332-
// TODO: fn readstream()
339+
/// Opens a stream to read an object.
340+
///
341+
/// Corresponds to the `readstream` function of [`git_odb_backend`].
342+
/// Requires that [`SupportedOperations::READSTREAM`] is present in the value returned from
343+
/// [`supported_operations`] to expose it to libgit2.
344+
///
345+
/// The default implementation of this method panics.
346+
///
347+
/// # Implementation notes
348+
///
349+
/// If an implementation returns `Ok(stream)`, `length` and `object_type` MUST be set to the
350+
/// length of the object's contents and the object type respectively; see
351+
/// [`OdbBackend::read_header`].
352+
///
353+
/// # Errors
354+
///
355+
/// See [`OdbBackend::read`].
356+
///
357+
/// [`git_odb_backend`]: raw::git_odb_backend
358+
/// [`supported_operations`]: Self::supported_operations
359+
fn open_read_stream(
360+
&mut self,
361+
ctx: &OdbBackendContext,
362+
oid: Oid,
363+
length: &mut usize,
364+
object_type: &mut ObjectType,
365+
) -> Result<Self::ReadStream, Error> {
366+
unimplemented!("OdbBackend::open_read_stream")
367+
}
368+
/// Opens a stream to write an object.
369+
///
370+
/// Corresponds to the `writestream` function of [`git_odb_backend`].
371+
/// Requires that [`SupportedOperations::WRITESTREAM`] is present in the value returned from
372+
/// [`supported_operations`] to expose it to libgit2.
373+
///
374+
/// The default implementation of this method panics.
375+
///
376+
/// # Implementation notes
377+
///
378+
/// The Oid of the object is calculated by libgit2 *after* all the data has been written.
379+
///
380+
/// # Errors
381+
///
382+
/// See [`OdbBackend::write`].
383+
///
384+
/// [`git_odb_backend`]: raw::git_odb_backend
385+
/// [`supported_operations`]: Self::supported_operations
386+
fn open_write_stream(
387+
&mut self,
388+
ctx: &OdbBackendContext,
389+
length: usize,
390+
object_type: ObjectType,
391+
) -> Result<Self::WriteStream, Error> {
392+
unimplemented!("OdbBackend::open_write_stream")
393+
}
394+
333395
// TODO: fn foreach()
334396
}
335397

@@ -347,9 +409,9 @@ bitflags! {
347409
const READ_HEADER = 1 << 2;
348410
/// The backend supports the [`OdbBackend::write`] method.
349411
const WRITE = 1 << 3;
350-
/// The backend supports the [`OdbBackend::writestream`] method.
412+
/// The backend supports the [`OdbBackend::open_write_stream`] method.
351413
const WRITESTREAM = 1 << 4;
352-
/// The backend supports the [`OdbBackend::readstream`] method.
414+
/// The backend supports the [`OdbBackend::open_read_stream`] method.
353415
const READSTREAM = 1 << 5;
354416
/// The backend supports the [`OdbBackend::exists`] method.
355417
const EXISTS = 1 << 6;
@@ -692,6 +754,94 @@ impl Binding for IndexerProgress {
692754
}
693755
}
694756

757+
/// A stream that can be read from.
758+
pub trait OdbReadStream<B: OdbBackend> {
759+
/// Read as many bytes as possible from this stream, returning how many bytes were read.
760+
///
761+
/// Corresponds to the `read` function of [`git_odb_stream`].
762+
///
763+
/// # Implementation notes
764+
///
765+
/// If `Ok(read_bytes)` is returned, `read_bytes` should be how many bytes were read from this
766+
/// stream. This number must not exceed the length of `out`.
767+
///
768+
/// `out` will never have a length greater than [`libc::c_int::MAX`].
769+
///
770+
/// > Whilst a caller may be able to pass buffers longer than that, `read_bytes` (from the `Ok`
771+
/// > return value) must be convertible to a [`libc::c_int`] for git2 to be able to return the
772+
/// > value back to libgit2.
773+
/// > For that reason, git2 will automatically limit the buffer length to [`libc::c_int::MAX`].
774+
///
775+
/// # Errors
776+
///
777+
/// See [`OdbBackend`].
778+
///
779+
/// [`git_odb_stream`]: raw::git_odb_stream
780+
fn read(&mut self, ctx: &mut OdbStreamContext<B>, out: &mut [u8]) -> Result<usize, Error>;
781+
}
782+
783+
impl<B: OdbBackend> OdbReadStream<B> for Infallible {
784+
fn read(&mut self, _ctx: &mut OdbStreamContext<B>, _out: &mut [u8]) -> Result<usize, Error> {
785+
unreachable!()
786+
}
787+
}
788+
789+
/// A stream that can be written to.
790+
pub trait OdbWriteStream<B: OdbBackend> {
791+
/// Write bytes to this stream.
792+
///
793+
/// Corresponds to the `write` function of [`git_odb_stream`].
794+
///
795+
/// # Implementation notes
796+
///
797+
/// All calls to `write` will be "finalized" by a single call to [`finalize_write`], after which
798+
/// no more calls to this stream will occur.
799+
///
800+
/// [`git_odb_stream`]: raw::git_odb_stream
801+
/// [`finalize_write`]: OdbWriteStream::finalize_write
802+
fn write(&mut self, ctx: &mut OdbStreamContext<B>, data: &[u8]) -> Result<(), Error>;
803+
/// Store the contents of the stream as an object with the specified [`Oid`].
804+
///
805+
/// Corresponds to the `finalize_write` function of [`git_odb_stream`].
806+
///
807+
/// # Implementation notes
808+
///
809+
/// This method might not be invoked if:
810+
/// - an error occurs in the [`write`] implementation,
811+
/// - `oid` refers to an already existing object in another backend, or
812+
/// - the final number of received bytes differs from the size declared when the stream was opened.
813+
///
814+
///
815+
/// [`git_odb_stream`]: raw::git_odb_stream
816+
/// [`write`]: OdbWriteStream::write
817+
fn finalize_write(&mut self, ctx: &mut OdbStreamContext<B>, oid: Oid) -> Result<(), Error>;
818+
}
819+
820+
impl<B: OdbBackend> OdbWriteStream<B> for Infallible {
821+
fn write(&mut self, _ctx: &mut OdbStreamContext<B>, _data: &[u8]) -> Result<(), Error> {
822+
unreachable!()
823+
}
824+
825+
fn finalize_write(&mut self, _ctx: &mut OdbStreamContext<B>, _oid: Oid) -> Result<(), Error> {
826+
unreachable!()
827+
}
828+
}
829+
830+
/// Context struct passed to [`OdbReadStream`] and [`OdbWriteStream`]'s methods.
831+
pub struct OdbStreamContext<B: OdbBackend> {
832+
backend_ptr: ptr::NonNull<Backend<B>>,
833+
}
834+
impl<B: OdbBackend> OdbStreamContext<B> {
835+
/// Get a reference to the associated [`OdbBackend`].
836+
pub fn backend(&self) -> &B {
837+
unsafe { &self.backend_ptr.as_ref().inner }
838+
}
839+
/// Get a mutable reference to the associated [`OdbBackend`].
840+
pub fn backend_mut(&mut self) -> &mut B {
841+
unsafe { &mut self.backend_ptr.as_mut().inner }
842+
}
843+
}
844+
695845
/// A handle to an [`OdbBackend`] that has been added to an [`Odb`].
696846
pub struct CustomOdbBackend<'a, B: OdbBackend> {
697847
// NOTE: Any pointer in this field must be both non-null and properly aligned.
@@ -767,6 +917,8 @@ impl<'a, B: OdbBackend> CustomOdbBackend<'a, B> {
767917
op_if!(read_prefix if READ_PREFIX);
768918
op_if!(read_header if READ_HEADER);
769919
op_if!(write if WRITE);
920+
op_if!(writestream if WRITESTREAM);
921+
op_if!(readstream if READSTREAM);
770922
op_if!(exists if EXISTS);
771923
op_if!(exists_prefix if EXISTS_PREFIX);
772924
op_if!(refresh if REFRESH);
@@ -902,6 +1054,90 @@ impl<B: OdbBackend> Backend<B> {
9021054
}
9031055
raw::GIT_OK
9041056
}
1057+
extern "C" fn writestream(
1058+
stream_out: *mut *mut raw::git_odb_stream,
1059+
backend_ptr: *mut raw::git_odb_backend,
1060+
length: raw::git_object_size_t,
1061+
object_type: raw::git_object_t,
1062+
) -> libc::c_int {
1063+
let backend = unsafe { backend_ptr.cast::<Backend<B>>().as_mut().unwrap() };
1064+
let object_type = ObjectType::from_raw(object_type).unwrap();
1065+
let context = OdbBackendContext { backend_ptr };
1066+
let stream_out = unsafe { stream_out.as_mut().unwrap() };
1067+
let stream = match backend
1068+
.inner
1069+
.open_write_stream(&context, length as usize, object_type)
1070+
{
1071+
Err(e) => return unsafe { e.raw_set_git_error() },
1072+
Ok(x) => x,
1073+
};
1074+
1075+
let stream = WriteStream::<B> {
1076+
parent: raw::git_odb_stream {
1077+
backend: backend_ptr,
1078+
mode: raw::GIT_STREAM_WRONLY,
1079+
hash_ctx: ptr::null_mut(),
1080+
declared_size: 0,
1081+
received_bytes: 0,
1082+
read: None,
1083+
write: Some(WriteStream::<B>::write),
1084+
finalize_write: Some(WriteStream::<B>::finalize_write),
1085+
free: Some(WriteStream::<B>::free),
1086+
},
1087+
_marker: marker::PhantomData,
1088+
inner: stream,
1089+
};
1090+
1091+
*stream_out = unsafe { allocate(stream).cast() };
1092+
1093+
raw::GIT_OK
1094+
}
1095+
extern "C" fn readstream(
1096+
stream_out: *mut *mut raw::git_odb_stream,
1097+
length_ptr: *mut libc::size_t,
1098+
otype_ptr: *mut raw::git_object_t,
1099+
backend_ptr: *mut raw::git_odb_backend,
1100+
oid_ptr: *const raw::git_oid,
1101+
) -> libc::c_int {
1102+
let size = unsafe { length_ptr.as_mut().unwrap() };
1103+
let otype = unsafe { otype_ptr.as_mut().unwrap() };
1104+
let backend = unsafe { backend_ptr.cast::<Backend<B>>().as_mut().unwrap() };
1105+
let oid = unsafe { Oid::from_raw(oid_ptr) };
1106+
let stream_out = unsafe { stream_out.as_mut().unwrap() };
1107+
1108+
let context = OdbBackendContext { backend_ptr };
1109+
1110+
let mut object_type = ObjectType::Any;
1111+
let stream = match backend
1112+
.inner
1113+
.open_read_stream(&context, oid, size, &mut object_type)
1114+
{
1115+
Err(e) => return unsafe { e.raw_set_git_error() },
1116+
Ok(x) => x,
1117+
};
1118+
1119+
*otype = object_type.raw();
1120+
1121+
let stream = ReadStream::<B> {
1122+
parent: raw::git_odb_stream {
1123+
backend: backend_ptr,
1124+
mode: raw::GIT_STREAM_RDONLY,
1125+
hash_ctx: ptr::null_mut(),
1126+
declared_size: 0,
1127+
received_bytes: 0,
1128+
read: Some(ReadStream::<B>::read),
1129+
write: None,
1130+
finalize_write: None,
1131+
free: Some(ReadStream::<B>::free),
1132+
},
1133+
_marker: marker::PhantomData,
1134+
inner: stream,
1135+
};
1136+
1137+
*stream_out = unsafe { allocate(stream).cast() };
1138+
1139+
raw::GIT_OK
1140+
}
9051141

9061142
extern "C" fn exists(
9071143
backend_ptr: *mut raw::git_odb_backend,
@@ -1074,3 +1310,84 @@ where
10741310
drop(inner);
10751311
}
10761312
}
1313+
1314+
struct Stream<B, T> {
1315+
parent: raw::git_odb_stream,
1316+
_marker: marker::PhantomData<B>,
1317+
inner: T,
1318+
}
1319+
impl<B, T> Stream<B, T> {
1320+
extern "C" fn read(
1321+
stream_ptr: *mut raw::git_odb_stream,
1322+
out_ptr: *mut libc::c_char,
1323+
out_len: libc::size_t,
1324+
) -> libc::c_int
1325+
where
1326+
B: OdbBackend<ReadStream = T>,
1327+
T: OdbReadStream<B>,
1328+
{
1329+
let stream = unsafe { stream_ptr.cast::<Self>().as_mut().unwrap() };
1330+
let buf_len = (out_len as usize).max(libc::c_int::MAX as usize);
1331+
let buf = unsafe { slice::from_raw_parts_mut(out_ptr.cast::<u8>(), buf_len) };
1332+
let mut context = OdbStreamContext {
1333+
backend_ptr: ptr::NonNull::new(stream.parent.backend).unwrap().cast(),
1334+
};
1335+
let read_bytes = match stream.inner.read(&mut context, buf) {
1336+
Err(e) => return unsafe { e.raw_set_git_error() },
1337+
Ok(x) => x,
1338+
};
1339+
read_bytes as libc::c_int
1340+
}
1341+
1342+
extern "C" fn write(
1343+
stream_ptr: *mut raw::git_odb_stream,
1344+
data: *const libc::c_char,
1345+
len: libc::size_t,
1346+
) -> libc::c_int
1347+
where
1348+
B: OdbBackend<WriteStream = T>,
1349+
T: OdbWriteStream<B>,
1350+
{
1351+
let stream = unsafe { stream_ptr.cast::<Self>().as_mut().unwrap() };
1352+
let data = unsafe { slice::from_raw_parts(data.cast::<u8>(), len) };
1353+
let mut context = OdbStreamContext {
1354+
backend_ptr: ptr::NonNull::new(stream.parent.backend).unwrap().cast(),
1355+
};
1356+
if let Err(e) = stream.inner.write(&mut context, data) {
1357+
return unsafe { e.raw_set_git_error() };
1358+
}
1359+
1360+
raw::GIT_OK
1361+
}
1362+
extern "C" fn finalize_write(
1363+
stream_ptr: *mut raw::git_odb_stream,
1364+
oid_ptr: *const raw::git_oid,
1365+
) -> libc::c_int
1366+
where
1367+
B: OdbBackend<WriteStream = T>,
1368+
T: OdbWriteStream<B>,
1369+
{
1370+
let stream = unsafe { stream_ptr.cast::<Self>().as_mut().unwrap() };
1371+
let oid = unsafe { Oid::from_raw(oid_ptr) };
1372+
let mut context = OdbStreamContext {
1373+
backend_ptr: ptr::NonNull::new(stream.parent.backend).unwrap().cast(),
1374+
};
1375+
if let Err(e) = stream.inner.finalize_write(&mut context, oid) {
1376+
return unsafe { e.raw_set_git_error() };
1377+
}
1378+
raw::GIT_OK
1379+
}
1380+
1381+
extern "C" fn free(stream_ptr: *mut raw::git_odb_stream) {
1382+
unsafe { free(stream_ptr.cast::<Self>()) }
1383+
}
1384+
}
1385+
type WriteStream<B> = Stream<B, <B as OdbBackend>::WriteStream>;
1386+
type ReadStream<B> = Stream<B, <B as OdbBackend>::ReadStream>;
1387+
1388+
unsafe fn allocate<T>(value: T) -> *mut T {
1389+
Box::into_raw(Box::new(value))
1390+
}
1391+
unsafe fn free<T>(ptr: *mut T) {
1392+
drop(Box::from_raw(ptr))
1393+
}

0 commit comments

Comments
 (0)