diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..fd8218e --- /dev/null +++ b/run.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +python3 main.py \ + --src-resume --src-wait \ + --src-host=127.0.0.1 --src-user=reader --src-password=qwerty \ + --dst-host=192.168.74.251 \ + --dst-db=db --dst-table=datatypes \ + --mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \ + --csvpool --csvpool-file-path-prefix=qwe \ + --csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 + +# --dst-file=dst.csv +# --csvpool-keep-files diff --git a/src/cliopts.py b/src/cliopts.py index 418f11d..fc78332 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -7,6 +7,31 @@ class CLIOpts(object): + @staticmethod + def join(lists_to_join): + # lists_to_join contains something like + # [['a=b', 'c=d'], ['e=f', 'z=x'], ] + if not isinstance(lists_to_join, list): + return None + + res = {} + for lst in lists_to_join: + # lst = ['a=b', 'c=d'] + for column_value_pair in lst: + # value = 'a=b' + column, value = column_value_pair.split('=', 2) + res[column] = value + + # dict { + # 'col1': 'value1', + # 'col2': 'value2', + # } + + if len(res) > 0: + return res + else: + return None + @staticmethod def config(): """ @@ -58,12 +83,22 @@ def config(): default=60, help='max seconds num between flushes' ) + argparser.add_argument( + '--csvpool', + action='store_true', + help='Cache data in csv files.' + ) argparser.add_argument( '--csvpool-file-path-prefix', type=str, - default=None, + default='/tmp/csvpool', help='file path prefix to CSV pool files' ) + argparser.add_argument( + '--csvpool-keep-files', + action='store_true', + help='Keep pool csv files.' + ) argparser.add_argument( '--src-server-id', @@ -167,6 +202,15 @@ def config(): help='Table to be used when writing to dst' ) + argparser.add_argument( + '--csv-column-default-value', + type=str, + nargs='*', + action='append', + default=None, + help='Table to be used when writing to dst' + ) + args = argparser.parse_args() # build options @@ -179,7 +223,16 @@ def config(): 'mempool': args.mempool, 'mempool-max-events-num': args.mempool_max_events_num, 'mempool-max-flush-interval': args.mempool_max_flush_interval, - 'csvpool_file_path_prefix': args.csvpool_file_path_prefix, + 'csvpool': args.csvpool, + }, + + 'converter-config': { + 'clickhouse': { + + }, + 'csv': { + 'column_default_value': CLIOpts.join(args.csv_column_default_value), + }, }, 'reader-config': { @@ -214,6 +267,11 @@ def config(): }, 'file': { 'csv_file_path': args.dst_file, + 'csv_file_path_prefix': args.csvpool_file_path_prefix, + 'csv_file_path_suffix_parts': [], + 'csv_keep_file': args.csvpool_keep_files, + 'dst_db': args.dst_db, + 'dst_table': args.dst_table, }, }, }) diff --git a/src/config.py b/src/config.py index 2fc7a59..2c8b487 100644 --- a/src/config.py +++ b/src/config.py @@ -3,10 +3,14 @@ from .reader.mysqlreader import MySQLReader from .reader.csvreader import CSVReader + from .writer.chwriter import CHWriter from .writer.csvwriter import CSVWriter +from .writer.chcsvwriter import CHCSVWriter from .writer.poolwriter import PoolWriter +from .converter.csvwriteconverter import CSVWriteConverter + class Config(object): @@ -37,8 +41,17 @@ def reader(self): return MySQLReader(**self.config['reader-config']['mysql']) def writer_class(self): - if self.config['writer-config']['file']['csv_file_path']: + + if self.config['app-config']['csvpool']: + return CSVWriter, { + **self.config['writer-config']['file'], + 'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']), + 'converter': CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value']) if self.config['converter-config']['csv']['column_default_value'] else None, + } + + elif self.config['writer-config']['file']['csv_file_path']: return CSVWriter, self.config['writer-config']['file'] + else: return CHWriter, self.config['writer-config']['clickhouse'] diff --git a/src/converter/chdatatypeconverter.py b/src/converter/chwriteconverter.py similarity index 97% rename from src/converter/chdatatypeconverter.py rename to src/converter/chwriteconverter.py index c7e850e..4fb6df5 100644 --- a/src/converter/chdatatypeconverter.py +++ b/src/converter/chwriteconverter.py @@ -7,7 +7,7 @@ import decimal -class CHDataTypeConverter(Converter): +class CHWriteConverter(Converter): delete_empty_columns = False diff --git a/src/converter/csvemptyvalueconverter.py b/src/converter/csvreadconverter.py similarity index 91% rename from src/converter/csvemptyvalueconverter.py rename to src/converter/csvreadconverter.py index 80e18dc..70ab90d 100644 --- a/src/converter/csvemptyvalueconverter.py +++ b/src/converter/csvreadconverter.py @@ -5,7 +5,7 @@ import ast -class CSVEmptyValueConverter(Converter): +class CSVReadConverter(Converter): def convert(self, event): for column in event.row: diff --git a/src/converter/csvwriteconverter.py b/src/converter/csvwriteconverter.py new file mode 100644 index 0000000..41d07d2 --- /dev/null +++ b/src/converter/csvwriteconverter.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .converter import Converter + + +class CSVWriteConverter(Converter): + + defaults = None + + def __init__(self, defaults={}): + self.defaults = defaults + + + def convert(self, event): + if not self.defaults: + return event + + for column in event.row: + if column in self.defaults and event.row[column] is None: + event.row[column] = self.defaults[column] + return event diff --git a/src/event/event.py b/src/event/event.py index 64c922b..1e388a3 100644 --- a/src/event/event.py +++ b/src/event/event.py @@ -12,3 +12,8 @@ class Event(object): # {'id':1, 'col1':1} row = None + + file = None + + # ['id', 'col1', 'col2'] + fieldnames = None \ No newline at end of file diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 6c60342..29d74f5 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -29,6 +29,8 @@ class BBPool(Pool): # 'key.2': UNIX TIMESTAMP } + buckets_count = 0 + def __init__( self, writer_class=None, @@ -115,14 +117,20 @@ def rotate_belt(self, belt_index, flush=False): while len(self.belts[belt_index]) > buckets_num_left_on_belt: # too many buckets on the belt + # time to rotate belt and flush the most-right-bucket + self.buckets_count += 1 buckets_num = len(self.belts[belt_index]) last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - print(now, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts)) + print(now, self.buckets_count, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts)) # time to flush data for specified key + self.writer_params['csv_file_path_suffix_parts'] = [str(now), str(self.buckets_count)] writer = self.writer_class(**self.writer_params) writer.insert(self.belts[belt_index].pop()) + writer.close() + writer.push() + writer.destroy() del writer # belt rotated diff --git a/src/reader/csvreader.py b/src/reader/csvreader.py index a7ccd2f..e0a439e 100644 --- a/src/reader/csvreader.py +++ b/src/reader/csvreader.py @@ -3,7 +3,7 @@ from .reader import Reader from ..event.event import Event -from ..converter.csvemptyvalueconverter import CSVEmptyValueConverter +from ..converter.csvreadconverter import CSVReadConverter import csv import os @@ -17,8 +17,8 @@ class CSVReader(Reader): has_header = False reader = None - def __init__(self, csv_file_path, callbacks={}): - super().__init__(callbacks=callbacks) + def __init__(self, csv_file_path, converter=None, callbacks={}): + super().__init__(converter=converter, callbacks=callbacks) self.csv_file_path = csv_file_path self.csvfile = open(self.csv_file_path) @@ -44,8 +44,7 @@ def read(self): self.fire('WriteRowsEvent', event=event) for row in self.reader: event.row = row - converter = CSVEmptyValueConverter() - self.fire('WriteRowsEvent.EachRow', event=converter.convert(event)) + self.fire('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event) except KeyboardInterrupt: pass diff --git a/src/reader/reader.py b/src/reader/reader.py index 1af518a..547edc1 100644 --- a/src/reader/reader.py +++ b/src/reader/reader.py @@ -4,6 +4,8 @@ class Reader(object): + converter = None + callbacks = { # called on each WriteRowsEvent 'WriteRowsEvent': [], @@ -15,7 +17,8 @@ class Reader(object): 'ReaderIdleEvent': [], } - def __init__(self, callbacks={}): + def __init__(self, converter=None, callbacks={}): + self.converter = converter self.subscribe(callbacks) def subscribe(self, callbacks): diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py new file mode 100644 index 0000000..a386788 --- /dev/null +++ b/src/writer/chcsvwriter.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os + +class CHCSVWriter(object): + + host = None + port = None + user = None + password = None + + def __init__(self, host=None, port=None, user=None, password=None): + self.host = host + self.port = port + self.user = user + self.password = password + + def insert(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + for event in event_or_events: + sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( + event.schema, + event.table, + ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ) + + choptions = "" + if self.host: + choptions += " --host=" + self.host + if self.port: + choptions += " --port=" + str(self.port) + if self.user: + choptions += " --user=" + self.user + if self.password: + choptions += " --password=" + self.password + bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + event.file, + choptions, + sql, + ) + + print('running:', bash) + os.system(bash) + + pass diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index 671c7bd..c331630 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -4,7 +4,7 @@ from clickhouse_driver.client import Client from .writer import Writer from ..event.event import Event -from ..converter.chdatatypeconverter import CHDataTypeConverter +from ..converter.chwriteconverter import CHWriteConverter class CHWriter(Writer): @@ -41,7 +41,7 @@ def insert(self, event_or_events=None): # event_or_events is instance of Event event_or_events = [event_or_events] - converter = CHDataTypeConverter() + converter = CHWriteConverter() values = [] ev = None diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index c952bb0..d425e25 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -12,10 +12,37 @@ class CSVWriter(Writer): file = None path = None writer = None + dst_db = None + dst_table = None + fieldnames = None header_written = False - - def __init__(self, csv_file_path): + converter = None + path_prefix = None + path_suffix_parts = [] + delete = False + + def __init__( + self, + csv_file_path=None, + csv_file_path_prefix=None, + csv_file_path_suffix_parts=[], + csv_keep_file=False, + dst_db=None, + dst_table=None, + next=None, + converter=None, + ): self.path = csv_file_path + self.path_prefix = csv_file_path_prefix + self.path_suffix_parts = csv_file_path_suffix_parts + self.dst_db = dst_db + self.dst_table = dst_table + self.next = next + self.converter = converter + + if self.path is None: + self.path = self.path_prefix + '_' + '_'.join(self.path_suffix_parts) + '.csv' + self.delete = not csv_keep_file def opened(self): return bool(self.file) @@ -56,19 +83,36 @@ def insert(self, event_or_events): self.open() if not self.writer: - self.writer = csv.DictWriter(self.file, fieldnames=sorted(event_or_events[0].row.keys())) + self.fieldnames = sorted(event_or_events[0].row.keys()) + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) if not self.header_written: self.writer.writeheader() for event in event_or_events: - self.writer.writerow(event.row) + self.writer.writerow(self.converter.convert(event).row if self.converter else event.row) + + def push(self): + if not self.next: + return + + event = Event() + event.schema = self.dst_db + event.table = self.dst_table + event.file = self.path + event.fieldnames = self.fieldnames + self.next.insert([event]) def close(self): if self.opened(): + self.file.flush() self.file.close() self.file = None self.writer = None + def destroy(self): + if self.delete: + os.remove(self.path) + if __name__ == '__main__': path = 'file.csv' diff --git a/src/writer/writer.py b/src/writer/writer.py index d0afd52..b6159c7 100644 --- a/src/writer/writer.py +++ b/src/writer/writer.py @@ -4,6 +4,8 @@ class Writer(object): + next = None + def opened(self): pass @@ -24,5 +26,11 @@ def insert(self, event_or_events=None): def flush(self): pass + def push(self): + pass + + def destroy(self): + pass + def close(self): pass