Skip to content

Commit dd5602e

Browse files
committed
Deduplicate generic::bufread::Decoder impl
Ref #384
1 parent b79f66d commit dd5602e

File tree

6 files changed

+171
-171
lines changed

6 files changed

+171
-171
lines changed

crates/async-compression/src/futures/bufread/generic/decoder.rs

Lines changed: 19 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,24 @@
11
use crate::codecs::Decode;
22
use crate::core::util::PartialBuffer;
3+
use crate::generic::bufread::{AsyncBufRead as GenericAsyncBufRead, Decoder as GenericDecoder};
34

45
use core::{
56
pin::Pin,
67
task::{Context, Poll},
78
};
9+
use std::io::{IoSlice, Result};
10+
811
use futures_core::ready;
912
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
1013
use pin_project_lite::pin_project;
11-
use std::io::{IoSlice, Result};
12-
13-
#[derive(Debug)]
14-
enum State {
15-
Decoding,
16-
Flushing,
17-
Done,
18-
Next,
19-
}
2014

2115
pin_project! {
2216
#[derive(Debug)]
2317
pub struct Decoder<R, D> {
2418
#[pin]
2519
reader: R,
2620
decoder: D,
27-
state: State,
28-
multiple_members: bool,
21+
inner: GenericDecoder,
2922
}
3023
}
3124

@@ -34,8 +27,7 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
3427
Self {
3528
reader,
3629
decoder,
37-
state: State::Decoding,
38-
multiple_members: false,
30+
inner: GenericDecoder::default(),
3931
}
4032
}
4133
}
@@ -58,94 +50,31 @@ impl<R, D> Decoder<R, D> {
5850
}
5951

6052
pub fn multiple_members(&mut self, enabled: bool) {
61-
self.multiple_members = enabled;
53+
self.inner.multiple_members(enabled);
6254
}
6355
}
6456

