Skip to content

Commit 1004c8c

Browse files
committed
Refactor: Encapsulate read and write of the download (ipfs & aleph)
1 parent 5a066dc commit 1004c8c

File tree

2 files changed

+111
-29
lines changed

2 files changed

+111
-29
lines changed

src/aleph/sdk/client.py

Lines changed: 97 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
Type,
2323
TypeVar,
2424
Union,
25+
Protocol,
2526
)
2627
from io import BytesIO
27-
from typing import BinaryIO
2828

2929
import aiohttp
3030
from aleph_message.models import (
@@ -69,6 +69,28 @@
6969
magic = None # type:ignore
7070

7171
T = TypeVar("T")
72+
C = TypeVar("C", str, bytes, covariant=True)
73+
U = TypeVar("U", str, bytes, contravariant=True)
74+
75+
76+
class AsyncReadable(Protocol[C]):
77+
async def read(self, n: int = -1) -> C:
78+
...
79+
80+
81+
class Writable(Protocol[U]):
82+
def write(self, buffer: U) -> int:
83+
...
84+
85+
86+
async def copy_async_readable_to_buffer(
87+
readable: AsyncReadable[C], buffer: Writable[C], chunk_size: int
88+
):
89+
while True:
90+
chunk = await readable.read(chunk_size)
91+
if not chunk:
92+
break
93+
buffer.write(chunk)
7294

7395

7496
def async_wrapper(f):
@@ -232,6 +254,12 @@ def get_posts(
232254
def download_file(self, file_hash: str) -> bytes:
233255
return self._wrap(self.async_session.download_file, file_hash=file_hash)
234256

257+
def download_file_ipfs(self, file_hash: str) -> bytes:
258+
return self._wrap(
259+
self.async_session.download_file_ipfs,
260+
file_hash=file_hash,
261+
)
262+
235263
def watch_messages(
236264
self,
237265
message_type: Optional[MessageType] = None,
@@ -443,33 +471,6 @@ def submit(
443471
)
444472

445473

446-
async def download_file_to_buffer(
447-
file_hash: str,
448-
output_buffer: BinaryIO,
449-
) -> None:
450-
"""
451-
Download a file from the storage engine and write it to the specified output buffer.
452-
453-
:param file_hash: The hash of the file to retrieve.
454-
:param output_buffer: The binary output buffer to write the file data to.
455-
"""
456-
url: str = f"{settings.API_HOST}/api/v0/storage/raw/{file_hash}"
457-
458-
ipfs_hash = ItemHash(file_hash)
459-
if ItemType.from_hash(ipfs_hash) == ItemType.ipfs:
460-
url = f"https://ipfs.aleph.im/ipfs/{file_hash}"
461-
async with aiohttp.ClientSession() as session:
462-
async with session.get(url) as response:
463-
if response.status == 200:
464-
while True:
465-
chunk = await response.content.read(16384)
466-
if not chunk:
467-
break
468-
output_buffer.write(chunk)
469-
elif response.status == 413:
470-
raise FileTooLarge(f"The file from {file_hash} is too large")
471-
472-
473474
class AlephClient:
474475
api_server: str
475476
http_session: aiohttp.ClientSession
@@ -639,6 +640,58 @@ async def get_posts(
639640
resp.raise_for_status()
640641
return await resp.json()
641642

643+
async def download_file_to_buffer(
644+
self,
645+
file_hash: str,
646+
output_buffer: Writable[bytes],
647+
) -> None:
648+
"""
649+
Download a file from the storage engine and write it to the specified output buffer.
650+
:param file_hash: The hash of the file to retrieve.
651+
:param output_buffer: Writable binary buffer. The file will be written to this buffer.
652+
"""
653+
654+
async with aiohttp.ClientSession() as session:
655+
async with self.http_session.get(
656+
f"/api/v0/storage/raw/{file_hash}"
657+
) as response:
658+
if response.status == 200:
659+
await copy_async_readable_to_buffer(
660+
response.content, output_buffer, chunk_size=16 * 1024
661+
)
662+
if response.status == 413:
663+
ipfs_hash = ItemHash(file_hash)
664+
if ItemType.from_hash(ipfs_hash) == ItemType.ipfs:
665+
return await self.download_file_ipfs_to_buffer(
666+
file_hash, output_buffer
667+
)
668+
else:
669+
raise FileTooLarge(f"The file from {file_hash} is too large")
670+
671+
async def download_file_ipfs_to_buffer(
672+
self,
673+
file_hash: str,
674+
output_buffer: Writable[bytes],
675+
) -> None:
676+
"""
677+
Download a file from the storage engine and write it to the specified output buffer.
678+
679+
:param file_hash: The hash of the file to retrieve.
680+
:param output_buffer: The binary output buffer to write the file data to.
681+
"""
682+
async with aiohttp.ClientSession() as session:
683+
async with session.get(
684+
f"https://ipfs.aleph.im/ipfs/{file_hash}"
685+
) as response:
686+
if response.status == 200:
687+
await copy_async_readable_to_buffer(
688+
response.content, output_buffer, chunk_size=16 * 1024
689+
)
690+
elif response.status == 413:
691+
raise FileTooLarge()
692+
else:
693+
response.raise_for_status()
694+
642695
async def download_file(
643696
self,
644697
file_hash: str,
@@ -651,7 +704,22 @@ async def download_file(
651704
:param file_hash: The hash of the file to retrieve.
652705
"""
653706
buffer = BytesIO()
654-
await download_file_to_buffer(file_hash, output_buffer=buffer)
707+
await self.download_file_to_buffer(file_hash, output_buffer=buffer)
708+
return buffer.getvalue()
709+
710+
async def download_file_ipfs(
711+
self,
712+
file_hash: str,
713+
) -> bytes:
714+
"""
715+
Get a file from the ipfs storage engine as raw bytes.
716+
717+
Warning: Downloading large files can be slow.
718+
719+
:param file_hash: The hash of the file to retrieve.
720+
"""
721+
buffer = BytesIO()
722+
await self.download_file_ipfs_to_buffer(file_hash, output_buffer=buffer)
655723
return buffer.getvalue()
656724

657725
async def get_messages(

tests/unit/test_download.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,17 @@ async def test_download(file_hash: str, expected_size: int):
1717
file_size = len(file_content)
1818
assert file_size == expected_size
1919

20+
21+
@pytest.mark.parametrize(
22+
"file_hash,expected_size",
23+
[
24+
("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5),
25+
("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703),
26+
],
27+
)
28+
@pytest.mark.asyncio
29+
async def test_download_ipfs(file_hash: str, expected_size: int):
30+
async with AlephClient(api_server=sdk_settings.API_HOST) as client:
31+
file_content = await client.download_file_ipfs(file_hash) ## 5817703 B FILE
32+
file_size = len(file_content)
33+
assert file_size == expected_size

0 commit comments

Comments
 (0)