From 1c9da05fa9688408c27e93a8c46148f5a7c7a51c Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 27 Jun 2025 18:16:20 +0200 Subject: [PATCH 1/3] Adds `fully_exhaust` arg for async query map. --- async_substrate_interface/async_substrate.py | 81 +++++++++++++++---- async_substrate_interface/utils/decoding.py | 2 +- .../asyncio_/test_substrate_interface.py | 43 ++++++++++ 3 files changed, 109 insertions(+), 17 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 8db4f7f..f83c74b 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -456,7 +456,6 @@ async def retrieve_next_page(self, start_key) -> list: ) if len(result.records) < self.page_size: self.loading_complete = True - # Update last key from new result set to use as offset for next page self.last_key = result.last_key return result.records @@ -3373,6 +3372,7 @@ async def query_map( page_size: int = 100, ignore_decoding_errors: bool = False, reuse_block_hash: bool = False, + fully_exhaust: bool = False, ) -> AsyncQueryMapResult: """ Iterates over all key-pairs located at the given module and storage_function. The storage @@ -3403,6 +3403,8 @@ async def query_map( decoding reuse_block_hash: use True if you wish to make the query using the last-used block hash. Do not mark True if supplying a block_hash + fully_exhaust: Pull the entire result at once, rather than paginating. Only use if you need the entire query + map result. Returns: AsyncQueryMapResult object @@ -3453,11 +3455,16 @@ async def query_map( page_size = max_results # Retrieve storage keys - response = await self.rpc_request( - method="state_getKeysPaged", - params=[prefix, page_size, start_key, block_hash], - runtime=runtime, - ) + if not fully_exhaust: + response = await self.rpc_request( + method="state_getKeysPaged", + params=[prefix, page_size, start_key, block_hash], + runtime=runtime, + ) + else: + response = await self.rpc_request( + method="state_getKeys", params=[prefix, block_hash], runtime=runtime + ) if "error" in response: raise SubstrateRequestException(response["error"]["message"]) @@ -3470,18 +3477,60 @@ async def query_map( if len(result_keys) > 0: last_key = result_keys[-1] - # Retrieve corresponding value - response = await self.rpc_request( - method="state_queryStorageAt", - params=[result_keys, block_hash], - runtime=runtime, - ) + # Retrieve corresponding value(s) + if not fully_exhaust: + response = await self.rpc_request( + method="state_queryStorageAt", + params=[result_keys, block_hash], + runtime=runtime, + ) + if "error" in response: + raise SubstrateRequestException(response["error"]["message"]) + for result_group in response["result"]: + result = decode_query_map( + result_group["changes"], + prefix, + runtime, + param_types, + params, + value_type, + key_hashers, + ignore_decoding_errors, + self.decode_ss58, + ) + else: + all_responses = [] + page_batches = [ + result_keys[i : i + page_size] + for i in range(0, len(result_keys), page_size) + ] + changes = [] + for batch_group in [ + # run five concurrent batch pulls; could go higher, but it's good to be a good citizens + # of the ecosystem + page_batches[i : i + 5] + for i in range(0, len(page_batches), 5) + ]: + all_responses.extend( + await asyncio.gather( + *[ + self.rpc_request( + method="state_queryStorageAt", + params=[batch_keys, block_hash], + runtime=runtime, + ) + for batch_keys in batch_group + ] + ) + ) + for response in all_responses: + if "error" in response: + raise SubstrateRequestException(response["error"]["message"]) + for result_group in response["result"]: + changes.extend(result_group["changes"]) - if "error" in response: - raise SubstrateRequestException(response["error"]["message"]) - for result_group in response["result"]: result = decode_query_map( - result_group["changes"], + changes, prefix, runtime, param_types, diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index ae062a3..c3dd429 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -73,7 +73,7 @@ def _decode_scale_list_with_runtime( def decode_query_map( - result_group_changes, + result_group_changes: list, prefix, runtime: "Runtime", param_types, diff --git a/tests/unit_tests/asyncio_/test_substrate_interface.py b/tests/unit_tests/asyncio_/test_substrate_interface.py index 39266a7..9c232f0 100644 --- a/tests/unit_tests/asyncio_/test_substrate_interface.py +++ b/tests/unit_tests/asyncio_/test_substrate_interface.py @@ -1,4 +1,5 @@ import asyncio +import time from unittest.mock import AsyncMock, MagicMock, ANY import pytest @@ -182,3 +183,45 @@ async def test_ss58_conversion(): if len(value.value) > 0: for decoded_key in value.value: assert isinstance(decoded_key, str) + + +@pytest.mark.asyncio +async def test_fully_exhaust_query_map(): + async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate: + block_hash = await substrate.get_chain_finalised_head() + non_fully_exhauster_start = time.time() + non_fully_exhausted_qm = await substrate.query_map( + "SubtensorModule", + "CRV3WeightCommits", + block_hash=block_hash, + ) + initial_records_count = len(non_fully_exhausted_qm.records) + assert initial_records_count <= 100 # default page size + exhausted_records_count = 0 + async for _ in non_fully_exhausted_qm: + exhausted_records_count += 1 + non_fully_exhausted_time = time.time() - non_fully_exhauster_start + + assert len(non_fully_exhausted_qm.records) >= initial_records_count + fully_exhausted_start = time.time() + fully_exhausted_qm = await substrate.query_map( + "SubtensorModule", + "CRV3WeightCommits", + block_hash=block_hash, + fully_exhaust=True, + ) + + fully_exhausted_time = time.time() - fully_exhausted_start + initial_records_count_fully_exhaust = len(fully_exhausted_qm.records) + assert fully_exhausted_time <= non_fully_exhausted_time, ( + f"Fully exhausted took longer than non-fully exhausted with " + f"{len(non_fully_exhausted_qm.records)} records in non-fully exhausted " + f"in {non_fully_exhausted_time} seconds, and {initial_records_count_fully_exhaust} in fully exhausted" + f" in {fully_exhausted_time} seconds. This could be caused by the fact that on this specific block, " + f"there are fewer records than take up a single page. This difference should still be small." + ) + fully_exhausted_records_count = 0 + async for _ in fully_exhausted_qm: + fully_exhausted_records_count += 1 + assert fully_exhausted_records_count == initial_records_count_fully_exhaust + assert initial_records_count_fully_exhaust == exhausted_records_count From a343edcbfbc0ce726b599c0303a73e50362d0be7 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 30 Jun 2025 15:49:41 +0200 Subject: [PATCH 2/3] Move test --- .../test_async_substrate_interface.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/integration_tests/test_async_substrate_interface.py b/tests/integration_tests/test_async_substrate_interface.py index 969771a..319330f 100644 --- a/tests/integration_tests/test_async_substrate_interface.py +++ b/tests/integration_tests/test_async_substrate_interface.py @@ -1,3 +1,5 @@ +import time + import pytest from scalecodec import ss58_encode @@ -71,3 +73,45 @@ async def test_ss58_conversion(): if len(value.value) > 0: for decoded_key in value.value: assert isinstance(decoded_key, str) + + +@pytest.mark.asyncio +async def test_fully_exhaust_query_map(): + async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate: + block_hash = await substrate.get_chain_finalised_head() + non_fully_exhauster_start = time.time() + non_fully_exhausted_qm = await substrate.query_map( + "SubtensorModule", + "CRV3WeightCommits", + block_hash=block_hash, + ) + initial_records_count = len(non_fully_exhausted_qm.records) + assert initial_records_count <= 100 # default page size + exhausted_records_count = 0 + async for _ in non_fully_exhausted_qm: + exhausted_records_count += 1 + non_fully_exhausted_time = time.time() - non_fully_exhauster_start + + assert len(non_fully_exhausted_qm.records) >= initial_records_count + fully_exhausted_start = time.time() + fully_exhausted_qm = await substrate.query_map( + "SubtensorModule", + "CRV3WeightCommits", + block_hash=block_hash, + fully_exhaust=True, + ) + + fully_exhausted_time = time.time() - fully_exhausted_start + initial_records_count_fully_exhaust = len(fully_exhausted_qm.records) + assert fully_exhausted_time <= non_fully_exhausted_time, ( + f"Fully exhausted took longer than non-fully exhausted with " + f"{len(non_fully_exhausted_qm.records)} records in non-fully exhausted " + f"in {non_fully_exhausted_time} seconds, and {initial_records_count_fully_exhaust} in fully exhausted" + f" in {fully_exhausted_time} seconds. This could be caused by the fact that on this specific block, " + f"there are fewer records than take up a single page. This difference should still be small." + ) + fully_exhausted_records_count = 0 + async for _ in fully_exhausted_qm: + fully_exhausted_records_count += 1 + assert fully_exhausted_records_count == initial_records_count_fully_exhaust + assert initial_records_count_fully_exhaust == exhausted_records_count From 7c1ece4d60b2502711642d4bab067cf4940cf123 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 30 Jun 2025 15:49:54 +0200 Subject: [PATCH 3/3] Ruff --- async_substrate_interface/utils/decoding.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index 3cd477b..af8d969 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -123,10 +123,10 @@ def concat_hash_len(key_hasher: str) -> int: decoded_keys = all_decoded[:middl_index] decoded_values = all_decoded[middl_index:] for kts, vts, dk, dv in zip( - pre_decoded_key_types, - pre_decoded_value_types, - decoded_keys, - decoded_values, + pre_decoded_key_types, + pre_decoded_value_types, + decoded_keys, + decoded_values, ): try: # strip key_hashers to use as item key