Skip to content

Commit d33d24b

Browse files
authored
Merge pull request pandas-dev#520 from dimosped/MDP-3095-fix-list-symbols-new
Fix for list_symbols reaching memory limit of MongoDB
2 parents 268b799 + 4c67132 commit d33d24b

File tree

4 files changed

+64
-21
lines changed

4 files changed

+64
-21
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### 1.62
44
* Bugfix: #517 VersionStore: append does not duplicate data in certain corner cases
5+
* Bugfix: #519 VersionStore: list_symbols speed improvement and fix for memory limit exceed
56

67
### 1.61 (2018-3-2)
78
* Feature: #288 Mapping reads and writes over chunks in chunkstore

arctic/store/version_store.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs):
133133
query['symbol'] = {'$regex': regex}
134134
if kwargs:
135135
for k, v in six.iteritems(kwargs):
136+
# TODO: this doesn't work as expected as it ignores the versions with metadata.deleted set
137+
# as a result it will return symbols with matching metadata which have been deleted
138+
# Maybe better add a match step in the pipeline instead of making it part of the query
136139
query['metadata.' + k] = v
137140
if snapshot is not None:
138141
try:
@@ -148,22 +151,32 @@ def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs):
148151
# Match based on user criteria first
149152
pipeline.append({'$match': query})
150153
pipeline.extend([
151-
# Id is by insert time which matches version order
152-
{'$sort': bson.SON([('symbol', pymongo.DESCENDING), ('version', pymongo.DESCENDING)])},
153-
# Group by 'symbol'
154-
{'$group': {'_id': '$symbol',
155-
'deleted': {'$first': '$metadata.deleted'},
156-
},
157-
},
158-
# Don't include symbols which are part of some snapshot, but really deleted...
159-
{'$match': {'deleted': {'$ne': True}}},
160-
{'$project': {'_id': 0,
161-
'symbol': '$_id',
162-
}
163-
}])
164-
165-
results = self._versions.aggregate(pipeline)
166-
return sorted([x['symbol'] for x in results])
154+
# version_custom value is: 2*version + (0 if deleted else 1)
155+
# This is used to optimize aggregation query:
156+
# - avoid sorting
157+
# - be able to rely on the latest version (max) for the deleted status
158+
#
159+
# Be aware of that if you don't use custom sort or if use a sort before $group which utilizes
160+
# exactly an existing index, the $group will do best effort to utilize this index:
161+
# - https://jira.mongodb.org/browse/SERVER-4507
162+
{'$group': {
163+
'_id': '$symbol',
164+
'version_custom': {
165+
'$max': {
166+
'$add': [
167+
{'$multiply': ['$version', 2]},
168+
{'$cond': [{'$eq': ['$metadata.deleted', True]}, 1, 0]}
169+
]
170+
}
171+
},
172+
}},
173+
# Don't include symbols which are part of some snapshot, but really deleted...
174+
{'$match': {'version_custom': {'$mod': [2, 0]}}}
175+
])
176+
177+
# We may hit the group memory limit (100MB), so use allowDiskUse to circumvent this
178+
# - https://docs.mongodb.com/manual/reference/operator/aggregation/group/#group-memory-limit
179+
return sorted([x['_id'] for x in self._versions.aggregate(pipeline, allowDiskUse=True)])
167180

168181
@mongo_retry
169182
def has_symbol(self, symbol, as_of=None):

tests/integration/store/test_version_store.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,27 @@ def test_list_symbols_newer_version_with_lower_id(library):
992992
assert symbol not in library.list_symbols()
993993

994994

995+
def test_list_symbols_write_snapshot_write_delete(library):
996+
library.write('asdf', {'foo': 'bar'})
997+
symbol = 'sym_a'
998+
library.write(symbol, {'foo2': 'bar2'}, prune_previous_version=False)
999+
library.snapshot('s1')
1000+
library.write(symbol, {'foo3': 'bar2'}, prune_previous_version=False)
1001+
library.delete(symbol)
1002+
# at this point we have one version retained from 's1' and
1003+
# one more version added with delete (version with foo3 is pruned)
1004+
# The list_symbols should return only 'asdf'
1005+
assert library.list_symbols() == ['asdf']
1006+
1007+
1008+
def test_list_symbols_delete_write(library):
1009+
symbol = 'sym_a'
1010+
library.write(symbol, {'foo': 'bar2'}, prune_previous_version=False)
1011+
library.delete(symbol)
1012+
library.write(symbol, {'foo2': 'bar2'}, prune_previous_version=False)
1013+
assert library.list_symbols() == [symbol]
1014+
1015+
9951016
def test_date_range_large(library):
9961017
index = [dt(2017,1,1)]*20000 + [dt(2017,1,2)]*20000
9971018
data = np.random.random((40000, 10))

tests/unit/store/test_version_store.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,20 @@ def test_list_symbols_default_pipeline():
268268
VersionStore.list_symbols(vs)
269269

270270
pipeline = [
271-
{'$sort': bson.SON([('symbol', pymongo.DESCENDING), ('version', pymongo.DESCENDING)])},
272-
{'$group': {'_id': '$symbol', 'deleted': {'$first': '$metadata.deleted'}}},
273-
{'$match': {'deleted': {'$ne': True}}},
274-
{'$project': {'_id': 0, 'symbol': '$_id'}}
271+
{'$group': {
272+
'_id': '$symbol',
273+
'version_custom': {
274+
'$max': {
275+
'$add': [
276+
{'$multiply': ['$version', 2]},
277+
{'$cond': [{'$eq': ['$metadata.deleted', True]}, 1, 0]}
278+
]
279+
}
280+
},
281+
}},
282+
{'$match': {'version_custom': {'$mod': [2, 0]}}}
275283
]
276-
versions.aggregate.assert_called_once_with(pipeline)
284+
versions.aggregate.assert_called_once_with(pipeline, allowDiskUse=True)
277285

278286

279287
def test_snapshot_duplicate_raises_exception():

0 commit comments

Comments
 (0)