From b6990961961a0f4236ff1de35b6243ec6158b9b2 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Sun, 13 Jun 2021 19:43:03 +0300 Subject: [PATCH 1/8] aioredis client with buffer queue --- redisgraph_bulk_loader/bulk_insert.py | 39 +++---- redisgraph_bulk_loader/label.py | 6 +- redisgraph_bulk_loader/query_buffer.py | 143 ++++++++++++++++++------ redisgraph_bulk_loader/relation_type.py | 6 +- test/test_bulk_loader.py | 84 +++++++------- test/test_bulk_update.py | 48 ++++---- 6 files changed, 198 insertions(+), 128 deletions(-) diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index 536549b..f4d2b98 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -1,7 +1,7 @@ import os import sys -import redis -import click +import aioredis +import asyncclick as click from timeit import default_timer as timer sys.path.append(os.path.dirname(__file__)) @@ -26,15 +26,15 @@ def parse_schemas(cls, query_buf, path_to_csv, csv_tuples, config): # For each input file, validate contents and convert to binary format. # If any buffer limits have been reached, flush all enqueued inserts to Redis. -def process_entities(entities): +async def process_entities(entities): for entity in entities: - entity.process_entities() + await entity.process_entities() added_size = entity.binary_size # Check to see if the addition of this data will exceed the buffer's capacity if (entity.query_buffer.buffer_size + added_size >= entity.config.max_buffer_size or entity.query_buffer.redis_token_count + len(entity.binary_entities) >= entity.config.max_token_count): # Send and flush the buffer if appropriate - entity.query_buffer.send_buffer() + await entity.query_buffer.send_buffer() # Add binary data to list and update all counts entity.query_buffer.redis_token_count += len(entity.binary_entities) entity.query_buffer.buffer_size += added_size @@ -70,7 +70,8 @@ def process_entities(entities): @click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)') @click.option('--index', '-i', multiple=True, help='Label:Propery on which to create an index') @click.option('--full-text-index', '-f', multiple=True, help='Label:Propery on which to create an full text search index') -def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index): +@click.option('--async-requests', '-A', default=3, help='amount of async requests to be executed in parallel' ) +async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index, async_requests): if sys.version_info.major < 3 or sys.version_info.minor < 6: raise Exception("Python >= 3.6 is required for the RedisGraph bulk loader.") @@ -88,40 +89,40 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node # Attempt to connect to Redis server try: if unix_socket_path is not None: - client = redis.StrictRedis(unix_socket_path=unix_socket_path, username=user, password=password) + client = await aioredis.from_url(f"unix://{unix_socket_path}", username=user, password=password) else: - client = redis.StrictRedis(host=host, port=port, username=user, password=password) - except redis.exceptions.ConnectionError as e: - print("Could not connect to Redis server.") + client = await aioredis.from_url(f"redis://{host}:{port}", username=user, password=password) + except aioredis.exceptions.ConnectionError as e: + print("Could not connect to Redis ser`ver.") raise e # Attempt to verify that RedisGraph module is loaded try: - module_list = client.execute_command("MODULE LIST") + module_list = await client.execute_command("MODULE", "LIST") if not any(b'graph' in module_description for module_description in module_list): print("RedisGraph module not loaded on connected server.") sys.exit(1) - except redis.exceptions.ResponseError: + except aioredis.exceptions.ResponseError: # Ignore check if the connected server does not support the "MODULE LIST" command pass # Verify that the graph name is not already used in the Redis database - key_exists = client.execute_command("EXISTS", graph) + key_exists = await client.execute_command("EXISTS", graph) if key_exists: print("Graph with name '%s', could not be created, as Redis key '%s' already exists." % (graph, graph)) sys.exit(1) - query_buf = QueryBuffer(graph, client, config) + query_buf = QueryBuffer(graph, client, config, async_requests) # Read the header rows of each input CSV and save its schema. labels = parse_schemas(Label, query_buf, nodes, nodes_with_label, config) reltypes = parse_schemas(RelationType, query_buf, relations, relations_with_type, config) - process_entities(labels) - process_entities(reltypes) + await process_entities(labels) + await process_entities(reltypes) # Send all remaining tokens to Redis - query_buf.send_buffer() + await query_buf.flush() end_time = timer() query_buf.report_completion(end_time - start_time) @@ -131,7 +132,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node l, p = i.split(":") print("Creating Index on Label: %s, Property: %s" % (l, p)) try: - index_create = client.execute_command("GRAPH.QUERY", graph, "CREATE INDEX ON :%s(%s)" % (l, p)) + index_create = await client.execute_command("GRAPH.QUERY", graph, "CREATE INDEX ON :%s(%s)" % (l, p)) for z in index_create: print(z[0].decode("utf-8")) except redis.exceptions.ResponseError as e: @@ -143,7 +144,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node l, p = i.split(":") print("Creating Full Text Search Index on Label: %s, Property: %s" % (l, p)) try: - index_create = client.execute_command("GRAPH.QUERY", graph, "CALL db.idx.fulltext.createNodeIndex('%s', '%s')" % (l, p)) + index_create = await client.execute_command("GRAPH.QUERY", graph, "CALL db.idx.fulltext.createNodeIndex('%s', '%s')" % (l, p)) print(index_create[-1][0].decode("utf-8")) except redis.exceptions.ResponseError as e: print("Unable to create Full Text Search Index on Label: %s, Property %s" % (l, p)) diff --git a/redisgraph_bulk_loader/label.py b/redisgraph_bulk_loader/label.py index 1dfc0f4..8a17fa1 100644 --- a/redisgraph_bulk_loader/label.py +++ b/redisgraph_bulk_loader/label.py @@ -1,6 +1,6 @@ import re import sys -import click +import asyncclick as click from entity_file import Type, EntityFile from exceptions import SchemaError @@ -50,7 +50,7 @@ def update_node_dictionary(self, identifier): self.query_buffer.nodes[identifier] = self.query_buffer.top_node_id self.query_buffer.top_node_id += 1 - def process_entities(self): + async def process_entities(self): entities_created = 0 with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: for row in reader: @@ -75,7 +75,7 @@ def process_entities(self): added_size = self.binary_size + row_binary_len if added_size >= self.config.max_token_size or self.query_buffer.buffer_size + added_size >= self.config.max_buffer_size: self.query_buffer.labels.append(self.to_binary()) - self.query_buffer.send_buffer() + await self.query_buffer.send_buffer() self.reset_partial_binary() # Push the label onto the query buffer again, as there are more entities to process. self.query_buffer.labels.append(self.to_binary()) diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index e3b2934..a7df509 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -1,65 +1,134 @@ -class QueryBuffer: - def __init__(self, graphname, client, config): - self.nodes = None - self.top_node_id = 0 +import asyncio - # Redis client and data for each query +class InternalBuffer: + def __init__(self, graphname, client): self.client = client self.graphname = graphname - # Create a node dictionary if we're building relations and as such require unique identifiers - if config.store_node_identifiers: - self.nodes = {} - else: - self.nodes = None - # Sizes for buffer currently being constructed self.redis_token_count = 0 self.buffer_size = 0 - # The first query should include a "BEGIN" token - self.graphname = graphname - self.initial_query = True - self.node_count = 0 self.relation_count = 0 self.labels = [] # List containing all pending Label objects self.reltypes = [] # List containing all pending RelationType objects - - self.nodes_created = 0 # Total number of nodes created - self.relations_created = 0 # Total number of relations created - - # TODO consider using a queue to send commands asynchronously - def send_buffer(self): + + def send_buffer(self, initial_query): """Send all pending inserts to Redis""" # Do nothing if we have no entities if self.node_count == 0 and self.relation_count == 0: - return + return None args = [self.node_count, self.relation_count, len(self.labels), len(self.reltypes)] + self.labels + self.reltypes # Prepend a "BEGIN" token if this is the first query - if self.initial_query: + if initial_query: args.insert(0, "BEGIN") - self.initial_query = False - result = self.client.execute_command("GRAPH.BULK", self.graphname, *args) - stats = result.split(', '.encode()) - self.nodes_created += int(stats[0].split(' '.encode())[0]) - self.relations_created += int(stats[1].split(' '.encode())[0]) + return self.client.execute_command("GRAPH.BULK", self.graphname, *args) - self.clear_buffer() +class QueryBuffer: + def __init__(self, graphname, client, config, async_requests): - # Delete all entities that have been inserted - def clear_buffer(self): - del self.labels[:] - del self.reltypes[:] + self.client = client + self.graphname = graphname + self.config = config + self.async_requests = async_requests - self.redis_token_count = 0 - self.buffer_size = 0 - self.node_count = 0 - self.relation_count = 0 + # A queue of internal buffers + self.internal_buffers = list() + for i in range(async_requests): + self.internal_buffers.append(InternalBuffer(graphname, client)) + # Each buffer sent to RedisGraph returns awaitable + self.awaitables = set() + # Pop the first buffer + self.current_buffer = self.internal_buffers.pop(0) + + self.initial_query = True + self.nodes_created = 0 # Total number of nodes created + self.relations_created = 0 # Total number of relations created + + self.nodes = None + self.top_node_id = 0 + # Create a node dictionary if we're building relations and as such require unique identifiers + if config.store_node_identifiers: + self.nodes = {} + else: + self.nodes = None + + async def send_buffer(self, flush=False): + # If flush is needed all of the awaitables need to be complete, otherwise at least one is needed. + return_when_flag = asyncio.ALL_COMPLETED if flush is True else asyncio.FIRST_COMPLETED + awaitable = self.current_buffer.send_buffer(self.initial_query) + if awaitable is not None: + self.awaitables.add(awaitable) + # Requests are flushed and awaited when: + # 1. Flush is needed. + # 2. Initial query with BEGIN token, to avoid race condition on async RedisGraph servers. + # 3. The amount of async requests has reached the limit. + if(len(self.awaitables) == self.async_requests or self.initial_query is True or flush == True): + done, pending = await asyncio.wait(self.awaitables, return_when = return_when_flag) + for d in done: + result = d.result() + stats = result.split(', '.encode()) + self.nodes_created += int(stats[0].split(' '.encode())[0]) + self.relations_created += int(stats[1].split(' '.encode())[0]) + # Create a new buffer of each completed task. + self.internal_buffers.append(InternalBuffer(self.graphname, self.client)) + # Store the pending tasks. + self.awaitables = pending + self.initial_query = False + # Pop a new buffer. + self.current_buffer = self.internal_buffers.pop(0) + + async def flush(self): + await self.send_buffer(flush=True) def report_completion(self, runtime): print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" % (self.graphname, self.nodes_created, self.relations_created, runtime)) + + @property + def node_count(self): + return self.current_buffer.node_count + + @node_count.setter + def node_count(self, value): + self.current_buffer.node_count = value + + @property + def buffer_size(self): + return self.current_buffer.buffer_size + + @property + def labels(self): + return self.current_buffer.labels + + @property + def reltypes(self): + return self.current_buffer.reltypes + + @property + def relation_count(self): + return self.current_buffer.relation_count + + @relation_count.setter + def relation_count(self, value): + self.current_buffer.relation_count = value + + @property + def redis_token_count(self): + return self.current_buffer.redis_token_count + + @redis_token_count.setter + def redis_token_count(self, value): + self.current_buffer.redis_token_count = value + + @property + def buffer_size(self): + return self.current_buffer.buffer_size + + @buffer_size.setter + def buffer_size(self, value): + self.current_buffer.buffer_size = value diff --git a/redisgraph_bulk_loader/relation_type.py b/redisgraph_bulk_loader/relation_type.py index 83f9718..4c58ffb 100644 --- a/redisgraph_bulk_loader/relation_type.py +++ b/redisgraph_bulk_loader/relation_type.py @@ -1,6 +1,6 @@ import re import struct -import click +import asyncclick as click from entity_file import Type, EntityFile from exceptions import CSVError, SchemaError @@ -45,7 +45,7 @@ def post_process_header_with_schema(self, header): if end_match: self.end_namespace = end_match.group(1) - def process_entities(self): + async def process_entities(self): entities_created = 0 with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: for row in reader: @@ -77,7 +77,7 @@ def process_entities(self): added_size = self.binary_size + row_binary_len if added_size >= self.config.max_token_size or self.query_buffer.buffer_size + added_size >= self.config.max_buffer_size: self.query_buffer.reltypes.append(self.to_binary()) - self.query_buffer.send_buffer() + await self.query_buffer.send_buffer() self.reset_partial_binary() # Push the reltype onto the query buffer again, as there are more entities to process. self.query_buffer.reltypes.append(self.to_binary()) diff --git a/test/test_bulk_loader.py b/test/test_bulk_loader.py index 5d817af..8394a6f 100644 --- a/test/test_bulk_loader.py +++ b/test/test_bulk_loader.py @@ -6,7 +6,7 @@ import redis import unittest from redisgraph import Graph -from click.testing import CliRunner +from asyncclick.testing import CliRunner from redisgraph_bulk_loader.bulk_insert import bulk_insert # Globals for validating example graph @@ -27,7 +27,7 @@ def row_count(in_csv): return idx -class TestBulkLoader(unittest.TestCase): +class TestBulkLoader(unittest.IsolatedAsyncioTestCase): @classmethod def setUpClass(cls): """ @@ -48,8 +48,8 @@ def tearDownClass(cls): def validate_exception(self, res, expected_msg): self.assertNotEqual(res.exit_code, 0) self.assertIn(expected_msg, str(res.exception)) - - def test01_social_graph(self): + + async def test01_social_graph(self): """Build the graph in 'example' and validate the created graph.""" global person_count global country_count @@ -71,7 +71,7 @@ def test01_social_graph(self): knows_count = str(row_count(knows_file)) visited_count = str(row_count(visited_file)) - res = runner.invoke(bulk_insert, ['--nodes', person_file, + res = await runner.invoke(bulk_insert, ['--nodes', person_file, '--nodes', country_file, '--relations', knows_file, '--relations', visited_file, @@ -181,7 +181,7 @@ def test01_social_graph(self): ['Valerie Abigail Arad', 'pleasure', 'Russia']] self.assertEqual(query_result.result_set, expected_result) - def test02_private_identifiers(self): + async def test02_private_identifiers(self): """Validate that private identifiers are not added to the graph.""" graphname = "tmpgraph1" # Write temporary files @@ -198,7 +198,7 @@ def test02_private_identifiers(self): out.writerow([5, 3]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--relations', '/tmp/relations.tmp', graphname], catch_exceptions=False) @@ -214,7 +214,7 @@ def test02_private_identifiers(self): for propname in query_result.header: self.assertNotIn('_identifier', propname) - def test03_reused_identifier(self): + async def test03_reused_identifier(self): """Expect failure on reused identifiers.""" graphname = "tmpgraph2" # Write temporary files @@ -230,7 +230,7 @@ def test03_reused_identifier(self): out.writerow([0, 3]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--relations', '/tmp/relations.tmp', graphname], catch_exceptions=False) @@ -240,14 +240,14 @@ def test03_reused_identifier(self): # Run the script again without creating relations runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', graphname], catch_exceptions=False) # The script should succeed and create 3 nodes self.assertEqual(res.exit_code, 0) self.assertIn('3 nodes created', res.output) - def test04_batched_build(self): + async def test04_batched_build(self): """ Create a graph using many batches. Reuses the inputs of test01_social_graph @@ -262,7 +262,7 @@ def test04_batched_build(self): visited_file = csv_path + 'VISITED.csv' csv_path = os.path.dirname(os.path.abspath(__file__)) + '/../../demo/bulk_insert/resources/' # Build the social graph again with a max token count of 1. - res = runner.invoke(bulk_insert, ['--nodes', person_file, + res = await runner.invoke(bulk_insert, ['--nodes', person_file, '--nodes', country_file, '--relations', knows_file, '--relations', visited_file, @@ -292,7 +292,7 @@ def test04_batched_build(self): new_result = new_graph.query('MATCH (a)-[e:KNOWS]->(b) RETURN a.name, e, b.name ORDER BY e.relation, a.name') self.assertEqual(original_result.result_set, new_result.result_set) - def test05_script_failures(self): + async def test05_script_failures(self): """Validate that the bulk loader fails gracefully on invalid inputs and arguments""" graphname = "tmpgraph3" @@ -303,7 +303,7 @@ def test05_script_failures(self): out.writerow([0]) # Wrong number of properites runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', graphname]) # The script should fail because a row has the wrong number of fields @@ -321,7 +321,7 @@ def test05_script_failures(self): out.writerow([0]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--relations', '/tmp/relations.tmp', graphname]) @@ -334,14 +334,14 @@ def test05_script_failures(self): out.writerow([0, "fakeidentifier"]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--relations', '/tmp/relations.tmp', graphname]) # The script should fail because an invalid node identifier was used self.validate_exception(res, "fakeidentifier") - def test06_property_types(self): + async def test06_property_types(self): """Verify that numeric, boolean, and string types are properly handled""" graphname = "tmpgraph4" @@ -360,7 +360,7 @@ def test06_property_types(self): out.writerow([7, 0.2, 'edge_prop']) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--relations', '/tmp/relations.tmp', graphname], catch_exceptions=False) @@ -377,7 +377,7 @@ def test06_property_types(self): # The graph should have the correct types for all properties self.assertEqual(query_result.result_set, expected_result) - def test07_utf8(self): + async def test07_utf8(self): """Verify that numeric, boolean, and null types are properly handled""" graphname = "tmpgraph5" # Write temporary files @@ -395,7 +395,7 @@ def test07_utf8(self): out.writerow([8, '美國人']) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', graphname], catch_exceptions=False) assert res.exit_code == 0 @@ -417,7 +417,7 @@ def test07_utf8(self): for i, j in zip(query_result.result_set, expected_strs): self.assertEqual(repr(i), repr(j)) - def test08_nonstandard_separators(self): + async def test08_nonstandard_separators(self): """Validate use of non-comma delimiters in input files.""" graphname = "tmpgraph6" @@ -432,7 +432,7 @@ def test08_nonstandard_separators(self): out.writerow(row) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--separator', '|', graphname], catch_exceptions=False) @@ -447,7 +447,7 @@ def test08_nonstandard_separators(self): # The graph should have the correct types for all properties self.assertEqual(query_result.result_set, expected_result) - def test09_schema(self): + async def test09_schema(self): """Validate that the enforce-schema argument is respected""" graphname = "tmpgraph7" @@ -458,7 +458,7 @@ def test09_schema(self): out.writerow([1, 1]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--enforce-schema', graphname], catch_exceptions=False) @@ -473,7 +473,7 @@ def test09_schema(self): # The graph should have the correct types for all properties self.assertEqual(query_result.result_set, expected_result) - def test10_invalid_schema(self): + async def test10_invalid_schema(self): """Validate that errors are emitted properly with an invalid CSV schema.""" graphname = "expect_fail" @@ -487,14 +487,14 @@ def test10_invalid_schema(self): runner = CliRunner() # Try to parse all cells as integers - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--enforce-schema', graphname]) # Expect an error. self.validate_exception(res, "Could not parse") - def test11_schema_ignore_columns(self): + async def test11_schema_ignore_columns(self): """Validate that columns with the type IGNORE are not inserted.""" graphname = "ignore_graph" @@ -505,7 +505,7 @@ def test11_schema_ignore_columns(self): out.writerow(['str2', 1]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--enforce-schema', graphname], catch_exceptions=False) @@ -521,7 +521,7 @@ def test11_schema_ignore_columns(self): self.assertEqual(query_result.result_set[0][0].properties, node_1) self.assertEqual(query_result.result_set[1][0].properties, node_2) - def test12_no_null_values(self): + async def test12_no_null_values(self): """Validate that NULL inputs are not inserted.""" graphname = "null_graph" @@ -532,7 +532,7 @@ def test12_no_null_values(self): out.writerow(['str2', None]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', graphname], catch_exceptions=False) self.assertEqual(res.exit_code, 0) @@ -547,7 +547,7 @@ def test12_no_null_values(self): self.assertEqual(query_result.result_set[0][0].properties, node_1) self.assertEqual(query_result.result_set[1][0].properties, node_2) - def test13_id_namespaces(self): + async def test13_id_namespaces(self): """Validate that ID namespaces allow for scoped identifiers.""" graphname = "namespace_graph" @@ -570,7 +570,7 @@ def test13_id_namespaces(self): out.writerow([1, 1]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes-with-label', 'User', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes-with-label', 'User', '/tmp/nodes.tmp', '--nodes-with-label', 'Post', '/tmp/nodes2.tmp', '--relations-with-type', 'AUTHOR', '/tmp/relations.tmp', '--enforce-schema', @@ -587,7 +587,7 @@ def test13_id_namespaces(self): ['1', 'Filipe', 'User', '1', 40, 'Post']] self.assertEqual(query_result.result_set, expected_result) - def test14_array_properties_inferred(self): + async def test14_array_properties_inferred(self): """Validate that array properties are correctly inserted.""" graphname = "arr_graph" @@ -598,7 +598,7 @@ def test14_array_properties_inferred(self): out.writerow(['str2', """['prop1', ['nested_1', 'nested_2'], 5]"""]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--separator', '|', graphname], catch_exceptions=False) @@ -613,7 +613,7 @@ def test14_array_properties_inferred(self): self.assertEqual(query_result.result_set[0][0].properties, node_1) self.assertEqual(query_result.result_set[1][0].properties, node_2) - def test15_array_properties_schema_enforced(self): + async def test15_array_properties_schema_enforced(self): """Validate that array properties are correctly inserted with an enforced schema.""" graphname = "arr_graph_with_schema" @@ -624,7 +624,7 @@ def test15_array_properties_schema_enforced(self): out.writerow(['str2', """['prop1', ['nested_1', 'nested_2'], 5]"""]) runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + res = await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--separator', '|', '--enforce-schema', graphname], catch_exceptions=False) @@ -640,7 +640,7 @@ def test15_array_properties_schema_enforced(self): self.assertEqual(query_result.result_set[0][0].properties, node_1) self.assertEqual(query_result.result_set[1][0].properties, node_2) - def test16_error_on_schema_failure(self): + async def test16_error_on_schema_failure(self): """Validate that the loader errors on processing non-conformant CSVs with an enforced schema.""" graphname = "schema_error" @@ -652,7 +652,7 @@ def test16_error_on_schema_failure(self): try: runner = CliRunner() - runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', + await runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp', '--separator', '|', '--enforce-schema', graphname], catch_exceptions=False) @@ -662,7 +662,7 @@ def test16_error_on_schema_failure(self): self.assertEqual(sys.exc_info()[0].__name__, 'SchemaError') self.assertIn("Could not parse 'strval' as an array", str(e)) - def test17_ensure_index_is_created(self): + async def test17_ensure_index_is_created(self): graphname = "index_test" with open('/tmp/nodes_index.tmp', mode='w') as csv_file: out = csv.writer(csv_file, delimiter='|') @@ -672,7 +672,7 @@ def test17_ensure_index_is_created(self): csv_file.close() runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes-with-label', 'Person', '/tmp/nodes_index.tmp', + res = await runner.invoke(bulk_insert, ['--nodes-with-label', 'Person', '/tmp/nodes_index.tmp', '--separator', '|', '--index', 'Person:age', '--enforce-schema', @@ -686,7 +686,7 @@ def test17_ensure_index_is_created(self): res = r.execute_command("GRAPH.EXPLAIN", graphname, 'MATCH (p:Person) WHERE p.age > 16 RETURN p') self.assertIn(' Index Scan | (p:Person)', res) - def test18_ensure_full_text_index_is_created(self): + async def test18_ensure_full_text_index_is_created(self): graphname = "index_full_text_test" with open('/tmp/nodes_full_text_index.tmp', mode='w') as csv_file: out = csv.writer(csv_file, delimiter='|') @@ -698,7 +698,7 @@ def test18_ensure_full_text_index_is_created(self): csv_file.close() runner = CliRunner() - res = runner.invoke(bulk_insert, ['--nodes-with-label', 'Monkeys', '/tmp/nodes_full_text_index.tmp', + res = await runner.invoke(bulk_insert, ['--nodes-with-label', 'Monkeys', '/tmp/nodes_full_text_index.tmp', '--full-text-index', 'Monkeys:name', '--enforce-schema', graphname], catch_exceptions=False) diff --git a/test/test_bulk_update.py b/test/test_bulk_update.py index 3d5b3da..9bf370b 100644 --- a/test/test_bulk_update.py +++ b/test/test_bulk_update.py @@ -5,11 +5,11 @@ import redis import unittest from redisgraph import Graph -from click.testing import CliRunner +from asyncclick.testing import CliRunner from redisgraph_bulk_loader.bulk_update import bulk_update -class TestBulkUpdate(unittest.TestCase): +class TestBulkUpdate(unittest.IsolatedAsyncioTestCase): @classmethod def setUpClass(cls): """ @@ -24,7 +24,7 @@ def tearDownClass(cls): os.remove('/tmp/csv.tmp') cls.redis_con.flushall() - def test01_simple_updates(self): + async def test01_simple_updates(self): """Validate that bulk updates work on an empty graph.""" graphname = "tmpgraph1" # Write temporary files @@ -36,7 +36,7 @@ def test01_simple_updates(self): out.writerow([3, "c"]) runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'CREATE (:L {id: row[0], name: row[1]})', graphname], catch_exceptions=False) @@ -55,7 +55,7 @@ def test01_simple_updates(self): self.assertEqual(query_result.result_set, expected_result) # Attempt to re-insert the entities using MERGE. - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'MERGE (:L {id: row[0], name: row[1]})', graphname], catch_exceptions=False) @@ -65,7 +65,7 @@ def test01_simple_updates(self): self.assertNotIn('Nodes created', res.output) self.assertNotIn('Properties set', res.output) - def test02_traversal_updates(self): + async def test02_traversal_updates(self): """Validate that bulk updates can create edges and perform traversals.""" graphname = "tmpgraph1" # Write temporary files @@ -79,7 +79,7 @@ def test02_traversal_updates(self): # Create a graph of the form: # (a)-->(b)-->(c), (a)-->(c) runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'MATCH (src {id: row[0]}) CREATE (src)-[:R]->(dest:L {id: row[1], name: row[2]})', graphname], catch_exceptions=False) @@ -97,7 +97,7 @@ def test02_traversal_updates(self): ["c", "c2"]] self.assertEqual(query_result.result_set, expected_result) - def test03_datatypes(self): + async def test03_datatypes(self): """Validate that all RedisGraph datatypes are supported by the bulk updater.""" graphname = "tmpgraph2" # Write temporary files @@ -106,7 +106,7 @@ def test03_datatypes(self): out.writerow([0, 1.5, "true", "string", "[1, 'nested_str']"]) runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'CREATE (a:L) SET a.intval = row[0], a.doubleval = row[1], a.boolval = row[2], a.stringval = row[3], a.arrayval = row[4]', '--no-header', graphname], catch_exceptions=False) @@ -122,7 +122,7 @@ def test03_datatypes(self): expected_result = [[0, 1.5, True, "string", "[1,'nested_str']"]] self.assertEqual(query_result.result_set, expected_result) - def test04_custom_delimiter(self): + async def test04_custom_delimiter(self): """Validate that non-comma delimiters produce the correct results.""" graphname = "tmpgraph3" # Write temporary files @@ -134,7 +134,7 @@ def test04_custom_delimiter(self): out.writerow([3, "c"]) runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'CREATE (:L {id: row[0], name: row[1]})', '--separator', '|', graphname], catch_exceptions=False) @@ -154,7 +154,7 @@ def test04_custom_delimiter(self): self.assertEqual(query_result.result_set, expected_result) # Attempt to re-insert the entities using MERGE. - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'MERGE (:L {id: row[0], name: row[1]})', '--separator', '|', graphname], catch_exceptions=False) @@ -165,7 +165,7 @@ def test04_custom_delimiter(self): self.assertNotIn('Nodes created', res.output) self.assertNotIn('Properties set', res.output) - def test05_custom_variable_name(self): + async def test05_custom_variable_name(self): """Validate that the user can specify the name of the 'row' query variable.""" graphname = "variable_name" runner = CliRunner() @@ -173,7 +173,7 @@ def test05_custom_variable_name(self): csv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../example/') person_file = os.path.join(csv_path, 'Person.csv') # Build the social graph again with a max token count of 1. - res = runner.invoke(bulk_update, ['--csv', person_file, + res = await runner.invoke(bulk_update, ['--csv', person_file, '--query', 'CREATE (p:Person) SET p.name = line[0], p.age = line[1], p.gender = line[2], p.status = line[3]', '--variable-name', 'line', graphname], catch_exceptions=False) @@ -203,7 +203,7 @@ def test05_custom_variable_name(self): ['Valerie Abigail Arad', 31, 'female', 'married']] self.assertEqual(query_result.result_set, expected_result) - def test06_no_header(self): + async def test06_no_header(self): """Validate that the '--no-header' option works properly.""" graphname = "tmpgraph4" # Write temporary files @@ -214,7 +214,7 @@ def test06_no_header(self): out.writerow([3, "c"]) runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'CREATE (:L {id: row[0], name: row[1]})', '--no-header', graphname], catch_exceptions=False) @@ -233,7 +233,7 @@ def test06_no_header(self): [5, "b"]] self.assertEqual(query_result.result_set, expected_result) - def test07_batched_update(self): + async def test07_batched_update(self): """Validate that updates performed over multiple batches produce the correct results.""" graphname = "batched_update" @@ -245,7 +245,7 @@ def test07_batched_update(self): out.writerow([prop_str]) runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'CREATE (:L {prop: row[0]})', '--no-header', '--max-token-size', 1, @@ -263,7 +263,7 @@ def test07_batched_update(self): expected_result = [[prop_str]] self.assertEqual(query_result.result_set, expected_result) - def test08_runtime_error(self): + async def test08_runtime_error(self): """Validate that run-time errors are captured by the bulk updater.""" graphname = "tmpgraph5" @@ -272,7 +272,7 @@ def test08_runtime_error(self): out = csv.writer(csv_file) out.writerow(["a"]) runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'MERGE (:L {val: NULL})', '--no-header', graphname]) @@ -280,11 +280,11 @@ def test08_runtime_error(self): self.assertNotEqual(res.exit_code, 0) self.assertIn("Cannot merge node", str(res.exception)) - def test09_compile_time_error(self): + async def test09_compile_time_error(self): """Validate that malformed queries trigger an early exit from the bulk updater.""" graphname = "tmpgraph5" runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/csv.tmp', '--query', 'CREATE (:L {val: row[0], val2: undefined_identifier})', '--no-header', graphname]) @@ -292,13 +292,13 @@ def test09_compile_time_error(self): self.assertNotEqual(res.exit_code, 0) self.assertIn("undefined_identifier not defined", str(res.exception)) - def test10_invalid_inputs(self): + async def test10_invalid_inputs(self): """Validate that the bulk updater handles invalid inputs incorrectly.""" graphname = "tmpgraph6" # Attempt to insert a non-existent CSV file. runner = CliRunner() - res = runner.invoke(bulk_update, ['--csv', '/tmp/fake_file.csv', + res = await runner.invoke(bulk_update, ['--csv', '/tmp/fake_file.csv', '--query', 'MERGE (:L {val: NULL})', graphname]) From a3de93e0777be1ca6cad94a6ef4855287cb0a8e5 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Sun, 13 Jun 2021 19:44:49 +0300 Subject: [PATCH 2/8] requirments.txt fix --- requirements.txt | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..88666d1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +asyncclick +aioredis==2.0.0a1 +redis>=3.5.3 +redisgraph From 2331e7ed718fc7a08411abc27c7258b66c78d867 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Sun, 13 Jun 2021 23:55:03 +0300 Subject: [PATCH 3/8] python 3.8 --- .gitignore | 1 + redisgraph_bulk_loader/bulk_insert.py | 2 +- requirements.txt | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 53f735b..9a93c85 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 .coverage +.vscode # User-specific stuff .idea/**/workspace.xml diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index f4d2b98..21d096b 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -93,7 +93,7 @@ async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes else: client = await aioredis.from_url(f"redis://{host}:{port}", username=user, password=password) except aioredis.exceptions.ConnectionError as e: - print("Could not connect to Redis ser`ver.") + print("Could not connect to Redis server.") raise e # Attempt to verify that RedisGraph module is loaded diff --git a/requirements.txt b/requirements.txt index 88666d1..b2afc2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ asyncclick +click +anyio aioredis==2.0.0a1 redis>=3.5.3 redisgraph From 2bcef8955a2d0e1e0bc2697ee7a50b576f88f447 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Tue, 15 Jun 2021 13:26:10 +0300 Subject: [PATCH 4/8] fixed PR comments --- redisgraph_bulk_loader/bulk_insert.py | 12 +++++++----- redisgraph_bulk_loader/config.py | 3 ++- redisgraph_bulk_loader/query_buffer.py | 12 ++++++------ 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index 21d096b..34a3814 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -70,13 +70,15 @@ async def process_entities(entities): @click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)') @click.option('--index', '-i', multiple=True, help='Label:Propery on which to create an index') @click.option('--full-text-index', '-f', multiple=True, help='Label:Propery on which to create an full text search index') -@click.option('--async-requests', '-A', default=3, help='amount of async requests to be executed in parallel' ) +@click.option('--async-requests', '-A', default=3, help='number of async requests to be executed in parallel' ) async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index, async_requests): - if sys.version_info.major < 3 or sys.version_info.minor < 6: - raise Exception("Python >= 3.6 is required for the RedisGraph bulk loader.") + if sys.version_info.major < 3 or sys.version_info.minor < 8: + raise Exception("Python >= 3.8 is required for the RedisGraph bulk loader.") if not (any(nodes) or any(nodes_with_label)): raise Exception("At least one node file must be specified.") + if async_requests <= 0: + raise Exception("The number of async requests must be greater than zero") start_time = timer() @@ -84,7 +86,7 @@ async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes store_node_identifiers = any(relations) or any(relations_with_type) # Initialize configurations with command-line arguments - config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar) + config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar, async_requests) # Attempt to connect to Redis server try: @@ -112,7 +114,7 @@ async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes print("Graph with name '%s', could not be created, as Redis key '%s' already exists." % (graph, graph)) sys.exit(1) - query_buf = QueryBuffer(graph, client, config, async_requests) + query_buf = QueryBuffer(graph, client, config) # Read the header rows of each input CSV and save its schema. labels = parse_schemas(Label, query_buf, nodes, nodes_with_label, config) diff --git a/redisgraph_bulk_loader/config.py b/redisgraph_bulk_loader/config.py index fb7966c..c1e2f9d 100644 --- a/redisgraph_bulk_loader/config.py +++ b/redisgraph_bulk_loader/config.py @@ -1,5 +1,5 @@ class Config: - def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token_size=512, enforce_schema=False, skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\'): + def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token_size=512, enforce_schema=False, skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\', async_requests =3 ): """Settings for this run of the bulk loader""" # Maximum number of tokens per query # 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so @@ -18,6 +18,7 @@ def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token self.separator = separator self.quoting = quoting self.escapechar = None if escapechar.lower() == "none" else escapechar + self.async_requests = async_requests # True if we are building relations as well as nodes self.store_node_identifiers = store_node_identifiers diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index a7df509..1289be3 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -29,16 +29,16 @@ def send_buffer(self, initial_query): return self.client.execute_command("GRAPH.BULK", self.graphname, *args) class QueryBuffer: - def __init__(self, graphname, client, config, async_requests): + def __init__(self, graphname, client, config): self.client = client self.graphname = graphname self.config = config - self.async_requests = async_requests + self.async_requests = config.async_requests # A queue of internal buffers self.internal_buffers = list() - for i in range(async_requests): + for i in range(self.async_requests): self.internal_buffers.append(InternalBuffer(graphname, client)) # Each buffer sent to RedisGraph returns awaitable self.awaitables = set() @@ -60,9 +60,9 @@ def __init__(self, graphname, client, config, async_requests): async def send_buffer(self, flush=False): # If flush is needed all of the awaitables need to be complete, otherwise at least one is needed. return_when_flag = asyncio.ALL_COMPLETED if flush is True else asyncio.FIRST_COMPLETED - awaitable = self.current_buffer.send_buffer(self.initial_query) - if awaitable is not None: - self.awaitables.add(awaitable) + coro = self.current_buffer.send_buffer(self.initial_query) + if coro is not None: + self.awaitables.add(asyncio.create_task(coro)) # Requests are flushed and awaited when: # 1. Flush is needed. # 2. Initial query with BEGIN token, to avoid race condition on async RedisGraph servers. From d92f8b9ae97574f1bb28fe44fba3c7dc32cb9d43 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Tue, 15 Jun 2021 13:27:28 +0300 Subject: [PATCH 5/8] after rebase --- requirements.txt | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index b2afc2d..0000000 --- a/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -asyncclick -click -anyio -aioredis==2.0.0a1 -redis>=3.5.3 -redisgraph From 58a3f34341e821ad2339b9c064b8d0e3e3ac4d86 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Tue, 15 Jun 2021 13:29:52 +0300 Subject: [PATCH 6/8] fixed tox file --- .gitignore | 1 + pyproject.toml | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 9a93c85..8370d5d 100644 --- a/.gitignore +++ b/.gitignore @@ -97,3 +97,4 @@ ENV/ .Python dist/ *.egg-info/ +poetry.lock \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 993dab5..67dd391 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,10 @@ repository = "https://github.com/RedisGraph/redigraph-bulk-loader" python = "^3.8.5" click = "^8.0.1" redis = "^3.5.3" +asyncclick = "8.0.1.3" +aioredis= "2.0.0a1" +redisgraph = "^2.4.0" +anyio = "^3.1.0" [tool.poetry.dev-dependencies] codecov = "^2.1.11" @@ -40,7 +44,6 @@ bandit = "^1.7.0" vulture = "^2.3" pytest = "^6.2.4" pytest-cov = "^2.12.1" -redisgraph = "^2.4.0" [build-system] requires = ["poetry-core>=1.0.0"] From 35ae1ff51c32bb7ea70c4130a1b80e5cec096794 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Tue, 15 Jun 2021 15:29:01 +0300 Subject: [PATCH 7/8] cosmetics --- redisgraph_bulk_loader/bulk_insert.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index 34a3814..79f8773 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -1,5 +1,6 @@ import os import sys +import asyncio import aioredis import asyncclick as click from timeit import default_timer as timer @@ -70,7 +71,7 @@ async def process_entities(entities): @click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)') @click.option('--index', '-i', multiple=True, help='Label:Propery on which to create an index') @click.option('--full-text-index', '-f', multiple=True, help='Label:Propery on which to create an full text search index') -@click.option('--async-requests', '-A', default=3, help='number of async requests to be executed in parallel' ) +@click.option('--async-requests', '-j', default=3, help='number of async requests to be executed in parallel') async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index, async_requests): if sys.version_info.major < 3 or sys.version_info.minor < 8: raise Exception("Python >= 3.8 is required for the RedisGraph bulk loader.") @@ -156,4 +157,4 @@ async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes if __name__ == '__main__': - bulk_insert() + asyncio.run(bulk_insert()) From f1245c51a57c2d6d1df4121bde8927917f0cb1a3 Mon Sep 17 00:00:00 2001 From: DvirDukhan Date: Wed, 16 Jun 2021 09:31:30 +0300 Subject: [PATCH 8/8] fixed a bug on flush with 1 sized list --- redisgraph_bulk_loader/query_buffer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index 1289be3..b54ba97 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -67,7 +67,7 @@ async def send_buffer(self, flush=False): # 1. Flush is needed. # 2. Initial query with BEGIN token, to avoid race condition on async RedisGraph servers. # 3. The amount of async requests has reached the limit. - if(len(self.awaitables) == self.async_requests or self.initial_query is True or flush == True): + if(len(self.awaitables) == self.async_requests or self.initial_query is True or (flush == True and len(self.awaitables) > 0)): done, pending = await asyncio.wait(self.awaitables, return_when = return_when_flag) for d in done: result = d.result() @@ -79,8 +79,8 @@ async def send_buffer(self, flush=False): # Store the pending tasks. self.awaitables = pending self.initial_query = False - # Pop a new buffer. - self.current_buffer = self.internal_buffers.pop(0) + # Pop a new buffer. + self.current_buffer = self.internal_buffers.pop(0) async def flush(self): await self.send_buffer(flush=True)