Skip to content

Commit f53bc69

Browse files
committed
feat: implement IMAP COMPRESS
1 parent 1954ce4 commit f53bc69

File tree

5 files changed

+194
-2
lines changed

5 files changed

+194
-2
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ jobs:
3131
- name: check tokio
3232
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio
3333

34+
- name: check compress feature with tokio
35+
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio,compress
36+
37+
- name: check compress feature with async-std
38+
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std,compress
39+
3440
- name: check async-std examples
3541
working-directory: examples
3642
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" }
2020

2121
[features]
2222
default = ["runtime-async-std"]
23+
compress = ["async-compression"]
2324

24-
runtime-async-std = ["async-std"]
25-
runtime-tokio = ["tokio"]
25+
runtime-async-std = ["async-std", "async-compression?/futures-io"]
26+
runtime-tokio = ["tokio", "async-compression?/tokio"]
2627

2728
[dependencies]
2829
async-channel = "2.0.0"
30+
async-compression = { version = "0.4.15", default-features = false, features = ["deflate"], optional = true }
2931
async-std = { version = "1.8.0", default-features = false, features = ["std", "unstable"], optional = true }
3032
base64 = "0.21"
3133
bytes = "1"
@@ -35,6 +37,7 @@ imap-proto = "0.16.4"
3537
log = "0.4.8"
3638
nom = "7.0"
3739
once_cell = "1.8.0"
40+
pin-project = "1"
3841
pin-utils = "0.1.0-alpha.4"
3942
self_cell = "1.0.1"
4043
stop-token = "0.7"

