Skip to content

Commit 6735c04

Browse files
authored
Forward pointers implementation (pandas-dev#651)
* initial commit for fw pointers implementation * Finalized implementation of forward pointers * incorporated PR comments for fw pointers implementation * fw pointers: chunks_ids to chunk_ids * fix for forward pointers implementation to be robust over mongo_retry with only subset of segments written * minor fix for initialization of ARCTIC_FORWARD_POINTERS * added implementation of cleanup/pruning supporting forward pointers and being also backwards compatible * updated FW pointers implementation to allow any transition from/to ENABLED/DISABLED/HYBRID per write/append/read * added comments explaining forward pointers variables and methods * added check for number of gathered segments when updating fw pointers, and raise operation errors upon failure * forward pointers implementation with SHAs instead of IDs * completed fully functional implementation of SHA based forward pointers * pruning compatible with forward pointers enabled/disabled/hybrid * don't strip() twice in version str * remove unnecessary import * set back the pruning timeout to 120 * set back the pruning timeout to 120 * added numerical value of the arctic version used to crate a version, in the version metadata * fixed index check integration test * fixed multiple bugs with concat_and_write, corrected the pruning and fixed multiple tests after enabling for all VersionStore tests to run in all three modes for forward pointers * fixed all integration tests for versionstore and fixed bug with pruning with FW pointers too early, as the version is not inserted yet when pruning happens * fixed the last two broken integration tests * fix the cleanup logic for forward pointers to retain the chunks created in the past than 24h. Added fw pointers in multiple other tests to verify functionality * moved back in original order the publsh_changes and prune calls * fixed python 3 compatiblility issue with pruning * updated changelog * fixed Binary(b'aaa') != b'aaa' in Python 3 * fixed last remaining failed tests for python 3 related to bson.binary.Binray comparison with binary
1 parent 883455c commit 6735c04

15 files changed

+2367
-1613
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
### 1.73
44
* Bugfix: #658 Write/append errors for Panel objects from older pandas versions
55
* Feature: #653 Add version meta-info in arctic module
6+
* Feature: #663 Include arctic numerical version in the metadata of the version document
7+
* Feature: #650 Implemented forward pointers for chunks in VersionStore (modes: enabled/disabled/hybrid)
68

79
### 1.72 (2018-11-06)
810
* Feature: #577 Added implementation for incremental serializer for numpy records

arctic/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,24 @@
22

33
from .arctic import Arctic, register_library_type
44
from .arctic import VERSION_STORE, TICK_STORE, CHUNK_STORE
5-
from .store.version_store import register_versioned_storage
5+
from .store.version_store import register_versioned_storage, register_version
66
from .store._pandas_ndarray_store import PandasDataFrameStore, PandasSeriesStore, PandasPanelStore
77
from .store._ndarray_store import NdarrayStore
88

99
try:
1010
from pkg_resources import get_distribution
1111
str_version = get_distribution(__name__).version.strip()
1212
int_parts = tuple(int(x) for x in str_version.split('.'))
13+
num_version = sum([1000 ** i * v for i, v in enumerate(reversed(int_parts))])
14+
register_version(str_version, num_version)
1315
except Exception:
1416
__version__ = None
1517
__version_parts__ = tuple()
18+
__version_numerical__ = 0
1619
else:
1720
__version__ = str_version
1821
__version_parts__ = int_parts
22+
__version_numerical__ = num_version
1923

2024

2125
register_versioned_storage(PandasDataFrameStore)

arctic/_util.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
2+
import os
23

34
import numpy as np
45
import pymongo
6+
from enum import Enum
57
from pandas import DataFrame
68
from pandas.util.testing import assert_frame_equal
79

@@ -15,6 +17,33 @@
1517
_use_new_count_api = None
1618

1719

20+
# This enum provides all the available modes of operation for Forward pointers
21+
class FwPointersCfg(Enum):
22+
ENABLED = 0 # use only forward pointers, don't update segment parent references
23+
DISABLED = 1 # operate in legacy mode, update segment parent references, don't add forward pointers
24+
HYBRID = 2 # maintain both forward pointers and parent references in segments; for reads prefer fw pointers
25+
26+
27+
# The version document key used to store the ObjectIDs of segments
28+
FW_POINTERS_REFS_KEY = 'SEGMENT_SHAS'
29+
# The version document key for storing the FW pointers configuration used to create this version
30+
FW_POINTERS_CONFIG_KEY = 'FW_POINTERS_CONFIG'
31+
# This variable controls has effect in Hybrid mode, and controls whether forward and regacy pointers are cross-verified
32+
ARCTIC_FORWARD_POINTERS_RECONCILE = bool(os.environ.get('ARCTIC_FORWARD_POINTERS_RECONCILE'))
33+
try:
34+
# Controls the mode of operation for FW pointers, has effect on any new versions created
35+
ARCTIC_FORWARD_POINTERS_CFG = FwPointersCfg[(os.environ.get('ARCTIC_FORWARD_POINTERS_CFG',
36+
FwPointersCfg.DISABLED.name).upper())]
37+
except Exception:
38+
logger.exception("Failed to configure forward pointers with configuration {}".format(
39+
os.environ.get('ARCTIC_FORWARD_POINTERS_CFG')))
40+
ARCTIC_FORWARD_POINTERS_CFG = FwPointersCfg.DISABLED
41+
42+
43+
def get_fwptr_config(version):
44+
return FwPointersCfg[version.get(FW_POINTERS_CONFIG_KEY, FwPointersCfg.DISABLED.name)]
45+
46+
1847
def _detect_new_count_api():
1948
try:
2049
mongo_v = [int(v) for v in pymongo.version.split('.')]

arctic/exceptions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ class DataIntegrityException(ArcticException):
4040
"""
4141
pass
4242

43+
4344
class ArcticSerializationException(ArcticException):
4445
pass
4546

47+
4648
class ConcurrentModificationException(DataIntegrityException):
4749
pass
4850

arctic/store/_ndarray_store.py

Lines changed: 215 additions & 67 deletions
Large diffs are not rendered by default.

arctic/store/_version_store_utils.py

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pandas.compat import pickle_compat
1212
from pymongo.errors import OperationFailure
1313

14-
from arctic._util import mongo_count
14+
from arctic._util import mongo_count, FW_POINTERS_REFS_KEY, FW_POINTERS_CONFIG_KEY, FwPointersCfg, get_fwptr_config
1515

1616

1717
def _split_arrs(array_2d, slices):
@@ -46,15 +46,28 @@ def checksum(symbol, doc):
4646
return Binary(sha.digest())
4747

4848

49-
def cleanup(arctic_lib, symbol, version_ids):
50-
"""
51-
Helper method for cleaning up chunks from a version store
52-
"""
53-
collection = arctic_lib.get_top_level_collection()
49+
def get_symbol_alive_shas(symbol, versions_coll):
50+
return set(Binary(x) for x in versions_coll.distinct(FW_POINTERS_REFS_KEY, {'symbol': symbol}))
51+
52+
53+
def _cleanup_fw_pointers(collection, symbol, version_ids, versions_coll, shas_to_delete, do_clean=True):
54+
shas_to_delete = set(shas_to_delete) if shas_to_delete else set()
55+
56+
if not version_ids or not shas_to_delete:
57+
return shas_to_delete
58+
59+
symbol_alive_shas = get_symbol_alive_shas(symbol, versions_coll)
60+
61+
# This is the set of shas which are not referenced by any FW pointers
62+
shas_safe_to_delete = shas_to_delete - symbol_alive_shas
63+
64+
if do_clean and shas_safe_to_delete:
65+
collection.delete_many({'symbol': symbol, 'sha': {'$in': list(shas_safe_to_delete)}})
66+
67+
return shas_safe_to_delete
68+
5469

55-
# Remove any chunks which contain just the parents, at the outset
56-
# We do this here, because $pullALL will make an empty array: []
57-
# and the index which contains the parents field will fail the unique constraint.
70+
def _cleanup_parent_pointers(collection, symbol, version_ids):
5871
for v in version_ids:
5972
# Remove all documents which only contain the parent
6073
collection.delete_many({'symbol': symbol,
@@ -69,6 +82,56 @@ def cleanup(arctic_lib, symbol, version_ids):
6982
collection.delete_one({'symbol': symbol, 'parent': []})
7083

7184

85+
def _cleanup_mixed(symbol, collection, version_ids, versions_coll):
86+
# Pull the deleted version IDs from the the parents field
87+
collection.update_many({'symbol': symbol, 'parent': {'$in': version_ids}}, {'$pullAll': {'parent': version_ids}})
88+
89+
# All-inclusive set of segments which are pointed by at least one version (SHA fw pointers)
90+
symbol_alive_shas = get_symbol_alive_shas(symbol, versions_coll)
91+
92+
spec = {'symbol': symbol, 'parent': []}
93+
if symbol_alive_shas:
94+
# This query unfortunately, while it hits the index (symbol, sha) to find the documents, in order to filter
95+
# the documents by "parent: []" it fetches at server side, and pollutes the cache of WiredTiger
96+
# TODO: add a new index for segments collection: (symbol, sha, parent)
97+
spec['sha'] = {'$nin': list(symbol_alive_shas)}
98+
collection.delete_many(spec)
99+
100+
101+
def _get_symbol_pointer_cfgs(symbol, versions_coll):
102+
return set(get_fwptr_config(v)
103+
for v in versions_coll.find({'symbol': symbol}, projection={FW_POINTERS_CONFIG_KEY: 1}))
104+
105+
106+
def cleanup(arctic_lib, symbol, version_ids, versions_coll, shas_to_delete=None, pointers_cfgs=None):
107+
"""
108+
Helper method for cleaning up chunks from a version store
109+
"""
110+
pointers_cfgs = set(pointers_cfgs) if pointers_cfgs else set()
111+
collection = arctic_lib.get_top_level_collection()
112+
version_ids = list(version_ids)
113+
114+
# Iterate versions to check if they are created only with fw pointers, parent pointers (old), or mixed
115+
# Keep in mind that the version is not yet inserted.
116+
all_symbol_pointers_cfgs = _get_symbol_pointer_cfgs(symbol, versions_coll)
117+
all_symbol_pointers_cfgs.update(pointers_cfgs)
118+
119+
# All the versions of the symbol have been created with old arctic or with disabled forward pointers.
120+
# Preserves backwards compatibility and regression for old pointers implementation.
121+
if all_symbol_pointers_cfgs == {FwPointersCfg.DISABLED} or not all_symbol_pointers_cfgs:
122+
_cleanup_parent_pointers(collection, symbol, version_ids)
123+
return
124+
125+
# All the versions of the symbol we wish to delete have been created with forward pointers
126+
if FwPointersCfg.DISABLED not in all_symbol_pointers_cfgs:
127+
_cleanup_fw_pointers(collection, symbol, version_ids, versions_coll,
128+
shas_to_delete=shas_to_delete, do_clean=True)
129+
return
130+
131+
# Reaching here means the symbol has versions with mixed forward pointers and legacy/parent pointer configurations
132+
_cleanup_mixed(symbol, collection, version_ids, versions_coll)
133+
134+
72135
def version_base_or_id(version):
73136
return version.get('base_version_id', version['_id'])
74137

0 commit comments

Comments
 (0)