Skip to content

Commit 0a44e9d

Browse files
committed
add PostsResponse; harmonize get_messages and watch_messages parameters; refactor node.py; increase test coverage
1 parent 6cd9aef commit 0a44e9d

File tree

6 files changed

+278
-166
lines changed

6 files changed

+278
-166
lines changed

src/aleph/sdk/base.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from aleph_message.models.execution.program import Encoding
2727
from aleph_message.status import MessageStatus
2828

29+
from aleph.sdk.models import PostsResponse
2930
from aleph.sdk.types import GenericMessage, StorageEnum
3031

3132
DEFAULT_PAGE_SIZE = 200
@@ -78,7 +79,9 @@ async def get_posts(
7879
chains: Optional[Iterable[str]] = None,
7980
start_date: Optional[Union[datetime, float]] = None,
8081
end_date: Optional[Union[datetime, float]] = None,
81-
) -> Dict[str, Dict]:
82+
ignore_invalid_messages: bool = True,
83+
invalid_messages_log_level: int = logging.NOTSET,
84+
) -> PostsResponse:
8285
"""
8386
Fetch a list of posts from the network.
8487
@@ -93,6 +96,8 @@ async def get_posts(
9396
:param chains: Chains of the posts to fetch (Default: all chains)
9497
:param start_date: Earliest date to fetch messages from
9598
:param end_date: Latest date to fetch messages from
99+
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
100+
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
96101
"""
97102
pass
98103

@@ -138,9 +143,9 @@ async def get_posts_iterator(
138143
start_date=start_date,
139144
end_date=end_date,
140145
)
141-
total_items = resp["pagination_total"]
146+
total_items = resp.pagination_total
142147
page += 1
143-
for post in resp["posts"]:
148+
for post in resp.posts:
144149
yield post
145150