6557
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
66-
fn do_poll_read(
58+
pub(crate) fn do_poll_read(
6759
self: Pin<&mut Self>,
6860
cx: &mut Context<'_>,
6961
output: &mut PartialBuffer<&mut [u8]>,
7062
) -> Poll<Result<()>> {
71-
let mut this = self.project();
72-
73-
let mut first = true;
74-
75-
loop {
76-
*this.state = match this.state {
77-
State::Decoding => {
78-
let input = if first {
79-
&[][..]
80-
} else {
81-
ready!(this.reader.as_mut().poll_fill_buf(cx))?
82-
};
83-
84-
if input.is_empty() && !first {
85-
// Avoid attempting to reinitialise the decoder if the
86-
// reader has returned EOF.
87-
*this.multiple_members = false;
88-
89-
State::Flushing
90-
} else {
91-
let mut input = PartialBuffer::new(input);
92-
let res = this.decoder.decode(&mut input, output).or_else(|err| {
93-
// ignore the first error, occurs when input is empty
94-
// but we need to run decode to flush
95-
if first {
96-
Ok(false)
97-
} else {
98-
Err(err)
99-
}
100-
});
101-
102-
if !first {
103-
let len = input.written().len();
104-
this.reader.as_mut().consume(len);
105-
}
106-
107-
first = false;
108-
109-
if res? {
110-
State::Flushing
111-
} else {
112-
State::Decoding
113-
}
114-
}
115-
}
116-
117-
State::Flushing => {
118-
if this.decoder.finish(output)? {
119-
if *this.multiple_members {
120-
this.decoder.reinit()?;
121-
State::Next
122-
} else {
123-
State::Done
124-
}
125-
} else {
126-
State::Flushing
127-
}
128-
}
129-
130-
State::Done => State::Done,
131-
132-
State::Next => {
133-
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
134-
if input.is_empty() {
135-
State::Done
136-
} else {
137-
State::Decoding
138-
}
139-
}
140-
};
141-
142-
if let State::Done = *this.state {
143-
return Poll::Ready(Ok(()));
63+
let this = self.project();
64+
65+
struct Reader<'a, R>(Pin<&'a mut R>);
66+
67+
impl<R: AsyncBufRead> GenericAsyncBufRead for Reader<'_, R> {
68+
fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
69+
self.0.as_mut().poll_fill_buf(cx)
14470
}
145-
if output.unwritten().is_empty() {
146-
return Poll::Ready(Ok(()));
71+
fn consume(&mut self, bytes: usize) {
72+
self.0.as_mut().consume(bytes)
14773
}
14874
}
75+
76+
this.inner
77+
.do_poll_read(cx, output, &mut Reader(this.reader), this.decoder)
14978
}
15079
}
15180

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use crate::codecs::Decode;
2+
use crate::core::util::PartialBuffer;
3+
4+
use core::task::{Context, Poll};
5+
use std::io::Result;
6+
7+
use futures_core::ready;
8+
9+
#[derive(Debug)]
10+
enum State {
11+
Decoding,
12+
Flushing,
13+
Done,
14+
Next,
15+
}
16+
17+
pub(crate) trait AsyncBufRead {
18+
fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
19+
fn consume(&mut self, bytes: usize);
20+
}
21+
22+
#[derive(Debug)]
23+
pub struct Decoder {
24+
state: State,
25+
multiple_members: bool,
26+
}
27+
28+
impl Default for Decoder {
29+
fn default() -> Self {
30+
Self {
31+
state: State::Decoding,
32+
multiple_members: false,
33+
}
34+
}
35+
}
36+
37+
impl Decoder {
38+
pub fn multiple_members(&mut self, enabled: bool) {
39+
self.multiple_members = enabled;
40+
}
41+
42+
pub fn do_poll_read<D: Decode>(
43+
&mut self,
44+
cx: &mut Context<'_>,
45+
output: &mut PartialBuffer<&mut [u8]>,
46+
reader: &mut dyn AsyncBufRead,
47+
decoder: &mut D,
48+
) -> Poll<Result<()>> {
49+
let mut first = true;
50+
51+
loop {
52+
self.state = match self.state {
53+
State::Decoding => {
54+
let input = if first {
55+
&[][..]
56+
} else {
57+
ready!(reader.poll_fill_buf(cx))?
58+
};
59+
60+
if input.is_empty() && !first {
61+
// Avoid attempting to reinitialise the decoder if the
62+
// reader has returned EOF.
63+
self.multiple_members = false;
64+
65+
State::Flushing
66+
} else {
67+
let mut input = PartialBuffer::new(input);
68+
let res = decoder.decode(&mut input, output).or_else(|err| {
69+
// ignore the first error, occurs when input is empty
70+
// but we need to run decode to flush
71+
if first {
72+
Ok(false)
73+
} else {
74+
Err(err)
75+
}
76+
});
77+
78+
if !first {
79+
let len = input.written().len();
80+
reader.consume(len);
81+
}
82+
83+
first = false;
84+
85+
if res? {
86+
State::Flushing
87+
} else {
88+
State::Decoding
89+
}
90+
}
91+
}
92+
93+
State::Flushing => {
94+
if decoder.finish(output)? {
95+
if self.multiple_members {
96+
decoder.reinit()?;
97+
State::Next
98+
} else {
99+
State::Done
100+
}
101+
} else {
102+
State::Flushing
103+
}
104+
}
105+
106+
State::Done => State::Done,
107+
108+
State::Next => {
109+
let input = ready!(reader.poll_fill_buf(cx))?;
110+
if input.is_empty() {
111+
State::Done
112+
} else {
113+
State::Decoding
114+
}
115+
}
116+
};
117+
118+
if let State::Done = self.state {
119+
return Poll::Ready(Ok(()));
120+
}
121+
if output.unwritten().is_empty() {
122+
return Poll::Ready(Ok(()));
123+
}
124+
}
125+
}
126+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod decoder;
2+
3+
pub(crate) use decoder::*;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub(crate) mod bufread;

crates/async-compression/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@
149149
#[macro_use]
150150
mod macros;
151151

152+
/// Generic, async runtime agonistc implementation of en/decoders
153+
mod generic;
154+
152155
#[cfg(feature = "futures-io")]
153156
pub mod futures;
154157
#[cfg(feature = "tokio")]

0 commit comments

Comments
 (0)