Skip to content
88 changes: 88 additions & 0 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Cursor:
description: Sequence of 7-item sequences describing one result column.
rowcount: Number of rows produced or affected by the last execute operation.
arraysize: Number of rows to fetch at a time with fetchmany().
rownumber: Current 0-based index of the cursor in the result set.

Methods:
__init__(connection_str) -> None.
Expand Down Expand Up @@ -74,6 +75,10 @@ def __init__(self, connection) -> None:
# Is a list instead of a bool coz bools in Python are immutable.
# Hence, we can't pass around bools by reference & modify them.
# Therefore, it must be a list with exactly one bool element.

# rownumber attribute
self._rownumber = None # 0-based index of cursor position in result set
self._has_result_set = False # Track if we have an active result set

def _is_unicode_string(self, param):
"""
Expand Down Expand Up @@ -536,6 +541,70 @@ def _map_data_type(self, sql_type):
# Add more mappings as needed
}
return sql_to_python_type.get(sql_type, str)

@property
def rownumber(self):
"""
DB-API extension: Current 0-based index of the cursor in the result set.

Returns:
int or None: The current 0-based index of the cursor in the result set,
or None if the index cannot be determined.

Warning:
This is a DB-API extension and may not be portable across different
database modules.
"""
# Use mssql_python logging system instead of standard warnings
log('warning', "DB-API extension cursor.rownumber used")

# Return None if cursor is closed or no result set is available
if self.closed or not self._has_result_set:
return None

return self._rownumber

def _reset_rownumber(self):
"""Reset the rownumber tracking when starting a new result set."""
self._rownumber = 0
self._has_result_set = True

def _increment_rownumber(self):
"""
Increment the rownumber by 1.

This should be called after each fetch operation to keep track of the current row index.
"""
if self._has_result_set:
if self._rownumber is None:
self._rownumber = 0
else:
self._rownumber += 1
else:
raise InterfaceError("Cannot increment rownumber: no active result set.")

# Will be used when we add support for scrollable cursors
def _decrement_rownumber(self):
"""
Decrement the rownumber by 1.

This could be used for error recovery or cursor positioning operations.
"""
if self._has_result_set and self._rownumber is not None:
if self._rownumber > 0:
self._rownumber -= 1
# If already at 0, don't go negative
else:
raise InterfaceError("Cannot decrement rownumber: no active result set.")

def _clear_rownumber(self):
"""
Clear the rownumber tracking.