146151
@abstractmethod
@@ -183,7 +188,7 @@ async def get_messages(
183188
:param page: Page to fetch, begins at 1 (Default: 1)
184189
:param message_type: Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET"
185190
:param content_types: Filter by content type
186-
:param content_keys: Filter by content key
191+
:param content_keys: Filter by aggregate key
187192
:param refs: If set, only fetch posts that reference these hashes (in the "refs" field)
188193
:param addresses: Addresses of the posts to fetch (Default: all addresses)
189194
:param tags: Tags of the posts to fetch (Default: all tags)
@@ -192,7 +197,7 @@ async def get_messages(
192197
:param chains: Filter by sender address chain
193198
:param start_date: Earliest date to fetch messages from
194199
:param end_date: Latest date to fetch messages from
195-
:param ignore_invalid_messages: Ignore invalid messages (Default: False)
200+
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
196201
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
197202
"""
198203
pass
@@ -271,6 +276,7 @@ def watch_messages(
271276
self,
272277
message_type: Optional[MessageType] = None,
273278
content_types: Optional[Iterable[str]] = None,
279+
content_keys: Optional[Iterable[str]] = None,
274280
refs: Optional[Iterable[str]] = None,
275281
addresses: Optional[Iterable[str]] = None,
276282
tags: Optional[Iterable[str]] = None,
@@ -285,6 +291,7 @@ def watch_messages(
285291
286292
:param message_type: Type of message to watch
287293
:param content_types: Content types to watch
294+
:param content_keys: Filter by aggregate key
288295
:param refs: References to watch
289296
:param addresses: Addresses to watch
290297
:param tags: Tags to watch

src/aleph/sdk/client.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
MessageNotFoundError,
6060
MultipleMessagesError,
6161
)
62-
from .models import MessagesResponse
62+
from .models import MessagesResponse, PostsResponse
6363
from .utils import check_unix_socket_valid, get_message_type_value
6464

6565
logger = logging.getLogger(__name__)
@@ -215,7 +215,7 @@ def get_posts(
215215
chains: Optional[Iterable[str]] = None,
216216
start_date: Optional[Union[datetime, float]] = None,
217217
end_date: Optional[Union[datetime, float]] = None,
218-
) -> Dict[str, Dict]:
218+
) -> PostsResponse:
219219
return self._wrap(
220220
self.async_session.get_posts,
221221
pagination=pagination,
@@ -575,7 +575,18 @@ async def get_posts(
575575
chains: Optional[Iterable[str]] = None,
576576
start_date: Optional[Union[datetime, float]] = None,
577577
end_date: Optional[Union[datetime, float]] = None,
578-
) -> Dict[str, Dict]:
578+
ignore_invalid_messages: bool = True,
579+
invalid_messages_log_level: int = logging.NOTSET,
580+
) -> PostsResponse:
581+
ignore_invalid_messages = (
582+
True if ignore_invalid_messages is None else ignore_invalid_messages
583+
)
584+
invalid_messages_log_level = (
585+
logging.NOTSET
586+
if invalid_messages_log_level is None
587+
else invalid_messages_log_level
588+
)
589+
579590
params: Dict[str, Any] = dict(pagination=pagination, page=page)
580591

581592
if types is not None:
@@ -604,7 +615,36 @@ async def get_posts(
604615

605616
async with self.http_session.get("/api/v0/posts.json", params=params) as resp:
606617
resp.raise_for_status()
607-
return await resp.json()
618+
response_json = await resp.json()
619+
posts_raw = response_json["posts"]
620+
621+
# All posts may not be valid according to the latest specification in
622+
# aleph-message. This allows the user to specify how errors should be handled.
623+
posts: List[AlephMessage] = []
624+
for post_raw in posts_raw:
625+
try:
626+
message = parse_message(post_raw)
627+
posts.append(message)
628+
except KeyError as e:
629+
if not ignore_invalid_messages:
630+
raise e
631+
logger.log(
632+
level=invalid_messages_log_level,
633+
msg=f"KeyError: Field '{e.args[0]}' not found",
634+
)
635+
except ValidationError as e:
636+
if not ignore_invalid_messages:
637+
raise e
638+
if invalid_messages_log_level:
639+
logger.log(level=invalid_messages_log_level, msg=e)
640+
641+
return PostsResponse(
642+
posts=posts,
643+
pagination_page=response_json["pagination_page"],
644+
pagination_total=response_json["pagination_total"],
645+
pagination_per_page=response_json["pagination_per_page"],
646+
pagination_item=response_json["pagination_item"],
647+
)
608648

609649
async def download_file_to_buffer(
610650
self,
@@ -807,6 +847,7 @@ async def watch_messages(
807847
self,
808848
message_type: Optional[MessageType] = None,
809849
content_types: Optional[Iterable[str]] = None,
850+
content_keys: Optional[Iterable[str]] = None,
810851
refs: Optional[Iterable[str]] = None,
811852
addresses: Optional[Iterable[str]] = None,
812853
tags: Optional[Iterable[str]] = None,
@@ -822,6 +863,8 @@ async def watch_messages(
822863
params["msgType"] = message_type.value
823864
if content_types is not None:
824865
params["contentTypes"] = ",".join(content_types)
866+
if content_keys is not None:
867+
params["contentKeys"] = ",".join(content_keys)
825868
if refs is not None:
826869
params["refs"] = ",".join(refs)
827870
if addresses is not None:

src/aleph/sdk/models.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
from typing import List
22

3-
from aleph_message.models import AlephMessage
3+
from aleph_message.models import AlephMessage, PostMessage
44
from pydantic import BaseModel
55

66

7-
class MessagesResponse(BaseModel):
8-
"""Response from an Aleph node API on the path /api/v0/messages.json"""
9-
10-
messages: List[AlephMessage]
7+
class PaginationResponse(BaseModel):
118
pagination_page: int
129
pagination_total: int
1310
pagination_per_page: int
1411
pagination_item: str
12+
13+
14+
class MessagesResponse(PaginationResponse):
15+
"""Response from an Aleph node API on the path /api/v0/messages.json"""
16+
17+
messages: List[AlephMessage]
18+
pagination_item = "messages"
19+
20+
21+
class PostsResponse(PaginationResponse):
22+
"""Response from an Aleph node API on the path /api/v0/posts.json"""
23+
24+
posts: List[PostMessage]
25+
pagination_item = "posts"

0 commit comments

Comments
 (0)