diff --git a/.gitignore b/.gitignore index eb18180..c44086a 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,7 @@ _build # Pyenv .python-version + +# Tinibird +bl-* +out-* \ No newline at end of file diff --git a/clickhouse_mysql/event/event.py b/clickhouse_mysql/event/event.py index 836f3d2..e018e57 100644 --- a/clickhouse_mysql/event/event.py +++ b/clickhouse_mysql/event/event.py @@ -28,6 +28,9 @@ class Event(object): # table name table = None + # primary key + primary_key = None + # /path/to/csv/file.csv filename = None diff --git a/clickhouse_mysql/pool/bbpool.py b/clickhouse_mysql/pool/bbpool.py index f15c268..c36265b 100644 --- a/clickhouse_mysql/pool/bbpool.py +++ b/clickhouse_mysql/pool/bbpool.py @@ -6,6 +6,7 @@ from clickhouse_mysql.pool.pool import Pool from clickhouse_mysql.objectbuilder import ObjectBuilder +from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent # Buckets Belts' Index Generator @@ -149,7 +150,18 @@ def rotate_belt(self, belt_index, flush=False): # time to flush data for specified key #self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)]) writer = self.writer_builder.new() - writer.insert(self.belts[belt_index].pop()) + item = self.belts[belt_index].pop() + # process event based on its type + if isinstance(item[0].pymysqlreplication_event, WriteRowsEvent): + writer.insert(item) + elif isinstance(item[0].pymysqlreplication_event, DeleteRowsEvent): + writer.delete(item) + elif isinstance(item[0].pymysqlreplication_event, UpdateRowsEvent): + writer.update(item) + else: + # skip other unhandled events + pass + # writer.insert(self.belts[belt_index].pop()) writer.close() writer.push() writer.destroy() diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index e75bc34..0b5d0c3 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -11,7 +11,6 @@ class Pumper(object): writer = None def __init__(self, reader=None, writer=None): - self.reader = reader self.writer = writer @@ -19,6 +18,8 @@ def __init__(self, reader=None, writer=None): # subscribe on reader's event notifications self.reader.subscribe({ 'WriteRowsEvent': self.write_rows_event, + 'UpdateRowsEvent': self.update_rows_event, + 'DeleteRowsEvent': self.delete_rows_event, # 'WriteRowsEvent.EachRow': self.write_rows_event_each_row, 'ReaderIdleEvent': self.reader_idle_event, }) @@ -46,5 +47,20 @@ def reader_idle_event(self): """ self.writer.flush() + def delete_rows_event(self, event=None): + """ + DeleteRowsEvent handler + :param event: + """ + self.writer.delete_row(event) + + def update_rows_event(self, event=None): + """ + UpdateRowsEvent handler + :param event: + """ + self.writer.update(event) + + if __name__ == '__main__': print("pumper") diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index f21e8b1..659ab77 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -12,7 +12,7 @@ from clickhouse_mysql.event.event import Event from clickhouse_mysql.tableprocessor import TableProcessor from clickhouse_mysql.util import Util -#from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent +from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent class MySQLReader(Reader): @@ -56,13 +56,15 @@ def __init__( self.server_id = server_id self.log_file = log_file self.log_pos = log_pos - self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes)) else TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes)) + self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables, + tables_prefixes)) else TableProcessor.extract_dbs( + schemas, Util.join_lists(tables, tables_prefixes)) self.tables = None if tables is None else TableProcessor.extract_tables(tables) self.tables_prefixes = None if tables_prefixes is None else TableProcessor.extract_tables(tables_prefixes) self.blocking = blocking self.resume_stream = resume_stream self.nice_pause = nice_pause - self.binlog_position_file=binlog_position_file + self.binlog_position_file = binlog_position_file logging.info("raw dbs list. len()=%d", 0 if schemas is None else len(schemas)) if schemas is not None: @@ -86,7 +88,8 @@ def __init__( if tables_prefixes is not None: for table in tables_prefixes: logging.info(table) - logging.info("normalised tables-prefixes list. len()=%d", 0 if self.tables_prefixes is None else len(self.tables_prefixes)) + logging.info("normalised tables-prefixes list. len()=%d", + 0 if self.tables_prefixes is None else len(self.tables_prefixes)) if self.tables_prefixes is not None: for table in self.tables_prefixes: logging.info(table) @@ -101,21 +104,21 @@ def __init__( # we are interested in reading CH-repeatable events only only_events=[ # Possible events - #BeginLoadQueryEvent, + # BeginLoadQueryEvent, DeleteRowsEvent, - #ExecuteLoadQueryEvent, - #FormatDescriptionEvent, - #GtidEvent, - #HeartbeatLogEvent, - #IntvarEvent - #NotImplementedEvent, - #QueryEvent, - #RotateEvent, - #StopEvent, - #TableMapEvent, + # ExecuteLoadQueryEvent, + # FormatDescriptionEvent, + # GtidEvent, + # HeartbeatLogEvent, + # IntvarEvent + # NotImplementedEvent, + # QueryEvent, + # RotateEvent, + # StopEvent, + # TableMapEvent, UpdateRowsEvent, WriteRowsEvent, - #XidEvent, + # XidEvent, ], only_schemas=self.schemas, # in case we have any prefixes - this means we need to listen to all tables within specified schemas @@ -245,6 +248,9 @@ def process_write_rows_event(self, mysql_event): :param mysql_event: WriteRowsEvent instance :return: """ + + logging.debug("Received insert event for table: " + mysql_event.table) + if self.tables_prefixes: # we have prefixes specified # need to find whether current event is produced by table in 'looking-into-tables' list @@ -294,10 +300,81 @@ def process_write_rows_event(self, mysql_event): self.stat_write_rows_event_finalyse() def process_update_rows_event(self, mysql_event): - logging.info("Skip update rows") + + logging.debug("Received update event for table: " + mysql_event.table + " Schema: " + mysql_event.schema) + + # for row in mysql_event.rows: + # for key in row['before_values']: + # logging.debug("\t *%s:%s=>%s" % (key, row["before_values"][key], row["after_values"][key])) + + if self.tables_prefixes: + # we have prefixes specified + # need to find whether current event is produced by table in 'looking-into-tables' list + if not self.is_table_listened(mysql_event.table): + # this table is not listened + # processing is over - just skip event + return + + # statistics + #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + + if self.subscribers('UpdateRowsEvent'): + # dispatch event to subscribers + + # statistics + #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + + # dispatch Event + event = Event() + event.schema = mysql_event.schema + event.table = mysql_event.table + event.pymysqlreplication_event = mysql_event + + #self.process_first_event(event=event) + self.notify('UpdateRowsEvent', event=event) + + # self.stat_write_rows_event_finalyse() + + # logging.info("Skip update rows") def process_delete_rows_event(self, mysql_event): - logging.info("Skip delete rows") + logging.debug("Received delete event for table: " + mysql_event.table) + + """ + for row in mysql_event.rows: + for key in row['values']: + logging.debug("\t *", key, ":", row["values"][key]) + """ + + if self.tables_prefixes: + # we have prefixes specified + # need to find whether current event is produced by table in 'looking-into-tables' list + if not self.is_table_listened(mysql_event.table): + # this table is not listened + # processing is over - just skip event + return + + # statistics + #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + + if self.subscribers('DeleteRowsEvent'): + # dispatch event to subscribers + + # statistics + #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + + # dispatch Event + event = Event() + event.schema = mysql_event.schema + event.table = mysql_event.table + event.pymysqlreplication_event = mysql_event + + self.process_first_event(event=event) + self.notify('DeleteRowsEvent', event=event) + + # self.stat_write_rows_event_finalyse() + + # logging.info("Skip delete rows") def process_binlog_position(self, file, pos): if self.binlog_position_file: @@ -321,14 +398,16 @@ def read(self): self.stat_init_fetch_loop() try: - logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef") + logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str( + self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef") # fetch available events from MySQL for mysql_event in self.binlog_stream: # new event has come # check what to do with it - logging.debug('Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos)) + logging.debug( + 'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos)) # process event based on its type if isinstance(mysql_event, WriteRowsEvent): @@ -393,6 +472,7 @@ def read(self): logging.info('end %d', end_timestamp) logging.info('len %d', end_timestamp - self.start_timestamp) + if __name__ == '__main__': connection_settings = { 'host': '127.0.0.1', diff --git a/clickhouse_mysql/reader/reader.py b/clickhouse_mysql/reader/reader.py index 379cf5f..c4f5246 100644 --- a/clickhouse_mysql/reader/reader.py +++ b/clickhouse_mysql/reader/reader.py @@ -18,6 +18,13 @@ class Reader(Observable): # called when Reader has no data to read 'ReaderIdleEvent': [], + + # called on each DeleteRowsEvent + 'DeleteRowsEvent': [], + + # called on each UpdateRowsEvent + 'UpdateRowsEvent': [], + } def __init__(self, converter=None, callbacks={}): diff --git a/clickhouse_mysql/writer/chcsvwriter.py b/clickhouse_mysql/writer/chcsvwriter.py index caea56e..88571c3 100644 --- a/clickhouse_mysql/writer/chcsvwriter.py +++ b/clickhouse_mysql/writer/chcsvwriter.py @@ -33,7 +33,9 @@ def __init__( dst_schema += "_all" if dst_distribute and dst_table is not None: dst_table += "_all" - logging.info("CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, dst_table)) + logging.info( + "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, + dst_table)) self.host = connection_settings['host'] self.port = connection_settings['port'] self.user = connection_settings['user'] @@ -98,3 +100,123 @@ def insert(self, event_or_events=None): os.system(bash) pass + + def deleteRow(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to delete. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s delete %d rows', __class__, len(events)) + + for event in events: + schema = self.dst_schema if self.dst_schema else event.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + else: + table = self.dst_table if self.dst_table else event.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + schema, + table, + ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ) + + choptions = "" + if self.host: + choptions += " --host=" + shlex.quote(self.host) + if self.port: + choptions += " --port=" + str(self.port) + if self.user: + choptions += " --user=" + shlex.quote(self.user) + if self.password: + choptions += " --password=" + shlex.quote(self.password) + bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + event.filename, + choptions, + sql, + ) + + logging.info('starting clickhouse-client process for delete operation') + logging.debug('starting %s', bash) + os.system(bash) + + pass + + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + logging.info('starting clickhouse-client process for update operation') + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to update. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s update %d rows', __class__, len(events)) + + for event in events: + schema = self.dst_schema if self.dst_schema else event.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + else: + table = self.dst_table if self.dst_table else event.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( + schema, + table, + ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ) + + sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {3}'.format( + schema, + table, + ', '.join(map(lambda column, value: '`%s`=`%s' % column, event.fieldnames, event.fieldnames)) + ) + + choptions = "" + if self.host: + choptions += " --host=" + shlex.quote(self.host) + if self.port: + choptions += " --port=" + str(self.port) + if self.user: + choptions += " --user=" + shlex.quote(self.user) + if self.password: + choptions += " --password=" + shlex.quote(self.password) + bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + event.filename, + choptions, + sql, + ) + + logging.info('starting clickhouse-client process') + logging.debug('starting %s', bash) + os.system(bash) + + pass diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index 587d48f..6cca8ef 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -10,7 +10,7 @@ from clickhouse_mysql.writer.writer import Writer from clickhouse_mysql.tableprocessor import TableProcessor -from clickhouse_mysql.event.event import Event +import datetime class CHWriter(Writer): @@ -108,10 +108,219 @@ def insert(self, event_or_events=None): logging.critical('QUERY FAILED') logging.critical('ex={}'.format(ex)) logging.critical('sql={}'.format(sql)) - sys.exit(0) + logging.critical('data={}'.format(rows)) + # sys.exit(0) # all DONE + def delete_row(self, event_or_events): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + logging.debug("Delete CHWriter") + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to insert. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s delete %d event(s)', __class__, len(events)) + + # verify and converts events and consolidate converted rows from all events into one batch + + rows = [] + event_converted = None + pk = None + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), + __class__) + continue # for event + + event_converted = self.convert(event) + pk = event_converted.pymysqlreplication_event.primary_key + for row in event_converted: + for key in row.keys(): + # we need to convert Decimal value to str value for suitable for table structure + if type(row[key]) == Decimal: + row[key] = str(row[key]) + rows.append(row) + + logging.debug('class:%s delete %d row(s)', __class__, len(rows)) + + # determine target schema.table + + schema = self.dst_schema if self.dst_schema else event_converted.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + else: + table = self.dst_table if self.dst_table else event_converted.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, + self.dst_table)) + + # and DELETE converted rows + + sql = '' + # try: + # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + # schema, + # table, + # ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + # ) + # self.client.execute(sql, rows) + + sql = '' + try: + sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2}'.format( + schema, + table, + ' and '.join(filter(None, map( + lambda column, value: "" if column != pk else self.get_data_format(column, value), + row.keys(), row.values()))) + ) + + self.client.execute(sql) + + except Exception as ex: + logging.critical('QUERY FAILED') + logging.critical('ex={}'.format(ex)) + logging.critical('sql={}'.format(sql)) + # sys.exit(0) + + # all DONE + + """ + Get string format pattern for update and delete operations + """ + def get_data_format(self, column, value): + t = type(value) + if t == str: + return "`%s`='%s'" % (column, value.replace("'", "\\'")) + elif t is datetime.datetime: + return "`%s`='%s'" % (column, value) + else: + # int, float + return "`%s`=%s" % (column, value) + + def update(self, event_or_events): + # event_or_events = [ + # event: { + # row: { + # 'before_values': {'id': 3, 'a': 3}, + # 'after_values': {'id': 3, 'a': 2} + # } + # }, + # event: { + # row: { + # 'before_values': {'id': 2, 'a': 3}, + # 'after_values': {'id': 2, 'a': 2} + # } + # }, + # ] + + logging.debug("Update CHWriter") + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to update. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s update %d event(s)', __class__, len(events)) + + # verify and converts events and consolidate converted rows from all events into one batch + + rows = [] + event_converted = None + pk = None + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), + __class__) + continue # for event + + event_converted = self.convert(event) + pk = [event_converted.pymysqlreplication_event.primary_key] + if event_converted.table == 'assets': + pk.append('name') + pk.append('title_id') + pk.append('company_id') + pk.append('asset_type_enumeration_entry_id') + for row in event_converted.pymysqlreplication_event.rows: + for key in row['after_values'].keys(): + # we need to convert Decimal value to str value for suitable for table structure + if type(row['after_values'][key]) == Decimal: + row['after_values'][key] = str(row['after_values'][key]) + rows.append(row) + + logging.debug('class:%s update %d row(s)', __class__, len(rows)) + + # determine target schema.table + + schema = self.dst_schema if self.dst_schema else event_converted.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + else: + table = self.dst_table if self.dst_table else event_converted.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, + self.dst_table)) + + # and UPDATE converted rows + # improve performance updating just those fields which have actually changed + updated_values = dict(set(row['after_values'].items()).difference(set(row['before_values'].items()))) + + sql = '' + try: + # sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( + # schema, + # table, + # ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), + # "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + # ' and '.join(filter(None, map( + # lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), + # row['before_values'].keys(), row['before_values'].values()))) + # ) + + sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( + schema, + table, + ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), updated_values.keys(), updated_values.values()))), + "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + ' and '.join(filter(None, map( + lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), + row['before_values'].keys(), row['before_values'].values()))) + ) + + logging.debug("SQL UPDATE: \n\n " + sql + "\n\n") + + self.client.execute(sql) + except Exception as ex: + logging.critical('QUERY FAILED') + logging.critical('ex={}'.format(ex)) + logging.critical('sql={}'.format(sql)) + # sys.exit(0) + + # all DONE + + + if __name__ == '__main__': connection_settings = { diff --git a/clickhouse_mysql/writer/csvwriter.py b/clickhouse_mysql/writer/csvwriter.py index 4ff9081..18cfda6 100644 --- a/clickhouse_mysql/writer/csvwriter.py +++ b/clickhouse_mysql/writer/csvwriter.py @@ -135,6 +135,18 @@ def insert(self, event_or_events): for row in event: self.writer.writerow(self.convert(row)) + def deleteRow(self, event_or_events): + """ + TODO + """ + logging.debug("Delete CSV Writer") + + def update(self, event_or_events): + """ + TODO + """ + logging.debug("Update CSV Writer") + def push(self): if not self.next_writer_builder or not self.fieldnames: return diff --git a/clickhouse_mysql/writer/poolwriter.py b/clickhouse_mysql/writer/poolwriter.py index b49e011..129f05a 100644 --- a/clickhouse_mysql/writer/poolwriter.py +++ b/clickhouse_mysql/writer/poolwriter.py @@ -37,6 +37,18 @@ def insert(self, event_or_events): logging.debug('class:%s insert', __class__) self.pool.insert(event_or_events) + + def delete(self, event_or_events): + """Insert delete data into Pool""" + logging.debug('class:%s delete', __class__) + self.pool.insert(event_or_events) + + def update(self, event_or_events): + """Insert update data into Pool""" + logging.debug('class:%s update', __class__) + self.pool.insert(event_or_events) + + def flush(self): self.pool.flush() diff --git a/clickhouse_mysql/writer/processwriter.py b/clickhouse_mysql/writer/processwriter.py index 226b72b..8177345 100644 --- a/clickhouse_mysql/writer/processwriter.py +++ b/clickhouse_mysql/writer/processwriter.py @@ -35,6 +35,28 @@ def process(self, event_or_events=None): writer.destroy() logging.debug('class:%s process() done', __class__) + def processDelete(self, event_or_events=None): + """Separate process body to be run""" + + logging.debug('class:%s process()', __class__) + writer = self.next_writer_builder.get() + writer.deleteRow(event_or_events) + writer.close() + writer.push() + writer.destroy() + logging.debug('class:%s process() done', __class__) + + def processUpdate(self, event_or_events=None): + """Separate process body to be run""" + + logging.debug('class:%s process()', __class__) + writer = self.next_writer_builder.get() + writer.delete(event_or_events) + writer.close() + writer.push() + writer.destroy() + logging.debug('class:%s process() done', __class__) + def insert(self, event_or_events=None): # event_or_events = [ # event: { @@ -57,6 +79,50 @@ def insert(self, event_or_events=None): logging.debug('class:%s insert done', __class__) pass + def delete(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # start separated process with event_or_events to be inserted + + logging.debug('class:%s delete', __class__) + process = mp.Process(target=self.processDelete, args=(event_or_events,)) + + logging.debug('class:%s delete.process.start()', __class__) + process.start() + + #process.join() + logging.debug('class:%s delete done', __class__) + pass + + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # start separated process with event_or_events to be inserted + + logging.debug('class:%s update', __class__) + process = mp.Process(target=self.processUpdate, args=(event_or_events,)) + + logging.debug('class:%s update.process.start()', __class__) + process.start() + + #process.join() + logging.debug('class:%s update done', __class__) + pass + def flush(self): pass diff --git a/clickhouse_mysql/writer/writer.py b/clickhouse_mysql/writer/writer.py index 11f788c..1bfaeb0 100644 --- a/clickhouse_mysql/writer/writer.py +++ b/clickhouse_mysql/writer/writer.py @@ -55,6 +55,34 @@ def insert(self, event_or_events=None): # ] pass + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: { + # 'before_values': {'id': 3, 'a': 3}, + # 'after_values': {'id': 3, 'a': 2} + # } + # }, + # event: { + # row: { + # 'before_values': {'id': 2, 'a': 3}, + # 'after_values': {'id': 2, 'a': 2} + # } + # }, + # ] + pass + + def delete_row(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + pass + def flush(self): pass diff --git a/init/dump-tables.sh b/init/dump-tables.sh new file mode 100755 index 0000000..25eb02b --- /dev/null +++ b/init/dump-tables.sh @@ -0,0 +1,132 @@ +#!/bin/bash + +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit -1 +fi + +DUMP_PATH=$1 + +source tb_tables.config + +########### +### titles +########### + +echo "Dumping titles" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction titles > $DUMP_PATH/titles.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/titles-insert-tb.sql +cat $DUMP_PATH/titles.sql | grep "INSERT INTO" >> $DUMP_PATH/titles-insert-tb.sql +sed -i 's/INSERT INTO `titles` VALUES/INSERT INTO `t_8a192b9c7ece4572a5a2fc9858e26d5c` (`id`, `name`, `licensor_id`, `created_at`, `updated_at`, `company_id`, `series_id`, `external_id`, `poster_file_name`, `poster_content_type`, `poster_file_size`, `poster_updated_at`, `episode_number`, `dirty_episode_number`, `rights_count`, `blackouts_count`, `denied_rights_count`, `images_count`, `cover_image_id`, `title_type`, `metadata_updated_at`, `promoted_content_id`, `promoted_content_type`, `soft_destroyed`, `credits_count`, `translated_attributes`, `rules_count`, `discarded`, `episode_reference_id`, `brand_id`) VALUES/g' $DUMP_PATH/titles-insert-tb.sql + +echo "Truncate titles table" +echo "truncate $TB_DATABASE.$TITLES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading titles into CH" +cat $DUMP_PATH/titles-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Titles loaded" + +read -p "Press enter to continue" + +########### +### assets +########### + +echo "Dumping assets" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction assets > $DUMP_PATH/assets.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/assets-insert-tb.sql +cat $DUMP_PATH/assets.sql | grep "INSERT INTO" >> $DUMP_PATH/assets-insert-tb.sql +sed -i 's/INSERT INTO `assets` VALUES/INSERT INTO `t_4c03fdeb4e3e4db784ead40b06ec8617` (`id`, `name`, `title_id`, `created_at`, `updated_at`, `description`, `runtime_in_milliseconds`, `metadata_updated_at`, `company_id`, `asset_type_enumeration_entry_id`, `external_id`) VALUES/g' $DUMP_PATH/assets-insert-tb.sql + +echo "Truncate assets table" +echo "truncate $TB_DATABASE.$ASSETS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading assets into CH" +cat $DUMP_PATH/assets-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Assets loaded" + +read -p "Press enter to continue" + +####################### +### Collection-entries +####################### + +echo "Dumping collection-entries" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction collection_entries > $DUMP_PATH/collections.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/collections-insert-tb.sql +cat $DUMP_PATH/collections.sql | grep "INSERT INTO" >> $DUMP_PATH/collections-insert-tb.sql +sed -i 's/INSERT INTO `collection_entries` VALUES/INSERT INTO `t_3dd7b323438943c687bd4e13a0e181a1` (`collection_id`, `title_id`, `id`, `position`) VALUES/g' $DUMP_PATH/collections-insert-tb.sql + +echo "Truncate collections table" +echo "truncate $TB_DATABASE.$COLLECTIONS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading collection-entries into CH" +cat $DUMP_PATH/collections-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Collection-entries loaded" + +read -p "Press enter to continue" + +############## +### Features +############## + +echo "Dumping features" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction features > $DUMP_PATH/features.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/features-insert-tb.sql +read -p "Press enter to continue use" +cat $DUMP_PATH/features.sql | grep "INSERT INTO" >> $DUMP_PATH/features-insert-tb.sql +read -p "Press enter to continue insert" +sed -i 's/INSERT INTO `features` VALUES/INSERT INTO `t_23f41723e0eb480088cbb1c8f890a38c` (`id`, `name`, `enabled`, `company_id`, `created_at`, `updated_at`) VALUES/g' $DUMP_PATH/features-insert-tb.sql +read -p "Press enter to continue sed" +echo "Truncate features table" +echo "truncate $TB_DATABASE.$FEATURES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading features into CH" +cat $DUMP_PATH/features-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Features loaded" + +read -p "Press enter to continue" + +############## +### Platforms +############## + +echo "Dumping platforms" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction platforms > $DUMP_PATH/platforms.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/platforms-insert-tb.sql +cat $DUMP_PATH/platforms.sql | grep "INSERT INTO" >> $DUMP_PATH/platforms-insert-tb.sql +sed -i 's/INSERT INTO `platforms` VALUES/INSERT INTO `t_83f598dc74254de68216a7c7735caffb` (`id`, `company_id`, `name`, `created_at`, `updated_at`, `sequence_service_titles_url`, `_deprecated_sequence_template_name`, `_deprecated_owned`, `sequence_template_url`, `metadata_constant_name`, `outlet_id`, `automatic_publication_enabled`, `metadata_updated_at`, `granted_categories`, `external_id`, `timezone`) VALUES/g' $DUMP_PATH/platforms-insert-tb.sql + +echo "Truncate platforms table" +echo "truncate $TB_DATABASE.$PLATFORMS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading platforms into CH" +cat $DUMP_PATH/platforms-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Platforms loaded" + +read -p "Press enter to continue" + +################# +### Schedulings +################# + +echo "Dumping schedulings" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction schedulings > $DUMP_PATH/schedulings.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/schedulings-insert-tb.sql +cat $DUMP_PATH/schedulings.sql | grep "INSERT INTO" >> $DUMP_PATH/schedulings-insert-tb.sql +sed -i 's/INSERT INTO `schedulings` VALUES/INSERT INTO `t_b5e541d4e73d4301ba736c427bd667c5` (`id`, `title_id`, `put_up`, `take_down`, `created_at`, `updated_at`, `cleared`, `platform_id`, `rule_id`, `workflow_offset`, `sequence_asset_url`, `sequence_asset_name`, `workflow_sent`, `status`, `asset_id`, `rule_asset_id`, `title_group_id`, `workflow_web_url`, `_deprecated_publication_status`, `published_at`, `_prev_put_up`, `_prev_take_down`, `_pending_simulation`, `workflow_template_url`, `original_draft_scheduling_id`, `playlist_id`, `updating_playlist`, `workflow_job_url`, `workflow_status`, `conflict_types`, `metadata_updated_at`, `company_id`, `cached_title_episode_number`, `metadata_status`, `publication_status`, `publication_status_updated_at`, `metadata_status_updated_at`, `external_id`, `disabled_at`, `scheduling_type`, `overridden_rule_attributes`, `update_in_progress`, `metadata_error_digest`) VALUES/g' $DUMP_PATH/schedulings-insert-tb.sql + +echo "Truncate schedulings table" +echo "truncate $TB_DATABASE.$SCHEDULINGS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading schedulings into CH" +cat $DUMP_PATH/schedulings-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Schedulings loaded" + +echo "Process finished!" \ No newline at end of file diff --git a/init/first-processing.sh b/init/first-processing.sh new file mode 100755 index 0000000..7daa44c --- /dev/null +++ b/init/first-processing.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit -1 +fi + +echo "Generate binlog timelog" +./run-listener.sh +./stop-listeners.sh + +echo "Generating dumps and loading data ..." +./dump-tables.sh $1 + +echo "Starting listeners" +./run-listener.sh + +echo "Done!" \ No newline at end of file diff --git a/init/run-listeners.sh b/init/run-listeners.sh new file mode 100755 index 0000000..21a7502 --- /dev/null +++ b/init/run-listeners.sh @@ -0,0 +1,130 @@ +#!/bin/bash + +LOG_LEVEL=debug + +SOURCE_HOST=127.0.0.1 +SOURCE_PORT=3307 +DESTINATION_HOST=127.0.0.1 +SOURCE_USER=tinybird +SOURCE_PASSWD=goo7eu9AeS3i + +PID_LOG_FILE=/tmp/listeners-pid.log + +source tb_tables.config + +############################################################ +# Run a process to synchronize MySQL table using binlog. +# +# $1 --> Source schema +# $2 --> Source table +# $3 --> Destination schema +# $4 --> Destination table +# $5 --> Server id +# $6 --> Log file +# $7 --> Binlog position file +# +############################################################# +function run_listener() { + + (clickhouse-mysql --src-server-id=$5 --src-wait --src-resume --binlog-position-file $7 --nice-pause=1 --src-host=$SOURCE_HOST --src-port=$SOURCE_PORT --src-user=$SOURCE_USER --src-password=$SOURCE_PASSWD --src-schemas=$1 --src-tables=$2 --dst-host=$DESTINATION_HOST --dst-schema=$3 --dst-table=$4 --log-level=$LOG_LEVEL --pump-data 2>> $6)& + +} + +function run_schedulings() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "schedulings" "$TB_DATABASE" "$SCHEDULINGS_TABLE" "91" "out-schedulings.log" "bl-pos-schedulings" + echo $! > $PID_LOG_FILE + +} + +function run_platforms() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "platforms" "$TB_DATABASE" "$PLATFORMS_TABLE" "92" "out-platforms.log" "bl-pos-platforms" + echo $! >> $PID_LOG_FILE + +} + +function run_titles() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "titles" "$TB_DATABASE" "$TITLES_TABLE" "93" "out-titles.log" "bl-pos-titles" + echo $! >> $PID_LOG_FILE +} + +function run_assets() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "assets" "$TB_DATABASE" "$ASSETS_TABLE" "94" "out-assets.log" "bl-pos-assets" + echo $! >> $PID_LOG_FILE +} + +function run_features() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "features" "$TB_DATABASE" "$FEATURES_TABLE" "95" "out-features.log" "bl-pos-features" + echo $! >> $PID_LOG_FILE +} + +function run_collections() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "collection_entries" "$TB_DATABASE" "$COLLECTIONS_TABLE" "96" "out-collections.log" "bl-pos-collections" + echo $! >> $PID_LOG_FILE +} + +function usage { + echo "usage: $0 -d datasource [-b clean_binlog]" + echo " -d datasource datasource to syn. Use all for synchronizing all available datasources." + echo " - all" + echo " - schedulings" + echo " - platforms" + echo " - titles" + echo " - assets" + echo " - features" + echo " - collections" + echo " -b clean_binlog clean binlog before running (true | false) False by default" + exit -1 +} + +datasource="NONE" +while getopts d:b: flag +do + case "${flag}" in + d) datasource=${OPTARG};; + b) binlog=${OPTARG};; + esac +done + +case "${datasource}" in + NONE) usage;; + all) run_schedulings binlog + run_platforms binlog + run_titles binlog + run_assets binlog + run_features binlog + run_collections binlog + ;; + schedulings) run_schedulings binlog;; + platforms) run_platforms binlog;; + titles) run_titles binlog;; + assets) run_assets binlog;; + features) run_features binlog;; + collections) run_collections binlog;; + *) usage;; +esac + +echo "PID processes in $PID_LOG_FILE" \ No newline at end of file diff --git a/init/stop-listeners.sh b/init/stop-listeners.sh new file mode 100755 index 0000000..582e97c --- /dev/null +++ b/init/stop-listeners.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +PID_LOG_FILE=/tmp/listeners-pid.log + +count_processes() { + echo `ps aux | grep clickhouse-mysql-data-reader | wc -l` +} + +total_before=$(count_processes) + +while IFS= read -r line +do + echo "$line" + kill $line +done < "$PID_LOG_FILE" + +total_after=$(count_processes) + +procs=`echo "$total_after - 1" | bc` + +if [ $total_after -gt 1 ]; then + echo "You still have $procs processes running" +fi \ No newline at end of file diff --git a/init/tb_tables.config b/init/tb_tables.config new file mode 100644 index 0000000..be59079 --- /dev/null +++ b/init/tb_tables.config @@ -0,0 +1,9 @@ +#!/bin/bash + +TB_DATABASE='d_073c5e' +TITLES_TABLE='t_8a192b9c7ece4572a5a2fc9858e26d5c' +ASSETS_TABLE='t_4c03fdeb4e3e4db784ead40b06ec8617' +COLLECTIONS_TABLE='t_3dd7b323438943c687bd4e13a0e181a1' +FEATURES_TABLE='t_23f41723e0eb480088cbb1c8f890a38c' +PLATFORMS_TABLE='t_83f598dc74254de68216a7c7735caffb' +SCHEDULINGS_TABLE='t_b5e541d4e73d4301ba736c427bd667c5' \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..da4173a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +clickhouse-driver==0.2.0 +mysql-replication==0.23 +mysqlclient==2.0.3 +PyMySQL==1.0.2 +pytz==2021.1 +tzlocal==2.1