Skip to content

aioredis client with buffer queue #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839

.coverage
.vscode

# User-specific stuff
.idea/**/workspace.xml
Expand Down Expand Up @@ -96,3 +97,4 @@ ENV/
.Python
dist/
*.egg-info/
poetry.lock
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ repository = "https://github.com/RedisGraph/redigraph-bulk-loader"
python = "^3.8.5"
click = "^8.0.1"
redis = "^3.5.3"
asyncclick = "8.0.1.3"
aioredis= "2.0.0a1"
redisgraph = "^2.4.0"
anyio = "^3.1.0"

[tool.poetry.dev-dependencies]
codecov = "^2.1.11"
Expand All @@ -40,7 +44,6 @@ bandit = "^1.7.0"
vulture = "^2.3"
pytest = "^6.2.4"
pytest-cov = "^2.12.1"
redisgraph = "^2.4.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
46 changes: 25 additions & 21 deletions redisgraph_bulk_loader/bulk_insert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import sys
import redis
import click
import asyncio
import aioredis
import asyncclick as click
from timeit import default_timer as timer

sys.path.append(os.path.dirname(__file__))
Expand All @@ -26,15 +27,15 @@ def parse_schemas(cls, query_buf, path_to_csv, csv_tuples, config):

# For each input file, validate contents and convert to binary format.
# If any buffer limits have been reached, flush all enqueued inserts to Redis.
def process_entities(entities):
async def process_entities(entities):
for entity in entities:
entity.process_entities()
await entity.process_entities()
added_size = entity.binary_size
# Check to see if the addition of this data will exceed the buffer's capacity
if (entity.query_buffer.buffer_size + added_size >= entity.config.max_buffer_size
or entity.query_buffer.redis_token_count + len(entity.binary_entities) >= entity.config.max_token_count):
# Send and flush the buffer if appropriate
entity.query_buffer.send_buffer()
await entity.query_buffer.send_buffer()
# Add binary data to list and update all counts
entity.query_buffer.redis_token_count += len(entity.binary_entities)
entity.query_buffer.buffer_size += added_size
Expand Down Expand Up @@ -70,43 +71,46 @@ def process_entities(entities):
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')
@click.option('--index', '-i', multiple=True, help='Label:Propery on which to create an index')
@click.option('--full-text-index', '-f', multiple=True, help='Label:Propery on which to create an full text search index')
def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index):
if sys.version_info.major < 3 or sys.version_info.minor < 6:
raise Exception("Python >= 3.6 is required for the RedisGraph bulk loader.")
@click.option('--async-requests', '-j', default=3, help='number of async requests to be executed in parallel')
async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index, async_requests):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this function is async because otherwise it cannot make async calls?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct

if sys.version_info.major < 3 or sys.version_info.minor < 8:
raise Exception("Python >= 3.8 is required for the RedisGraph bulk loader.")

if not (any(nodes) or any(nodes_with_label)):
raise Exception("At least one node file must be specified.")
if async_requests <= 0:
raise Exception("The number of async requests must be greater than zero")

start_time = timer()

# If relations are being built, we must store unique node identifiers to later resolve endpoints.
store_node_identifiers = any(relations) or any(relations_with_type)

# Initialize configurations with command-line arguments
config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar)
config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar, async_requests)

# Attempt to connect to Redis server
try:
if unix_socket_path is not None:
client = redis.StrictRedis(unix_socket_path=unix_socket_path, username=user, password=password)
client = await aioredis.from_url(f"unix://{unix_socket_path}", username=user, password=password)
else:
client = redis.StrictRedis(host=host, port=port, username=user, password=password)
except redis.exceptions.ConnectionError as e:
client = await aioredis.from_url(f"redis://{host}:{port}", username=user, password=password)
except aioredis.exceptions.ConnectionError as e:
print("Could not connect to Redis server.")
raise e

# Attempt to verify that RedisGraph module is loaded
try:
module_list = client.execute_command("MODULE LIST")
module_list = await client.execute_command("MODULE", "LIST")
if not any(b'graph' in module_description for module_description in module_list):
print("RedisGraph module not loaded on connected server.")
sys.exit(1)
except redis.exceptions.ResponseError:
except aioredis.exceptions.ResponseError:
# Ignore check if the connected server does not support the "MODULE LIST" command
pass

# Verify that the graph name is not already used in the Redis database
key_exists = client.execute_command("EXISTS", graph)
key_exists = await client.execute_command("EXISTS", graph)
if key_exists:
print("Graph with name '%s', could not be created, as Redis key '%s' already exists." % (graph, graph))
sys.exit(1)
Expand All @@ -117,11 +121,11 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node
labels = parse_schemas(Label, query_buf, nodes, nodes_with_label, config)
reltypes = parse_schemas(RelationType, query_buf, relations, relations_with_type, config)

