11import asyncio
22import json
33import logging
4- from typing import Dict , cast , Optional , Any , Mapping , List , Union
4+ from typing import Dict , cast , Optional , Any , List , Union
55
6- import aio_pika .abc
76from aiohttp import web
87from aleph_p2p_client import AlephP2PServiceClient
98from configmanager import Config
109from pydantic import BaseModel , Field , ValidationError
1110
12- import aleph .toolkit .json as aleph_json
13- from aleph .schemas .pending_messages import parse_message , BasePendingMessage
1411from aleph .services .ipfs import IpfsService
1512from aleph .services .p2p .pubsub import publish as pub_p2p
1613from aleph .toolkit .shield import shielded
17- from aleph .types .message_status import (
18- InvalidMessageException ,
19- MessageStatus ,
20- MessageProcessingStatus ,
21- )
2214from aleph .types .protocol import Protocol
2315from aleph .web .controllers .app_state_getters import (
2416 get_config_from_request ,
2517 get_ipfs_service_from_request ,
2618 get_p2p_client_from_request ,
27- get_mq_channel_from_request ,
2819)
29- from aleph .web .controllers .utils import mq_make_aleph_message_topic_queue
20+ from aleph .web .controllers .utils import (
21+ validate_message_dict ,
22+ broadcast_and_process_message ,
23+ PublicationStatus ,
24+ broadcast_status_to_http_status ,
25+ )
3026
3127LOGGER = logging .getLogger (__name__ )
3228
3329
34- class PublicationStatus (BaseModel ):
35- status : str
36- failed : List [Protocol ]
37-
38- @classmethod
39- def from_failures (cls , failed_publications : List [Protocol ]):
40- status = {
41- 0 : "success" ,
42- 1 : "warning" ,
43- 2 : "error" ,
44- }[len (failed_publications )]
45- return cls (status = status , failed = failed_publications )
46-
47-
48- def _validate_message_dict (message_dict : Mapping [str , Any ]) -> BasePendingMessage :
49- try :
50- return parse_message (message_dict )
51- except InvalidMessageException as e :
52- raise web .HTTPUnprocessableEntity (body = str (e ))
53-
54-
5530def _validate_request_data (config : Config , request_data : Dict ) -> None :
5631 """
5732 Validates the content of a JSON pubsub message depending on the channel
@@ -83,7 +58,7 @@ def _validate_request_data(config: Config, request_data: Dict) -> None:
8358 reason = "'data': must be deserializable as JSON."
8459 )
8560
86- _validate_message_dict (message_dict )
61+ validate_message_dict (message_dict )
8762
8863
8964async def _pub_on_p2p_topics (
@@ -142,48 +117,11 @@ async def pub_json(request: web.Request):
142117 )
143118
144119
145- async def _mq_read_one_message (
146- mq_queue : aio_pika .abc .AbstractQueue , timeout : float
147- ) -> Optional [aio_pika .abc .AbstractIncomingMessage ]:
148- """
149- Consume one element from a message queue and then return.
150- """
151-
152- queue : asyncio .Queue = asyncio .Queue ()
153-
154- async def _process_message (message : aio_pika .abc .AbstractMessage ):
155- await queue .put (message )
156-
157- consumer_tag = await mq_queue .consume (_process_message , no_ack = True )
158-
159- try :
160- return await asyncio .wait_for (queue .get (), timeout )
161- except asyncio .TimeoutError :
162- return None
163- finally :
164- await mq_queue .cancel (consumer_tag )
165-
166-
167- def _processing_status_to_http_status (status : MessageProcessingStatus ) -> int :
168- mapping = {
169- MessageProcessingStatus .PROCESSED_NEW_MESSAGE : 200 ,
170- MessageProcessingStatus .PROCESSED_CONFIRMATION : 200 ,
171- MessageProcessingStatus .FAILED_WILL_RETRY : 202 ,
172- MessageProcessingStatus .FAILED_REJECTED : 422 ,
173- }
174- return mapping [status ]
175-
176-
177120class PubMessageRequest (BaseModel ):
178121 sync : bool = False
179122 message_dict : Dict [str , Any ] = Field (alias = "message" )
180123
181124
182- class PubMessageResponse (BaseModel ):
183- publication_status : PublicationStatus
184- message_status : Optional [MessageStatus ]
185-
186-
187125@shielded
188126async def pub_message (request : web .Request ):
189127 try :
@@ -194,76 +132,14 @@ async def pub_message(request: web.Request):
194132 # Body must be valid JSON
195133 raise web .HTTPUnprocessableEntity ()
196134
197- pending_message = _validate_message_dict (request_data .message_dict )
198-
199- # In sync mode, wait for a message processing event. We need to create the queue
200- # before publishing the message on P2P topics in order to guarantee that the event
201- # will be picked up.
202- config = get_config_from_request (request )
203-
204- if request_data .sync :
205- mq_channel = await get_mq_channel_from_request (request = request , logger = LOGGER )
206- mq_queue = await mq_make_aleph_message_topic_queue (
207- channel = mq_channel ,
208- config = config ,
209- routing_key = f"*.{ pending_message .item_hash } " ,
210- )
211- else :
212- mq_queue = None
213-
214- # We publish the message on P2P topics early, for 3 reasons:
215- # 1. Just because this node is unable to process the message does not
216- # necessarily mean the message is incorrect (ex: bug in a new version).
217- # 2. If the publication fails after the processing, we end up in a situation where
218- # a message exists without being propagated to the other nodes, ultimately
219- # causing sync issues on the network.
220- # 3. The message is currently fed to this node using the P2P service client
221- # loopback mechanism.
222- ipfs_service = get_ipfs_service_from_request (request )
223- p2p_client = get_p2p_client_from_request (request )
224-
225- message_topic = config .aleph .queue_topic .value
226- failed_publications = await _pub_on_p2p_topics (
227- p2p_client = p2p_client ,
228- ipfs_service = ipfs_service ,
229- topic = message_topic ,
230- payload = aleph_json .dumps (request_data .message_dict ),
135+ pending_message = validate_message_dict (request_data .message_dict )
136+ broadcast_status = await broadcast_and_process_message (
137+ pending_message = pending_message ,
138+ message_dict = request_data .message_dict ,
139+ sync = request_data .sync ,
140+ request = request ,
141+ logger = LOGGER ,
231142 )
232- pub_status = PublicationStatus .from_failures (failed_publications )
233- if pub_status .status == "error" :
234- return web .json_response (
235- text = PubMessageResponse (
236- publication_status = pub_status , message_status = None
237- ).json (),
238- status = 500 ,
239- )
240-
241- status = PubMessageResponse (
242- publication_status = pub_status , message_status = MessageStatus .PENDING
243- )
244-
245- # When publishing in async mode, just respond with 202 (Accepted).
246- message_accepted_response = web .json_response (text = status .json (), status = 202 )
247- if not request_data .sync :
248- return message_accepted_response
249-
250- # Ignore type checking here, we know that mq_queue is set at this point
251- assert mq_queue is not None
252- response = await _mq_read_one_message (mq_queue , timeout = 30 )
253-
254- # Delete the queue immediately
255- await mq_queue .delete (if_empty = False )
256-
257- # If the message was not processed before the timeout, return a 202.
258- if response is None :
259- return message_accepted_response
260-
261- routing_key = response .routing_key
262- assert routing_key is not None # again, for type checking
263- status_str , _item_hash = routing_key .split ("." )
264- processing_status = MessageProcessingStatus (status_str )
265- status_code = _processing_status_to_http_status (processing_status )
266-
267- status .message_status = processing_status .to_message_status ()
268143
269- return web .json_response (text = status .json (), status = status_code )
144+ status_code = broadcast_status_to_http_status (broadcast_status )
145+ return web .json_response (text = broadcast_status .json (), status = status_code )
0 commit comments