diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 5c54f97d..f3c76853 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -380,16 +380,10 @@ def _map_sql_type(self, param, parameters_list, i): ) if isinstance(param, bytes): - if len(param) > 8000: # Assuming VARBINARY(MAX) for long byte arrays - return ( - ddbc_sql_const.SQL_VARBINARY.value, - ddbc_sql_const.SQL_C_BINARY.value, - len(param), - 0, - False, - ) + # Use VARBINARY for Python bytes/bytearray since they are variable-length by nature. + # This avoids storage waste from BINARY's zero-padding and matches Python's semantics. return ( - ddbc_sql_const.SQL_BINARY.value, + ddbc_sql_const.SQL_VARBINARY.value, ddbc_sql_const.SQL_C_BINARY.value, len(param), 0, @@ -397,16 +391,10 @@ def _map_sql_type(self, param, parameters_list, i): ) if isinstance(param, bytearray): - if len(param) > 8000: # Assuming VARBINARY(MAX) for long byte arrays - return ( - ddbc_sql_const.SQL_VARBINARY.value, - ddbc_sql_const.SQL_C_BINARY.value, - len(param), - 0, - True, - ) + # Use VARBINARY for Python bytes/bytearray since they are variable-length by nature. + # This avoids storage waste from BINARY's zero-padding and matches Python's semantics. return ( - ddbc_sql_const.SQL_BINARY.value, + ddbc_sql_const.SQL_VARBINARY.value, ddbc_sql_const.SQL_C_BINARY.value, len(param), 0, @@ -848,6 +836,8 @@ def _select_best_sample_value(column): return max(non_nulls, key=lambda s: len(str(s))) if all(isinstance(v, datetime.datetime) for v in non_nulls): return datetime.datetime.now() + if all(isinstance(v, (bytes, bytearray)) for v in non_nulls): + return max(non_nulls, key=lambda b: len(b)) if all(isinstance(v, datetime.date) for v in non_nulls): return datetime.date.today() return non_nulls[0] # fallback diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index a45b288b..864a42a9 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -667,7 +667,7 @@ def test_longvarbinary(cursor, db_connection): db_connection.commit() cursor.execute("INSERT INTO #pytest_longvarbinary_test (longvarbinary_column) VALUES (?), (?)", [bytearray("ABCDEFGHI", 'utf-8'), bytes("123!@#", 'utf-8')]) db_connection.commit() - expectedRows = 3 + expectedRows = 2 # Only 2 rows are inserted # fetchone test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = [] @@ -675,12 +675,12 @@ def test_longvarbinary(cursor, db_connection): rows.append(cursor.fetchone()) assert cursor.fetchone() == None, "longvarbinary_column is expected to have only {} rows".format(expectedRows) assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 0" - assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" + assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" # fetchall test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = cursor.fetchall() assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 0" - assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" + assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" except Exception as e: pytest.fail(f"SQL_LONGVARBINARY parsing test failed: {e}") finally: @@ -1265,6 +1265,56 @@ def test_executemany_null_vs_empty_string(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_null_vs_empty") db_connection.commit() +def test_executemany_binary_data_edge_cases(cursor, db_connection): + """Test executemany with binary data and empty byte arrays""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_binary_test ( + id INT, + binary_data VARBINARY(100) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_binary_test") + db_connection.commit() + + # Test data with binary data and empty bytes + test_data = [ + (1, b''), # Empty bytes + (2, b'hello'), # Regular bytes + (3, b''), # Empty bytes again + (4, b'\x00\x01\x02'), # Binary data with null bytes + (5, b''), # Empty bytes + (6, None), # NULL + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_binary_test VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_binary_test ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 6, f"Expected 6 rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" + if expected_row[1] is None: + assert actual[1] is None, f"Row {i}: Expected NULL, got {actual[1]}" + else: + assert actual[1] == expected_row[1], f"Row {i}: Binary data mismatch expected {expected_row[1]}, got {actual[1]}" + except Exception as e: + pytest.fail(f"Executemany with binary data edge cases failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_binary_test") + db_connection.commit() + + def test_nextset(cursor): """Test nextset""" cursor.execute("SELECT * FROM #pytest_all_data_types WHERE id = 1;") @@ -6221,6 +6271,283 @@ def test_executemany_utf16_length_validation(cursor, db_connection): drop_table_if_exists(cursor, "#pytest_utf16_validation") db_connection.commit() +def test_binary_data_over_8000_bytes(cursor, db_connection): + """Test binary data larger than 8000 bytes - document current driver limitations""" + try: + # Create test table with VARBINARY(MAX) to handle large data + drop_table_if_exists(cursor, "#pytest_large_binary") + cursor.execute(""" + CREATE TABLE #pytest_large_binary ( + id INT, + large_binary VARBINARY(MAX) + ) + """) + + # Test the current driver limitations: + # 1. Parameters cannot be > 8192 bytes + # 2. Fetch buffer is limited to 4096 bytes + + large_data = b'A' * 10000 # 10,000 bytes - exceeds parameter limit + + # This should fail with the current driver parameter limitation + try: + cursor.execute("INSERT INTO #pytest_large_binary VALUES (?, ?)", (1, large_data)) + pytest.fail("Expected streaming parameter error for data > 8192 bytes") + except RuntimeError as e: + error_msg = str(e) + assert "Streaming parameters is not yet supported" in error_msg, f"Expected streaming parameter error, got: {e}" + assert "8192 bytes" in error_msg, f"Expected 8192 bytes limit mentioned, got: {e}" + + # Test data that fits within both parameter and fetch limits (< 4096 bytes) + medium_data = b'B' * 3000 # 3,000 bytes - under both limits + small_data = b'C' * 1000 # 1,000 bytes - well under limits + + # These should work fine + cursor.execute("INSERT INTO #pytest_large_binary VALUES (?, ?)", (1, medium_data)) + cursor.execute("INSERT INTO #pytest_large_binary VALUES (?, ?)", (2, small_data)) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, large_binary FROM #pytest_large_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 2, f"Expected 2 rows, got {len(results)}" + assert len(results[0][1]) == 3000, f"Expected 3000 bytes, got {len(results[0][1])}" + assert len(results[1][1]) == 1000, f"Expected 1000 bytes, got {len(results[1][1])}" + assert results[0][1] == medium_data, "Medium binary data mismatch" + assert results[1][1] == small_data, "Small binary data mismatch" + + print("Note: Driver currently limits parameters to < 8192 bytes and fetch buffer to 4096 bytes.") + + except Exception as e: + pytest.fail(f"Binary data over 8000 bytes test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_large_binary") + db_connection.commit() + +def test_all_empty_binaries(cursor, db_connection): + """Test table with only empty binary values""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_all_empty_binary") + cursor.execute(""" + CREATE TABLE #pytest_all_empty_binary ( + id INT, + empty_binary VARBINARY(100) + ) + """) + + # Insert multiple rows with only empty binary data + test_data = [ + (1, b''), + (2, b''), + (3, b''), + (4, b''), + (5, b''), + ] + + cursor.executemany("INSERT INTO #pytest_all_empty_binary VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify all data is empty binary + cursor.execute("SELECT id, empty_binary FROM #pytest_all_empty_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 5, f"Expected 5 rows, got {len(results)}" + for i, row in enumerate(results, 1): + assert row[0] == i, f"ID mismatch for row {i}" + assert row[1] == b'', f"Row {i} should have empty binary, got {row[1]}" + assert isinstance(row[1], bytes), f"Row {i} should return bytes type, got {type(row[1])}" + assert len(row[1]) == 0, f"Row {i} should have zero-length binary" + + except Exception as e: + pytest.fail(f"All empty binaries test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_all_empty_binary") + db_connection.commit() + +def test_mixed_bytes_and_bytearray_types(cursor, db_connection): + """Test mixing bytes and bytearray types in same column with executemany""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_mixed_binary_types") + cursor.execute(""" + CREATE TABLE #pytest_mixed_binary_types ( + id INT, + binary_data VARBINARY(100) + ) + """) + + # Test data mixing bytes and bytearray for the same column + test_data = [ + (1, b'bytes_data'), # bytes type + (2, bytearray(b'bytearray_1')), # bytearray type + (3, b'more_bytes'), # bytes type + (4, bytearray(b'bytearray_2')), # bytearray type + (5, b''), # empty bytes + (6, bytearray()), # empty bytearray + (7, bytearray(b'\x00\x01\x02\x03')), # bytearray with null bytes + (8, b'\x04\x05\x06\x07'), # bytes with null bytes + ] + + # Execute with mixed types + cursor.executemany("INSERT INTO #pytest_mixed_binary_types VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_mixed_binary_types ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 8, f"Expected 8 rows, got {len(results)}" + + # Check each row - note that SQL Server returns everything as bytes + expected_values = [ + b'bytes_data', + b'bytearray_1', + b'more_bytes', + b'bytearray_2', + b'', + b'', + b'\x00\x01\x02\x03', + b'\x04\x05\x06\x07', + ] + + for i, (row, expected) in enumerate(zip(results, expected_values)): + assert row[0] == i + 1, f"ID mismatch for row {i+1}" + assert row[1] == expected, f"Row {i+1}: expected {expected}, got {row[1]}" + assert isinstance(row[1], bytes), f"Row {i+1} should return bytes type, got {type(row[1])}" + + except Exception as e: + pytest.fail(f"Mixed bytes and bytearray types test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_mixed_binary_types") + db_connection.commit() + +def test_binary_mostly_small_one_large(cursor, db_connection): + """Test binary column with mostly small/empty values but one large value (within driver limits)""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_mixed_size_binary") + cursor.execute(""" + CREATE TABLE #pytest_mixed_size_binary ( + id INT, + binary_data VARBINARY(MAX) + ) + """) + + # Create large binary value within both parameter and fetch limits (< 4096 bytes) + large_binary = b'X' * 3500 # 3,500 bytes - under both limits + + # Test data with mostly small/empty values and one large value + test_data = [ + (1, b''), # Empty + (2, b'small'), # Small value + (3, b''), # Empty again + (4, large_binary), # Large value (3,500 bytes) + (5, b'tiny'), # Small value + (6, b''), # Empty + (7, b'short'), # Small value + (8, b''), # Empty + ] + + # Execute with mixed sizes + cursor.executemany("INSERT INTO #pytest_mixed_size_binary VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_mixed_size_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 8, f"Expected 8 rows, got {len(results)}" + + # Check each row + expected_lengths = [0, 5, 0, 3500, 4, 0, 5, 0] + for i, (row, expected_len) in enumerate(zip(results, expected_lengths)): + assert row[0] == i + 1, f"ID mismatch for row {i+1}" + assert len(row[1]) == expected_len, f"Row {i+1}: expected length {expected_len}, got {len(row[1])}" + + # Special check for the large value + if i == 3: # Row 4 (index 3) has the large value + assert row[1] == large_binary, f"Row 4 should have large binary data" + + # Test that we can query the large value specifically + cursor.execute("SELECT binary_data FROM #pytest_mixed_size_binary WHERE id = 4") + large_result = cursor.fetchone() + assert len(large_result[0]) == 3500, "Large binary should be 3,500 bytes" + assert large_result[0] == large_binary, "Large binary data should match" + + print("Note: Large binary test uses 3,500 bytes due to current driver limits (8192 param, 4096 fetch).") + + except Exception as e: + pytest.fail(f"Binary mostly small one large test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_mixed_size_binary") + db_connection.commit() + +def test_only_null_and_empty_binary(cursor, db_connection): + """Test table with only NULL and empty binary values to ensure fallback doesn't produce size=0""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_null_empty_binary") + cursor.execute(""" + CREATE TABLE #pytest_null_empty_binary ( + id INT, + binary_data VARBINARY(100) + ) + """) + + # Test data with only NULL and empty values + test_data = [ + (1, None), # NULL + (2, b''), # Empty bytes + (3, None), # NULL + (4, b''), # Empty bytes + (5, None), # NULL + (6, b''), # Empty bytes + ] + + # Execute with only NULL and empty values + cursor.executemany("INSERT INTO #pytest_null_empty_binary VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_null_empty_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 6, f"Expected 6 rows, got {len(results)}" + + # Check each row + expected_values = [None, b'', None, b'', None, b''] + for i, (row, expected) in enumerate(zip(results, expected_values)): + assert row[0] == i + 1, f"ID mismatch for row {i+1}" + + if expected is None: + assert row[1] is None, f"Row {i+1} should be NULL, got {row[1]}" + else: + assert row[1] == b'', f"Row {i+1} should be empty bytes, got {row[1]}" + assert isinstance(row[1], bytes), f"Row {i+1} should return bytes type, got {type(row[1])}" + assert len(row[1]) == 0, f"Row {i+1} should have zero length" + + # Test specific queries to ensure NULL vs empty distinction + cursor.execute("SELECT COUNT(*) FROM #pytest_null_empty_binary WHERE binary_data IS NULL") + null_count = cursor.fetchone()[0] + assert null_count == 3, f"Expected 3 NULL values, got {null_count}" + + cursor.execute("SELECT COUNT(*) FROM #pytest_null_empty_binary WHERE binary_data IS NOT NULL") + not_null_count = cursor.fetchone()[0] + assert not_null_count == 3, f"Expected 3 non-NULL values, got {not_null_count}" + + # Test that empty binary values have length 0 (not confused with NULL) + cursor.execute("SELECT COUNT(*) FROM #pytest_null_empty_binary WHERE DATALENGTH(binary_data) = 0") + empty_count = cursor.fetchone()[0] + assert empty_count == 3, f"Expected 3 empty binary values, got {empty_count}" + + except Exception as e: + pytest.fail(f"Only NULL and empty binary test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_null_empty_binary") + db_connection.commit() + def test_close(db_connection): """Test closing the cursor""" try: