diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 394ec2e3..764e101f 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -1725,6 +1725,10 @@ def fetchone(self) -> Union[None, Row]: self.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(self.hstmt)) if ret == ddbc_sql_const.SQL_NO_DATA.value: + # No more data available + if self._next_row_index == 0 and self.description is not None: + # This is an empty result set, set rowcount to 0 + self.rowcount = 0 return None # Update internal position after successful fetch @@ -1733,6 +1737,8 @@ def fetchone(self) -> Union[None, Row]: self._next_row_index += 1 else: self._increment_rownumber() + + self.rowcount = self._next_row_index # Create and return a Row object, passing column name map if available column_map = getattr(self, '_column_name_map', None) @@ -1775,6 +1781,12 @@ def fetchmany(self, size: int = None) -> List[Row]: # advance counters by number of rows actually returned self._next_row_index += len(rows_data) self._rownumber = self._next_row_index - 1 + + # Centralize rowcount assignment after fetch + if len(rows_data) == 0 and self._next_row_index == 0: + self.rowcount = 0 + else: + self.rowcount = self._next_row_index # Convert raw data to Row objects column_map = getattr(self, '_column_name_map', None) @@ -1807,6 +1819,12 @@ def fetchall(self) -> List[Row]: if rows_data and self._has_result_set: self._next_row_index += len(rows_data) self._rownumber = self._next_row_index - 1 + + # Centralize rowcount assignment after fetch + if len(rows_data) == 0 and self._next_row_index == 0: + self.rowcount = 0 + else: + self.rowcount = self._next_row_index # Convert raw data to Row objects column_map = getattr(self, '_column_name_map', None) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 17c149b2..97600c17 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -9647,6 +9647,190 @@ def test_primarykeys_cleanup(cursor, db_connection): except Exception as e: pytest.fail(f"Test cleanup failed: {e}") +def test_rowcount_after_fetch_operations(cursor, db_connection): + """Test that rowcount is updated correctly after various fetch operations.""" + try: + # Create a test table + cursor.execute("CREATE TABLE #rowcount_fetch_test (id INT PRIMARY KEY, name NVARCHAR(100))") + + # Insert some test data + cursor.execute("INSERT INTO #rowcount_fetch_test VALUES (1, 'Row 1')") + cursor.execute("INSERT INTO #rowcount_fetch_test VALUES (2, 'Row 2')") + cursor.execute("INSERT INTO #rowcount_fetch_test VALUES (3, 'Row 3')") + cursor.execute("INSERT INTO #rowcount_fetch_test VALUES (4, 'Row 4')") + cursor.execute("INSERT INTO #rowcount_fetch_test VALUES (5, 'Row 5')") + db_connection.commit() + + # Test fetchone + cursor.execute("SELECT * FROM #rowcount_fetch_test ORDER BY id") + # Initially, rowcount should be -1 after a SELECT statement + assert cursor.rowcount == -1, "rowcount should be -1 right after SELECT statement" + + # After fetchone, rowcount should be 1 + row = cursor.fetchone() + assert row is not None, "Should fetch one row" + assert cursor.rowcount == 1, "rowcount should be 1 after fetchone" + + # After another fetchone, rowcount should be 2 + row = cursor.fetchone() + assert row is not None, "Should fetch second row" + assert cursor.rowcount == 2, "rowcount should be 2 after second fetchone" + + # Test fetchmany + cursor.execute("SELECT * FROM #rowcount_fetch_test ORDER BY id") + assert cursor.rowcount == -1, "rowcount should be -1 right after SELECT statement" + + # After fetchmany(2), rowcount should be 2 + rows = cursor.fetchmany(2) + assert len(rows) == 2, "Should fetch two rows" + assert cursor.rowcount == 2, "rowcount should be 2 after fetchmany(2)" + + # After another fetchmany(2), rowcount should be 4 + rows = cursor.fetchmany(2) + assert len(rows) == 2, "Should fetch two more rows" + assert cursor.rowcount == 4, "rowcount should be 4 after second fetchmany(2)" + + # Test fetchall + cursor.execute("SELECT * FROM #rowcount_fetch_test ORDER BY id") + assert cursor.rowcount == -1, "rowcount should be -1 right after SELECT statement" + + # After fetchall, rowcount should be the total number of rows fetched (5) + rows = cursor.fetchall() + assert len(rows) == 5, "Should fetch all rows" + assert cursor.rowcount == 5, "rowcount should be 5 after fetchall" + + # Test mixed fetch operations + cursor.execute("SELECT * FROM #rowcount_fetch_test ORDER BY id") + + # Fetch one row + row = cursor.fetchone() + assert row is not None, "Should fetch one row" + assert cursor.rowcount == 1, "rowcount should be 1 after fetchone" + + # Fetch two more rows with fetchmany + rows = cursor.fetchmany(2) + assert len(rows) == 2, "Should fetch two more rows" + assert cursor.rowcount == 3, "rowcount should be 3 after fetchone + fetchmany(2)" + + # Fetch remaining rows with fetchall + rows = cursor.fetchall() + assert len(rows) == 2, "Should fetch remaining two rows" + assert cursor.rowcount == 5, "rowcount should be 5 after fetchone + fetchmany(2) + fetchall" + + # Test fetchall on an empty result + cursor.execute("SELECT * FROM #rowcount_fetch_test WHERE id > 100") + rows = cursor.fetchall() + assert len(rows) == 0, "Should fetch zero rows" + assert cursor.rowcount == 0, "rowcount should be 0 after fetchall on empty result" + + finally: + # Clean up + try: + cursor.execute("DROP TABLE #rowcount_fetch_test") + db_connection.commit() + except: + pass + +def test_rowcount_guid_table(cursor, db_connection): + """Test rowcount with GUID/uniqueidentifier columns to match the GitHub issue scenario.""" + try: + # Create a test table similar to the one in the GitHub issue + cursor.execute("CREATE TABLE #test_log (id uniqueidentifier PRIMARY KEY DEFAULT NEWID(), message VARCHAR(100))") + + # Insert test data + cursor.execute("INSERT INTO #test_log (message) VALUES ('Log 1')") + cursor.execute("INSERT INTO #test_log (message) VALUES ('Log 2')") + cursor.execute("INSERT INTO #test_log (message) VALUES ('Log 3')") + db_connection.commit() + + # Execute SELECT query + cursor.execute("SELECT * FROM #test_log") + assert cursor.rowcount == -1, "Rowcount should be -1 after a SELECT statement (before fetch)" + + # Test fetchall + rows = cursor.fetchall() + assert len(rows) == 3, "Should fetch 3 rows" + assert cursor.rowcount == 3, "Rowcount should be 3 after fetchall" + + # Execute SELECT again + cursor.execute("SELECT * FROM #test_log") + + # Test fetchmany + rows = cursor.fetchmany(2) + assert len(rows) == 2, "Should fetch 2 rows" + assert cursor.rowcount == 2, "Rowcount should be 2 after fetchmany(2)" + + # Fetch remaining row + rows = cursor.fetchall() + assert len(rows) == 1, "Should fetch 1 remaining row" + assert cursor.rowcount == 3, "Rowcount should be 3 after fetchmany(2) + fetchall" + + # Execute SELECT again + cursor.execute("SELECT * FROM #test_log") + + # Test individual fetchone calls + row1 = cursor.fetchone() + assert row1 is not None, "First row should not be None" + assert cursor.rowcount == 1, "Rowcount should be 1 after first fetchone" + + row2 = cursor.fetchone() + assert row2 is not None, "Second row should not be None" + assert cursor.rowcount == 2, "Rowcount should be 2 after second fetchone" + + row3 = cursor.fetchone() + assert row3 is not None, "Third row should not be None" + assert cursor.rowcount == 3, "Rowcount should be 3 after third fetchone" + + row4 = cursor.fetchone() + assert row4 is None, "Fourth row should be None (no more rows)" + assert cursor.rowcount == 3, "Rowcount should remain 3 when fetchone returns None" + + finally: + # Clean up + try: + cursor.execute("DROP TABLE #test_log") + db_connection.commit() + except: + pass + +def test_rowcount(cursor, db_connection): + """Test rowcount after various operations""" + try: + cursor.execute("CREATE TABLE #pytest_test_rowcount (id INT IDENTITY(1,1) PRIMARY KEY, name NVARCHAR(100))") + db_connection.commit() + + cursor.execute("INSERT INTO #pytest_test_rowcount (name) VALUES ('JohnDoe1');") + assert cursor.rowcount == 1, "Rowcount should be 1 after first insert" + + cursor.execute("INSERT INTO #pytest_test_rowcount (name) VALUES ('JohnDoe2');") + assert cursor.rowcount == 1, "Rowcount should be 1 after second insert" + + cursor.execute("INSERT INTO #pytest_test_rowcount (name) VALUES ('JohnDoe3');") + assert cursor.rowcount == 1, "Rowcount should be 1 after third insert" + + cursor.execute(""" + INSERT INTO #pytest_test_rowcount (name) + VALUES + ('JohnDoe4'), + ('JohnDoe5'), + ('JohnDoe6'); + """) + assert cursor.rowcount == 3, "Rowcount should be 3 after inserting multiple rows" + + cursor.execute("SELECT * FROM #pytest_test_rowcount;") + assert cursor.rowcount == -1, "Rowcount should be -1 after a SELECT statement (before fetch)" + + # After fetchall, rowcount should be updated to match the number of rows fetched + rows = cursor.fetchall() + assert len(rows) == 6, "Should have fetched 6 rows" + assert cursor.rowcount == 6, "Rowcount should be updated to 6 after fetchall" + + db_connection.commit() + except Exception as e: + pytest.fail(f"Rowcount test failed: {e}") + finally: + cursor.execute("DROP TABLE #pytest_test_rowcount") + def test_specialcolumns_setup(cursor, db_connection): """Create test tables for testing rowIdColumns and rowVerColumns""" try: