From 9a253bc913d08fd8b3e0e105112a24177beaa910 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Tue, 7 Oct 2025 13:09:35 +0200 Subject: [PATCH 1/2] Fix corrupted data before processing --- src/aleph/services/storage/fileystem_engine.py | 15 ++++++++++++++- src/aleph/storage.py | 11 ++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/aleph/services/storage/fileystem_engine.py b/src/aleph/services/storage/fileystem_engine.py index 4a209f4c1..8f9c30631 100644 --- a/src/aleph/services/storage/fileystem_engine.py +++ b/src/aleph/services/storage/fileystem_engine.py @@ -23,7 +23,20 @@ async def read(self, filename: str) -> Optional[bytes]: async def write(self, filename: str, content: bytes): file_path = self.folder / filename - file_path.write_bytes(content) + temp_path = self.folder / f"{filename}.tmp" + + try: + # Write to temporary file first + temp_path.write_bytes(content) + + # Atomic rename - this operation is atomic on POSIX systems + # If crash happens before this, temp file exists but target doesn't + temp_path.replace(file_path) + + except Exception: + # Clean up temp file if write failed + temp_path.unlink(missing_ok=True) + raise async def delete(self, filename: str): file_path = self.folder / filename diff --git a/src/aleph/storage.py b/src/aleph/storage.py index d9a4d19f2..858055338 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -8,6 +8,7 @@ from hashlib import sha256 from typing import Any, Final, Optional, cast +import orjson from aleph_message.models import ItemType import aleph.toolkit.json as aleph_json @@ -194,8 +195,16 @@ async def get_hash_content( # Try to retrieve the data from the DB, then from the network or IPFS. content = await self.storage_engine.read(filename=content_hash) + if content is not None: - source = ContentSource.DB + # check json and fix if corrupted + try: + json_content = aleph_json.loads(content) + source = ContentSource.DB + except orjson.JSONDecodeError as e: + LOGGER.warning("Can't decode JSON, Change source...") + await self.storage_engine.delete(filename=content_hash) + content = None if content is None and use_network: content = await self._fetch_content_from_network( From c3731132c722cc4c1d7f3acae12da3cab81ec069 Mon Sep 17 00:00:00 2001 From: Ali EL BROUDI Date: Wed, 8 Oct 2025 10:52:54 +0200 Subject: [PATCH 2/2] move the fix on the correct location (message processing) --- src/aleph/storage.py | 48 +++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/src/aleph/storage.py b/src/aleph/storage.py index 858055338..dd7707ad6 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -8,7 +8,6 @@ from hashlib import sha256 from typing import Any, Final, Optional, cast -import orjson from aleph_message.models import ItemType import aleph.toolkit.json as aleph_json @@ -85,14 +84,39 @@ async def get_message_content( try: content = aleph_json.loads(item_content) - except aleph_json.DecodeError as e: - error_msg = f"Can't decode JSON: {e}" - LOGGER.warning(error_msg) - raise InvalidContent(error_msg) - except json.decoder.JSONDecodeError as e: + except (aleph_json.DecodeError, json.decoder.JSONDecodeError) as e: error_msg = f"Can't decode JSON: {e}" LOGGER.warning(error_msg) - raise InvalidContent(error_msg) + # If content was from local cache and is corrupted, delete it and retry + if source == ContentSource.DB and item_type in ( + ItemType.ipfs, + ItemType.storage, + ): + LOGGER.warning( + f"Corrupted cached content for {item_hash}, deleting and retrying from network" + ) + await self.storage_engine.delete(filename=item_hash) + # Retry fetching from network/IPFS + hash_content = await self.get_hash_content( + item_hash, + engine=ItemType(item_type), + use_network=True, + use_ipfs=True, + ) + item_content = hash_content.value + source = hash_content.source + # Try parsing again + try: + content = aleph_json.loads(item_content) + except ( + aleph_json.DecodeError, + json.decoder.JSONDecodeError, + ) as retry_error: + raise InvalidContent( + f"Content still invalid after retry: {retry_error}" + ) from retry_error + else: + raise InvalidContent(error_msg) return MessageContent( hash=item_hash, @@ -195,16 +219,8 @@ async def get_hash_content( # Try to retrieve the data from the DB, then from the network or IPFS. content = await self.storage_engine.read(filename=content_hash) - if content is not None: - # check json and fix if corrupted - try: - json_content = aleph_json.loads(content) - source = ContentSource.DB - except orjson.JSONDecodeError as e: - LOGGER.warning("Can't decode JSON, Change source...") - await self.storage_engine.delete(filename=content_hash) - content = None + source = ContentSource.DB if content is None and use_network: content = await self._fetch_content_from_network(