-
Notifications
You must be signed in to change notification settings - Fork 0
Parallel cypher import #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tonijurjevic96
wants to merge
8
commits into
main
Choose a base branch
from
parallel_cypher_import
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
1a7b69d
Added basic example showcasing read and write queries and how to inte…
tonijurjevic96 9b5fdf5
Changed README to be correct format for PR
tonijurjevic96 11840cd
Fixed typo in README
tonijurjevic96 7fe49dc
Added example for parallel import of Cypher files using Python multip…
tonijurjevic96 ffb1106
Added parallel import using GQLAlchemy
tonijurjevic96 2456a58
Updated README
tonijurjevic96 983af72
Added changing into in memmory analytical, dropping graph and creatio…
tonijurjevic96 a9bb06e
Adjusted structure to main branch
tonijurjevic96 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Binary file not shown.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
|
|
||
| # Multiprocess import with Memgraph Example | ||
|
|
||
| This example demonstrates how to split your Cypher file into nodes and relationships and how to use multiple processes to load data efficiently. | ||
|
|
||
|
|
||
| ## 🧠 What This Example Does | ||
|
|
||
| The script performs the following actions: | ||
|
|
||
| 1. **Run cypher_file_splitter_script.py** - helper script to split pokec dataset into nodes and relationships and saving into sperate folder. | ||
| 2. **Run multiprocess_import_test.py** | ||
| - you should create proper indices before hand in this case it is: | ||
| -`CREATE INDEX ON :User;` | ||
| -`CREATE INDEX ON :User(id);` | ||
| - this script first loads nodes than relationships and uses 8 processes for parallel import | ||
|
|
||
|
|
||
| ## 🚀 How to Run Memgraph with Docker | ||
|
|
||
| To run Memgraph Community using Docker: | ||
|
|
||
| ```bash | ||
| docker run -it --rm -p 7687:7687 memgraph/memgraph:3.2 | ||
| ``` | ||
|
|
||
|
|
||
| ## 🛠 Requirements | ||
|
|
||
| Install dependencies with: | ||
|
|
||
| ```bash | ||
| pip install -r requirements.txt | ||
| ``` | ||
|
|
||
| Your `requirements.txt` should include: | ||
|
|
||
| ``` | ||
| gqlalchemy | ||
| ``` | ||
|
|
||
| ## 🧪 How to Run the Script | ||
|
|
||
| Once Memgraph is running: | ||
|
|
||
| ```bash | ||
| python3 cypher_file_splitter_script.py | ||
|
|
||
| python3 multiprocess_import_test.py | ||
| ``` | ||
|
|
||
|
|
||
| ## 🔖 Version Compatibility | ||
|
|
||
| This example was built and tested with: | ||
|
|
||
| - **Memgraph v3.2** | ||
|
|
||
| If you run into any issues or have questions, feel free to reach out on the [Memgraph Discord server](https://discord.gg/memgraph). We're happy to help! | ||
|
|
||
|
|
||
| ## 🏢 Enterprise or Community? | ||
|
|
||
| **Community Edition** | ||
59 changes: 59 additions & 0 deletions
59
import/cypher/parralel_import_with_mgconsole/cypher_file_splitter_script.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| import os | ||
|
|
||
| # Function to split the Cypher file into node and relationship queries | ||
| def split_cypher_file(input_file, output_directory): | ||
| # Read the content of the large Cypher file | ||
| with open(input_file, "r") as f: | ||
| content = f.read() | ||
|
|
||
| # Split the file content by ';' (Cypher statements are terminated with a semicolon) | ||
| queries = content.split(";") | ||
|
|
||
| # Initialize lists to hold node and relationship queries | ||
| node_queries = [] | ||
| relationship_queries = [] | ||
|
|
||
| # Process the queries | ||
| for query in queries: | ||
| query = query.strip() # Remove leading/trailing whitespace | ||
| if query.startswith("CREATE (:"): # Node creation queries | ||
| node_queries.append(query) | ||
| elif query.startswith("MATCH"): # Relationship creation queries | ||
| relationship_queries.append(query) | ||
|
|
||
| # Create output directory if it doesn't exist | ||
| if not os.path.exists(output_directory): | ||
| os.makedirs(output_directory) | ||
|
|
||
| # Split and write the relationship queries into 8 smaller files | ||
| chunk_size_relations = len(relationship_queries) // 8 | ||
| for i in range(8): | ||
| start_index = i * chunk_size_relations | ||
| end_index = (i + 1) * chunk_size_relations if i != 7 else len(relationship_queries) # Ensure the last chunk gets any remainder | ||
| chunk = relationship_queries[start_index:end_index] | ||
|
|
||
| # Write each chunk of relationships to a separate file | ||
| with open(os.path.join(output_directory, f"relationships_part_{i+1}.cypher"), "w") as f: | ||
| for query in chunk: | ||
| f.write(query + ";\n") | ||
|
|
||
| print(f"Relationship queries split into {output_directory} directory.") | ||
|
|
||
| # Split the node queries into 8 smaller files | ||
| chunk_size_nodes = len(node_queries) // 8 | ||
| for i in range(8): | ||
| start_index = i * chunk_size_nodes | ||
| end_index = (i + 1) * chunk_size_nodes if i != 7 else len(node_queries) # Ensure the last chunk gets any remainder | ||
| chunk = node_queries[start_index:end_index] | ||
|
|
||
| # Write each chunk to a separate file | ||
| with open(os.path.join(output_directory, f"nodes_part_{i+1}.cypher"), "w") as f: | ||
| for query in chunk: | ||
| f.write(query + ";\n") | ||
|
|
||
| print(f"Node queries split into {output_directory} directory.") | ||
|
|
||
| if __name__ == "__main__": | ||
| input_file = "pokec_medium_import.cypher" # Your large Cypher file | ||
| output_directory = "split_queries" # Output directory to store the split files | ||
| split_cypher_file(input_file, output_directory) |
98 changes: 98 additions & 0 deletions
98
import/cypher/parralel_import_with_mgconsole/multiprocessing_import_test.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| import multiprocessing | ||
| import os | ||
| import time | ||
| from gqlalchemy import Memgraph | ||
|
|
||
| # Function to run a Cypher file using gqlalchemy | ||
| def run_cypher_file(cypher_file): | ||
| # Establish a connection to Memgraph using gqlalchemy | ||
| memgraph = Memgraph(host='127.0.0.1', port=7687) | ||
|
|
||
| try: | ||
| # Open the Cypher file and read it line by line | ||
| with open(cypher_file, "r") as f: | ||
| for line in f: | ||
| line = line.strip() # Remove any surrounding whitespace or newlines | ||
| if line: # Ensure the line isn't empty | ||
| # Debugging: Print the query to verify its contents | ||
| print(f"Executing query: {line}") | ||
|
|
||
| # Execute each Cypher query using gqlalchemy | ||
| result = list(memgraph.execute_and_fetch(line)) | ||
| print(f"Query executed successfully: {line}") | ||
| # Optional: print the result for debugging | ||
| print(f"Result: {result}") | ||
| else: | ||
| print(f"Skipping empty line in file {cypher_file}") | ||
| except Exception as e: | ||
| print(f"Error executing queries in {cypher_file}: {str(e)}") | ||
|
|
||
| # Run queries in parallel using multiprocessing | ||
| def run_in_parallel(cypher_files): | ||
| processes = [] | ||
| for cypher_file in cypher_files: | ||
| process = multiprocessing.Process(target=run_cypher_file, args=(cypher_file,)) | ||
| processes.append(process) | ||
| process.start() | ||
|
|
||
| # Wait for all processes to finish | ||
| for process in processes: | ||
| process.join() | ||
|
|
||
| if __name__ == "__main__": | ||
| # Record the start time before execution begins | ||
| start_time = time.time() | ||
|
|
||
| # Establish a connection to Memgraph using gqlalchemy | ||
| memgraph = Memgraph(host='127.0.0.1', port=7687) | ||
|
|
||
| # Swapping into in memory analytical mode in order to maximize import performance | ||
| query = """ | ||
| STORAGE MODE IN_MEMORY_ANALYTICAL; | ||
| """ | ||
|
|
||
| # Execute each Cypher query using gqlalchemy | ||
| result = list(memgraph.execute_and_fetch(query)) | ||
|
|
||
| # Clearing previous data in graph | ||
| query = """ | ||
| DROP GRAPH; | ||
| """ | ||
|
|
||
| # Execute each Cypher query using gqlalchemy | ||
| result = list(memgraph.execute_and_fetch(query)) | ||
|
|
||
| # Creating index for label | ||
| query = """ | ||
| CREATE INDEX ON :User; | ||
| """ | ||
|
|
||
| # Execute each Cypher query using gqlalchemy | ||
| result = list(memgraph.execute_and_fetch(query)) | ||
|
|
||
| # Creating index for label+property, important for importing relationships | ||
| query = """ | ||
| CREATE INDEX ON :User(id); | ||
| """ | ||
|
|
||
| # Execute each Cypher query using gqlalchemy | ||
| result = list(memgraph.execute_and_fetch(query)) | ||
|
|
||
| # List of node Cypher files to run in parallel | ||
| node_files = [f"split_queries/nodes_part_{i+1}.cypher" for i in range(8)] | ||
|
|
||
| # Run node creation queries in parallel | ||
| run_in_parallel(node_files) | ||
|
|
||
| # List of relationship Cypher files to run in parallel | ||
| relationship_files = [f"split_queries/relationships_part_{i+1}.cypher" for i in range(8)] | ||
|
|
||
| # Run relationship creation queries in parallel | ||
| run_in_parallel(relationship_files) | ||
|
|
||
| # Record the end time after execution finishes | ||
| end_time = time.time() | ||
|
|
||
| # Calculate the time taken | ||
| execution_time = end_time - start_time | ||
| print(f"Execution time: {execution_time:.2f} seconds") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| GQLAlchemy==1.7.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| # Creating and reading nodes with Memgraph Example | ||
|
|
||
| This example demonstrates how to use read and write queries via the `gqlalchemy` Python client. | ||
|
|
||
|
|
||
| ## 🧠 What This Example Does | ||
|
|
||
| The script performs the following actions: | ||
|
|
||
| 1. **Connects to a running Memgraph instance** using `gqlalchemy`. | ||
| 2. **Creates an `Person` node** — creating a node with multiple properties. | ||
| 3. **Execute query** - showcasing how to run write queries | ||
| 4. **Creates a read query and executes read query** showcasing how to read data from Memgraph. | ||
| 5. **Showcasing manipulation over Node object**: | ||
| - How to access labels. | ||
| - How to access properties. | ||
| - How to access specific property. | ||
|
|
||
|
|
||
| ## 🚀 How to Run Memgraph with Docker | ||
|
|
||
| To run Memgraph Community using Docker: | ||
|
|
||
| ```bash | ||
| docker run -it --rm -p 7687:7687 memgraph/memgraph:3.2 | ||
| ``` | ||
|
|
||
|
|
||
| ## 🛠 Requirements | ||
|
|
||
| Install dependencies with: | ||
|
|
||
| ```bash | ||
| pip install -r requirements.txt | ||
| ``` | ||
|
|
||
| Your `requirements.txt` should include: | ||
|
|
||
| ``` | ||
| gqlalchemy | ||
| ``` | ||
|
|
||
|
|
||
| ## 🧪 How to Run the Script | ||
|
|
||
| Once Memgraph is running: | ||
|
|
||
| ```bash | ||
| python3 creating_and_reading_nodes.py | ||
| ``` | ||
|
|
||
|
|
||
| ## 🔖 Version Compatibility | ||
|
|
||
| This example was built and tested with: | ||
|
|
||
| - **Memgraph v3.2** | ||
|
|
||
| If you run into any issues or have questions, feel free to reach out on the [Memgraph Discord server](https://discord.gg/memgraph). We're happy to help! | ||
|
|
||
|
|
||
| ## 🏢 Enterprise or Community? | ||
|
|
||
| This example works with **Memgraph Community Edition** |
26 changes: 26 additions & 0 deletions
26
python/querying/creating_and_reading_nodes/creating_and_reading_nodes.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| from gqlalchemy import Memgraph, Node | ||
|
|
||
| # Establish a connection to Memgraph | ||
| memgraph = Memgraph(host='127.0.0.1', port=7687) | ||
|
|
||
| # Create a Person node with properties name, surname and age | ||
| query = """ | ||
| CREATE (n:Person {name: "Marko", surname: "Polo", age: 65}) | ||
| """ | ||
| # Execute query | ||
| memgraph.execute(query) | ||
|
|
||
| query2 = """ | ||
| MATCH (n) RETURN n; | ||
| """ | ||
| # Execute the query and fetch the result | ||
| results = list(memgraph.execute_and_fetch(query2)) | ||
|
|
||
| # Print the result | ||
| for result in results: | ||
| # Accessing labes for each node | ||
| print("Labels: ",result["n"]._labels) | ||
| # Accessing properties for each node | ||
| print("Properties: ",result["n"]._properties) | ||
| # Accessing specific property of a node | ||
| print("Specific property: ",result["n"]._properties["age"]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| GQLAlchemy==1.7.0 |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also the script can execute
STORAGE MODE IN_MEMORY_ANALYTICAL,DROP GRAPHand creation of indices