From 5c1846c799c9dcc99680611909ba0fd5a828ace5 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 9 Sep 2023 20:13:35 +0900 Subject: [PATCH 01/16] feat: add category_of_none --- pymysqlreplication/row_event.py | 54 ++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index fcd138d3..ada72031 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -459,6 +459,24 @@ def __read_binary_slice(self, binary, start, size, data_length): mask = ((1 << size) - 1) return binary & mask + def _categorize_none(self, column_data): + result = {} + for column_name, value in column_data.items(): + if value is not None: + continue + + category = "null" + + column_type = [col.type for col in self.columns if col.name == column_name][0] + if column_type in (FIELD_TYPE.DATETIME, FIELD_TYPE.DATE, FIELD_TYPE.DATETIME2): + category = "out of datetime range" + elif column_type == FIELD_TYPE.SET: + category = "empty set" + + result[column_name] = category + + return result + def _dump(self): super()._dump() print("Table: %s.%s" % (self.schema, self.table)) @@ -498,6 +516,8 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) + row["category_of_none"] = self._categorize_none(row["values"]) + return row def _dump(self): @@ -506,8 +526,8 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key]) - + print("*", key, ":", row["values"][key], + "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -526,6 +546,8 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) + row["category_of_none"] = self._categorize_none(row["values"]) + return row def _dump(self): @@ -534,7 +556,8 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key]) + print("*", key, ":", row["values"][key], + "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") class UpdateRowsEvent(RowsEvent): @@ -561,9 +584,15 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _fetch_one_row(self): row = {} - row["before_values"] = self._read_column_data(self.columns_present_bitmap) + changes = {} + updated_columns = [] + column_types = {} + row["before_values"] = self._read_column_data(self.columns_present_bitmap) row["after_values"] = self._read_column_data(self.columns_present_bitmap2) + row["before_category_of_none"] = self._categorize_none(row["before_values"]) + row["after_category_of_none"] = self._categorize_none(row["after_values"]) + return row def _dump(self): @@ -573,10 +602,21 @@ def _dump(self): for row in self.rows: print("--") for key in row["before_values"]: - print("*%s:%s=>%s" % (key, - row["before_values"][key], - row["after_values"][key])) + if key in row["before_category_of_none"]: + before_value_info = "%s(%s)" % (row["before_values"][key], + row["before_category_of_none"][key]) + else: + before_value_info = row["before_values"][key] + + if key in row["after_category_of_none"]: + after_value_info = "%s(%s)" % (row["after_values"][key], + row["after_category_of_none"][key]) + else: + after_value_info = row["after_values"][key] + print("*%s:%s=>%s" % (key, + before_value_info, + after_value_info)) class TableMapEvent(BinLogEvent): """This event describes the structure of a table. From 7d5da6774c321a6f890b42841d1cc8b7c54fc4a8 Mon Sep 17 00:00:00 2001 From: sean Date: Thu, 14 Sep 2023 16:54:09 +0900 Subject: [PATCH 02/16] categorize test when null value --- pymysqlreplication/tests/test_basic.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index f03b5663..e01d1839 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -574,6 +574,24 @@ def create_binlog_packet_wrapper(pkt): self.assertEqual(binlog_event.event._is_event_valid, True) self.assertNotEqual(wrong_event.event._is_event_valid, True) + def test_categorize_none(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + resume_stream=False, + only_events = [WriteRowsEvent] + ) + query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" + self.execute(query) + query = "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + self.execute(query) + self.execute("COMMIT") + write_rows_event = self.stream.fetchone() + self.assertIsInstance(write_rows_event, WriteRowsEvent) + self.assertEqual(write_rows_event.rows[0]['category_of_none']['col1'], 'null') + + class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self): From 6f5939eab5c8094b168416a65806d8a224f1614b Mon Sep 17 00:00:00 2001 From: mjs Date: Thu, 14 Sep 2023 20:43:44 +0900 Subject: [PATCH 03/16] feat: add self.none_sources --- pymysqlreplication/row_event.py | 48 ++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index ada72031..fd6a1a78 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -24,6 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] self.__ignored_schemas = kwargs["ignored_schemas"] + self.none_sources = {} #Header self.table_id = self._read_table_id() @@ -123,11 +124,14 @@ def _read_column_data(self, cols_bitmap): def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, zerofill, fixed_binary_length, i): + name = self.table_map[self.table_id].columns[i].name if BitGet(cols_bitmap, i) == 0: + self.none_sources[name] = 'cols_bitmap' return None if self._is_null(null_bitmap, null_bitmap_index): + self.none_sources[name] = 'null' return None if column.type == FIELD_TYPE.TINY: @@ -182,18 +186,27 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap elif column.type == FIELD_TYPE.BLOB: return self.__read_string(column.length_size, column) elif column.type == FIELD_TYPE.DATETIME: - return self.__read_datetime() + ret = self.__read_datetime() + if ret is None: + self.none_sources[name] = 'out of datetime range' + return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: - return self.__read_date() + ret = self.__read_date() + if ret is None: + self.none_sources[name] = 'out of date range' + return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.fromtimestamp( self.packet.read_uint32()) # For new date format: elif column.type == FIELD_TYPE.DATETIME2: - return self.__read_datetime2(column) + ret = self.__read_datetime2(column) + if ret is None: + self.none_sources[name] = 'out of datetime2 range' + return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) elif column.type == FIELD_TYPE.TIMESTAMP2: @@ -217,10 +230,14 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap # We read set columns as a bitmap telling us which options # are enabled bit_mask = self.packet.read_uint_by_size(column.size) - return set( + set_value = set( val for idx, val in enumerate(column.set_values) if bit_mask & 2 ** idx - ) or None + ) + if not set_value: + self.none_sources[column.name] = "empty set" + return None + return set_value elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) @@ -465,16 +482,8 @@ def _categorize_none(self, column_data): if value is not None: continue - category = "null" - - column_type = [col.type for col in self.columns if col.name == column_name][0] - if column_type in (FIELD_TYPE.DATETIME, FIELD_TYPE.DATE, FIELD_TYPE.DATETIME2): - category = "out of datetime range" - elif column_type == FIELD_TYPE.SET: - category = "empty set" - + category = self.none_sources.get(column_name, "null") result[column_name] = category - return result def _dump(self): @@ -584,15 +593,10 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _fetch_one_row(self): row = {} - changes = {} - updated_columns = [] - column_types = {} - row["before_values"] = self._read_column_data(self.columns_present_bitmap) - row["after_values"] = self._read_column_data(self.columns_present_bitmap2) - row["before_category_of_none"] = self._categorize_none(row["before_values"]) - row["after_category_of_none"] = self._categorize_none(row["after_values"]) - + row['before_category_of_none'] = self._categorize_none(row["before_values"]) + row['after_values'] = self._read_column_data(self.columns_present_bitmap2) + row['after_category_of_none'] = self._categorize_none(row["after_values"]) return row def _dump(self): From a74e3ff957f2786920e155570818d0e5d4ea8aa9 Mon Sep 17 00:00:00 2001 From: mjs Date: Sat, 16 Sep 2023 13:08:15 +0900 Subject: [PATCH 04/16] feat: add categorize test when invalid mode --- pymysqlreplication/tests/test_basic.py | 29 ++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index e01d1839..e8818ae0 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -591,6 +591,35 @@ def test_categorize_none(self): self.assertIsInstance(write_rows_event, WriteRowsEvent) self.assertEqual(write_rows_event.rows[0]['category_of_none']['col1'], 'null') + def test_categorize_none_invalid(self): + self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") + self.execute("CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))") + self.execute("INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)") + self.resetBinLog() + self.execute("UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL") + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) + self.assertIsInstance(event, UpdateRowsEvent) + self.assertEqual(event.rows[0]["before_category_of_none"]["col0"], 'null') + self.assertEqual(event.rows[0]["before_category_of_none"]["col1"], 'null') + self.assertEqual(event.rows[0]["before_category_of_none"]["col2"], 'out of datetime2 range') + self.assertEqual(event.rows[0]["before_category_of_none"]["col3"], 'null') + self.assertEqual(event.rows[0]["before_category_of_none"]["col4"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col0"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col1"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col2"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col3"], 'out of date range') + self.assertEqual(event.rows[0]["after_category_of_none"]["col4"], 'empty set') class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From b5076730d4077583f89bc2c4ab5985ce3f4723f2 Mon Sep 17 00:00:00 2001 From: starcat37 Date: Mon, 18 Sep 2023 09:19:49 +0900 Subject: [PATCH 05/16] docs: add binlog_row_image comment in read_values_name --- pymysqlreplication/row_event.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index fd6a1a78..780ed04a 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -127,6 +127,8 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap name = self.table_map[self.table_id].columns[i].name if BitGet(cols_bitmap, i) == 0: + # This block is only executed when binlog_row_image = MINIMAL. + # When binlog_row_image = FULL, this block does not execute. self.none_sources[name] = 'cols_bitmap' return None From c9733f9cc3dd685a5bc135786d8f98e186c7377f Mon Sep 17 00:00:00 2001 From: starcat37 Date: Wed, 20 Sep 2023 18:39:52 +0900 Subject: [PATCH 06/16] refactor: change the name of variables and functions --- pymysqlreplication/row_event.py | 28 +++++++++++++------------- pymysqlreplication/tests/test_basic.py | 28 +++++++++++++------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 780ed04a..42b9eb97 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -478,14 +478,14 @@ def __read_binary_slice(self, binary, start, size, data_length): mask = ((1 << size) - 1) return binary & mask - def _categorize_none(self, column_data): + def _get_none_sources(self, column_data): result = {} for column_name, value in column_data.items(): if value is not None: continue - category = self.none_sources.get(column_name, "null") - result[column_name] = category + source = self.none_sources.get(column_name, "null") + result[column_name] = source return result def _dump(self): @@ -527,7 +527,7 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) - row["category_of_none"] = self._categorize_none(row["values"]) + row["none_sources"] = self._get_none_sources(row["values"]) return row @@ -538,7 +538,7 @@ def _dump(self): print("--") for key in row["values"]: print("*", key, ":", row["values"][key], - "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") + "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -557,7 +557,7 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) - row["category_of_none"] = self._categorize_none(row["values"]) + row["none_sources"] = self._get_none_sources(row["values"]) return row @@ -568,7 +568,7 @@ def _dump(self): print("--") for key in row["values"]: print("*", key, ":", row["values"][key], - "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") + "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") class UpdateRowsEvent(RowsEvent): @@ -596,9 +596,9 @@ def _fetch_one_row(self): row = {} row["before_values"] = self._read_column_data(self.columns_present_bitmap) - row['before_category_of_none'] = self._categorize_none(row["before_values"]) - row['after_values'] = self._read_column_data(self.columns_present_bitmap2) - row['after_category_of_none'] = self._categorize_none(row["after_values"]) + row["before_none_source"] = self._get_none_sources(row["before_values"]) + row["after_values"] = self._read_column_data(self.columns_present_bitmap2) + row["after_none_source"] = self._get_none_sources(row["after_values"]) return row def _dump(self): @@ -608,15 +608,15 @@ def _dump(self): for row in self.rows: print("--") for key in row["before_values"]: - if key in row["before_category_of_none"]: + if key in row["before_none_source"]: before_value_info = "%s(%s)" % (row["before_values"][key], - row["before_category_of_none"][key]) + row["before_none_source"][key]) else: before_value_info = row["before_values"][key] - if key in row["after_category_of_none"]: + if key in row["after_none_source"]: after_value_info = "%s(%s)" % (row["after_values"][key], - row["after_category_of_none"][key]) + row["after_none_source"][key]) else: after_value_info = row["after_values"][key] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index e8818ae0..ce89aea4 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -574,13 +574,13 @@ def create_binlog_packet_wrapper(pkt): self.assertEqual(binlog_event.event._is_event_valid, True) self.assertNotEqual(wrong_event.event._is_event_valid, True) - def test_categorize_none(self): + def test_get_none(self): self.stream.close() self.stream = BinLogStreamReader( self.database, server_id=1024, resume_stream=False, - only_events = [WriteRowsEvent] + only_events=[WriteRowsEvent] ) query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" self.execute(query) @@ -589,9 +589,9 @@ def test_categorize_none(self): self.execute("COMMIT") write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - self.assertEqual(write_rows_event.rows[0]['category_of_none']['col1'], 'null') + self.assertEqual(write_rows_event.rows[0]['none_sources']['col1'], 'null') - def test_categorize_none_invalid(self): + def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") self.execute("CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))") self.execute("INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)") @@ -610,16 +610,16 @@ def test_categorize_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - self.assertEqual(event.rows[0]["before_category_of_none"]["col0"], 'null') - self.assertEqual(event.rows[0]["before_category_of_none"]["col1"], 'null') - self.assertEqual(event.rows[0]["before_category_of_none"]["col2"], 'out of datetime2 range') - self.assertEqual(event.rows[0]["before_category_of_none"]["col3"], 'null') - self.assertEqual(event.rows[0]["before_category_of_none"]["col4"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col0"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col1"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col2"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col3"], 'out of date range') - self.assertEqual(event.rows[0]["after_category_of_none"]["col4"], 'empty set') + self.assertEqual(event.rows[0]["before_none_source"]["col0"], 'null') + self.assertEqual(event.rows[0]["before_none_source"]["col1"], 'null') + self.assertEqual(event.rows[0]["before_none_source"]["col2"], 'out of datetime2 range') + self.assertEqual(event.rows[0]["before_none_source"]["col3"], 'null') + self.assertEqual(event.rows[0]["before_none_source"]["col4"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col0"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col1"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col2"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col3"], 'out of date range') + self.assertEqual(event.rows[0]["after_none_source"]["col4"], 'empty set') class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From edb52420e8fba369acfa82d857018cfe51679c30 Mon Sep 17 00:00:00 2001 From: mikaniz Date: Thu, 21 Sep 2023 23:18:28 +0900 Subject: [PATCH 07/16] refactor: change the name of variables and the string formatting --- pymysqlreplication/row_event.py | 40 ++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 42b9eb97..e16086fd 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -24,7 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] self.__ignored_schemas = kwargs["ignored_schemas"] - self.none_sources = {} + self.__none_sources = {} #Header self.table_id = self._read_table_id() @@ -129,11 +129,11 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap if BitGet(cols_bitmap, i) == 0: # This block is only executed when binlog_row_image = MINIMAL. # When binlog_row_image = FULL, this block does not execute. - self.none_sources[name] = 'cols_bitmap' + self.__none_sources[name] = 'cols_bitmap' return None if self._is_null(null_bitmap, null_bitmap_index): - self.none_sources[name] = 'null' + self.__none_sources[name] = 'null' return None if column.type == FIELD_TYPE.TINY: @@ -190,14 +190,14 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap elif column.type == FIELD_TYPE.DATETIME: ret = self.__read_datetime() if ret is None: - self.none_sources[name] = 'out of datetime range' + self.__none_sources[name] = 'out of datetime range' return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: ret = self.__read_date() if ret is None: - self.none_sources[name] = 'out of date range' + self.__none_sources[name] = 'out of date range' return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.fromtimestamp( @@ -207,7 +207,7 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap elif column.type == FIELD_TYPE.DATETIME2: ret = self.__read_datetime2(column) if ret is None: - self.none_sources[name] = 'out of datetime2 range' + self.__none_sources[name] = 'out of datetime2 range' return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) @@ -232,14 +232,14 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap # We read set columns as a bitmap telling us which options # are enabled bit_mask = self.packet.read_uint_by_size(column.size) - set_value = set( + ret = set( val for idx, val in enumerate(column.set_values) if bit_mask & 2 ** idx ) - if not set_value: - self.none_sources[column.name] = "empty set" + if not ret: + self.__none_sources[column.name] = "empty set" return None - return set_value + return ret elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) @@ -484,7 +484,7 @@ def _get_none_sources(self, column_data): if value is not None: continue - source = self.none_sources.get(column_name, "null") + source = self.__none_sources.get(column_name, "null") result[column_name] = source return result @@ -537,8 +537,11 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key], - "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") + none_source = row["none_sources"][key] if key in row["none_sources"] else "" + if none_source: + print("*", key, ":", row["values"][key], f"({none_source})") + else: + print("*", key, ":", row["values"][key]) class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -567,8 +570,11 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key], - "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") + none_source = row["none_sources"][key] if key in row["none_sources"] else "" + if none_source: + print("*", key, ":", row["values"][key], f"({none_source})") + else: + print("*", key, ":", row["values"][key]) class UpdateRowsEvent(RowsEvent): @@ -620,9 +626,7 @@ def _dump(self): else: after_value_info = row["after_values"][key] - print("*%s:%s=>%s" % (key, - before_value_info, - after_value_info)) + print(f"*{key}:{before_value_info}=>{after_value_info}") class TableMapEvent(BinLogEvent): """This event describes the structure of a table. From 5107c77a25e001c71e7299cc2e01bf3a99b82d10 Mon Sep 17 00:00:00 2001 From: heehehe Date: Fri, 22 Sep 2023 17:11:22 +0900 Subject: [PATCH 08/16] refactor: modify scripts by black --- pymysqlreplication/row_event.py | 31 +++++++++++------- pymysqlreplication/tests/test_basic.py | 44 ++++++++++++++++---------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 0198ec56..9fa78bab 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -137,11 +137,11 @@ def __read_values_name( if BitGet(cols_bitmap, i) == 0: # This block is only executed when binlog_row_image = MINIMAL. # When binlog_row_image = FULL, this block does not execute. - self.__none_sources[name] = 'cols_bitmap' + self.__none_sources[name] = "cols_bitmap" return None if self._is_null(null_bitmap, null_bitmap_index): - self.__none_sources[name] = 'null' + self.__none_sources[name] = "null" return None if column.type == FIELD_TYPE.TINY: @@ -187,14 +187,14 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME: ret = self.__read_datetime() if ret is None: - self.__none_sources[name] = 'out of datetime range' + self.__none_sources[name] = "out of datetime range" return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: ret = self.__read_date() if ret is None: - self.__none_sources[name] = 'out of date range' + self.__none_sources[name] = "out of date range" return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.utcfromtimestamp(self.packet.read_uint32()) @@ -203,7 +203,7 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME2: ret = self.__read_datetime2(column) if ret is None: - self.__none_sources[name] = 'out of datetime2 range' + self.__none_sources[name] = "out of datetime2 range" return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) @@ -547,12 +547,15 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - none_source = row["none_sources"][key] if key in row["none_sources"] else "" + none_source = ( + row["none_sources"][key] if key in row["none_sources"] else "" + ) if none_source: print("*", key, ":", row["values"][key], f"({none_source})") else: print("*", key, ":", row["values"][key]) + class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -580,7 +583,9 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - none_source = row["none_sources"][key] if key in row["none_sources"] else "" + none_source = ( + row["none_sources"][key] if key in row["none_sources"] else "" + ) if none_source: print("*", key, ":", row["values"][key], f"({none_source})") else: @@ -626,14 +631,18 @@ def _dump(self): print("--") for key in row["before_values"]: if key in row["before_none_source"]: - before_value_info = "%s(%s)" % (row["before_values"][key], - row["before_none_source"][key]) + before_value_info = "%s(%s)" % ( + row["before_values"][key], + row["before_none_source"][key], + ) else: before_value_info = row["before_values"][key] if key in row["after_none_source"]: - after_value_info = "%s(%s)" % (row["after_values"][key], - row["after_none_source"][key]) + after_value_info = "%s(%s)" % ( + row["after_values"][key], + row["after_none_source"][key], + ) else: after_value_info = row["after_values"][key] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 31312ee3..2bc74834 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -609,23 +609,31 @@ def test_get_none(self): self.database, server_id=1024, resume_stream=False, - only_events=[WriteRowsEvent] + only_events=[WriteRowsEvent], ) query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" self.execute(query) - query = "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + query = ( + "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + ) self.execute(query) self.execute("COMMIT") write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - self.assertEqual(write_rows_event.rows[0]['none_sources']['col1'], 'null') + self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") - self.execute("CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))") - self.execute("INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)") + self.execute( + "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" + ) + self.execute( + "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" + ) self.resetBinLog() - self.execute("UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL") + self.execute( + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL" + ) self.execute("COMMIT") self.assertIsInstance(self.stream.fetchone(), RotateEvent) @@ -639,16 +647,20 @@ def test_get_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - self.assertEqual(event.rows[0]["before_none_source"]["col0"], 'null') - self.assertEqual(event.rows[0]["before_none_source"]["col1"], 'null') - self.assertEqual(event.rows[0]["before_none_source"]["col2"], 'out of datetime2 range') - self.assertEqual(event.rows[0]["before_none_source"]["col3"], 'null') - self.assertEqual(event.rows[0]["before_none_source"]["col4"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col0"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col1"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col2"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col3"], 'out of date range') - self.assertEqual(event.rows[0]["after_none_source"]["col4"], 'empty set') + self.assertEqual(event.rows[0]["before_none_source"]["col0"], "null") + self.assertEqual(event.rows[0]["before_none_source"]["col1"], "null") + self.assertEqual( + event.rows[0]["before_none_source"]["col2"], "out of datetime2 range" + ) + self.assertEqual(event.rows[0]["before_none_source"]["col3"], "null") + self.assertEqual(event.rows[0]["before_none_source"]["col4"], "null") + self.assertEqual(event.rows[0]["after_none_source"]["col0"], "null") + self.assertEqual(event.rows[0]["after_none_source"]["col1"], "null") + self.assertEqual(event.rows[0]["after_none_source"]["col2"], "null") + self.assertEqual( + event.rows[0]["after_none_source"]["col3"], "out of date range" + ) + self.assertEqual(event.rows[0]["after_none_source"]["col4"], "empty set") class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From 26f036c9e0fc4f48ef3cfb6d146150acc004c895 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 23 Sep 2023 08:04:40 +0900 Subject: [PATCH 09/16] fix: check when none sources exist Since column schema has deleted, we cannot get any none sources when optional_meta_data is False. --- pymysqlreplication/row_event.py | 12 ++++----- pymysqlreplication/tests/test_basic.py | 34 ++++++++++++++------------ 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 9fa78bab..769f7f93 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -619,9 +619,9 @@ def _fetch_one_row(self): row = {} row["before_values"] = self._read_column_data(self.columns_present_bitmap) - row["before_none_source"] = self._get_none_sources(row["before_values"]) + row["before_none_sources"] = self._get_none_sources(row["before_values"]) row["after_values"] = self._read_column_data(self.columns_present_bitmap2) - row["after_none_source"] = self._get_none_sources(row["after_values"]) + row["after_none_sources"] = self._get_none_sources(row["after_values"]) return row def _dump(self): @@ -630,18 +630,18 @@ def _dump(self): for row in self.rows: print("--") for key in row["before_values"]: - if key in row["before_none_source"]: + if key in row["before_none_sources"]: before_value_info = "%s(%s)" % ( row["before_values"][key], - row["before_none_source"][key], + row["before_none_sources"][key], ) else: before_value_info = row["before_values"][key] - if key in row["after_none_source"]: + if key in row["after_none_sources"]: after_value_info = "%s(%s)" % ( row["after_values"][key], - row["after_none_source"][key], + row["after_none_sources"][key], ) else: after_value_info = row["after_values"][key] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 2bc74834..1019d71c 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -620,7 +620,9 @@ def test_get_none(self): self.execute("COMMIT") write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") + + if write_rows_event.rows[0].get("none_sources"): + self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") @@ -632,7 +634,7 @@ def test_get_none_invalid(self): ) self.resetBinLog() self.execute( - "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL" + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" ) self.execute("COMMIT") @@ -647,20 +649,20 @@ def test_get_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - self.assertEqual(event.rows[0]["before_none_source"]["col0"], "null") - self.assertEqual(event.rows[0]["before_none_source"]["col1"], "null") - self.assertEqual( - event.rows[0]["before_none_source"]["col2"], "out of datetime2 range" - ) - self.assertEqual(event.rows[0]["before_none_source"]["col3"], "null") - self.assertEqual(event.rows[0]["before_none_source"]["col4"], "null") - self.assertEqual(event.rows[0]["after_none_source"]["col0"], "null") - self.assertEqual(event.rows[0]["after_none_source"]["col1"], "null") - self.assertEqual(event.rows[0]["after_none_source"]["col2"], "null") - self.assertEqual( - event.rows[0]["after_none_source"]["col3"], "out of date range" - ) - self.assertEqual(event.rows[0]["after_none_source"]["col4"], "empty set") + + if event.rows[0].get("before_none_sources"): + self.assertEqual(event.rows[0]["before_none_sources"]["col0"], "null") + self.assertEqual(event.rows[0]["before_none_sources"]["col1"], "null") + self.assertEqual(event.rows[0]["before_none_sources"]["col2"], "out of datetime2 range") + self.assertEqual(event.rows[0]["before_none_sources"]["col3"], "null") + self.assertEqual(event.rows[0]["before_none_sources"]["col4"], "null") + + if event.rows[0].get("after_none_sources"): + self.assertEqual(event.rows[0]["after_none_sources"]["col0"], "null") + self.assertEqual(event.rows[0]["after_none_sources"]["col1"], "null") + self.assertEqual(event.rows[0]["after_none_sources"]["col2"], "null") + self.assertEqual(event.rows[0]["after_none_sources"]["col3"], "out of date range") + self.assertEqual(event.rows[0]["after_none_sources"]["col4"], "empty set") class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From ab1c5fbd3df4e7c6437be60a9afccffaab3a8a1c Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 23 Sep 2023 08:14:05 +0900 Subject: [PATCH 10/16] fix: do not add none_sources when column_name is None --- pymysqlreplication/row_event.py | 2 +- pymysqlreplication/tests/test_basic.py | 31 ++++++++++++++------------ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 769f7f93..cb091b4d 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -487,7 +487,7 @@ def __read_binary_slice(self, binary, start, size, data_length): def _get_none_sources(self, column_data): result = {} for column_name, value in column_data.items(): - if value is not None: + if (column_name is None) or (value is not None): continue source = self.__none_sources.get(column_name, "null") diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 1019d71c..1786e2e8 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -649,20 +649,23 @@ def test_get_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - - if event.rows[0].get("before_none_sources"): - self.assertEqual(event.rows[0]["before_none_sources"]["col0"], "null") - self.assertEqual(event.rows[0]["before_none_sources"]["col1"], "null") - self.assertEqual(event.rows[0]["before_none_sources"]["col2"], "out of datetime2 range") - self.assertEqual(event.rows[0]["before_none_sources"]["col3"], "null") - self.assertEqual(event.rows[0]["before_none_sources"]["col4"], "null") - - if event.rows[0].get("after_none_sources"): - self.assertEqual(event.rows[0]["after_none_sources"]["col0"], "null") - self.assertEqual(event.rows[0]["after_none_sources"]["col1"], "null") - self.assertEqual(event.rows[0]["after_none_sources"]["col2"], "null") - self.assertEqual(event.rows[0]["after_none_sources"]["col3"], "out of date range") - self.assertEqual(event.rows[0]["after_none_sources"]["col4"], "empty set") + + before_none_sources = event.rows[0].get("before_none_sources") + after_none_sources = event.rows[0].get("after_none_sources") + + if before_none_sources: + self.assertEqual(before_none_sources["col0"], "null") + self.assertEqual(before_none_sources["col1"], "null") + self.assertEqual(before_none_sources["col2"], "out of datetime2 range") + self.assertEqual(before_none_sources["col3"], "null") + self.assertEqual(before_none_sources["col4"], "null") + + if after_none_sources: + self.assertEqual(after_none_sources["col0"], "null") + self.assertEqual(after_none_sources["col1"], "null") + self.assertEqual(after_none_sources["col2"], "null") + self.assertEqual(after_none_sources["col3"], "out of date range") + self.assertEqual(after_none_sources["col4"], "empty set") class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From 6a506ebaa3c5e4a3788dcbc8b34c983ca7336996 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 23 Sep 2023 08:42:33 +0900 Subject: [PATCH 11/16] refactor: get none_sources as variable --- pymysqlreplication/tests/test_basic.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 1786e2e8..ed50073a 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -621,8 +621,9 @@ def test_get_none(self): write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - if write_rows_event.rows[0].get("none_sources"): - self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") + none_sources = write_rows_event.rows[0].get("none_sources") + if none_sources: + self.assertEqual(none_sources["col1"], "null") def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") From 154bf8882f5415e54a0555d1401ebc88c28b65df Mon Sep 17 00:00:00 2001 From: heehehe Date: Sun, 24 Sep 2023 00:38:01 +0900 Subject: [PATCH 12/16] feat: add TestColumnValueNoneSources testcase --- pymysqlreplication/tests/test_basic.py | 145 ++++++++++++++----------- 1 file changed, 80 insertions(+), 65 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index ed50073a..77b7c7e2 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -30,6 +30,7 @@ "TestStatementConnectionSetting", "TestRowsQueryLogEvents", "TestOptionalMetaData", + "TestColumnValueNoneSources", ] @@ -603,71 +604,6 @@ def create_binlog_packet_wrapper(pkt): self.assertEqual(binlog_event.event._is_event_valid, True) self.assertNotEqual(wrong_event.event._is_event_valid, True) - def test_get_none(self): - self.stream.close() - self.stream = BinLogStreamReader( - self.database, - server_id=1024, - resume_stream=False, - only_events=[WriteRowsEvent], - ) - query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" - self.execute(query) - query = ( - "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" - ) - self.execute(query) - self.execute("COMMIT") - write_rows_event = self.stream.fetchone() - self.assertIsInstance(write_rows_event, WriteRowsEvent) - - none_sources = write_rows_event.rows[0].get("none_sources") - if none_sources: - self.assertEqual(none_sources["col1"], "null") - - def test_get_none_invalid(self): - self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") - self.execute( - "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" - ) - self.execute( - "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" - ) - self.resetBinLog() - self.execute( - "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" - ) - self.execute("COMMIT") - - self.assertIsInstance(self.stream.fetchone(), RotateEvent) - self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - self.assertIsInstance(self.stream.fetchone(), QueryEvent) - self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - - event = self.stream.fetchone() - if self.isMySQL56AndMore(): - self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) - else: - self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) - self.assertIsInstance(event, UpdateRowsEvent) - - before_none_sources = event.rows[0].get("before_none_sources") - after_none_sources = event.rows[0].get("after_none_sources") - - if before_none_sources: - self.assertEqual(before_none_sources["col0"], "null") - self.assertEqual(before_none_sources["col1"], "null") - self.assertEqual(before_none_sources["col2"], "out of datetime2 range") - self.assertEqual(before_none_sources["col3"], "null") - self.assertEqual(before_none_sources["col4"], "null") - - if after_none_sources: - self.assertEqual(after_none_sources["col0"], "null") - self.assertEqual(after_none_sources["col1"], "null") - self.assertEqual(after_none_sources["col2"], "null") - self.assertEqual(after_none_sources["col3"], "out of date range") - self.assertEqual(after_none_sources["col4"], "empty set") - class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def setUp(self): @@ -1808,6 +1744,85 @@ def tearDown(self): super(TestOptionalMetaData, self).tearDown() +class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestColumnValueNoneSources, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(TableMapEvent,), + ) + if not self.isMySQL8014AndMore(): + self.skipTest("Mysql version is under 8.0.14 - pass TestOptionalMetaData") + self.execute("SET GLOBAL binlog_row_metadata='FULL';") + + def test_get_none(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + resume_stream=False, + only_events=[WriteRowsEvent], + ) + query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" + self.execute(query) + query = ( + "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + ) + self.execute(query) + self.execute("COMMIT") + write_rows_event = self.stream.fetchone() + self.assertIsInstance(write_rows_event, WriteRowsEvent) + + none_sources = write_rows_event.rows[0].get("none_sources") + if none_sources: + self.assertEqual(none_sources["col1"], "null") + + def test_get_none_invalid(self): + self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") + self.execute( + "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" + ) + self.execute( + "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" + ) + self.resetBinLog() + self.execute( + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" + ) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) + self.assertIsInstance(event, UpdateRowsEvent) + + before_none_sources = event.rows[0].get("before_none_sources") + after_none_sources = event.rows[0].get("after_none_sources") + + if before_none_sources: + self.assertEqual(before_none_sources["col0"], "null") + self.assertEqual(before_none_sources["col1"], "null") + self.assertEqual(before_none_sources["col2"], "out of datetime2 range") + self.assertEqual(before_none_sources["col3"], "null") + self.assertEqual(before_none_sources["col4"], "null") + + if after_none_sources: + self.assertEqual(after_none_sources["col0"], "null") + self.assertEqual(after_none_sources["col1"], "null") + self.assertEqual(after_none_sources["col2"], "null") + self.assertEqual(after_none_sources["col3"], "out of date range") + self.assertEqual(after_none_sources["col4"], "empty set") + + if __name__ == "__main__": import unittest From 4c797a14cc95cd3b0456c7ceccbb965996ebc617 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sun, 24 Sep 2023 00:42:55 +0900 Subject: [PATCH 13/16] feat: inherit PyMySQLReplicationVersion8TestCase in TestColumnValueNoneSources --- pymysqlreplication/tests/test_basic.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 050776bf..cb5688cc 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1737,7 +1737,7 @@ def tearDown(self): super(TestOptionalMetaData, self).tearDown() -class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): +class TestColumnValueNoneSources(base.PyMySQLReplicationVersion8TestCase): def setUp(self): super(TestColumnValueNoneSources, self).setUp() self.stream.close() @@ -1747,7 +1747,9 @@ def setUp(self): only_events=(TableMapEvent,), ) if not self.isMySQL8014AndMore(): - self.skipTest("Mysql version is under 8.0.14 - pass TestOptionalMetaData") + self.skipTest( + "Mysql version is under 8.0.14 - pass TestColumnValueNoneSources" + ) self.execute("SET GLOBAL binlog_row_metadata='FULL';") def test_get_none(self): @@ -1788,6 +1790,8 @@ def test_get_none_invalid(self): self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent) + self.assertIsInstance(self.stream.fetchone(), GtidEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) self.assertIsInstance(self.stream.fetchone(), TableMapEvent) From be89cc511bca9f87ad514d9250564159a497e671 Mon Sep 17 00:00:00 2001 From: heehehe Date: Fri, 29 Sep 2023 23:07:22 +0900 Subject: [PATCH 14/16] feat: add constants/NONE_SOURCE.py --- pymysqlreplication/constants/NONE_SOURCE.py | 6 ++++++ pymysqlreplication/row_event.py | 17 +++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) create mode 100644 pymysqlreplication/constants/NONE_SOURCE.py diff --git a/pymysqlreplication/constants/NONE_SOURCE.py b/pymysqlreplication/constants/NONE_SOURCE.py new file mode 100644 index 00000000..68391003 --- /dev/null +++ b/pymysqlreplication/constants/NONE_SOURCE.py @@ -0,0 +1,6 @@ +NULL = "null" +OUT_OF_DATE_RANGE = "out of date range" +OUT_OF_DATETIME_RANGE = "out of datetime range" +OUT_OF_DATETIME2_RANGE = "out of datetime2 range" +EMPTY_SET = "empty set" +COLS_BITMAP = "cols bitmap" diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index d3fd2ed0..83fd2d25 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -9,6 +9,7 @@ from .constants import FIELD_TYPE from .constants import BINLOG from .constants import CHARSET +from .constants import NONE_SOURCE from .column import Column from .table import Table from .bitmap import BitCount, BitGet @@ -135,11 +136,11 @@ def __read_values_name( if BitGet(cols_bitmap, i) == 0: # This block is only executed when binlog_row_image = MINIMAL. # When binlog_row_image = FULL, this block does not execute. - self.__none_sources[name] = "cols_bitmap" + self.__none_sources[name] = NONE_SOURCE.COLS_BITMAP return None if self._is_null(null_bitmap, null_bitmap_index): - self.__none_sources[name] = "null" + self.__none_sources[name] = NONE_SOURCE.NULL return None if column.type == FIELD_TYPE.TINY: @@ -185,14 +186,14 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME: ret = self.__read_datetime() if ret is None: - self.__none_sources[name] = "out of datetime range" + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME_RANGE return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: ret = self.__read_date() if ret is None: - self.__none_sources[name] = "out of date range" + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATE_RANGE return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.utcfromtimestamp(self.packet.read_uint32()) @@ -201,7 +202,7 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME2: ret = self.__read_datetime2(column) if ret is None: - self.__none_sources[name] = "out of datetime2 range" + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME2_RANGE return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) @@ -232,10 +233,10 @@ def __read_values_name( if bit_mask & (1 << idx) } if not ret: - self.__none_sources[column.name] = "empty set" + self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET return None return ret - self.__none_sources[column.name] = "empty set" + self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET return None elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) @@ -244,7 +245,7 @@ def __read_values_name( elif column.type == FIELD_TYPE.JSON: return self.packet.read_binary_json(column.length_size) else: - raise NotImplementedError("Unknown MySQL column type: %d" % (column.type)) + raise NotImplementedError("Unknown MySQL column type: %d" % column.type) def __add_fsp_to_time(self, time, column): """Read and add the fractional part of time From 97c9e2b1ebadf0705901f6790f94082a72ac3ff1 Mon Sep 17 00:00:00 2001 From: sean Date: Fri, 13 Oct 2023 15:53:14 +0900 Subject: [PATCH 15/16] fix last order column only when DML When we do not know columnInformation values dictionary key is always null so overwrite value Object. Thus always values dictionary has last order column object value To avoid user confusion, it is marked and displayed as UNKNOWN_COL. --- pymysqlreplication/row_event.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 1818c149..a53596fd 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -119,7 +119,13 @@ def _read_column_data(self, cols_bitmap): column = self.columns[i] name = self.table_map[self.table_id].columns[i].name unsigned = self.table_map[self.table_id].columns[i].unsigned - + if not name: + # If you are using mysql 5.7 or mysql 8, but binlog_row_metadata = "MINIMAL", + # we do not know the column information. + # If you know column information, + # mysql 5.7 version Users Use Under 1.0 version + # mysql 8.0 version Users Set binlog_row_metadata = "FULL" + name = "UNKNOWN_COL" + str(i) values[name] = self.__read_values_name( column, null_bitmap, From f1e9df25c5a82b6b8104458486eae519d1ffe777 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 14 Oct 2023 21:34:58 +0900 Subject: [PATCH 16/16] fix: remove deprecated test class --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 8b1bec37..c1861a17 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1755,7 +1755,7 @@ def tearDown(self): super(TestOptionalMetaData, self).tearDown() -class TestColumnValueNoneSources(base.PyMySQLReplicationVersion8TestCase): +class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): def setUp(self): super(TestColumnValueNoneSources, self).setUp() self.stream.close()