src/extensions/compress.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
//! IMAP COMPRESS extension specified in [RFC4978](https://www.rfc-editor.org/rfc/rfc4978.html).
2+
3+
use std::fmt;
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
7+
use pin_project::pin_project;
8+
9+
use crate::client::Session;
10+
use crate::error::Result;
11+
use crate::imap_stream::ImapStream;
12+
use crate::types::IdGenerator;
13+
use crate::Connection;
14+
15+
#[cfg(feature = "runtime-async-std")]
16+
use async_std::io::{Read, Write};
17+
#[cfg(feature = "runtime-async-std")]
18+
use futures::io::BufReader;
19+
#[cfg(feature = "runtime-tokio")]
20+
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, BufReader, ReadBuf};
21+
22+
#[cfg(feature = "runtime-tokio")]
23+
use async_compression::tokio::bufread::DeflateDecoder;
24+
#[cfg(feature = "runtime-tokio")]
25+
use async_compression::tokio::write::DeflateEncoder;
26+
27+
#[cfg(feature = "runtime-async-std")]
28+
use async_compression::futures::bufread::DeflateDecoder;
29+
#[cfg(feature = "runtime-async-std")]
30+
use async_compression::futures::write::DeflateEncoder;
31+
32+
/// IMAP stream
33+
#[derive(Debug)]
34+
#[pin_project]
35+
pub struct DeflateStream<T: Read + Write + Unpin + fmt::Debug> {
36+
#[pin]
37+
inner: DeflateDecoder<BufReader<DeflateEncoder<T>>>,
38+
}
39+
40+
impl<T: Read + Write + Unpin + fmt::Debug> DeflateStream<T> {
41+
pub(crate) fn new(stream: T) -> Self {
42+
let stream = DeflateEncoder::new(stream);
43+
let stream = BufReader::new(stream);
44+
let stream = DeflateDecoder::new(stream);
45+
Self { inner: stream }
46+
}
47+
48+
/// Gets a reference to the underlying stream.
49+
pub fn get_ref(&self) -> &T {
50+
self.inner.get_ref().get_ref().get_ref()
51+
}
52+
53+
/// Gets a mutable reference to the underlying stream.
54+
pub fn get_mut(&mut self) -> &mut T {
55+
self.inner.get_mut().get_mut().get_mut()
56+
}
57+
58+
/// Consumes `DeflateStream` and returns underlying stream.
59+
pub fn into_inner(self) -> T {
60+
self.inner.into_inner().into_inner().into_inner()
61+
}
62+
}
63+
64+
#[cfg(feature = "runtime-tokio")]
65+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
66+
fn poll_read(
67+
self: Pin<&mut Self>,
68+
cx: &mut Context<'_>,
69+
buf: &mut ReadBuf<'_>,
70+
) -> Poll<std::io::Result<()>> {
71+
self.project().inner.poll_read(cx, buf)
72+
}
73+
}
74+
75+
#[cfg(feature = "runtime-async-std")]
76+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
77+
fn poll_read(
78+
self: Pin<&mut Self>,
79+
cx: &mut Context<'_>,
80+
buf: &mut [u8],
81+
) -> Poll<async_std::io::Result<usize>> {
82+
self.project().inner.poll_read(cx, buf)
83+
}
84+
}
85+
86+
#[cfg(feature = "runtime-tokio")]
87+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
88+
fn poll_write(
89+
self: Pin<&mut Self>,
90+
cx: &mut std::task::Context<'_>,
91+
buf: &[u8],
92+
) -> Poll<std::io::Result<usize>> {
93+
self.project().inner.get_pin_mut().poll_write(cx, buf)
94+
}
95+
96+
fn poll_write_vectored(
97+
self: Pin<&mut Self>,
98+
cx: &mut Context<'_>,
99+
bufs: &[std::io::IoSlice<'_>],
100+
) -> Poll<std::io::Result<usize>> {
101+
self.project().inner.poll_write_vectored(cx, bufs)
102+
}
103+
104+
fn is_write_vectored(&self) -> bool {
105+
self.inner.is_write_vectored()
106+
}
107+
108+
fn poll_flush(
109+
self: Pin<&mut Self>,
110+
cx: &mut std::task::Context<'_>,
111+
) -> Poll<std::io::Result<()>> {
112+
self.project().inner.poll_flush(cx)
113+
}
114+
115+
fn poll_shutdown(
116+
self: Pin<&mut Self>,
117+
cx: &mut std::task::Context<'_>,
118+
) -> Poll<std::io::Result<()>> {
119+
self.project().inner.poll_shutdown(cx)
120+
}
121+
}
122+
123+
#[cfg(feature = "runtime-async-std")]
124+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
125+
fn poll_write(
126+
self: Pin<&mut Self>,
127+
cx: &mut std::task::Context<'_>,
128+
buf: &[u8],
129+
) -> Poll<async_std::io::Result<usize>> {
130+
self.project().inner.as_mut().poll_write(cx, buf)
131+
}
132+
133+
fn poll_flush(
134+
self: Pin<&mut Self>,
135+
cx: &mut std::task::Context<'_>,
136+
) -> Poll<async_std::io::Result<()>> {
137+
self.project().inner.poll_flush(cx)
138+
}
139+
140+
fn poll_close(
141+
self: Pin<&mut Self>,
142+
cx: &mut std::task::Context<'_>,
143+
) -> Poll<async_std::io::Result<()>> {
144+
self.project().inner.poll_close(cx)
145+
}
146+
}
147+
148+
impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
149+
/// Runs `COMPRESS DEFLATE` command.
150+
pub async fn compress<F, S>(self, f: F) -> Result<Session<S>>
151+
where
152+
S: Read + Write + Unpin + fmt::Debug,
153+
F: FnOnce(DeflateStream<T>) -> S,
154+
{
155+
let Self {
156+
mut conn,
157+
unsolicited_responses_tx,
158+
unsolicited_responses,
159+
} = self;
160+
conn.run_command_and_check_ok("COMPRESS DEFLATE", Some(unsolicited_responses_tx.clone()))
161+
.await?;
162+
163+
let stream = conn.into_inner();
164+
let deflate_stream = DeflateStream::new(stream);
165+
let stream = ImapStream::new(f(deflate_stream));
166+
let conn = Connection {
167+
stream,
168+
request_ids: IdGenerator::new(),
169+
};
170+
let session = Session {
171+
conn,
172+
unsolicited_responses_tx,
173+
unsolicited_responses,
174+
};
175+
Ok(session)
176+
}
177+
}

src/extensions/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
//! Implementations of various IMAP extensions.
2+
#[cfg(feature = "compress")]
3+
pub mod compress;
4+
25
pub mod idle;
36

47
pub mod quota;

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ mod imap_stream;
9595
mod parse;
9696
pub mod types;
9797

98+
#[cfg(feature = "compress")]
99+
pub use crate::extensions::compress::DeflateStream;
100+
98101
pub use crate::authenticator::Authenticator;
99102
pub use crate::client::*;
100103

0 commit comments

Comments
 (0)