Skip to content

Commit 01b1237

Browse files
committed
Add test for parallel drop and restore for snapshots
1 parent 067b121 commit 01b1237

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

tests/snapshot/test_snapshot.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import random
2+
import shutil
3+
import threading
4+
import unittest
5+
6+
from cr8.run_crate import wait_until
7+
from crate.client import connect
8+
from crate.client.exceptions import ProgrammingError
9+
from crate.qa.minio_svr import MinioServer, _is_up
10+
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, gen_id, assert_busy
11+
12+
13+
class SnapshotOperationTest(NodeProvider, unittest.TestCase):
14+
15+
PATH_DATA = 'data_test_snapshot_ops'
16+
17+
def tearDown(self):
18+
shutil.rmtree(self.PATH_DATA, ignore_errors=True)
19+
20+
def _assert_num_docs(self, conn, expected_count):
21+
c = conn.cursor()
22+
c.execute('SELECT COUNT(*) FROM doc.test')
23+
rowcount = c.fetchone()[0]
24+
self.assertEqual(rowcount, expected_count)
25+
26+
def test_snapshot_restore_and_drop_in_parallel(self):
27+
"""Test to run the drop and restore operation on two different
28+
snapshots in parallel.
29+
30+
The purpose of this test is to validate that the snapshot mechanism
31+
of CrateDB can handle the two operations in parallel. Here, Minio is
32+
used as s3 backend for the repository, but this should work on any
33+
other backend as well.
34+
"""
35+
with MinioServer() as minio:
36+
t = threading.Thread(target=minio.run)
37+
t.daemon = True
38+
t.start()
39+
wait_until(lambda: _is_up('127.0.0.1', 9000))
40+
41+
num_nodes = random.randint(3, 5)
42+
number_of_shards = random.randint(1, 3)
43+
number_of_replicas = random.randint(0, 2)
44+
num_docs = random.randint(1, 100)
45+
46+
cluster_settings = {
47+
'cluster.name': gen_id(),
48+
'path.data': self.PATH_DATA
49+
}
50+
shutil.rmtree(self.PATH_DATA, ignore_errors=True)
51+
cluster = self._new_cluster('latest-nightly', num_nodes, settings=cluster_settings)
52+
cluster.start()
53+
54+
with connect(cluster.node().http_url, error_trace=True) as conn:
55+
c = conn.cursor()
56+
wait_for_active_shards(c)
57+
c.execute('''
58+
create table doc.test(x int) clustered into ? shards with( number_of_replicas =?)
59+
''', (number_of_shards, number_of_replicas,))
60+
61+
insert_data(conn, 'doc', 'test', num_docs)
62+
63+
c.execute('''
64+
CREATE REPOSITORY repo TYPE S3
65+
WITH (access_key = 'minio',
66+
secret_key = 'miniostorage',
67+
bucket='backups',
68+
endpoint = '127.0.0.1:9000',
69+
protocol = 'http')
70+
''')
71+
72+
c.execute('CREATE SNAPSHOT repo.snapshot1 TABLE doc.test WITH (wait_for_completion = true)')
73+
c.execute('CREATE SNAPSHOT repo.snapshot2 TABLE doc.test WITH (wait_for_completion = true)')
74+
c.execute('DROP TABLE doc.test')
75+
# Drop snapshot2 while the restore of snapshot1 is still running
76+
c.execute('RESTORE SNAPSHOT repo.snapshot1 ALL WITH (wait_for_completion = false)')
77+
try:
78+
c.execute('DROP SNAPSHOT repo.snapshot2')
79+
except ProgrammingError:
80+
self.fail("Restore and Drop Snapshot operation should work in parallel")
81+
82+
assert_busy(lambda: self._assert_num_docs(conn, num_docs))
83+
84+
cluster.stop()

0 commit comments

Comments
 (0)