diff --git a/tests/test_bypass_serialization.py b/tests/test_bypass_serialization.py new file mode 100644 index 000000000..5f73e1d2e --- /dev/null +++ b/tests/test_bypass_serialization.py @@ -0,0 +1,57 @@ +import pytest +import datajoint as dj +import numpy as np +from . import PREFIX +from numpy.testing import assert_array_equal + +test_blob = np.array([1, 2, 3]) + + +class Input(dj.Lookup): + definition = """ + id: int + --- + data: blob + """ + contents = [(0, test_blob)] + + +class Output(dj.Manual): + definition = """ + id: int + --- + data: blob + """ + + +@pytest.fixture +def schema_in(connection_test): + schema = dj.Schema( + PREFIX + "_test_bypass_serialization_in", + context=dict(Input=Input), + connection=connection_test, + ) + schema(Input) + yield schema + schema.drop() + + +@pytest.fixture +def schema_out(connection_test): + schema = dj.Schema( + PREFIX + "_test_blob_bypass_serialization_out", + context=dict(Output=Output), + connection=connection_test, + ) + schema(Output) + yield schema + schema.drop() + + +def test_bypass_serialization(schema_in, schema_out): + dj.blob.bypass_serialization = True + contents = Input.fetch(as_dict=True) + assert isinstance(contents[0]["data"], bytes) + Output.insert(contents) + dj.blob.bypass_serialization = False + assert_array_equal(Input.fetch1("data"), Output.fetch1("data")) diff --git a/tests/test_cascading_delete.py b/tests/test_cascading_delete.py new file mode 100644 index 000000000..8646edeca --- /dev/null +++ b/tests/test_cascading_delete.py @@ -0,0 +1,119 @@ +import pytest +import datajoint as dj +from .schema_simple import A, B, D, E, L, Website, Profile +from .schema import ComplexChild, ComplexParent + + +@pytest.fixture +def schema_simp_pop(schema_simp): + A().insert(A.contents, skip_duplicates=True) + L().insert(L.contents, skip_duplicates=True) + B().populate() + D().populate() + E().populate() + yield schema_simp + + +class TestDelete: + def test_delete_tree(self, schema_simp_pop): + assert not dj.config["safemode"], "safemode must be off for testing" + assert ( + L() and A() and B() and B.C() and D() and E() and E.F(), + "schema is not populated", + ) + A().delete() + assert not A() or B() or B.C() or D() or E() or E.F(), "incomplete delete" + + def test_stepwise_delete(self, schema_simp_pop): + assert not dj.config["safemode"], "safemode must be off for testing" + assert L() and A() and B() and B.C(), "schema population failed" + B.C().delete(force=True) + assert not B.C(), "failed to delete child tables" + B().delete() + assert ( + not B() + ), "failed to delete from the parent table following child table deletion" + + def test_delete_tree_restricted(self, schema_simp_pop): + assert not dj.config["safemode"], "safemode must be off for testing" + assert ( + L() and A() and B() and B.C() and D() and E() and E.F() + ), "schema is not populated" + cond = "cond_in_a" + rel = A() & cond + rest = dict( + A=len(A()) - len(rel), + B=len(B() - rel), + C=len(B.C() - rel), + D=len(D() - rel), + E=len(E() - rel), + F=len(E.F() - rel), + ) + rel.delete() + assert not ( + rel or B() & rel or B.C() & rel or D() & rel or E() & rel or (E.F() & rel) + ), "incomplete delete" + assert len(A()) == rest["A"], "invalid delete restriction" + assert len(B()) == rest["B"], "invalid delete restriction" + assert len(B.C()) == rest["C"], "invalid delete restriction" + assert len(D()) == rest["D"], "invalid delete restriction" + assert len(E()) == rest["E"], "invalid delete restriction" + assert len(E.F()) == rest["F"], "invalid delete restriction" + + def test_delete_lookup(self, schema_simp_pop): + assert not dj.config["safemode"], "safemode must be off for testing" + assert ( + bool(L() and A() and B() and B.C() and D() and E() and E.F()), + "schema is not populated", + ) + L().delete() + assert not bool(L() or D() or E() or E.F()), "incomplete delete" + A().delete() # delete all is necessary because delete L deletes from subtables. + + def test_delete_lookup_restricted(self, schema_simp_pop): + assert not dj.config["safemode"], "safemode must be off for testing" + assert ( + L() and A() and B() and B.C() and D() and E() and E.F(), + "schema is not populated", + ) + rel = L() & "cond_in_l" + original_count = len(L()) + deleted_count = len(rel) + rel.delete() + assert len(L()) == original_count - deleted_count + + def test_delete_complex_keys(self, schema_any): + """ + https://github.com/datajoint/datajoint-python/issues/883 + https://github.com/datajoint/datajoint-python/issues/886 + """ + assert not dj.config["safemode"], "safemode must be off for testing" + parent_key_count = 8 + child_key_count = 1 + restriction = dict( + {"parent_id_{}".format(i + 1): i for i in range(parent_key_count)}, + **{ + "child_id_{}".format(i + 1): (i + parent_key_count) + for i in range(child_key_count) + } + ) + assert len(ComplexParent & restriction) == 1, "Parent record missing" + assert len(ComplexChild & restriction) == 1, "Child record missing" + (ComplexParent & restriction).delete() + assert len(ComplexParent & restriction) == 0, "Parent record was not deleted" + assert len(ComplexChild & restriction) == 0, "Child record was not deleted" + + def test_delete_master(self, schema_simp_pop): + Profile().populate_random() + Profile().delete() + + def test_delete_parts(self, schema_simp_pop): + """test issue #151""" + with pytest.raises(dj.DataJointError): + Profile().populate_random() + Website().delete() + + def test_drop_part(self, schema_simp_pop): + """test issue #374""" + with pytest.raises(dj.DataJointError): + Website().drop()