From 197899aa71377c7b942d5d2203cce6a33b0eeef6 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 10 Jul 2023 11:18:41 +0200 Subject: [PATCH 01/15] Feature: download on ipfs client --- src/aleph/sdk/client.py | 82 +++++++++++++++++++++++++++++++++---- tests/unit/test_download.py | 32 +++++++++++++++ 2 files changed, 107 insertions(+), 7 deletions(-) create mode 100644 tests/unit/test_download.py diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 9877d716..85d4b83e 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -23,6 +23,8 @@ TypeVar, Union, ) +from io import BytesIO +from typing import BinaryIO import aiohttp from aleph_message.models import ( @@ -225,8 +227,11 @@ def get_posts( end_date=end_date, ) - def download_file(self, file_hash: str) -> bytes: - return self._wrap(self.async_session.download_file, file_hash=file_hash) + def download_file(self, file_hash: str, chunk_size :int = 16 * 1024) -> bytes: + return self._wrap(self.async_session.download_file, file_hash=file_hash, chunk_size=chunk_size) + + def download_file_ipfs(self, file_hash: str, chunk_size : int = 16 * 1024) -> bytes: + return self._wrap(self.async_session.download_file_ipfs, file_hash=file_hash, chunk_size=chunk_size) def watch_messages( self, @@ -608,9 +613,55 @@ async def get_posts( resp.raise_for_status() return await resp.json() + async def download_file_to_buffer( + self, + file_hash: str, + output_buffer: BinaryIO, + chunk_size: int, + ) -> None: + """ + Download a file from the storage engine and write it to the specified output buffer. + :param self: The current instance of the class. + :param file_hash: The hash of the file to retrieve. + :param output_buffer: The binary output buffer to write the file data to. + :param chunk_size: Size of the chunk to download. + """ + async with aiohttp.ClientSession() as session: + async with session.get(f"https://api1.aleph.im/api/v0/storage/raw/{file_hash}") as response: + response.raise_for_status() + + while True: + chunk = await response.content.read(chunk_size) + if not chunk: + break + output_buffer.write(chunk) + + async def download_file_ipfs_to_buffer( + self, + file_hash: str, + output_buffer: BinaryIO, + chunk_size : int, + ) -> None: + """ + Download a file from the storage engine and write it to the specified output buffer. + + :param file_hash: The hash of the file to retrieve. + :param output_buffer: The binary output buffer to write the file data to. + :param chunk_size: Size of chunk we download. + """ + async with aiohttp.ClientSession() as session: + async with session.get(f"https://ipfs.io/ipfs/{file_hash}") as response: + response.raise_for_status() + while True: + chunk = await response.content.read(chunk_size) + if not chunk: + break + output_buffer.write(chunk) + async def download_file( self, file_hash: str, + chunk_size: int = 16 * 1024, ) -> bytes: """ Get a file from the storage engine as raw bytes. @@ -618,12 +669,29 @@ async def download_file( Warning: Downloading large files can be slow and memory intensive. :param file_hash: The hash of the file to retrieve. + :param chunk_size: The size of each chunk to read from the response. """ - async with self.http_session.get( - f"/api/v0/storage/raw/{file_hash}" - ) as response: - response.raise_for_status() - return await response.read() + buffer = BytesIO() + await self.download_file_to_buffer(file_hash, output_buffer=buffer, chunk_size=chunk_size) + return buffer.getvalue() + + + async def download_file_ipfs( + self, + file_hash: str, + chunk_size: int = 16 * 1024, + ) -> bytes: + """ + Get a file from the ipfs storage engine as raw bytes. + + Warning: Downloading large files can be slow. + + :param file_hash: The hash of the file to retrieve. + :param chunk_size: The size of each chunk to read from the response. + """ + buffer = BytesIO() + await self.download_file_ipfs_to_buffer(file_hash, output_buffer=buffer, chunk_size=chunk_size) + return buffer.getvalue() async def get_messages( self, diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py new file mode 100644 index 00000000..455e0fde --- /dev/null +++ b/tests/unit/test_download.py @@ -0,0 +1,32 @@ +import pytest +from aleph.sdk import AlephClient +from aleph.sdk.conf import settings as sdk_settings + + +@pytest.mark.asyncio +def test_download(): + with AlephClient( + api_server=sdk_settings.API_HOST + ) as client: + file_size: int = 0 + try: + file_content = client.download_file("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH") # File is 5B + file_size = len(file_content) + except Exception as e: + pass + assert file_size == 5 + + +@pytest.mark.asyncio +def test_download_ipfs(): + with AlephClient( + api_server=sdk_settings.API_HOST + ) as client: + file_size: int = 0 + try: + file_content = client.download_file_ipfs( + "Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq") ## 5817703 B FILE + file_size = len(file_content) + except Exception as e: + pass + assert file_size == 5817703 From 6ab843b67d33fe834f844da7008b231c21376d02 Mon Sep 17 00:00:00 2001 From: 1yam <40899431+1yam@users.noreply.github.com> Date: Mon, 10 Jul 2023 17:58:15 +0200 Subject: [PATCH 02/15] Update src/aleph/sdk/client.py Co-authored-by: Hugo Herter --- src/aleph/sdk/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 85d4b83e..238ec4f3 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -627,7 +627,7 @@ async def download_file_to_buffer( :param chunk_size: Size of the chunk to download. """ async with aiohttp.ClientSession() as session: - async with session.get(f"https://api1.aleph.im/api/v0/storage/raw/{file_hash}") as response: + async with session.get(f"https://official.aleph.im/api/v0/storage/raw/{file_hash}") as response: response.raise_for_status() while True: From 993b12a44c3a950a3cbfdf76278b1cf8aab1c407 Mon Sep 17 00:00:00 2001 From: 1yam <40899431+1yam@users.noreply.github.com> Date: Mon, 10 Jul 2023 17:58:29 +0200 Subject: [PATCH 03/15] Update src/aleph/sdk/client.py Co-authored-by: Hugo Herter --- src/aleph/sdk/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 238ec4f3..39bb0946 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -650,7 +650,7 @@ async def download_file_ipfs_to_buffer( :param chunk_size: Size of chunk we download. """ async with aiohttp.ClientSession() as session: - async with session.get(f"https://ipfs.io/ipfs/{file_hash}") as response: + async with session.get(f"https://ipfs.aleph.im/ipfs/{file_hash}") as response: response.raise_for_status() while True: chunk = await response.content.read(chunk_size) From ac5d869bff5c66a0e7044819a304bc17ddf4bc19 Mon Sep 17 00:00:00 2001 From: 1yam <40899431+1yam@users.noreply.github.com> Date: Mon, 10 Jul 2023 18:06:07 +0200 Subject: [PATCH 04/15] Update src/aleph/sdk/client.py Co-authored-by: Olivier Desenfans --- src/aleph/sdk/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 39bb0946..174f2938 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -623,7 +623,7 @@ async def download_file_to_buffer( Download a file from the storage engine and write it to the specified output buffer. :param self: The current instance of the class. :param file_hash: The hash of the file to retrieve. - :param output_buffer: The binary output buffer to write the file data to. + :param output_buffer: Writable binary buffer. The file will be written to this buffer. :param chunk_size: Size of the chunk to download. """ async with aiohttp.ClientSession() as session: From 2898e5222d6471faf888d8e12a5da6480865b3c3 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 10 Jul 2023 18:28:39 +0200 Subject: [PATCH 05/15] Fix: Pr change --- src/aleph/sdk/client.py | 48 +++++++++++++++++++++++-------------- tests/unit/test_download.py | 47 ++++++++++++++++++------------------ 2 files changed, 54 insertions(+), 41 deletions(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 39bb0946..475f00a3 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -227,11 +227,17 @@ def get_posts( end_date=end_date, ) - def download_file(self, file_hash: str, chunk_size :int = 16 * 1024) -> bytes: - return self._wrap(self.async_session.download_file, file_hash=file_hash, chunk_size=chunk_size) + def download_file(self, file_hash: str, chunk_size: int = 16 * 1024) -> bytes: + return self._wrap( + self.async_session.download_file, file_hash=file_hash, chunk_size=chunk_size + ) - def download_file_ipfs(self, file_hash: str, chunk_size : int = 16 * 1024) -> bytes: - return self._wrap(self.async_session.download_file_ipfs, file_hash=file_hash, chunk_size=chunk_size) + def download_file_ipfs(self, file_hash: str, chunk_size: int = 16 * 1024) -> bytes: + return self._wrap( + self.async_session.download_file_ipfs, + file_hash=file_hash, + chunk_size=chunk_size, + ) def watch_messages( self, @@ -614,20 +620,21 @@ async def get_posts( return await resp.json() async def download_file_to_buffer( - self, - file_hash: str, - output_buffer: BinaryIO, - chunk_size: int, + self, + file_hash: str, + output_buffer: BinaryIO, + chunk_size: int, ) -> None: """ Download a file from the storage engine and write it to the specified output buffer. - :param self: The current instance of the class. :param file_hash: The hash of the file to retrieve. :param output_buffer: The binary output buffer to write the file data to. :param chunk_size: Size of the chunk to download. """ async with aiohttp.ClientSession() as session: - async with session.get(f"https://official.aleph.im/api/v0/storage/raw/{file_hash}") as response: + async with self.http_session.get( + f"/api/v0/storage/raw/{file_hash}" + ) as response: response.raise_for_status() while True: @@ -637,10 +644,10 @@ async def download_file_to_buffer( output_buffer.write(chunk) async def download_file_ipfs_to_buffer( - self, - file_hash: str, - output_buffer: BinaryIO, - chunk_size : int, + self, + file_hash: str, + output_buffer: BinaryIO, + chunk_size: int, ) -> None: """ Download a file from the storage engine and write it to the specified output buffer. @@ -650,7 +657,9 @@ async def download_file_ipfs_to_buffer( :param chunk_size: Size of chunk we download. """ async with aiohttp.ClientSession() as session: - async with session.get(f"https://ipfs.aleph.im/ipfs/{file_hash}") as response: + async with session.get( + f"https://ipfs.aleph.im/ipfs/{file_hash}" + ) as response: response.raise_for_status() while True: chunk = await response.content.read(chunk_size) @@ -672,10 +681,11 @@ async def download_file( :param chunk_size: The size of each chunk to read from the response. """ buffer = BytesIO() - await self.download_file_to_buffer(file_hash, output_buffer=buffer, chunk_size=chunk_size) + await self.download_file_to_buffer( + file_hash, output_buffer=buffer, chunk_size=chunk_size + ) return buffer.getvalue() - async def download_file_ipfs( self, file_hash: str, @@ -690,7 +700,9 @@ async def download_file_ipfs( :param chunk_size: The size of each chunk to read from the response. """ buffer = BytesIO() - await self.download_file_ipfs_to_buffer(file_hash, output_buffer=buffer, chunk_size=chunk_size) + await self.download_file_ipfs_to_buffer( + file_hash, output_buffer=buffer, chunk_size=chunk_size + ) return buffer.getvalue() async def get_messages( diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index 455e0fde..219ccde5 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -3,30 +3,31 @@ from aleph.sdk.conf import settings as sdk_settings +@pytest.mark.parametrize( + "file_hash,expected_size", + [ + ("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5), + ("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703), + ], +) @pytest.mark.asyncio -def test_download(): - with AlephClient( - api_server=sdk_settings.API_HOST - ) as client: - file_size: int = 0 - try: - file_content = client.download_file("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH") # File is 5B - file_size = len(file_content) - except Exception as e: - pass - assert file_size == 5 +async def test_download(file_hash: str, expected_size: int): + async with AlephClient(api_server=sdk_settings.API_HOST) as client: + file_content = await client.download_file(file_hash) # File is 5B + file_size = len(file_content) + assert file_size == expected_size +@pytest.mark.parametrize( + "file_hash,expected_size", + [ + ("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5), + ("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703), + ], +) @pytest.mark.asyncio -def test_download_ipfs(): - with AlephClient( - api_server=sdk_settings.API_HOST - ) as client: - file_size: int = 0 - try: - file_content = client.download_file_ipfs( - "Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq") ## 5817703 B FILE - file_size = len(file_content) - except Exception as e: - pass - assert file_size == 5817703 +async def test_download_ipfs(file_hash: str, expected_size: int): + async with AlephClient(api_server=sdk_settings.API_HOST) as client: + file_content = await client.download_file_ipfs(file_hash) ## 5817703 B FILE + file_size = len(file_content) + assert file_size == expected_size From f9b76bcb48dbfe5290ee88539719c9ef7427411d Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 11 Jul 2023 12:35:53 +0200 Subject: [PATCH 06/15] Fix: Handle IPFS downloads in download_file_to_buffer. --- src/aleph/sdk/client.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 87dcf5d4..08c5e0c5 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -33,6 +33,7 @@ AlephMessage, ForgetContent, ForgetMessage, + ItemHash, ItemType, Message, MessageType, @@ -44,7 +45,7 @@ ) from aleph_message.models.program import Encoding, ProgramContent from aleph_message.status import MessageStatus -from pydantic import ValidationError +from pydantic import ValidationError, BaseModel from aleph.sdk.types import Account, GenericMessage, StorageEnum @@ -69,6 +70,10 @@ T = TypeVar("T") +class ModelWithItemHash(BaseModel): + hash: ItemHash + + def async_wrapper(f): """ Copies the docstring of wrapped functions. @@ -631,6 +636,11 @@ async def download_file_to_buffer( :param output_buffer: Writable binary buffer. The file will be written to this buffer. :param chunk_size: Size of the chunk to download. """ + IPFS_HASH = ItemHash(file_hash) + if ItemType.from_hash(IPFS_HASH) == ItemType.ipfs: + return await self.download_file_ipfs_to_buffer( + file_hash, output_buffer, chunk_size + ) async with aiohttp.ClientSession() as session: async with self.http_session.get( f"/api/v0/storage/raw/{file_hash}" From b30cf002164fea7cf2131bedf9efc01e935b10e8 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 11 Jul 2023 12:37:33 +0200 Subject: [PATCH 07/15] Fix: Useless code --- src/aleph/sdk/client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 08c5e0c5..bf1a36d6 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -70,10 +70,6 @@ T = TypeVar("T") -class ModelWithItemHash(BaseModel): - hash: ItemHash - - def async_wrapper(f): """ Copies the docstring of wrapped functions. From 067c63b1ed98fc2ba1e0b00029a8fc65bc5f7cf0 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 11 Jul 2023 12:57:18 +0200 Subject: [PATCH 08/15] Fix: Handle IPFS downloads in download_file_to_buffer when return 413. --- src/aleph/sdk/client.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index bf1a36d6..12209997 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -633,21 +633,25 @@ async def download_file_to_buffer( :param chunk_size: Size of the chunk to download. """ IPFS_HASH = ItemHash(file_hash) - if ItemType.from_hash(IPFS_HASH) == ItemType.ipfs: - return await self.download_file_ipfs_to_buffer( - file_hash, output_buffer, chunk_size - ) + async with aiohttp.ClientSession() as session: async with self.http_session.get( f"/api/v0/storage/raw/{file_hash}" ) as response: - response.raise_for_status() - - while True: - chunk = await response.content.read(chunk_size) - if not chunk: - break - output_buffer.write(chunk) + if response.status == 200: + response.raise_for_status() + while True: + chunk = await response.content.read(chunk_size) + if not chunk: + break + output_buffer.write(chunk) + if response.status == 413: + if ItemType.from_hash(IPFS_HASH) == ItemType.ipfs: + return await self.download_file_ipfs_to_buffer( + file_hash, output_buffer, chunk_size + ) + else: + raise Exception("Unsupported file hash") async def download_file_ipfs_to_buffer( self, From d180dba0e05201e97cfc3c6f88e6888850d193d3 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 11 Jul 2023 16:23:03 +0200 Subject: [PATCH 09/15] Fix: Remove chunk_size from paramater --- src/aleph/sdk/client.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 12209997..b4fcd3b6 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -228,16 +228,15 @@ def get_posts( end_date=end_date, ) - def download_file(self, file_hash: str, chunk_size: int = 16 * 1024) -> bytes: + def download_file(self, file_hash: str) -> bytes: return self._wrap( - self.async_session.download_file, file_hash=file_hash, chunk_size=chunk_size + self.async_session.download_file, file_hash=file_hash ) - def download_file_ipfs(self, file_hash: str, chunk_size: int = 16 * 1024) -> bytes: + def download_file_ipfs(self, file_hash: str) -> bytes: return self._wrap( self.async_session.download_file_ipfs, file_hash=file_hash, - chunk_size=chunk_size, ) def watch_messages( @@ -624,13 +623,11 @@ async def download_file_to_buffer( self, file_hash: str, output_buffer: BinaryIO, - chunk_size: int, ) -> None: """ Download a file from the storage engine and write it to the specified output buffer. :param file_hash: The hash of the file to retrieve. :param output_buffer: Writable binary buffer. The file will be written to this buffer. - :param chunk_size: Size of the chunk to download. """ IPFS_HASH = ItemHash(file_hash) @@ -641,15 +638,14 @@ async def download_file_to_buffer( if response.status == 200: response.raise_for_status() while True: - chunk = await response.content.read(chunk_size) + chunk = await response.content.read(16384) if not chunk: break output_buffer.write(chunk) if response.status == 413: if ItemType.from_hash(IPFS_HASH) == ItemType.ipfs: return await self.download_file_ipfs_to_buffer( - file_hash, output_buffer, chunk_size - ) + file_hash, output_buffer) else: raise Exception("Unsupported file hash") @@ -657,14 +653,12 @@ async def download_file_ipfs_to_buffer( self, file_hash: str, output_buffer: BinaryIO, - chunk_size: int, ) -> None: """ Download a file from the storage engine and write it to the specified output buffer. :param file_hash: The hash of the file to retrieve. :param output_buffer: The binary output buffer to write the file data to. - :param chunk_size: Size of chunk we download. """ async with aiohttp.ClientSession() as session: async with session.get( @@ -672,7 +666,7 @@ async def download_file_ipfs_to_buffer( ) as response: response.raise_for_status() while True: - chunk = await response.content.read(chunk_size) + chunk = await response.content.read(16384) if not chunk: break output_buffer.write(chunk) @@ -680,7 +674,6 @@ async def download_file_ipfs_to_buffer( async def download_file( self, file_hash: str, - chunk_size: int = 16 * 1024, ) -> bytes: """ Get a file from the storage engine as raw bytes. @@ -688,18 +681,15 @@ async def download_file( Warning: Downloading large files can be slow and memory intensive. :param file_hash: The hash of the file to retrieve. - :param chunk_size: The size of each chunk to read from the response. """ buffer = BytesIO() await self.download_file_to_buffer( - file_hash, output_buffer=buffer, chunk_size=chunk_size - ) + file_hash, output_buffer=buffer) return buffer.getvalue() async def download_file_ipfs( self, file_hash: str, - chunk_size: int = 16 * 1024, ) -> bytes: """ Get a file from the ipfs storage engine as raw bytes. @@ -707,12 +697,10 @@ async def download_file_ipfs( Warning: Downloading large files can be slow. :param file_hash: The hash of the file to retrieve. - :param chunk_size: The size of each chunk to read from the response. """ buffer = BytesIO() await self.download_file_ipfs_to_buffer( - file_hash, output_buffer=buffer, chunk_size=chunk_size - ) + file_hash, output_buffer=buffer) return buffer.getvalue() async def get_messages( From cb8c27168d3901eebfe566609476357e6c95b8d3 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 12 Jul 2023 10:08:13 +0200 Subject: [PATCH 10/15] Refactor: download ipfs & aleph on same function --- src/aleph/sdk/client.py | 109 ++++++++++-------------------------- tests/unit/test_download.py | 14 ----- 2 files changed, 30 insertions(+), 93 deletions(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index b4fcd3b6..ebc56597 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -55,6 +55,7 @@ InvalidMessageError, MessageNotFoundError, MultipleMessagesError, + FileTooLarge, ) from .models import MessagesResponse from .utils import check_unix_socket_valid, get_message_type_value @@ -229,15 +230,7 @@ def get_posts( ) def download_file(self, file_hash: str) -> bytes: - return self._wrap( - self.async_session.download_file, file_hash=file_hash - ) - - def download_file_ipfs(self, file_hash: str) -> bytes: - return self._wrap( - self.async_session.download_file_ipfs, - file_hash=file_hash, - ) + return self._wrap(self.async_session.download_file, file_hash=file_hash) def watch_messages( self, @@ -450,6 +443,33 @@ def submit( ) +async def download_file_to_buffer( + file_hash: str, + output_buffer: BinaryIO, +) -> None: + """ + Download a file from the storage engine and write it to the specified output buffer. + + :param file_hash: The hash of the file to retrieve. + :param output_buffer: The binary output buffer to write the file data to. + """ + url: str = f"{settings.API_HOST}/api/v0/storage/raw/{file_hash}" + + ipfs_hash = ItemHash(file_hash) + if ItemType.from_hash(ipfs_hash) == ItemType.ipfs: + url = f"https://ipfs.aleph.im/ipfs/{file_hash}" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + while True: + chunk = await response.content.read(16384) + if not chunk: + break + output_buffer.write(chunk) + elif response.status == 413: + raise FileTooLarge(f"The file from {file_hash} is too large") + + class AlephClient: api_server: str http_session: aiohttp.ClientSession @@ -619,58 +639,6 @@ async def get_posts( resp.raise_for_status() return await resp.json() - async def download_file_to_buffer( - self, - file_hash: str, - output_buffer: BinaryIO, - ) -> None: - """ - Download a file from the storage engine and write it to the specified output buffer. - :param file_hash: The hash of the file to retrieve. - :param output_buffer: Writable binary buffer. The file will be written to this buffer. - """ - IPFS_HASH = ItemHash(file_hash) - - async with aiohttp.ClientSession() as session: - async with self.http_session.get( - f"/api/v0/storage/raw/{file_hash}" - ) as response: - if response.status == 200: - response.raise_for_status() - while True: - chunk = await response.content.read(16384) - if not chunk: - break - output_buffer.write(chunk) - if response.status == 413: - if ItemType.from_hash(IPFS_HASH) == ItemType.ipfs: - return await self.download_file_ipfs_to_buffer( - file_hash, output_buffer) - else: - raise Exception("Unsupported file hash") - - async def download_file_ipfs_to_buffer( - self, - file_hash: str, - output_buffer: BinaryIO, - ) -> None: - """ - Download a file from the storage engine and write it to the specified output buffer. - - :param file_hash: The hash of the file to retrieve. - :param output_buffer: The binary output buffer to write the file data to. - """ - async with aiohttp.ClientSession() as session: - async with session.get( - f"https://ipfs.aleph.im/ipfs/{file_hash}" - ) as response: - response.raise_for_status() - while True: - chunk = await response.content.read(16384) - if not chunk: - break - output_buffer.write(chunk) - async def download_file( self, file_hash: str, @@ -683,24 +651,7 @@ async def download_file( :param file_hash: The hash of the file to retrieve. """ buffer = BytesIO() - await self.download_file_to_buffer( - file_hash, output_buffer=buffer) - return buffer.getvalue() - - async def download_file_ipfs( - self, - file_hash: str, - ) -> bytes: - """ - Get a file from the ipfs storage engine as raw bytes. - - Warning: Downloading large files can be slow. - - :param file_hash: The hash of the file to retrieve. - """ - buffer = BytesIO() - await self.download_file_ipfs_to_buffer( - file_hash, output_buffer=buffer) + await download_file_to_buffer(file_hash, output_buffer=buffer) return buffer.getvalue() async def get_messages( diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index 219ccde5..b1e8afe7 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -17,17 +17,3 @@ async def test_download(file_hash: str, expected_size: int): file_size = len(file_content) assert file_size == expected_size - -@pytest.mark.parametrize( - "file_hash,expected_size", - [ - ("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5), - ("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703), - ], -) -@pytest.mark.asyncio -async def test_download_ipfs(file_hash: str, expected_size: int): - async with AlephClient(api_server=sdk_settings.API_HOST) as client: - file_content = await client.download_file_ipfs(file_hash) ## 5817703 B FILE - file_size = len(file_content) - assert file_size == expected_size From 5a066dc4961710f82e7c515213498cc4bb09fa56 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 12 Jul 2023 10:08:59 +0200 Subject: [PATCH 11/15] Fix: Error Handling --- src/aleph/sdk/exceptions.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/aleph/sdk/exceptions.py b/src/aleph/sdk/exceptions.py index 0c626548..51762925 100644 --- a/src/aleph/sdk/exceptions.py +++ b/src/aleph/sdk/exceptions.py @@ -42,3 +42,11 @@ class BadSignatureError(Exception): """ pass + + +class FileTooLarge(Exception): + """ + A file is too large + """ + + pass From 1004c8cadafa9dd82aeb64d52be36f5bca80591d Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 12 Jul 2023 12:28:05 +0200 Subject: [PATCH 12/15] Refactor: Encapsulate read and write of the download (ipfs & aleph) --- src/aleph/sdk/client.py | 126 +++++++++++++++++++++++++++--------- tests/unit/test_download.py | 14 ++++ 2 files changed, 111 insertions(+), 29 deletions(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index ebc56597..3bb7d051 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -22,9 +22,9 @@ Type, TypeVar, Union, + Protocol, ) from io import BytesIO -from typing import BinaryIO import aiohttp from aleph_message.models import ( @@ -69,6 +69,28 @@ magic = None # type:ignore T = TypeVar("T") +C = TypeVar("C", str, bytes, covariant=True) +U = TypeVar("U", str, bytes, contravariant=True) + + +class AsyncReadable(Protocol[C]): + async def read(self, n: int = -1) -> C: + ... + + +class Writable(Protocol[U]): + def write(self, buffer: U) -> int: + ... + + +async def copy_async_readable_to_buffer( + readable: AsyncReadable[C], buffer: Writable[C], chunk_size: int +): + while True: + chunk = await readable.read(chunk_size) + if not chunk: + break + buffer.write(chunk) def async_wrapper(f): @@ -232,6 +254,12 @@ def get_posts( def download_file(self, file_hash: str) -> bytes: return self._wrap(self.async_session.download_file, file_hash=file_hash) + def download_file_ipfs(self, file_hash: str) -> bytes: + return self._wrap( + self.async_session.download_file_ipfs, + file_hash=file_hash, + ) + def watch_messages( self, message_type: Optional[MessageType] = None, @@ -443,33 +471,6 @@ def submit( ) -async def download_file_to_buffer( - file_hash: str, - output_buffer: BinaryIO, -) -> None: - """ - Download a file from the storage engine and write it to the specified output buffer. - - :param file_hash: The hash of the file to retrieve. - :param output_buffer: The binary output buffer to write the file data to. - """ - url: str = f"{settings.API_HOST}/api/v0/storage/raw/{file_hash}" - - ipfs_hash = ItemHash(file_hash) - if ItemType.from_hash(ipfs_hash) == ItemType.ipfs: - url = f"https://ipfs.aleph.im/ipfs/{file_hash}" - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - while True: - chunk = await response.content.read(16384) - if not chunk: - break - output_buffer.write(chunk) - elif response.status == 413: - raise FileTooLarge(f"The file from {file_hash} is too large") - - class AlephClient: api_server: str http_session: aiohttp.ClientSession @@ -639,6 +640,58 @@ async def get_posts( resp.raise_for_status() return await resp.json() + async def download_file_to_buffer( + self, + file_hash: str, + output_buffer: Writable[bytes], + ) -> None: + """ + Download a file from the storage engine and write it to the specified output buffer. + :param file_hash: The hash of the file to retrieve. + :param output_buffer: Writable binary buffer. The file will be written to this buffer. + """ + + async with aiohttp.ClientSession() as session: + async with self.http_session.get( + f"/api/v0/storage/raw/{file_hash}" + ) as response: + if response.status == 200: + await copy_async_readable_to_buffer( + response.content, output_buffer, chunk_size=16 * 1024 + ) + if response.status == 413: + ipfs_hash = ItemHash(file_hash) + if ItemType.from_hash(ipfs_hash) == ItemType.ipfs: + return await self.download_file_ipfs_to_buffer( + file_hash, output_buffer + ) + else: + raise FileTooLarge(f"The file from {file_hash} is too large") + + async def download_file_ipfs_to_buffer( + self, + file_hash: str, + output_buffer: Writable[bytes], + ) -> None: + """ + Download a file from the storage engine and write it to the specified output buffer. + + :param file_hash: The hash of the file to retrieve. + :param output_buffer: The binary output buffer to write the file data to. + """ + async with aiohttp.ClientSession() as session: + async with session.get( + f"https://ipfs.aleph.im/ipfs/{file_hash}" + ) as response: + if response.status == 200: + await copy_async_readable_to_buffer( + response.content, output_buffer, chunk_size=16 * 1024 + ) + elif response.status == 413: + raise FileTooLarge() + else: + response.raise_for_status() + async def download_file( self, file_hash: str, @@ -651,7 +704,22 @@ async def download_file( :param file_hash: The hash of the file to retrieve. """ buffer = BytesIO() - await download_file_to_buffer(file_hash, output_buffer=buffer) + await self.download_file_to_buffer(file_hash, output_buffer=buffer) + return buffer.getvalue() + + async def download_file_ipfs( + self, + file_hash: str, + ) -> bytes: + """ + Get a file from the ipfs storage engine as raw bytes. + + Warning: Downloading large files can be slow. + + :param file_hash: The hash of the file to retrieve. + """ + buffer = BytesIO() + await self.download_file_ipfs_to_buffer(file_hash, output_buffer=buffer) return buffer.getvalue() async def get_messages( diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index b1e8afe7..219ccde5 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -17,3 +17,17 @@ async def test_download(file_hash: str, expected_size: int): file_size = len(file_content) assert file_size == expected_size + +@pytest.mark.parametrize( + "file_hash,expected_size", + [ + ("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5), + ("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703), + ], +) +@pytest.mark.asyncio +async def test_download_ipfs(file_hash: str, expected_size: int): + async with AlephClient(api_server=sdk_settings.API_HOST) as client: + file_content = await client.download_file_ipfs(file_hash) ## 5817703 B FILE + file_size = len(file_content) + assert file_size == expected_size From 254379a999350b0bcf6ed1b86c089a9284f44b90 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 12 Jul 2023 14:27:27 +0200 Subject: [PATCH 13/15] Fix: cleaning the code --- src/aleph/sdk/client.py | 56 +++++++++++------------------------------ src/aleph/sdk/utils.py | 31 +++++++++++++++++++++++ 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 3bb7d051..643bfed6 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -22,7 +22,6 @@ Type, TypeVar, Union, - Protocol, ) from io import BytesIO @@ -48,7 +47,7 @@ from pydantic import ValidationError, BaseModel from aleph.sdk.types import Account, GenericMessage, StorageEnum - +from aleph.sdk.utils import copy_async_readable_to_buffer, Writable, AsyncReadable from .conf import settings from .exceptions import ( BroadcastError, @@ -69,28 +68,6 @@ magic = None # type:ignore T = TypeVar("T") -C = TypeVar("C", str, bytes, covariant=True) -U = TypeVar("U", str, bytes, contravariant=True) - - -class AsyncReadable(Protocol[C]): - async def read(self, n: int = -1) -> C: - ... - - -class Writable(Protocol[U]): - def write(self, buffer: U) -> int: - ... - - -async def copy_async_readable_to_buffer( - readable: AsyncReadable[C], buffer: Writable[C], chunk_size: int -): - while True: - chunk = await readable.read(chunk_size) - if not chunk: - break - buffer.write(chunk) def async_wrapper(f): @@ -651,22 +628,21 @@ async def download_file_to_buffer( :param output_buffer: Writable binary buffer. The file will be written to this buffer. """ - async with aiohttp.ClientSession() as session: - async with self.http_session.get( - f"/api/v0/storage/raw/{file_hash}" - ) as response: - if response.status == 200: - await copy_async_readable_to_buffer( - response.content, output_buffer, chunk_size=16 * 1024 + async with self.http_session.get( + f"/api/v0/storage/raw/{file_hash}" + ) as response: + if response.status == 200: + await copy_async_readable_to_buffer( + response.content, output_buffer, chunk_size=16 * 1024 + ) + if response.status == 413: + ipfs_hash = ItemHash(file_hash) + if ipfs_hash.item_type == ItemType.ipfs: + return await self.download_file_ipfs_to_buffer( + file_hash, output_buffer ) - if response.status == 413: - ipfs_hash = ItemHash(file_hash) - if ItemType.from_hash(ipfs_hash) == ItemType.ipfs: - return await self.download_file_ipfs_to_buffer( - file_hash, output_buffer - ) - else: - raise FileTooLarge(f"The file from {file_hash} is too large") + else: + raise FileTooLarge(f"The file from {file_hash} is too large") async def download_file_ipfs_to_buffer( self, @@ -687,8 +663,6 @@ async def download_file_ipfs_to_buffer( await copy_async_readable_to_buffer( response.content, output_buffer, chunk_size=16 * 1024 ) - elif response.status == 413: - raise FileTooLarge() else: response.raise_for_status() diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index fdbf6095..ca7d126b 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -12,6 +12,13 @@ from aleph.sdk.conf import settings from aleph.sdk.types import GenericMessage +from typing import ( + Tuple, + Type, + TypeVar, + Protocol, +) + logger = logging.getLogger(__name__) try: @@ -76,3 +83,27 @@ def check_unix_socket_valid(unix_socket_path: str) -> bool: unix_socket_path, ) return True + + +C = TypeVar("C", str, bytes, covariant=True) +U = TypeVar("U", str, bytes, contravariant=True) + + +class AsyncReadable(Protocol[C]): + async def read(self, n: int = -1) -> C: + ... + + +class Writable(Protocol[U]): + def write(self, buffer: U) -> int: + ... + + +async def copy_async_readable_to_buffer( + readable: AsyncReadable[C], buffer: Writable[C], chunk_size: int +): + while True: + chunk = await readable.read(chunk_size) + if not chunk: + break + buffer.write(chunk) From c3d18a6c1c6eb856c324aa5d11428e03dee7ba69 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 12 Jul 2023 15:08:29 +0200 Subject: [PATCH 14/15] Fix: variable name --- src/aleph/sdk/utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index ca7d126b..366ad789 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -85,12 +85,12 @@ def check_unix_socket_valid(unix_socket_path: str) -> bool: return True -C = TypeVar("C", str, bytes, covariant=True) +T = TypeVar("T", str, bytes, covariant=True) U = TypeVar("U", str, bytes, contravariant=True) -class AsyncReadable(Protocol[C]): - async def read(self, n: int = -1) -> C: +class AsyncReadable(Protocol[T]): + async def read(self, n: int = -1) -> T: ... @@ -100,7 +100,7 @@ def write(self, buffer: U) -> int: async def copy_async_readable_to_buffer( - readable: AsyncReadable[C], buffer: Writable[C], chunk_size: int + readable: AsyncReadable[T], buffer: Writable[T], chunk_size: int ): while True: chunk = await readable.read(chunk_size) From 7d4b2ba7d9752b22211696e03513bd290f229e4e Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 10 Jul 2023 11:18:41 +0200 Subject: [PATCH 15/15] Feature: Download on ipfs client --- src/aleph/sdk/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index 0c536ea8..91c45c26 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -54,7 +54,7 @@ def create_archive(path: Path) -> Tuple[Path, Encoding]: return archive_path, Encoding.zip elif os.path.isfile(path): if path.suffix == ".squashfs" or ( - magic and magic.from_file(path).startswith("Squashfs filesystem") + magic and magic.from_file(path).startswith("Squashfs filesystem") ): return path, Encoding.squashfs else: @@ -85,6 +85,7 @@ def check_unix_socket_valid(unix_socket_path: str) -> bool: ) return True + T = TypeVar("T", str, bytes, covariant=True) U = TypeVar("U", str, bytes, contravariant=True) @@ -100,7 +101,7 @@ def write(self, buffer: U) -> int: async def copy_async_readable_to_buffer( - readable: AsyncReadable[T], buffer: Writable[T], chunk_size: int + readable: AsyncReadable[T], buffer: Writable[T], chunk_size: int ): while True: chunk = await readable.read(chunk_size) @@ -121,4 +122,4 @@ def enum_as_str(obj: Union[str, Enum]) -> str: if isinstance(obj, Enum): return obj.value - return obj \ No newline at end of file + return obj