From dbe8345ceb18ee8cab24c550a2028a6bab3434c2 Mon Sep 17 00:00:00 2001 From: 1yam Date: Mon, 23 Oct 2023 17:03:45 +0200 Subject: [PATCH 1/3] Feature: authenticated file upload SDK Solution: update create_store to send message if storage engine --- src/aleph/sdk/client/authenticated_http.py | 87 +++++++++++++++++++++- tests/unit/test_upload.py | 39 ++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_upload.py diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 6291467a..b4a7cc6a 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -341,8 +341,19 @@ async def create_store( file_content = Path(file_path).read_bytes() if storage_engine == StorageEnum.storage: - file_hash = await self.storage_push_file(file_content=file_content) + # Upload the file and message all at once using authenticated upload. + return await self._upload_file_native( + address=address, + file_content=file_content, + guess_mime_type=guess_mime_type, + ref=ref, + extra_fields=extra_fields, + channel=channel, + sync=sync, + ) elif storage_engine == StorageEnum.ipfs: + # We do not support authenticated upload for IPFS yet. Use the legacy method + # of uploading the file first then publishing the message using POST /messages. file_hash = await self.ipfs_push_file(file_content=file_content) else: raise ValueError(f"Unknown storage engine: '{storage_engine}'") @@ -558,3 +569,77 @@ async def submit( ) message_status = await self._broadcast(message=message, sync=sync) return message, message_status + + async def _storage_push_file_with_message( + self, + file_content: bytes, + store_content: StoreContent, + channel: Optional[str] = None, + sync: bool = False, + ) -> Tuple[StoreMessage, MessageStatus]: + """Push a file to the storage service.""" + data = aiohttp.FormData() + + # Prepare the STORE message + message = await self._prepare_aleph_message( + message_type=MessageType.store, + content=store_content.dict(exclude_none=True), + channel=channel, + ) + metadata = { + "message": message.dict(exclude_none=True), + "sync": sync, + } + data.add_field( + "metadata", json.dumps(metadata), content_type="application/json" + ) + # Add the file + data.add_field("file", file_content) + + url = "/api/v0/storage/add_file" + logger.debug(f"Posting file on {url}") + + async with self.http_session.post(url, data=data) as resp: + resp.raise_for_status() + message_status = ( + MessageStatus.PENDING if resp.status == 202 else MessageStatus.PROCESSED + ) + return message, message_status + + async def _upload_file_native( + self, + address: str, + file_content: bytes, + guess_mime_type: bool = False, + ref: Optional[str] = None, + extra_fields: Optional[dict] = None, + channel: Optional[str] = None, + sync: bool = False, + ) -> Tuple[StoreMessage, MessageStatus]: + file_hash = hashlib.sha256(file_content).hexdigest() + if magic and guess_mime_type: + mime_type = magic.from_buffer(file_content, mime=True) + else: + mime_type = None + + store_content = StoreContent( + address=address, + ref=ref, + item_type=StorageEnum.storage, + item_hash=file_hash, + mime_type=mime_type, + time=time.time(), + **extra_fields, + ) + message, _ = await self._storage_push_file_with_message( + file_content=file_content, + store_content=store_content, + channel=channel, + sync=sync, + ) + + # Some nodes may not implement authenticated file upload yet. As we cannot detect + # this easily, broadcast the message a second time to ensure publication on older + # nodes. + status = await self._broadcast(message=message, sync=sync) + return message, status diff --git a/tests/unit/test_upload.py b/tests/unit/test_upload.py new file mode 100644 index 00000000..aa4d1911 --- /dev/null +++ b/tests/unit/test_upload.py @@ -0,0 +1,39 @@ +import hashlib + +import pytest +from aleph_message.models import StoreMessage +from aleph_message.status import MessageStatus + +from aleph.sdk import AuthenticatedAlephHttpClient +from aleph.sdk.chains.common import get_fallback_private_key +from aleph.sdk.chains.ethereum import ETHAccount +from aleph.sdk.types import StorageEnum + + +@pytest.mark.asyncio +async def test_upload_with_message(): + pkey = get_fallback_private_key() + account = ETHAccount(private_key=pkey) + + content = b"Test pyaleph upload\n" + file_hash = hashlib.sha256(content).hexdigest() + + async with AuthenticatedAlephHttpClient(account=account, api_server="http://62.210.145.23:4024") as client: + message, status = await client.create_store( + address=account.get_address(), + file_content=content, + storage_engine=StorageEnum.storage, + sync=True, + ) + print(message, status) + + assert status == MessageStatus.PROCESSED + assert message.content.item_hash == file_hash + + server_content = await client.download_file(file_hash=file_hash) + assert server_content == content + + server_message = await client.get_message( + item_hash=message.item_hash, message_type=StoreMessage + ) + assert server_message.content.item_hash == file_hash \ No newline at end of file From c51e7ced426d862b8fba9a46cd76ea105c584648 Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 24 Oct 2023 16:37:05 +0200 Subject: [PATCH 2/3] Fix: Remove HardCoded node for test --- tests/unit/test_upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_upload.py b/tests/unit/test_upload.py index aa4d1911..9812d06a 100644 --- a/tests/unit/test_upload.py +++ b/tests/unit/test_upload.py @@ -18,7 +18,7 @@ async def test_upload_with_message(): content = b"Test pyaleph upload\n" file_hash = hashlib.sha256(content).hexdigest() - async with AuthenticatedAlephHttpClient(account=account, api_server="http://62.210.145.23:4024") as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=None) as client: message, status = await client.create_store( address=account.get_address(), file_content=content, From 1c5a6294c44fe558fe127b1814d5db74473f693e Mon Sep 17 00:00:00 2001 From: 1yam Date: Tue, 24 Oct 2023 16:58:09 +0200 Subject: [PATCH 3/3] Fix: Unit test wasn't fully mocked --- tests/unit/test_asynchronous.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/test_asynchronous.py b/tests/unit/test_asynchronous.py index dbccbaa6..f70a3e16 100644 --- a/tests/unit/test_asynchronous.py +++ b/tests/unit/test_asynchronous.py @@ -32,6 +32,9 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): @property def status(self): return 200 if self.sync else 202 + + async def raise_for_status(self): + ... async def json(self): message_status = "processed" if self.sync else "pending"