Skip to content

TST/CLN: Ensure sqlite memory connections are closed #49154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 17, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 72 additions & 63 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""
from __future__ import annotations

import contextlib
from contextlib import closing
import csv
from datetime import (
Expand Down Expand Up @@ -456,8 +457,9 @@ def sqlite_iris_conn(sqlite_iris_engine):

@pytest.fixture
def sqlite_buildin():
with sqlite3.connect(":memory:") as conn:
yield conn
with contextlib.closing(sqlite3.connect(":memory:")) as closing_conn:
with closing_conn as conn:
yield conn


@pytest.fixture
Expand Down Expand Up @@ -1561,9 +1563,12 @@ def __init__(self, *args, **kwargs) -> None:
def __getattr__(self, name):
return getattr(self.conn, name)

conn = MockSqliteConnection(":memory:")
with tm.assert_produces_warning(UserWarning):
sql.read_sql("SELECT 1", conn)
def close(self):
self.conn.close()

with contextlib.closing(MockSqliteConnection(":memory:")) as conn:
with tm.assert_produces_warning(UserWarning):
sql.read_sql("SELECT 1", conn)

def test_read_sql_delegate(self):
iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn)
Expand Down Expand Up @@ -1945,7 +1950,7 @@ def test_datetime_date(self):
# comes back as datetime64
tm.assert_series_equal(result, expected)

def test_datetime_time(self):
def test_datetime_time(self, sqlite_buildin):
# test support for datetime.time
df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
assert df.to_sql("test_time", self.conn, index=False) == 2
Expand All @@ -1954,7 +1959,7 @@ def test_datetime_time(self):

# GH8341
# first, use the fallback to have the sqlite adapter put in place
sqlite_conn = TestSQLiteFallback.connect()
sqlite_conn = sqlite_buildin
assert sql.to_sql(df, "test_time2", sqlite_conn, index=False) == 2
res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn)
ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
Expand Down Expand Up @@ -2762,21 +2767,17 @@ def tquery(query, con=None):


class TestXSQLite:
def setup_method(self):
self.conn = sqlite3.connect(":memory:")

def teardown_method(self):
self.conn.close()

def drop_table(self, table_name):
cur = self.conn.cursor()
def drop_table(self, table_name, conn):
cur = conn.cursor()
cur.execute(f"DROP TABLE IF EXISTS {sql._get_valid_sqlite_name(table_name)}")
self.conn.commit()
conn.commit()

def test_basic(self):
def test_basic(self, sqlite_buildin):
frame = tm.makeTimeDataFrame()
assert sql.to_sql(frame, name="test_table", con=self.conn, index=False) == 30
result = sql.read_sql("select * from test_table", self.conn)
assert (
sql.to_sql(frame, name="test_table", con=sqlite_buildin, index=False) == 30
)
result = sql.read_sql("select * from test_table", sqlite_buildin)

# HACK! Change this once indexes are handled properly.
result.index = frame.index
Expand All @@ -2788,47 +2789,52 @@ def test_basic(self):
frame2 = frame.copy()
new_idx = Index(np.arange(len(frame2))) + 10
frame2["Idx"] = new_idx.copy()
assert sql.to_sql(frame2, name="test_table2", con=self.conn, index=False) == 30
result = sql.read_sql("select * from test_table2", self.conn, index_col="Idx")
assert (
sql.to_sql(frame2, name="test_table2", con=sqlite_buildin, index=False)
== 30
)
result = sql.read_sql(
"select * from test_table2", sqlite_buildin, index_col="Idx"
)
expected = frame.copy()
expected.index = new_idx
expected.index.name = "Idx"
tm.assert_frame_equal(expected, result)

def test_write_row_by_row(self):
def test_write_row_by_row(self, sqlite_buildin):
frame = tm.makeTimeDataFrame()
frame.iloc[0, 0] = np.nan
create_sql = sql.get_schema(frame, "test")
cur = self.conn.cursor()
cur = sqlite_buildin.cursor()
cur.execute(create_sql)

ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"
for _, row in frame.iterrows():
fmt_sql = format_query(ins, *row)
tquery(fmt_sql, con=self.conn)
tquery(fmt_sql, con=sqlite_buildin)

self.conn.commit()
sqlite_buildin.commit()

result = sql.read_sql("select * from test", con=self.conn)
result = sql.read_sql("select * from test", con=sqlite_buildin)
result.index = frame.index
tm.assert_frame_equal(result, frame, rtol=1e-3)

def test_execute(self):
def test_execute(self, sqlite_buildin):
frame = tm.makeTimeDataFrame()
create_sql = sql.get_schema(frame, "test")
cur = self.conn.cursor()
cur = sqlite_buildin.cursor()
cur.execute(create_sql)
ins = "INSERT INTO test VALUES (?, ?, ?, ?)"

row = frame.iloc[0]
sql.execute(ins, self.conn, params=tuple(row))
self.conn.commit()
sql.execute(ins, sqlite_buildin, params=tuple(row))
sqlite_buildin.commit()

result = sql.read_sql("select * from test", self.conn)
result = sql.read_sql("select * from test", sqlite_buildin)
result.index = frame.index[:1]
tm.assert_frame_equal(result, frame[:1])