process_entities(labels)
process_entities(reltypes)
await process_entities(labels)
await process_entities(reltypes)

# Send all remaining tokens to Redis
query_buf.send_buffer()
await query_buf.flush()

end_time = timer()
query_buf.report_completion(end_time - start_time)
Expand All @@ -131,7 +135,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node
l, p = i.split(":")
print("Creating Index on Label: %s, Property: %s" % (l, p))
try:
index_create = client.execute_command("GRAPH.QUERY", graph, "CREATE INDEX ON :%s(%s)" % (l, p))
index_create = await client.execute_command("GRAPH.QUERY", graph, "CREATE INDEX ON :%s(%s)" % (l, p))
for z in index_create:
print(z[0].decode("utf-8"))
except redis.exceptions.ResponseError as e:
Expand All @@ -143,7 +147,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node
l, p = i.split(":")
print("Creating Full Text Search Index on Label: %s, Property: %s" % (l, p))
try:
index_create = client.execute_command("GRAPH.QUERY", graph, "CALL db.idx.fulltext.createNodeIndex('%s', '%s')" % (l, p))
index_create = await client.execute_command("GRAPH.QUERY", graph, "CALL db.idx.fulltext.createNodeIndex('%s', '%s')" % (l, p))
print(index_create[-1][0].decode("utf-8"))
except redis.exceptions.ResponseError as e:
print("Unable to create Full Text Search Index on Label: %s, Property %s" % (l, p))
Expand All @@ -153,4 +157,4 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node


if __name__ == '__main__':
bulk_insert()
asyncio.run(bulk_insert())
3 changes: 2 additions & 1 deletion redisgraph_bulk_loader/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class Config:
def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token_size=512, enforce_schema=False, skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\'):
def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token_size=512, enforce_schema=False, skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\', async_requests =3 ):
"""Settings for this run of the bulk loader"""
# Maximum number of tokens per query
# 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so
Expand All @@ -18,6 +18,7 @@ def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token
self.separator = separator
self.quoting = quoting
self.escapechar = None if escapechar.lower() == "none" else escapechar
self.async_requests = async_requests

# True if we are building relations as well as nodes
self.store_node_identifiers = store_node_identifiers
6 changes: 3 additions & 3 deletions redisgraph_bulk_loader/label.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
import sys
import click
import asyncclick as click
from entity_file import Type, EntityFile
from exceptions import SchemaError

Expand Down Expand Up @@ -50,7 +50,7 @@ def update_node_dictionary(self, identifier):
self.query_buffer.nodes[identifier] = self.query_buffer.top_node_id
self.query_buffer.top_node_id += 1

def process_entities(self):
async def process_entities(self):
entities_created = 0
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
for row in reader:
Expand All @@ -75,7 +75,7 @@ def process_entities(self):
added_size = self.binary_size + row_binary_len
if added_size >= self.config.max_token_size or self.query_buffer.buffer_size + added_size >= self.config.max_buffer_size:
self.query_buffer.labels.append(self.to_binary())
self.query_buffer.send_buffer()
await self.query_buffer.send_buffer()
self.reset_partial_binary()
# Push the label onto the query buffer again, as there are more entities to process.
self.query_buffer.labels.append(self.to_binary())
Expand Down
143 changes: 106 additions & 37 deletions redisgraph_bulk_loader/query_buffer.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,134 @@
class QueryBuffer:
def __init__(self, graphname, client, config):
self.nodes = None
self.top_node_id = 0
import asyncio

# Redis client and data for each query
class InternalBuffer:
def __init__(self, graphname, client):
self.client = client
self.graphname = graphname

# Create a node dictionary if we're building relations and as such require unique identifiers
if config.store_node_identifiers:
self.nodes = {}
else:
self.nodes = None

# Sizes for buffer currently being constructed
self.redis_token_count = 0
self.buffer_size = 0

# The first query should include a "BEGIN" token
self.graphname = graphname
self.initial_query = True

self.node_count = 0
self.relation_count = 0

self.labels = [] # List containing all pending Label objects
self.reltypes = [] # List containing all pending RelationType objects

self.nodes_created = 0 # Total number of nodes created
self.relations_created = 0 # Total number of relations created

# TODO consider using a queue to send commands asynchronously
def send_buffer(self):

def send_buffer(self, initial_query):
"""Send all pending inserts to Redis"""
# Do nothing if we have no entities
if self.node_count == 0 and self.relation_count == 0:
return
return None

args = [self.node_count, self.relation_count, len(self.labels), len(self.reltypes)] + self.labels + self.reltypes
# Prepend a "BEGIN" token if this is the first query
if self.initial_query:
if initial_query:
args.insert(0, "BEGIN")
self.initial_query = False

result = self.client.execute_command("GRAPH.BULK", self.graphname, *args)
stats = result.split(', '.encode())
self.nodes_created += int(stats[0].split(' '.encode())[0])
self.relations_created += int(stats[1].split(' '.encode())[0])
return self.client.execute_command("GRAPH.BULK", self.graphname, *args)

self.clear_buffer()
class QueryBuffer:
def __init__(self, graphname, client, config):

# Delete all entities that have been inserted
def clear_buffer(self):
del self.labels[:]
del self.reltypes[:]
self.client = client
self.graphname = graphname
self.config = config
self.async_requests = config.async_requests

self.redis_token_count = 0
self.buffer_size = 0
self.node_count = 0
self.relation_count = 0
# A queue of internal buffers
self.internal_buffers = list()
for i in range(self.async_requests):
self.internal_buffers.append(InternalBuffer(graphname, client))
# Each buffer sent to RedisGraph returns awaitable
self.awaitables = set()
# Pop the first buffer
self.current_buffer = self.internal_buffers.pop(0)

self.initial_query = True
self.nodes_created = 0 # Total number of nodes created
self.relations_created = 0 # Total number of relations created

self.nodes = None
self.top_node_id = 0
# Create a node dictionary if we're building relations and as such require unique identifiers
if config.store_node_identifiers:
self.nodes = {}
else:
self.nodes = None

async def send_buffer(self, flush=False):
# If flush is needed all of the awaitables need to be complete, otherwise at least one is needed.
return_when_flag = asyncio.ALL_COMPLETED if flush is True else asyncio.FIRST_COMPLETED
coro = self.current_buffer.send_buffer(self.initial_query)
if coro is not None:
self.awaitables.add(asyncio.create_task(coro))
# Requests are flushed and awaited when:
# 1. Flush is needed.
# 2. Initial query with BEGIN token, to avoid race condition on async RedisGraph servers.
# 3. The amount of async requests has reached the limit.
if(len(self.awaitables) == self.async_requests or self.initial_query is True or (flush == True and len(self.awaitables) > 0)):
done, pending = await asyncio.wait(self.awaitables, return_when = return_when_flag)
for d in done:
result = d.result()
stats = result.split(', '.encode())
self.nodes_created += int(stats[0].split(' '.encode())[0])
self.relations_created += int(stats[1].split(' '.encode())[0])
# Create a new buffer of each completed task.
self.internal_buffers.append(InternalBuffer(self.graphname, self.client))
# Store the pending tasks.
self.awaitables = pending
self.initial_query = False
# Pop a new buffer.
self.current_buffer = self.internal_buffers.pop(0)

async def flush(self):
await self.send_buffer(flush=True)

def report_completion(self, runtime):
print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds"
% (self.graphname, self.nodes_created, self.relations_created, runtime))

@property
def node_count(self):
return self.current_buffer.node_count

@node_count.setter
def node_count(self, value):
self.current_buffer.node_count = value

@property
def buffer_size(self):
return self.current_buffer.buffer_size

@property
def labels(self):
return self.current_buffer.labels

@property
def reltypes(self):
return self.current_buffer.reltypes

@property
def relation_count(self):
return self.current_buffer.relation_count

@relation_count.setter
def relation_count(self, value):
self.current_buffer.relation_count = value

@property
def redis_token_count(self):
return self.current_buffer.redis_token_count

@redis_token_count.setter
def redis_token_count(self, value):
self.current_buffer.redis_token_count = value

@property
def buffer_size(self):
return self.current_buffer.buffer_size

@buffer_size.setter
def buffer_size(self, value):
self.current_buffer.buffer_size = value
6 changes: 3 additions & 3 deletions redisgraph_bulk_loader/relation_type.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
import struct
import click
import asyncclick as click
from entity_file import Type, EntityFile
from exceptions import CSVError, SchemaError

Expand Down Expand Up @@ -45,7 +45,7 @@ def post_process_header_with_schema(self, header):
if end_match:
self.end_namespace = end_match.group(1)

def process_entities(self):
async def process_entities(self):
entities_created = 0
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
for row in reader:
Expand Down Expand Up @@ -77,7 +77,7 @@ def process_entities(self):
added_size = self.binary_size + row_binary_len
if added_size >= self.config.max_token_size or self.query_buffer.buffer_size + added_size >= self.config.max_buffer_size:
self.query_buffer.reltypes.append(self.to_binary())
self.query_buffer.send_buffer()
await self.query_buffer.send_buffer()
self.reset_partial_binary()
# Push the reltype onto the query buffer again, as there are more entities to process.
self.query_buffer.reltypes.append(self.to_binary())
Expand Down
Loading