From da97d62615f28f41c065be324e388a59e21245f7 Mon Sep 17 00:00:00 2001 From: Avi Avni Date: Sun, 12 Sep 2021 13:43:16 +0300 Subject: [PATCH 1/8] move to async batches --- pyproject.toml | 1 + redisgraph_bulk_loader/bulk_insert.py | 1 + redisgraph_bulk_loader/query_buffer.py | 23 ++++++++++++++++++----- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 25155a0..321954e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ vulture = "^2.3" pytest = "^6.2.4" pytest-cov = "^2.12.1" redisgraph = "^2.4.0" +pathos>=0.2.8 [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index 33077eb..f383277 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -39,6 +39,7 @@ def process_entities(entities): # Add binary data to list and update all counts entity.query_buffer.redis_token_count += len(entity.binary_entities) entity.query_buffer.buffer_size += added_size + entity.query_buffer.wait_pool() ################################################################################ diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index e3b2934..e9db429 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -1,3 +1,10 @@ +from pathos.pools import ThreadPool as Pool + +def run(client, graphname, args): + result = client.execute_command("GRAPH.BULK", graphname, *args) + stats = result.split(', '.encode()) + return stats + class QueryBuffer: def __init__(self, graphname, client, config): self.nodes = None @@ -30,7 +37,9 @@ def __init__(self, graphname, client, config): self.nodes_created = 0 # Total number of nodes created self.relations_created = 0 # Total number of relations created - # TODO consider using a queue to send commands asynchronously + self.pool = Pool(nodes=1) + self.tasks = [] + def send_buffer(self): """Send all pending inserts to Redis""" # Do nothing if we have no entities @@ -43,10 +52,8 @@ def send_buffer(self): args.insert(0, "BEGIN") self.initial_query = False - result = self.client.execute_command("GRAPH.BULK", self.graphname, *args) - stats = result.split(', '.encode()) - self.nodes_created += int(stats[0].split(' '.encode())[0]) - self.relations_created += int(stats[1].split(' '.encode())[0]) + task = self.pool.apipe(run, self.client, self.graphname, args) + self.tasks.append(task) self.clear_buffer() @@ -59,6 +66,12 @@ def clear_buffer(self): self.buffer_size = 0 self.node_count = 0 self.relation_count = 0 + + def wait_pool(self): + for task in self.tasks: + stats = task.get() + self.nodes_created += int(stats[0].split(' '.encode())[0]) + self.relations_created += int(stats[1].split(' '.encode())[0]) def report_completion(self, runtime): print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" From 63a3690d30168b2d0cc9915e977b369901a35718 Mon Sep 17 00:00:00 2001 From: Avi Avni Date: Sun, 12 Sep 2021 13:44:31 +0300 Subject: [PATCH 2/8] fix build --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 321954e..a87ea79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ vulture = "^2.3" pytest = "^6.2.4" pytest-cov = "^2.12.1" redisgraph = "^2.4.0" -pathos>=0.2.8 +pathos = "^0.2.8" [build-system] requires = ["poetry-core>=1.0.0"] From f14820224e127579aaa7a70714215658fbe94b7a Mon Sep 17 00:00:00 2001 From: Avi Avni Date: Sun, 12 Sep 2021 14:10:24 +0300 Subject: [PATCH 3/8] fix tests --- redisgraph_bulk_loader/bulk_insert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index f383277..f561fa9 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -39,7 +39,6 @@ def process_entities(entities): # Add binary data to list and update all counts entity.query_buffer.redis_token_count += len(entity.binary_entities) entity.query_buffer.buffer_size += added_size - entity.query_buffer.wait_pool() ################################################################################ @@ -146,6 +145,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, ssl_keyfile # Send all remaining tokens to Redis query_buf.send_buffer() + query_buf.wait_pool() end_time = timer() query_buf.report_completion(end_time - start_time) From 959df607e43a9713eec7bb8c7069afed78df961e Mon Sep 17 00:00:00 2001 From: Avi Avni Date: Sun, 12 Sep 2021 15:50:40 +0300 Subject: [PATCH 4/8] fix perf --- redisgraph_bulk_loader/label.py | 2 +- redisgraph_bulk_loader/relation_type.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redisgraph_bulk_loader/label.py b/redisgraph_bulk_loader/label.py index 0ba4876..8b43d9f 100644 --- a/redisgraph_bulk_loader/label.py +++ b/redisgraph_bulk_loader/label.py @@ -56,7 +56,7 @@ def update_node_dictionary(self, identifier): def process_entities(self): entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: + with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str, update_min_steps=100) as reader: for row in reader: self.validate_row(row) diff --git a/redisgraph_bulk_loader/relation_type.py b/redisgraph_bulk_loader/relation_type.py index e917e46..72d0753 100644 --- a/redisgraph_bulk_loader/relation_type.py +++ b/redisgraph_bulk_loader/relation_type.py @@ -47,7 +47,7 @@ def post_process_header_with_schema(self, header): def process_entities(self): entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: + with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str, update_min_steps=100) as reader: for row in reader: self.validate_row(row) try: From a44ddc95b5f6aea17ada96f318488e9211481106 Mon Sep 17 00:00:00 2001 From: Avi Avni Date: Mon, 13 Sep 2021 13:41:58 +0300 Subject: [PATCH 5/8] cap async tasks --- redisgraph_bulk_loader/query_buffer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index e9db429..b77d27d 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -54,6 +54,8 @@ def send_buffer(self): task = self.pool.apipe(run, self.client, self.graphname, args) self.tasks.append(task) + if len(self.tasks) >= 5: + self.wait_pool() self.clear_buffer() @@ -72,6 +74,7 @@ def wait_pool(self): stats = task.get() self.nodes_created += int(stats[0].split(' '.encode())[0]) self.relations_created += int(stats[1].split(' '.encode())[0]) + self.tasks.clear() def report_completion(self, runtime): print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" From c47e0a674c01aad913c13269baaea23d2237448c Mon Sep 17 00:00:00 2001 From: Avi Avni Date: Tue, 14 Sep 2021 09:58:22 +0300 Subject: [PATCH 6/8] pull 1 task at time --- redisgraph_bulk_loader/query_buffer.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index b77d27d..0d58454 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -53,9 +53,7 @@ def send_buffer(self): self.initial_query = False task = self.pool.apipe(run, self.client, self.graphname, args) - self.tasks.append(task) - if len(self.tasks) >= 5: - self.wait_pool() + self.add_task(task) self.clear_buffer() @@ -69,13 +67,24 @@ def clear_buffer(self): self.node_count = 0 self.relation_count = 0 + def add_task(self, task): + self.tasks.append(task) + if len(self.tasks) == 5: + task = self.tasks[0] + stats = task.get() + self.update_stats(stats) + self.tasks.pop(0) + def wait_pool(self): for task in self.tasks: stats = task.get() - self.nodes_created += int(stats[0].split(' '.encode())[0]) - self.relations_created += int(stats[1].split(' '.encode())[0]) + self.update_stats(stats) self.tasks.clear() + def update_stats(self, stats): + self.nodes_created += int(stats[0].split(' '.encode())[0]) + self.relations_created += int(stats[1].split(' '.encode())[0]) + def report_completion(self, runtime): print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" % (self.graphname, self.nodes_created, self.relations_created, runtime)) From e1f4c7310c13391074e83b1d912c218aa3028444 Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Tue, 14 Sep 2021 10:40:29 +0300 Subject: [PATCH 7/8] Update query_buffer.py --- redisgraph_bulk_loader/query_buffer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index 0d58454..39722a4 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -70,10 +70,9 @@ def clear_buffer(self): def add_task(self, task): self.tasks.append(task) if len(self.tasks) == 5: - task = self.tasks[0] + task = self.tasks.pop(0) stats = task.get() - self.update_stats(stats) - self.tasks.pop(0) + self.update_stats(stats) def wait_pool(self): for task in self.tasks: From 53c90100a05e585fa7d47976bf28bae9639a8cbe Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Tue, 14 Sep 2021 10:41:19 +0300 Subject: [PATCH 8/8] Update query_buffer.py --- redisgraph_bulk_loader/query_buffer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index 39722a4..cad974f 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -72,7 +72,7 @@ def add_task(self, task): if len(self.tasks) == 5: task = self.tasks.pop(0) stats = task.get() - self.update_stats(stats) + self.update_stats(stats) def wait_pool(self): for task in self.tasks: