@@ -341,8 +341,19 @@ async def create_store(
341341 file_content = Path (file_path ).read_bytes ()
342342
343343 if storage_engine == StorageEnum .storage :
344- file_hash = await self .storage_push_file (file_content = file_content )
344+ # Upload the file and message all at once using authenticated upload.
345+ return await self ._upload_file_native (
346+ address = address ,
347+ file_content = file_content ,
348+ guess_mime_type = guess_mime_type ,
349+ ref = ref ,
350+ extra_fields = extra_fields ,
351+ channel = channel ,
352+ sync = sync ,
353+ )
345354 elif storage_engine == StorageEnum .ipfs :
355+ # We do not support authenticated upload for IPFS yet. Use the legacy method
356+ # of uploading the file first then publishing the message using POST /messages.
346357 file_hash = await self .ipfs_push_file (file_content = file_content )
347358 else :
348359 raise ValueError (f"Unknown storage engine: '{ storage_engine } '" )
@@ -558,3 +569,77 @@ async def submit(
558569 )
559570 message_status = await self ._broadcast (message = message , sync = sync )
560571 return message , message_status
572+
573+ async def _storage_push_file_with_message (
574+ self ,
575+ file_content : bytes ,
576+ store_content : StoreContent ,
577+ channel : Optional [str ] = None ,
578+ sync : bool = False ,
579+ ) -> Tuple [StoreMessage , MessageStatus ]:
580+ """Push a file to the storage service."""
581+ data = aiohttp .FormData ()
582+
583+ # Prepare the STORE message
584+ message = await self ._prepare_aleph_message (
585+ message_type = MessageType .store ,
586+ content = store_content .dict (exclude_none = True ),
587+ channel = channel ,
588+ )
589+ metadata = {
590+ "message" : message .dict (exclude_none = True ),
591+ "sync" : sync ,
592+ }
593+ data .add_field (
594+ "metadata" , json .dumps (metadata ), content_type = "application/json"
595+ )
596+ # Add the file
597+ data .add_field ("file" , file_content )
598+
599+ url = "/api/v0/storage/add_file"
600+ logger .debug (f"Posting file on { url } " )
601+
602+ async with self .http_session .post (url , data = data ) as resp :
603+ resp .raise_for_status ()
604+ message_status = (
605+ MessageStatus .PENDING if resp .status == 202 else MessageStatus .PROCESSED
606+ )
607+ return message , message_status
608+
609+ async def _upload_file_native (
610+ self ,
611+ address : str ,
612+ file_content : bytes ,
613+ guess_mime_type : bool = False ,
614+ ref : Optional [str ] = None ,
615+ extra_fields : Optional [dict ] = None ,
616+ channel : Optional [str ] = None ,
617+ sync : bool = False ,
618+ ) -> Tuple [StoreMessage , MessageStatus ]:
619+ file_hash = hashlib .sha256 (file_content ).hexdigest ()
620+ if magic and guess_mime_type :
621+ mime_type = magic .from_buffer (file_content , mime = True )
622+ else :
623+ mime_type = None
624+
625+ store_content = StoreContent (
626+ address = address ,
627+ ref = ref ,
628+ item_type = StorageEnum .storage ,
629+ item_hash = file_hash ,
630+ mime_type = mime_type ,
631+ time = time .time (),
632+ ** extra_fields ,
633+ )
634+ message , _ = await self ._storage_push_file_with_message (
635+ file_content = file_content ,
636+ store_content = store_content ,
637+ channel = channel ,
638+ sync = sync ,
639+ )
640+
641+ # Some nodes may not implement authenticated file upload yet. As we cannot detect
642+ # this easily, broadcast the message a second time to ensure publication on older
643+ # nodes.
644+ status = await self ._broadcast (message = message , sync = sync )
645+ return message , status
0 commit comments