def test_schema(self):
def test_schema(self, sqlite_buildin):
frame = tm.makeTimeDataFrame()
create_sql = sql.get_schema(frame, "test")
lines = create_sql.splitlines()
Expand All @@ -2840,10 +2846,10 @@ def test_schema(self):
create_sql = sql.get_schema(frame, "test", keys=["A", "B"])
lines = create_sql.splitlines()
assert 'PRIMARY KEY ("A", "B")' in create_sql
cur = self.conn.cursor()
cur = sqlite_buildin.cursor()
cur.execute(create_sql)

def test_execute_fail(self):
def test_execute_fail(self, sqlite_buildin):
create_sql = """
CREATE TABLE test
(
Expand All @@ -2853,14 +2859,14 @@ def test_execute_fail(self):
PRIMARY KEY (a, b)
);
"""
cur = self.conn.cursor()
cur = sqlite_buildin.cursor()
cur.execute(create_sql)

sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn)
sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', sqlite_buildin)
sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', sqlite_buildin)

with pytest.raises(sql.DatabaseError, match="Execution failed on sql"):
sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn)
sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', sqlite_buildin)

def test_execute_closed_connection(self):
create_sql = """
Expand All @@ -2872,36 +2878,36 @@ def test_execute_closed_connection(self):
PRIMARY KEY (a, b)
);
"""
cur = self.conn.cursor()
cur.execute(create_sql)
with contextlib.closing(sqlite3.connect(":memory:")) as conn:
cur = conn.cursor()
cur.execute(create_sql)

sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
self.conn.close()
sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', conn)

msg = "Cannot operate on a closed database."
with pytest.raises(sqlite3.ProgrammingError, match=msg):
tquery("select * from test", con=self.conn)
tquery("select * from test", con=conn)

def test_keyword_as_column_names(self):
def test_keyword_as_column_names(self, sqlite_buildin):
df = DataFrame({"From": np.ones(5)})
assert sql.to_sql(df, con=self.conn, name="testkeywords", index=False) == 5
assert sql.to_sql(df, con=sqlite_buildin, name="testkeywords", index=False) == 5

def test_onecolumn_of_integer(self):
def test_onecolumn_of_integer(self, sqlite_buildin):
# GH 3628
# a column_of_integers dataframe should transfer well to sql

mono_df = DataFrame([1, 2], columns=["c0"])
assert sql.to_sql(mono_df, con=self.conn, name="mono_df", index=False) == 2
assert sql.to_sql(mono_df, con=sqlite_buildin, name="mono_df", index=False) == 2
# computing the sum via sql
con_x = self.conn
con_x = sqlite_buildin
the_sum = sum(my_c0[0] for my_c0 in con_x.execute("select * from mono_df"))
# it should not fail, and gives 3 ( Issue #3628 )
assert the_sum == 3

result = sql.read_sql("select * from mono_df", con_x)
tm.assert_frame_equal(result, mono_df)

def test_if_exists(self):
def test_if_exists(self, sqlite_buildin):
df_if_exists_1 = DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
df_if_exists_2 = DataFrame({"col1": [3, 4, 5], "col2": ["C", "D", "E"]})
table_name = "table_if_exists"
Expand All @@ -2911,70 +2917,73 @@ def test_if_exists(self):
with pytest.raises(ValueError, match=msg):
sql.to_sql(
frame=df_if_exists_1,
con=self.conn,
con=sqlite_buildin,
name=table_name,
if_exists="notvalidvalue",
)
self.drop_table(table_name)
self.drop_table(table_name, sqlite_buildin)

# test if_exists='fail'
sql.to_sql(
frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail"
frame=df_if_exists_1, con=sqlite_buildin, name=table_name, if_exists="fail"
)
msg = "Table 'table_if_exists' already exists"
with pytest.raises(ValueError, match=msg):
sql.to_sql(
frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail"
frame=df_if_exists_1,
con=sqlite_buildin,
name=table_name,
if_exists="fail",
)
# test if_exists='replace'
sql.to_sql(
frame=df_if_exists_1,
con=self.conn,
con=sqlite_buildin,
name=table_name,
if_exists="replace",
index=False,
)
assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")]
assert tquery(sql_select, con=sqlite_buildin) == [(1, "A"), (2, "B")]
assert (
sql.to_sql(
frame=df_if_exists_2,
con=self.conn,
con=sqlite_buildin,
name=table_name,
if_exists="replace",
index=False,
)
== 3
)
assert tquery(sql_select, con=self.conn) == [(3, "C"), (4, "D"), (5, "E")]
self.drop_table(table_name)
assert tquery(sql_select, con=sqlite_buildin) == [(3, "C"), (4, "D"), (5, "E")]
self.drop_table(table_name, sqlite_buildin)

# test if_exists='append'
assert (
sql.to_sql(
frame=df_if_exists_1,
con=self.conn,
con=sqlite_buildin,
name=table_name,
if_exists="fail",
index=False,
)
== 2
)
assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")]
assert tquery(sql_select, con=sqlite_buildin) == [(1, "A"), (2, "B")]
assert (
sql.to_sql(
frame=df_if_exists_2,
con=self.conn,
con=sqlite_buildin,
name=table_name,
if_exists="append",
index=False,
)
== 3
)
assert tquery(sql_select, con=self.conn) == [
assert tquery(sql_select, con=sqlite_buildin) == [
(1, "A"),
(2, "B"),
(3, "C"),
(4, "D"),
(5, "E"),
]
self.drop_table(table_name)
self.drop_table(table_name, sqlite_buildin)