Skip to content

Commit c5ccb5a

Browse files
committed
Tests: Add software tests for CrateDB/pandas
1 parent a98534c commit c5ccb5a

File tree

5 files changed

+222
-1
lines changed

5 files changed

+222
-1
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ jobs:
3232
cratedb:
3333
image: crate/crate:nightly
3434
ports:
35+
- 4200:4200
3536
- 5432:5432
3637

3738
env:

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ release = [
7777
"twine<5",
7878
]
7979
test = [
80+
"crate[sqlalchemy]",
81+
"pandas<2.1",
8082
"pytest<8",
8183
"pytest-asyncio<1",
8284
"pytest-cov<5",
@@ -122,7 +124,7 @@ show_missing = true
122124

123125
[tool.ruff]
124126
line-length = 120
125-
127+
extend-ignore = ["PD901"]
126128
select = [
127129
# Bandit
128130
"S",

tests/conftest.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import pytest
2+
3+
4+
def pytest_addoption(parser):
5+
"""
6+
Register custom options to support invocation by cr8.
7+
Use cr8 to invoke a pytest test suite, see `run.sh`.
8+
9+
Example::
10+
11+
pytest -vvv --http-host 127.0.0.1 --http-port 4200 --psql-host 127.0.0.1 --psql-port 5432
12+
13+
https://github.com/mfussenegger/cr8
14+
15+
TODO: Refactor to `cratedb-toolkit` or `pytest-cratedb` in the long run.
16+
"""
17+
parser.addoption("--http-url", action="store", default="localhost:4200")
18+
parser.addoption("--http-host", action="store", default="localhost")
19+
parser.addoption("--http-port", action="store", default="4200")
20+
parser.addoption("--psql-host", action="store", default="localhost")
21+
parser.addoption("--psql-port", action="store", default="5432")
22+
23+
24+
@pytest.fixture
25+
def cratedb_http_url(pytestconfig):
26+
return pytestconfig.getoption("--http-url")
27+
28+
29+
@pytest.fixture
30+
def cratedb_http_host(pytestconfig):
31+
return pytestconfig.getoption("--http-host")
32+
33+
34+
@pytest.fixture
35+
def cratedb_http_port(pytestconfig):
36+
return pytestconfig.getoption("--http-port")
37+
38+
39+
@pytest.fixture
40+
def cratedb_psql_host(pytestconfig):
41+
return pytestconfig.getoption("--psql-host")
42+
43+
44+
@pytest.fixture
45+
def cratedb_psql_port(pytestconfig):
46+
return pytestconfig.getoption("--psql-port")

tests/test_cratedb_pandas_read.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import pandas as pd
2+
import pytest
3+
import sqlalchemy as sa
4+
from pandas._testing import assert_frame_equal
5+
from sqlalchemy.ext.asyncio import create_async_engine
6+
7+
REFERENCE_FRAME = pd.DataFrame.from_records([{"mountain": "Mont Blanc", "height": 4808}])
8+
SQL_SELECT_STATEMENT = "SELECT mountain, height FROM sys.summits ORDER BY height DESC LIMIT 1;"
9+
10+
11+
pytest.skip("Does not work on Python 3.7", allow_module_level=True)
12+
13+
14+
def test_crate_read_sql(cratedb_http_host, cratedb_http_port):
15+
engine = sa.create_engine(
16+
url=f"crate://{cratedb_http_host}:{cratedb_http_port}",
17+
echo=True,
18+
)
19+
conn = engine.connect()
20+
df = pd.read_sql(sql=sa.text(SQL_SELECT_STATEMENT), con=conn)
21+
assert_frame_equal(df, REFERENCE_FRAME)
22+
23+
24+
def test_psycopg_read_sql(cratedb_psql_host, cratedb_psql_port):
25+
engine = sa.create_engine(
26+
url=f"postgresql+psycopg_relaxed://crate@{cratedb_psql_host}:{cratedb_psql_port}",
27+
isolation_level="AUTOCOMMIT",
28+
use_native_hstore=False,
29+
echo=True,
30+
)
31+
conn = engine.connect()
32+
df = pd.read_sql(sql=sa.text(SQL_SELECT_STATEMENT), con=conn)
33+
assert_frame_equal(df, REFERENCE_FRAME)
34+
35+
36+
@pytest.mark.asyncio
37+
async def test_psycopg_async_read_sql(cratedb_psql_host, cratedb_psql_port):
38+
engine = create_async_engine(
39+
url=f"postgresql+psycopg_relaxed://crate@{cratedb_psql_host}:{cratedb_psql_port}",
40+
isolation_level="AUTOCOMMIT",
41+
use_native_hstore=False,
42+
echo=True,
43+
)
44+
45+
async with engine.begin() as conn:
46+
df = await conn.run_sync(read_sql_sync, sa.text(SQL_SELECT_STATEMENT))
47+
assert_frame_equal(df, REFERENCE_FRAME)
48+
49+
50+
@pytest.mark.asyncio
51+
async def test_asyncpg_read_sql(cratedb_psql_host, cratedb_psql_port):
52+
engine = create_async_engine(
53+
url=f"postgresql+asyncpg_relaxed://crate@{cratedb_psql_host}:{cratedb_psql_port}",
54+
isolation_level="AUTOCOMMIT",
55+
echo=True,
56+
)
57+
58+
async with engine.begin() as conn:
59+
df = await conn.run_sync(read_sql_sync, sa.text(SQL_SELECT_STATEMENT))
60+
assert_frame_equal(df, REFERENCE_FRAME)
61+
62+
63+
def read_sql_sync(conn, stmt):
64+
"""
65+
Making pd.read_sql connection the first argument to make it compatible
66+
with conn.run_sync(), see https://stackoverflow.com/a/70861276.
67+
"""
68+
return pd.read_sql(stmt, conn)

tests/test_cratedb_pandas_write.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import pandas as pd
2+
import pytest
3+
import sqlalchemy as sa
4+
from pandas._testing import assert_frame_equal
5+
from sqlalchemy.ext.asyncio import create_async_engine
6+
7+
INPUT_FRAME = pd.DataFrame.from_records([{"id": 1, "name": "foo", "value": 42.42}])
8+
SQL_SELECT_STATEMENT = "SELECT * FROM doc.foo;"
9+
SQL_REFRESH_STATEMENT = "REFRESH TABLE doc.foo;"
10+
11+
12+
pytest.skip("Does not work on Python 3.7", allow_module_level=True)
13+
14+
15+
def test_crate_to_sql(cratedb_http_host, cratedb_http_port):
16+
# Connect to database.
17+
engine = sa.create_engine(
18+
url=f"crate://{cratedb_http_host}:{cratedb_http_port}",
19+
echo=True,
20+
)
21+
con = engine.connect()
22+
23+
# Insert data using pandas.
24+
df = INPUT_FRAME
25+
retval = df.to_sql(name="foo", con=con, if_exists="replace", index=False)
26+
assert retval == -1
27+
28+
# Synchronize table content.
29+
con.execute(sa.text(SQL_REFRESH_STATEMENT))
30+
31+
# Read back and verify data using pandas.
32+
df = pd.read_sql(sql=sa.text(SQL_SELECT_STATEMENT), con=con)
33+
assert_frame_equal(df, INPUT_FRAME)
34+
35+
36+
@pytest.mark.skip(reason="Needs COLLATE and pg_table_is_visible")
37+
def test_psycopg_to_sql(cratedb_psql_host, cratedb_psql_port):
38+
# Connect to database.
39+
engine = sa.create_engine(
40+
url=f"postgresql+psycopg_relaxed://crate@{cratedb_psql_host}:{cratedb_psql_port}",
41+
isolation_level="AUTOCOMMIT",
42+
use_native_hstore=False,
43+
echo=True,
44+
)
45+
conn = engine.connect()
46+
47+
# Insert data using pandas.
48+
df = INPUT_FRAME
49+
retval = df.to_sql(name="foo", con=conn, if_exists="replace", index=False)
50+
assert retval == -1
51+
52+
# Synchronize table content.
53+
conn.execute(sa.text(SQL_REFRESH_STATEMENT))
54+
55+
# Read back and verify data using pandas.
56+
df = pd.read_sql(sql=sa.text(SQL_SELECT_STATEMENT), con=conn)
57+
assert_frame_equal(df, INPUT_FRAME)
58+
59+
60+
@pytest.mark.skip(reason="Needs COLLATE and pg_table_is_visible")
61+
@pytest.mark.asyncio
62+
async def test_psycopg_async_to_sql(cratedb_psql_host, cratedb_psql_port):
63+
# Connect to database.
64+
engine = create_async_engine(
65+
url=f"postgresql+psycopg_relaxed://crate@{cratedb_psql_host}:{cratedb_psql_port}",
66+
isolation_level="AUTOCOMMIT",
67+
use_native_hstore=False,
68+
echo=True,
69+
)
70+
71+
# Insert data using pandas.
72+
async with engine.begin() as conn:
73+
df = INPUT_FRAME
74+
retval = await conn.run_sync(to_sql_sync, df=df, name="foo", if_exists="replace", index=False)
75+
assert retval == -1
76+
77+
# TODO: Read back dataframe and compare with original.
78+
79+
80+
@pytest.mark.skip(reason="Needs COLLATE and pg_table_is_visible")
81+
@pytest.mark.asyncio
82+
async def test_asyncpg_to_sql(cratedb_psql_host, cratedb_psql_port):
83+
# Connect to database.
84+
engine = create_async_engine(
85+
url=f"postgresql+asyncpg_relaxed://crate@{cratedb_psql_host}:{cratedb_psql_port}",
86+
isolation_level="AUTOCOMMIT",
87+
echo=True,
88+
)
89+
90+
# Insert data using pandas.
91+
async with engine.begin() as conn:
92+
df = INPUT_FRAME
93+
retval = await conn.run_sync(to_sql_sync, df=df, name="foo", if_exists="replace", index=False)
94+
assert retval == -1
95+
96+
# TODO: Read back dataframe and compare with original.
97+
98+
99+
def to_sql_sync(conn, df, name, **kwargs):
100+
"""
101+
Making df.to_sql connection the first argument to make it compatible
102+
with conn.run_sync(), see https://stackoverflow.com/a/70861276.
103+
"""
104+
return df.to_sql(name=name, con=conn, **kwargs)

0 commit comments

Comments
 (0)