This should be called when the result set is cleared or when the cursor is reset.
"""
self._rownumber = None
self._has_result_set = False

def __iter__(self):
"""
Expand Down Expand Up @@ -655,6 +724,12 @@ def execute(

# Initialize description after execution
self._initialize_description()

# Reset rownumber for new result set (only for SELECT statements)
if self.description: # If we have column descriptions, it's likely a SELECT
self._reset_rownumber()
else:
self._clear_rownumber()

# Return self for method chaining
return self
Expand Down Expand Up @@ -772,6 +847,9 @@ def fetchone(self) -> Union[None, Row]:
if ret == ddbc_sql_const.SQL_NO_DATA.value:
return None

# Increment rownumber for successful fetch
self._increment_rownumber()

# Create and return a Row object
return Row(row_data, self.description)

Expand All @@ -797,6 +875,11 @@ def fetchmany(self, size: int = None) -> List[Row]:
rows_data = []
ret = ddbc_bindings.DDBCSQLFetchMany(self.hstmt, rows_data, size)

# Update rownumber for the number of rows fetched
if rows_data and self._has_result_set:
for _ in rows_data:
self._increment_rownumber()

# Convert raw data to Row objects
return [Row(row_data, self.description) for row_data in rows_data]

Expand All @@ -813,6 +896,11 @@ def fetchall(self) -> List[Row]:
rows_data = []
ret = ddbc_bindings.DDBCSQLFetchAll(self.hstmt, rows_data)

# Update rownumber for the number of rows fetched
if rows_data and self._has_result_set:
for _ in rows_data:
self._increment_rownumber()

# Convert raw data to Row objects
return [Row(row_data, self.description) for row_data in rows_data]

Expand Down
148 changes: 148 additions & 0 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,154 @@ def test_execute_chaining_compatibility_examples(cursor, db_connection):
except:
pass

def test_rownumber_basic_functionality(cursor, db_connection):
"""Test basic rownumber functionality"""
try:
# Create test table
cursor.execute("CREATE TABLE #test_rownumber (id INT, value VARCHAR(50))")
db_connection.commit()

# Insert test data
for i in range(5):
cursor.execute("INSERT INTO #test_rownumber VALUES (?, ?)", i, f"value_{i}")
db_connection.commit()

# Execute query and check initial rownumber
cursor.execute("SELECT * FROM #test_rownumber ORDER BY id")

# Note: Since we're now using log('warning', ...) instead of warnings.warn(),
# we can't easily capture the log messages in tests without additional setup.
# The warning will be logged to the configured logger instead.
initial_rownumber = cursor.rownumber

# Initial rownumber should be 0 (before any fetch)
assert initial_rownumber == 0, f"Initial rownumber should be 0, got {initial_rownumber}"

# Fetch first row and check rownumber
row1 = cursor.fetchone()
assert cursor.rownumber == 1, f"After fetching 1 row, rownumber should be 1, got {cursor.rownumber}"
assert row1[0] == 0, "First row should have id 0"

# Fetch second row and check rownumber
row2 = cursor.fetchone()
assert cursor.rownumber == 2, f"After fetching 2 rows, rownumber should be 2, got {cursor.rownumber}"
assert row2[0] == 1, "Second row should have id 1"

# Fetch remaining rows and check rownumber progression
row3 = cursor.fetchone()
assert cursor.rownumber == 3, f"After fetching 3 rows, rownumber should be 3, got {cursor.rownumber}"

row4 = cursor.fetchone()
assert cursor.rownumber == 4, f"After fetching 4 rows, rownumber should be 4, got {cursor.rownumber}"

row5 = cursor.fetchone()
assert cursor.rownumber == 5, f"After fetching 5 rows, rownumber should be 5, got {cursor.rownumber}"

# Try to fetch beyond result set
no_more_rows = cursor.fetchone()
assert no_more_rows is None, "Should return None when no more rows"
assert cursor.rownumber == 5, f"Rownumber should remain 5 after exhausting result set, got {cursor.rownumber}"

finally:
try:
cursor.execute("DROP TABLE #test_rownumber")
db_connection.commit()
except:
pass

def test_rownumber_warning_logged(cursor, db_connection):
"""Test that accessing rownumber logs a warning message"""
import logging
from mssql_python.helpers import get_logger

try:
# Create test table
cursor.execute("CREATE TABLE #test_rownumber_log (id INT)")
db_connection.commit()
cursor.execute("INSERT INTO #test_rownumber_log VALUES (1)")
db_connection.commit()

# Execute query
cursor.execute("SELECT * FROM #test_rownumber_log")

# Set up logging capture
logger = get_logger()
if logger:
# Create a test handler to capture log messages
import io
log_stream = io.StringIO()
test_handler = logging.StreamHandler(log_stream)
test_handler.setLevel(logging.WARNING)

# Add our test handler
logger.addHandler(test_handler)

try:
# Access rownumber (should trigger warning log)
rownumber = cursor.rownumber

# Check if warning was logged
log_contents = log_stream.getvalue()
assert "DB-API extension cursor.rownumber used" in log_contents, \
f"Expected warning message not found in logs: {log_contents}"

# Verify rownumber functionality still works
assert rownumber == 0, f"Expected rownumber 0, got {rownumber}"

finally:
# Clean up: remove our test handler
logger.removeHandler(test_handler)
else:
# If no logger configured, just test that rownumber works
rownumber = cursor.rownumber
assert rownumber == 0, f"Expected rownumber 0, got {rownumber}"

finally:
try:
cursor.execute("DROP TABLE #test_rownumber_log")
db_connection.commit()
except:
pass

def test_rownumber_closed_cursor(cursor, db_connection):
"""Test rownumber behavior with closed cursor"""
# Create a separate cursor for this test
test_cursor = db_connection.cursor()

try:
# Create test table
test_cursor.execute("CREATE TABLE #test_rownumber_closed (id INT)")
db_connection.commit()

# Insert data and execute query
test_cursor.execute("INSERT INTO #test_rownumber_closed VALUES (1)")
test_cursor.execute("SELECT * FROM #test_rownumber_closed")

# Verify rownumber works before closing
assert test_cursor.rownumber == 0, "Rownumber should work before closing"

# Close the cursor
test_cursor.close()

# Test that rownumber returns None for closed cursor
# Note: This will still log a warning, but that's expected behavior
rownumber = test_cursor.rownumber
assert rownumber is None, "Rownumber should be None for closed cursor"

finally:
# Clean up
try:
if not test_cursor.closed:
test_cursor.execute("DROP TABLE #test_rownumber_closed")
db_connection.commit()
test_cursor.close()
else:
# Use the main cursor to clean up
cursor.execute("DROP TABLE IF EXISTS #test_rownumber_closed")
db_connection.commit()
except:
pass

def test_close(db_connection):
"""Test closing the cursor"""
try:
Expand Down