Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions tests/test_bypass_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pytest
import datajoint as dj
import numpy as np
from . import PREFIX
from numpy.testing import assert_array_equal

test_blob = np.array([1, 2, 3])


class Input(dj.Lookup):
definition = """
id: int
---
data: blob
"""
contents = [(0, test_blob)]


class Output(dj.Manual):
definition = """
id: int
---
data: blob
"""


@pytest.fixture
def schema_in(connection_test):
schema = dj.Schema(
PREFIX + "_test_bypass_serialization_in",
context=dict(Input=Input),
connection=connection_test,
)
schema(Input)
yield schema
schema.drop()


@pytest.fixture
def schema_out(connection_test):
schema = dj.Schema(
PREFIX + "_test_blob_bypass_serialization_out",
context=dict(Output=Output),
connection=connection_test,
)
schema(Output)
yield schema
schema.drop()


def test_bypass_serialization(schema_in, schema_out):
dj.blob.bypass_serialization = True
contents = Input.fetch(as_dict=True)
assert isinstance(contents[0]["data"], bytes)
Output.insert(contents)
dj.blob.bypass_serialization = False
assert_array_equal(Input.fetch1("data"), Output.fetch1("data"))
119 changes: 119 additions & 0 deletions tests/test_cascading_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import pytest
import datajoint as dj
from .schema_simple import A, B, D, E, L, Website, Profile
from .schema import ComplexChild, ComplexParent


@pytest.fixture
def schema_simp_pop(schema_simp):
A().insert(A.contents, skip_duplicates=True)
L().insert(L.contents, skip_duplicates=True)
B().populate()
D().populate()
E().populate()
yield schema_simp


class TestDelete:
def test_delete_tree(self, schema_simp_pop):
assert not dj.config["safemode"], "safemode must be off for testing"
assert (
L() and A() and B() and B.C() and D() and E() and E.F(),
"schema is not populated",
)
A().delete()
assert not A() or B() or B.C() or D() or E() or E.F(), "incomplete delete"

def test_stepwise_delete(self, schema_simp_pop):
assert not dj.config["safemode"], "safemode must be off for testing"
assert L() and A() and B() and B.C(), "schema population failed"
B.C().delete(force=True)
assert not B.C(), "failed to delete child tables"
B().delete()
assert (
not B()
), "failed to delete from the parent table following child table deletion"

def test_delete_tree_restricted(self, schema_simp_pop):
assert not dj.config["safemode"], "safemode must be off for testing"
assert (
L() and A() and B() and B.C() and D() and E() and E.F()
), "schema is not populated"
cond = "cond_in_a"
rel = A() & cond
rest = dict(
A=len(A()) - len(rel),
B=len(B() - rel),
C=len(B.C() - rel),
D=len(D() - rel),
E=len(E() - rel),
F=len(E.F() - rel),
)
rel.delete()
assert not (
rel or B() & rel or B.C() & rel or D() & rel or E() & rel or (E.F() & rel)
), "incomplete delete"
assert len(A()) == rest["A"], "invalid delete restriction"
assert len(B()) == rest["B"], "invalid delete restriction"
assert len(B.C()) == rest["C"], "invalid delete restriction"
assert len(D()) == rest["D"], "invalid delete restriction"
assert len(E()) == rest["E"], "invalid delete restriction"
assert len(E.F()) == rest["F"], "invalid delete restriction"

def test_delete_lookup(self, schema_simp_pop):
assert not dj.config["safemode"], "safemode must be off for testing"
assert (
bool(L() and A() and B() and B.C() and D() and E() and E.F()),
"schema is not populated",
)
L().delete()
assert not bool(L() or D() or E() or E.F()), "incomplete delete"
A().delete() # delete all is necessary because delete L deletes from subtables.

def test_delete_lookup_restricted(self, schema_simp_pop):
assert not dj.config["safemode"], "safemode must be off for testing"
assert (
L() and A() and B() and B.C() and D() and E() and E.F(),
"schema is not populated",
)
rel = L() & "cond_in_l"
original_count = len(L())
deleted_count = len(rel)
rel.delete()
assert len(L()) == original_count - deleted_count

def test_delete_complex_keys(self, schema_any):
"""
https://github.com/datajoint/datajoint-python/issues/883
https://github.com/datajoint/datajoint-python/issues/886
"""
assert not dj.config["safemode"], "safemode must be off for testing"
parent_key_count = 8
child_key_count = 1
restriction = dict(
{"parent_id_{}".format(i + 1): i for i in range(parent_key_count)},
**{
"child_id_{}".format(i + 1): (i + parent_key_count)
for i in range(child_key_count)
}
)
assert len(ComplexParent & restriction) == 1, "Parent record missing"
assert len(ComplexChild & restriction) == 1, "Child record missing"
(ComplexParent & restriction).delete()
assert len(ComplexParent & restriction) == 0, "Parent record was not deleted"
assert len(ComplexChild & restriction) == 0, "Child record was not deleted"

def test_delete_master(self, schema_simp_pop):
Profile().populate_random()
Profile().delete()

def test_delete_parts(self, schema_simp_pop):
"""test issue #151"""
with pytest.raises(dj.DataJointError):
Profile().populate_random()
Website().delete()

def test_drop_part(self, schema_simp_pop):
"""test issue #374"""
with pytest.raises(dj.DataJointError):
Website().drop()