Skip to content

Commit 210d6c8

Browse files
remove upload
1 parent b62098d commit 210d6c8

File tree

3 files changed

+185
-367
lines changed

3 files changed

+185
-367
lines changed

src/gridfs.rs

Lines changed: 4 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// TODO(RUST-1395) Remove these allows.
22
#![allow(dead_code, unused_variables)]
33

4+
mod download;
45
pub mod options;
5-
mod upload;
66

77
use core::task::{Context, Poll};
88
use std::{
@@ -11,15 +11,14 @@ use std::{
1111
};
1212

1313
use serde::{Deserialize, Serialize};
14-
use tokio::io::{AsyncWriteExt, ReadBuf};
15-
use tokio_util::compat::FuturesAsyncWriteCompatExt;
14+
use tokio::io::ReadBuf;
1615

1716
use crate::{
1817
bson::{doc, oid::ObjectId, Binary, Bson, DateTime, Document},
1918
concern::{ReadConcern, WriteConcern},
2019
cursor::Cursor,
21-
error::{ErrorKind, Result},
22-
options::{FindOneOptions, FindOptions, SelectionCriteria},
20+
error::Result,
21+
options::SelectionCriteria,
2322
Collection,
2423
Database,
2524
};
@@ -260,176 +259,6 @@ impl GridFsBucket {
260259
todo!()
261260
}
262261

263-
/// Downloads the contents of the stored file specified by `id` and writes
264-
/// the contents to the `destination`.
265-
pub async fn download_to_tokio_writer<T>(&self, id: Bson, destination: &mut T) -> Result<()>
266-
where
267-
T: tokio::io::AsyncWrite + std::marker::Unpin,
268-
{
269-
let options = FindOneOptions::builder()
270-
.read_concern(self.read_concern().cloned())
271-
.selection_criteria(self.selection_criteria().cloned())
272-
.build();
273-
274-
let file = match self
275-
.inner
276-
.files
277-
.find_one(doc! { "_id": &id }, options)
278-
.await?
279-
{
280-
Some(fcd) => fcd,
281-
None => {
282-
return Err(ErrorKind::InvalidArgument {
283-
message: format!("couldn't find file with id {}", &id),
284-
}
285-
.into());
286-
}
287-
};
288-
self.download_to_tokio_writer_common(file, destination)
289-
.await
290-
}
291-
292-
/// Downloads the contents of the stored file specified by `id` and writes
293-
/// the contents to the `destination`.
294-
pub async fn download_to_futures_0_3_writer<T>(
295-
&self,
296-
id: Bson,
297-
destination: &mut T,
298-
) -> Result<()>
299-
where
300-
T: futures_util::io::AsyncWrite + std::marker::Unpin,
301-
{
302-
self.download_to_tokio_writer(id, &mut destination.compat_write())
303-
.await
304-
}
305-
306-
/// Downloads the contents of the stored file specified by `filename` and writes the contents to
307-
/// the `destination`. If there are multiple files with the same filename, the `revision` in the
308-
/// options provided is used to determine which one to download. If no `revision` is specified,
309-
/// the most recent file with the given filename is chosen.
310-
pub async fn download_to_tokio_writer_by_name<T>(
311-
&self,
312-
filename: impl AsRef<str>,
313-
destination: &mut T,
314-
options: impl Into<Option<GridFsDownloadByNameOptions>>,
315-
) -> Result<()>
316-
where
317-
T: tokio::io::AsyncWrite + std::marker::Unpin,
318-
{
319-
let revision = options.into().and_then(|opts| opts.revision).unwrap_or(-1);
320-
let (sort, skip) = if revision >= 0 {
321-
(1, revision)
322-
} else {
323-
(-1, -revision - 1)
324-
};
325-
let options = FindOneOptions::builder()
326-
.sort(doc! { "uploadDate": sort })
327-
.skip(skip as u64)
328-
.read_concern(self.read_concern().cloned())
329-
.selection_criteria(self.selection_criteria().cloned())
330-
.build();
331-
332-
let file = match self
333-
.inner
334-
.files
335-
.find_one(doc! { "filename": filename.as_ref() }, options)
336-
.await?
337-
{
338-
Some(fcd) => fcd,
339-
None => {
340-
return Err(ErrorKind::InvalidArgument {
341-
message: format!("couldn't find file with name {}", &filename.as_ref()),
342-
}
343-
.into());
344-
}
345-
};
346-
347-
self.download_to_tokio_writer_common(file, destination)
348-
.await
349-
}
350-
351-
/// Downloads the contents of the stored file specified by `filename` and writes the contents to
352-
/// the `destination`. If there are multiple files with the same filename, the `revision` in the
353-
/// options provided is used to determine which one to download. If no `revision` is specified,
354-
/// the most recent file with the given filename is chosen.
355-
pub async fn download_to_futures_0_3_writer_by_name<T>(
356-
&self,
357-
filename: impl AsRef<str>,
358-
destination: &mut T,
359-
options: impl Into<Option<GridFsDownloadByNameOptions>>,
360-
) -> Result<()>
361-
where
362-
T: futures_util::io::AsyncWrite + std::marker::Unpin,
363-
{
364-
self.download_to_tokio_writer_by_name(filename, &mut destination.compat_write(), options)
365-
.await
366-
}
367-
368-
async fn download_to_tokio_writer_common<T>(
369-
&self,
370-
file: FilesCollectionDocument,
371-
destination: &mut T,
372-
) -> Result<()>
373-
where
374-
T: tokio::io::AsyncWrite + std::marker::Unpin,
375-
{
376-
let total_bytes_expected = file.length;
377-
let chunk_size = file.chunk_size as u64;
378-
379-
if total_bytes_expected == 0 {
380-
return Ok(());
381-
}
382-
383-
let options = FindOptions::builder()
384-
.sort(doc! { "n": 1 })
385-
.read_concern(self.read_concern().cloned())
386-
.selection_criteria(self.selection_criteria().cloned())
387-
.build();
388-
let mut cursor = self
389-
.inner
390-
.chunks
391-
.find(doc! { "files_id": &file.id }, options)
392-
.await?;
393-
394-
let mut n = 0;
395-
while cursor.advance().await? {
396-
let chunk = cursor.deserialize_current()?;
397-
if chunk.n != n {
398-
return Err(ErrorKind::GridFS {
399-
message: format!("missing chunk {} in file", n),
400-
}
401-
.into());
402-
}
403-
404-
let chunk_length = chunk.data.bytes.len();
405-
let expected_length =
406-
std::cmp::min(total_bytes_expected - chunk_size * n as u64, chunk_size);
407-
if chunk_length as u64 != expected_length {
408-
return Err(ErrorKind::GridFS {
409-
message: format!(
410-
"chunk {} was expected to be {} bytes but is {} bytes",
411-
n, chunk_length, expected_length
412-
),
413-
}
414-
.into());
415-
}
416-
417-
destination.write_all(&chunk.data.bytes).await?;
418-
n += 1;
419-
}
420-
421-
let expected_n =
422-
total_bytes_expected / chunk_size + u64::from(total_bytes_expected % chunk_size != 0);
423-
if (n as u64) < expected_n {
424-
return Err(ErrorKind::GridFS {
425-
message: "missing last chunk in file".into(),
426-
}
427-
.into());
428-
}
429-
430-
Ok(())
431-
}
432-
433262
/// Given an `id`, deletes the stored file's files collection document and
434263
/// associated chunks from a [`GridFsBucket`].
435264
pub async fn delete(&self, id: Bson) {

src/gridfs/download.rs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
use tokio::io::AsyncWriteExt;
2+
use tokio_util::compat::FuturesAsyncWriteCompatExt;
3+
4+
use super::{options::GridFsDownloadByNameOptions, FilesCollectionDocument, GridFsBucket};
5+
use crate::{
6+
bson::{doc, Bson},
7+
error::{ErrorKind, Result},
8+
options::{FindOneOptions, FindOptions},
9+
};
10+
11+
impl GridFsBucket {
12+
/// Downloads the contents of the stored file specified by `id` and writes
13+
/// the contents to the `destination`.
14+
pub async fn download_to_tokio_writer<T>(&self, id: Bson, destination: &mut T) -> Result<()>
15+
where
16+
T: tokio::io::AsyncWrite + std::marker::Unpin,
17+
{
18+
let options = FindOneOptions::builder()
19+
.read_concern(self.read_concern().cloned())
20+
.selection_criteria(self.selection_criteria().cloned())
21+
.build();
22+
23+
let file = match self
24+
.inner
25+
.files
26+
.find_one(doc! { "_id": &id }, options)
27+
.await?
28+
{
29+
Some(fcd) => fcd,
30+
None => {
31+
return Err(ErrorKind::InvalidArgument {
32+
message: format!("couldn't find file with id {}", &id),
33+
}
34+
.into());
35+
}
36+
};
37+
self.download_to_tokio_writer_common(file, destination)
38+
.await
39+
}
40+
41+
/// Downloads the contents of the stored file specified by `id` and writes
42+
/// the contents to the `destination`.
43+
pub async fn download_to_futures_0_3_writer<T>(
44+
&self,
45+
id: Bson,
46+
destination: &mut T,
47+
) -> Result<()>
48+
where
49+
T: futures_util::io::AsyncWrite + std::marker::Unpin,
50+
{
51+
self.download_to_tokio_writer(id, &mut destination.compat_write())
52+
.await
53+
}
54+
55+
/// Downloads the contents of the stored file specified by `filename` and writes the contents to
56+
/// the `destination`. If there are multiple files with the same filename, the `revision` in the
57+
/// options provided is used to determine which one to download. If no `revision` is specified,
58+
/// the most recent file with the given filename is chosen.
59+
pub async fn download_to_tokio_writer_by_name<T>(
60+
&self,
61+
filename: impl AsRef<str>,
62+
destination: &mut T,
63+
options: impl Into<Option<GridFsDownloadByNameOptions>>,
64+
) -> Result<()>
65+
where
66+
T: tokio::io::AsyncWrite + std::marker::Unpin,
67+
{
68+
let revision = options.into().and_then(|opts| opts.revision).unwrap_or(-1);
69+
let (sort, skip) = if revision >= 0 {
70+
(1, revision)
71+
} else {
72+
(-1, -revision - 1)
73+
};
74+
let options = FindOneOptions::builder()
75+
.sort(doc! { "uploadDate": sort })
76+
.skip(skip as u64)
77+
.read_concern(self.read_concern().cloned())
78+
.selection_criteria(self.selection_criteria().cloned())
79+
.build();
80+
81+
let file = match self
82+
.inner
83+
.files
84+
.find_one(doc! { "filename": filename.as_ref() }, options)
85+
.await?
86+
{
87+
Some(fcd) => fcd,
88+
None => {
89+
return Err(ErrorKind::InvalidArgument {
90+
message: format!("couldn't find file with name {}", &filename.as_ref()),
91+
}
92+
.into());
93+
}
94+
};
95+
96+
self.download_to_tokio_writer_common(file, destination)
97+
.await
98+
}
99+
100+
/// Downloads the contents of the stored file specified by `filename` and writes the contents to
101+
/// the `destination`. If there are multiple files with the same filename, the `revision` in the
102+
/// options provided is used to determine which one to download. If no `revision` is specified,
103+
/// the most recent file with the given filename is chosen.
104+
pub async fn download_to_futures_0_3_writer_by_name<T>(
105+
&self,
106+
filename: impl AsRef<str>,
107+
destination: &mut T,
108+
options: impl Into<Option<GridFsDownloadByNameOptions>>,
109+
) -> Result<()>
110+
where
111+
T: futures_util::io::AsyncWrite + std::marker::Unpin,
112+
{
113+
self.download_to_tokio_writer_by_name(filename, &mut destination.compat_write(), options)
114+
.await
115+
}
116+
117+
async fn download_to_tokio_writer_common<T>(
118+
&self,
119+
file: FilesCollectionDocument,
120+
destination: &mut T,
121+
) -> Result<()>
122+
where
123+
T: tokio::io::AsyncWrite + std::marker::Unpin,
124+
{
125+
let total_bytes_expected = file.length;
126+
let chunk_size = file.chunk_size as u64;
127+
128+
if total_bytes_expected == 0 {
129+
return Ok(());
130+
}
131+
132+
let options = FindOptions::builder()
133+
.sort(doc! { "n": 1 })
134+
.read_concern(self.read_concern().cloned())
135+
.selection_criteria(self.selection_criteria().cloned())
136+
.build();
137+
let mut cursor = self
138+
.inner
139+
.chunks
140+
.find(doc! { "files_id": &file.id }, options)
141+
.await?;
142+
143+
let mut n = 0;
144+
while cursor.advance().await? {
145+
let chunk = cursor.deserialize_current()?;
146+
if chunk.n != n {
147+
return Err(ErrorKind::GridFS {
148+
message: format!("missing chunk {} in file", n),
149+
}
150+
.into());
151+
}
152+
153+
let chunk_length = chunk.data.bytes.len();
154+
let expected_length =
155+
std::cmp::min(total_bytes_expected - chunk_size * n as u64, chunk_size);
156+
if chunk_length as u64 != expected_length {
157+
return Err(ErrorKind::GridFS {
158+
message: format!(
159+
"chunk {} was expected to be {} bytes but is {} bytes",
160+
n, chunk_length, expected_length
161+
),
162+
}
163+
.into());
164+
}
165+
166+
destination.write_all(&chunk.data.bytes).await?;
167+
n += 1;
168+
}
169+
170+
let expected_n =
171+
total_bytes_expected / chunk_size + u64::from(total_bytes_expected % chunk_size != 0);
172+
if (n as u64) < expected_n {
173+
return Err(ErrorKind::GridFS {
174+
message: "missing last chunk in file".into(),
175+
}
176+
.into());
177+
}
178+
179+
Ok(())
180+
}
181+
}

0 commit comments

Comments
 (0)