Skip to content

update internal cached mapping upon graph version change #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions redisgraph/graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .util import *
from .query_result import QueryResult

class Graph(object):
"""
Graph, collection of nodes and edges.
Expand All @@ -9,39 +10,65 @@ def __init__(self, name, redis_con):
"""
Create a new graph.
"""
self.name = name
self.name = name # Graph key
self.version = None # Graph version
self.redis_con = redis_con
self.nodes = {}
self.edges = []
self._labels = [] # List of node labels.
self._properties = [] # List of properties.
self._relationshipTypes = [] # List of relation types.

def _clear_schema(self):
self._labels = []
self._properties = []
self._relationshipTypes = []

def _refresh_schema(self):
self._clear_schema()
self._refresh_labels()
self._refresh_relations()
self._refresh_attributes()

def _refresh_labels(self):
lbls = self.labels()

# Unpack data.
self._labels = [None] * len(lbls)
for i, l in enumerate(lbls):
self._labels[i] = l[0]

def _refresh_relations(self):
rels = self.relationshipTypes()

# Unpack data.
self._relationshipTypes = [None] * len(rels)
for i, r in enumerate(rels):
self._relationshipTypes[i] = r[0]

def _refresh_attributes(self):
props = self.propertyKeys()

# Unpack data.
self._properties = [None] * len(props)
for i, p in enumerate(props):
self._properties[i] = p[0]

def get_label(self, idx):
try:
label = self._labels[idx]
except IndexError:
# Refresh graph labels.
lbls = self.labels()
# Unpack data.
self._labels = [None] * len(lbls)
for i, l in enumerate(lbls):
self._labels[i] = l[0]

# Refresh labels.
self._refresh_labels()
label = self._labels[idx]
return label

def get_relation(self, idx):
try:
relationshipType = self._relationshipTypes[idx]
except IndexError:
# Refresh graph relations.
rels = self.relationshipTypes()
# Unpack data.
self._relationshipTypes = [None] * len(rels)
for i, r in enumerate(rels):
self._relationshipTypes[i] = r[0]

# Refresh relationship types.
self._refresh_relations()
relationshipType = self._relationshipTypes[idx]
return relationshipType

Expand All @@ -50,12 +77,7 @@ def get_property(self, idx):
propertie = self._properties[idx]
except IndexError:
# Refresh properties.
props = self.propertyKeys()
# Unpack data.
self._properties = [None] * len(props)
for i, p in enumerate(props):
self._properties[i] = p[0]

self._refresh_attributes()
propertie = self._properties[idx]
return propertie

Expand Down Expand Up @@ -151,6 +173,7 @@ def delete(self):
"""
Deletes graph.
"""
self._clear_schema()
return self.redis_con.execute_command("GRAPH.DELETE", self.name)

def merge(self, pattern):
Expand Down
75 changes: 46 additions & 29 deletions redisgraph/query_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,28 @@
from prettytable import PrettyTable
from redis import ResponseError

LABELS_ADDED = 'Labels added'
NODES_CREATED = 'Nodes created'
NODES_DELETED = 'Nodes deleted'
RELATIONSHIPS_DELETED = 'Relationships deleted'
PROPERTIES_SET = 'Properties set'
RELATIONSHIPS_CREATED = 'Relationships created'
INDICES_CREATED = "Indices created"
INDICES_DELETED = "Indices deleted"
CACHED_EXECUTION = "Cached execution"
INTERNAL_EXECUTION_TIME = 'internal execution time'
GRAPH_VERSION = 'Graph version'

STATS = [LABELS_ADDED, NODES_CREATED, PROPERTIES_SET, RELATIONSHIPS_CREATED,
NODES_DELETED, RELATIONSHIPS_DELETED, INDICES_CREATED, INDICES_DELETED,
CACHED_EXECUTION, INTERNAL_EXECUTION_TIME, GRAPH_VERSION]

class ResultSetColumnTypes(object):
COLUMN_UNKNOWN = 0
COLUMN_SCALAR = 1
COLUMN_NODE = 2 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility.
COLUMN_RELATION = 3 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility.


class ResultSetScalarTypes(object):
VALUE_UNKNOWN = 0
VALUE_NULL = 1
Expand All @@ -25,16 +39,6 @@ class ResultSetScalarTypes(object):
VALUE_PATH = 9

class QueryResult(object):
LABELS_ADDED = 'Labels added'
NODES_CREATED = 'Nodes created'
NODES_DELETED = 'Nodes deleted'
RELATIONSHIPS_DELETED = 'Relationships deleted'
PROPERTIES_SET = 'Properties set'
RELATIONSHIPS_CREATED = 'Relationships created'
INDICES_CREATED = "Indices created"
INDICES_DELETED = "Indices deleted"
CACHED_EXECUTION = "Cached execution"
INTERNAL_EXECUTION_TIME = 'internal execution time'

def __init__(self, graph, response):
self.graph = graph
Expand All @@ -48,9 +52,18 @@ def __init__(self, graph, response):
if len(response) == 1:
self.parse_statistics(response[0])
else:
self.parse_results(response)
# start by parsing statistics, see if the reported graph version
# matches the one we have
self.parse_statistics(response[-1]) # Last element.

if(graph.version != self.graph_version):
# graph version miss-match, this is an indication that
# the graph schema was modified, sync it.
graph.version = self.graph_version
graph._refresh_schema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that refresh schema should return the timestamp retrieved from the last procedure reply. The schema can be changed between two procedure calls but you will update only according to the latest procedure reply.

Suggested change
if(graph.version != self.graph_version):
# graph version miss-match, this is an indication that
# the graph schema was modified, sync it.
graph.version = self.graph_version
graph._refresh_schema()
graph_version = self.graph_version
while(graph.version != graph_version):
# graph version miss-match, this is an indication that
# the graph schema was modified, sync it.
graph.version = graph_version
graph_version = graph._refresh_schema()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taking your comment to the extreme I think that the mapping used in the original compact reply can become obsolete by the time the first procedure call is made, although the client cache might be in sync with the server the mapping between the current result-set and the cache might be complete incorrect, in which case I suggest reissuing the query


self.parse_results(response)

def parse_results(self, raw_result_set):
self.header = self.parse_header(raw_result_set)

Expand All @@ -63,10 +76,12 @@ def parse_results(self, raw_result_set):
def parse_statistics(self, raw_statistics):
self.statistics = {}

stats = [self.LABELS_ADDED, self.NODES_CREATED, self.PROPERTIES_SET, self.RELATIONSHIPS_CREATED,
self.NODES_DELETED, self.RELATIONSHIPS_DELETED, self.INDICES_CREATED, self.INDICES_DELETED,
self.CACHED_EXECUTION, self.INTERNAL_EXECUTION_TIME]
for s in stats:
# decode statistics
for idx, stat in enumerate(raw_statistics):
if isinstance(stat, bytes):
raw_statistics[idx] = stat.decode()

for s in STATS:
v = self._get_value(s, raw_statistics)
if v is not None:
self.statistics[s] = v
Expand Down Expand Up @@ -223,53 +238,55 @@ def is_empty(self):
@staticmethod
def _get_value(prop, statistics):
for stat in statistics:
if isinstance(stat, bytes):
stat = stat.decode()
if prop in stat:
return float(stat.split(': ')[1].split(' ')[0])


return None

def _get_stat(self, stat):
return self.statistics[stat] if stat in self.statistics else 0

@property
def labels_added(self):
return self._get_stat(self.LABELS_ADDED)
return self._get_stat(LABELS_ADDED)

@property
def nodes_created(self):
return self._get_stat(self.NODES_CREATED)
return self._get_stat(NODES_CREATED)

@property
def nodes_deleted(self):
return self._get_stat(self.NODES_DELETED)
return self._get_stat(NODES_DELETED)

@property
def properties_set(self):
return self._get_stat(self.PROPERTIES_SET)
return self._get_stat(PROPERTIES_SET)

@property
def relationships_created(self):
return self._get_stat(self.RELATIONSHIPS_CREATED)
return self._get_stat(RELATIONSHIPS_CREATED)

@property
def relationships_deleted(self):
return self._get_stat(self.RELATIONSHIPS_DELETED)
return self._get_stat(RELATIONSHIPS_DELETED)

@property
def indices_created(self):
return self._get_stat(self.INDICES_CREATED)
return self._get_stat(INDICES_CREATED)

@property
def indices_deleted(self):
return self._get_stat(self.INDICES_DELETED)
return self._get_stat(INDICES_DELETED)

@property
def cached_execution(self):
return self._get_stat(self.CACHED_EXECUTION) == 1
return self._get_stat(CACHED_EXECUTION) == 1

@property
def run_time_ms(self):
return self._get_stat(self.INTERNAL_EXECUTION_TIME)
return self._get_stat(INTERNAL_EXECUTION_TIME)

@property
def graph_version(self):
return self._get_stat(GRAPH_VERSION)

70 changes: 70 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,5 +228,75 @@ def test_query_timeout(self):
# Expecting an error.
pass

def test_cache_sync(self):
# This test verifies that client internal graph schema cache stays
# in sync with the graph schema
#
# Client B will try to get Client A out of sync by:
# 1. deleting the graph
# 2. reconstructing the graph in a different order, this will casuse
# a differance in the current mapping between string IDs and the
# mapping Client A is aware of
#
# Client A should pick up on the changes by comparing graph versions
# and resyncing its cache.

A = Graph('cache-sync', self.r)
B = Graph('cache-sync', self.r)

# Build order:
# 1. introduce label 'L' and 'K'
# 2. introduce attribute 'x' and 'q'
# 3. introduce relationship-type 'R' and 'S'

A.query("CREATE (:L)")
B.query("CREATE (:K)")
A.query("MATCH (n) SET n.x = 1")
B.query("MATCH (n) SET n.q = 1")
A.query("MATCH (n) CREATE (n)-[:R]->()")
B.query("MATCH (n) CREATE (n)-[:S]->()")

# Cause client A to populate its cache
A.query("MATCH (n)-[e]->() RETURN n, e")

assert(len(A._labels) == 2)
assert(len(A._properties) == 2)
assert(len(A._relationshipTypes) == 2)
assert(A._labels[0] == 'L')
assert(A._labels[1] == 'K')
assert(A._properties[0] == 'x')
assert(A._properties[1] == 'q')
assert(A._relationshipTypes[0] == 'R')
assert(A._relationshipTypes[1] == 'S')

# Have client B reconstruct the graph in a different order.
B.delete()

# Build order:
# 1. introduce relationship-type 'R'
# 2. introduce label 'L'
# 3. introduce attribute 'x'
Comment on lines +276 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see logic below

B.query("CREATE ()-[:S]->()")
B.query("CREATE ()-[:R]->()")
B.query("CREATE (:K)")
B.query("CREATE (:L)")
B.query("MATCH (n) SET n.q = 1")
B.query("MATCH (n) SET n.x = 1")

# A's internal cached mapping is now out of sync
# issue a query and make sure A's cache is synced.
A.query("MATCH (n)-[e]->() RETURN n, e")

assert(len(A._labels) == 2)
assert(len(A._properties) == 2)
assert(len(A._relationshipTypes) == 2)
assert(A._labels[0] == 'K')
assert(A._labels[1] == 'L')
assert(A._properties[0] == 'q')
assert(A._properties[1] == 'x')
assert(A._relationshipTypes[0] == 'S')
assert(A._relationshipTypes[1] == 'R')

if __name__ == '__main__':
unittest.main()