diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 0594afda252d4..9adada8afb2c2 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -18,6 +18,7 @@ """ from __future__ import annotations +import contextlib from contextlib import closing import csv from datetime import ( @@ -456,8 +457,9 @@ def sqlite_iris_conn(sqlite_iris_engine): @pytest.fixture def sqlite_buildin(): - with sqlite3.connect(":memory:") as conn: - yield conn + with contextlib.closing(sqlite3.connect(":memory:")) as closing_conn: + with closing_conn as conn: + yield conn @pytest.fixture @@ -1561,9 +1563,12 @@ def __init__(self, *args, **kwargs) -> None: def __getattr__(self, name): return getattr(self.conn, name) - conn = MockSqliteConnection(":memory:") - with tm.assert_produces_warning(UserWarning): - sql.read_sql("SELECT 1", conn) + def close(self): + self.conn.close() + + with contextlib.closing(MockSqliteConnection(":memory:")) as conn: + with tm.assert_produces_warning(UserWarning): + sql.read_sql("SELECT 1", conn) def test_read_sql_delegate(self): iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn) @@ -1945,7 +1950,7 @@ def test_datetime_date(self): # comes back as datetime64 tm.assert_series_equal(result, expected) - def test_datetime_time(self): + def test_datetime_time(self, sqlite_buildin): # test support for datetime.time df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"]) assert df.to_sql("test_time", self.conn, index=False) == 2 @@ -1954,7 +1959,7 @@ def test_datetime_time(self): # GH8341 # first, use the fallback to have the sqlite adapter put in place - sqlite_conn = TestSQLiteFallback.connect() + sqlite_conn = sqlite_buildin assert sql.to_sql(df, "test_time2", sqlite_conn, index=False) == 2 res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn) ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f")) @@ -2762,21 +2767,17 @@ def tquery(query, con=None): class TestXSQLite: - def setup_method(self): - self.conn = sqlite3.connect(":memory:") - - def teardown_method(self): - self.conn.close() - - def drop_table(self, table_name): - cur = self.conn.cursor() + def drop_table(self, table_name, conn): + cur = conn.cursor() cur.execute(f"DROP TABLE IF EXISTS {sql._get_valid_sqlite_name(table_name)}") - self.conn.commit() + conn.commit() - def test_basic(self): + def test_basic(self, sqlite_buildin): frame = tm.makeTimeDataFrame() - assert sql.to_sql(frame, name="test_table", con=self.conn, index=False) == 30 - result = sql.read_sql("select * from test_table", self.conn) + assert ( + sql.to_sql(frame, name="test_table", con=sqlite_buildin, index=False) == 30 + ) + result = sql.read_sql("select * from test_table", sqlite_buildin) # HACK! Change this once indexes are handled properly. result.index = frame.index @@ -2788,47 +2789,52 @@ def test_basic(self): frame2 = frame.copy() new_idx = Index(np.arange(len(frame2))) + 10 frame2["Idx"] = new_idx.copy() - assert sql.to_sql(frame2, name="test_table2", con=self.conn, index=False) == 30 - result = sql.read_sql("select * from test_table2", self.conn, index_col="Idx") + assert ( + sql.to_sql(frame2, name="test_table2", con=sqlite_buildin, index=False) + == 30 + ) + result = sql.read_sql( + "select * from test_table2", sqlite_buildin, index_col="Idx" + ) expected = frame.copy() expected.index = new_idx expected.index.name = "Idx" tm.assert_frame_equal(expected, result) - def test_write_row_by_row(self): + def test_write_row_by_row(self, sqlite_buildin): frame = tm.makeTimeDataFrame() frame.iloc[0, 0] = np.nan create_sql = sql.get_schema(frame, "test") - cur = self.conn.cursor() + cur = sqlite_buildin.cursor() cur.execute(create_sql) ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" for _, row in frame.iterrows(): fmt_sql = format_query(ins, *row) - tquery(fmt_sql, con=self.conn) + tquery(fmt_sql, con=sqlite_buildin) - self.conn.commit() + sqlite_buildin.commit() - result = sql.read_sql("select * from test", con=self.conn) + result = sql.read_sql("select * from test", con=sqlite_buildin) result.index = frame.index tm.assert_frame_equal(result, frame, rtol=1e-3) - def test_execute(self): + def test_execute(self, sqlite_buildin): frame = tm.makeTimeDataFrame() create_sql = sql.get_schema(frame, "test") - cur = self.conn.cursor() + cur = sqlite_buildin.cursor() cur.execute(create_sql) ins = "INSERT INTO test VALUES (?, ?, ?, ?)" row = frame.iloc[0] - sql.execute(ins, self.conn, params=tuple(row)) - self.conn.commit() + sql.execute(ins, sqlite_buildin, params=tuple(row)) + sqlite_buildin.commit() - result = sql.read_sql("select * from test", self.conn) + result = sql.read_sql("select * from test", sqlite_buildin) result.index = frame.index[:1] tm.assert_frame_equal(result, frame[:1]) - def test_schema(self): + def test_schema(self, sqlite_buildin): frame = tm.makeTimeDataFrame() create_sql = sql.get_schema(frame, "test") lines = create_sql.splitlines() @@ -2840,10 +2846,10 @@ def test_schema(self): create_sql = sql.get_schema(frame, "test", keys=["A", "B"]) lines = create_sql.splitlines() assert 'PRIMARY KEY ("A", "B")' in create_sql - cur = self.conn.cursor() + cur = sqlite_buildin.cursor() cur.execute(create_sql) - def test_execute_fail(self): + def test_execute_fail(self, sqlite_buildin): create_sql = """ CREATE TABLE test ( @@ -2853,14 +2859,14 @@ def test_execute_fail(self): PRIMARY KEY (a, b) ); """ - cur = self.conn.cursor() + cur = sqlite_buildin.cursor() cur.execute(create_sql) - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) - sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn) + sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', sqlite_buildin) + sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', sqlite_buildin) with pytest.raises(sql.DatabaseError, match="Execution failed on sql"): - sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn) + sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', sqlite_buildin) def test_execute_closed_connection(self): create_sql = """ @@ -2872,28 +2878,28 @@ def test_execute_closed_connection(self): PRIMARY KEY (a, b) ); """ - cur = self.conn.cursor() - cur.execute(create_sql) + with contextlib.closing(sqlite3.connect(":memory:")) as conn: + cur = conn.cursor() + cur.execute(create_sql) - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) - self.conn.close() + sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', conn) msg = "Cannot operate on a closed database." with pytest.raises(sqlite3.ProgrammingError, match=msg): - tquery("select * from test", con=self.conn) + tquery("select * from test", con=conn) - def test_keyword_as_column_names(self): + def test_keyword_as_column_names(self, sqlite_buildin): df = DataFrame({"From": np.ones(5)}) - assert sql.to_sql(df, con=self.conn, name="testkeywords", index=False) == 5 + assert sql.to_sql(df, con=sqlite_buildin, name="testkeywords", index=False) == 5 - def test_onecolumn_of_integer(self): + def test_onecolumn_of_integer(self, sqlite_buildin): # GH 3628 # a column_of_integers dataframe should transfer well to sql mono_df = DataFrame([1, 2], columns=["c0"]) - assert sql.to_sql(mono_df, con=self.conn, name="mono_df", index=False) == 2 + assert sql.to_sql(mono_df, con=sqlite_buildin, name="mono_df", index=False) == 2 # computing the sum via sql - con_x = self.conn + con_x = sqlite_buildin the_sum = sum(my_c0[0] for my_c0 in con_x.execute("select * from mono_df")) # it should not fail, and gives 3 ( Issue #3628 ) assert the_sum == 3 @@ -2901,7 +2907,7 @@ def test_onecolumn_of_integer(self): result = sql.read_sql("select * from mono_df", con_x) tm.assert_frame_equal(result, mono_df) - def test_if_exists(self): + def test_if_exists(self, sqlite_buildin): df_if_exists_1 = DataFrame({"col1": [1, 2], "col2": ["A", "B"]}) df_if_exists_2 = DataFrame({"col1": [3, 4, 5], "col2": ["C", "D", "E"]}) table_name = "table_if_exists" @@ -2911,70 +2917,73 @@ def test_if_exists(self): with pytest.raises(ValueError, match=msg): sql.to_sql( frame=df_if_exists_1, - con=self.conn, + con=sqlite_buildin, name=table_name, if_exists="notvalidvalue", ) - self.drop_table(table_name) + self.drop_table(table_name, sqlite_buildin) # test if_exists='fail' sql.to_sql( - frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail" + frame=df_if_exists_1, con=sqlite_buildin, name=table_name, if_exists="fail" ) msg = "Table 'table_if_exists' already exists" with pytest.raises(ValueError, match=msg): sql.to_sql( - frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail" + frame=df_if_exists_1, + con=sqlite_buildin, + name=table_name, + if_exists="fail", ) # test if_exists='replace' sql.to_sql( frame=df_if_exists_1, - con=self.conn, + con=sqlite_buildin, name=table_name, if_exists="replace", index=False, ) - assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")] + assert tquery(sql_select, con=sqlite_buildin) == [(1, "A"), (2, "B")] assert ( sql.to_sql( frame=df_if_exists_2, - con=self.conn, + con=sqlite_buildin, name=table_name, if_exists="replace", index=False, ) == 3 ) - assert tquery(sql_select, con=self.conn) == [(3, "C"), (4, "D"), (5, "E")] - self.drop_table(table_name) + assert tquery(sql_select, con=sqlite_buildin) == [(3, "C"), (4, "D"), (5, "E")] + self.drop_table(table_name, sqlite_buildin) # test if_exists='append' assert ( sql.to_sql( frame=df_if_exists_1, - con=self.conn, + con=sqlite_buildin, name=table_name, if_exists="fail", index=False, ) == 2 ) - assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")] + assert tquery(sql_select, con=sqlite_buildin) == [(1, "A"), (2, "B")] assert ( sql.to_sql( frame=df_if_exists_2, - con=self.conn, + con=sqlite_buildin, name=table_name, if_exists="append", index=False, ) == 3 ) - assert tquery(sql_select, con=self.conn) == [ + assert tquery(sql_select, con=sqlite_buildin) == [ (1, "A"), (2, "B"), (3, "C"), (4, "D"), (5, "E"), ] - self.drop_table(table_name) + self.drop_table(table_name, sqlite_buildin)