From 39caf871a66ab7f43fe20be58a17d20e1a59b119 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Tue, 31 Oct 2017 00:23:34 +0300 Subject: [PATCH 1/6] count flushed buckets --- src/pool/bbpool.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 6c60342..4c1b2a7 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -29,6 +29,8 @@ class BBPool(Pool): # 'key.2': UNIX TIMESTAMP } + buckets_count = 0 + def __init__( self, writer_class=None, @@ -115,10 +117,12 @@ def rotate_belt(self, belt_index, flush=False): while len(self.belts[belt_index]) > buckets_num_left_on_belt: # too many buckets on the belt + # time to rotate belt and flush the most-right-bucket + self.buckets_count += 1 buckets_num = len(self.belts[belt_index]) last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - print(now, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts)) + print(now, self.buckets_count, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts)) # time to flush data for specified key writer = self.writer_class(**self.writer_params) From dd3a44345f40470349b7fc6f07d8251eb553bec0 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Tue, 31 Oct 2017 12:44:55 +0300 Subject: [PATCH 2/6] chain of writers --- src/cliopts.py | 10 +++++++- src/config.py | 11 +++++++- src/event/event.py | 5 ++++ src/pool/bbpool.py | 2 ++ src/writer/chcsvwriter.py | 53 +++++++++++++++++++++++++++++++++++++++ src/writer/csvwriter.py | 29 +++++++++++++++++++-- src/writer/writer.py | 5 ++++ 7 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 src/writer/chcsvwriter.py diff --git a/src/cliopts.py b/src/cliopts.py index 418f11d..c270223 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -58,10 +58,15 @@ def config(): default=60, help='max seconds num between flushes' ) + argparser.add_argument( + '--csvpool', + action='store_true', + help='Cache data in csv files.' + ) argparser.add_argument( '--csvpool-file-path-prefix', type=str, - default=None, + default='/tmp/csvpool', help='file path prefix to CSV pool files' ) @@ -179,6 +184,7 @@ def config(): 'mempool': args.mempool, 'mempool-max-events-num': args.mempool_max_events_num, 'mempool-max-flush-interval': args.mempool_max_flush_interval, + 'csvpool': args.csvpool, 'csvpool_file_path_prefix': args.csvpool_file_path_prefix, }, @@ -214,6 +220,8 @@ def config(): }, 'file': { 'csv_file_path': args.dst_file, + 'dst_db': args.dst_db, + 'dst_table': args.dst_table, }, }, }) diff --git a/src/config.py b/src/config.py index 2fc7a59..f31800e 100644 --- a/src/config.py +++ b/src/config.py @@ -5,6 +5,7 @@ from .reader.csvreader import CSVReader from .writer.chwriter import CHWriter from .writer.csvwriter import CSVWriter +from .writer.chcsvwriter import CHCSVWriter from .writer.poolwriter import PoolWriter @@ -37,8 +38,16 @@ def reader(self): return MySQLReader(**self.config['reader-config']['mysql']) def writer_class(self): - if self.config['writer-config']['file']['csv_file_path']: + + if self.config['app-config']['csvpool']: + return CSVWriter, { + **self.config['writer-config']['file'], + 'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']), + } + + elif self.config['writer-config']['file']['csv_file_path']: return CSVWriter, self.config['writer-config']['file'] + else: return CHWriter, self.config['writer-config']['clickhouse'] diff --git a/src/event/event.py b/src/event/event.py index 64c922b..1e388a3 100644 --- a/src/event/event.py +++ b/src/event/event.py @@ -12,3 +12,8 @@ class Event(object): # {'id':1, 'col1':1} row = None + + file = None + + # ['id', 'col1', 'col2'] + fieldnames = None \ No newline at end of file diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 4c1b2a7..9943795 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -127,6 +127,8 @@ def rotate_belt(self, belt_index, flush=False): # time to flush data for specified key writer = self.writer_class(**self.writer_params) writer.insert(self.belts[belt_index].pop()) + writer.close() + writer.push() del writer # belt rotated diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py new file mode 100644 index 0000000..1eed680 --- /dev/null +++ b/src/writer/chcsvwriter.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os + +class CHCSVWriter(object): + + host = None + port = None + user = None + password = None + + def __init__(self, host=None, port=None, user=None, password=None): + self.host = host + self.port = port + self.user = user + self.password = password + + def insert(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + for event in event_or_events: + sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( + event.schema, + event.table, + ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ) + + choptions = "" + if self.host: + choptions += " --host=" + self.host + if self.port: + choptions += " --port=" + str(self.port) + if self.user: + choptions += " --user=" + self.user + if self.password: + choptions += " --password=" + self.password + bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + event.file, + choptions, + sql, + ) + + print(bash) + + pass diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index c952bb0..d23ec50 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -12,10 +12,22 @@ class CSVWriter(Writer): file = None path = None writer = None + dst_db = None + dst_table = None + fieldnames = None header_written = False - def __init__(self, csv_file_path): + def __init__( + self, + csv_file_path=None, + dst_db=None, + dst_table=None, + next=None, + ): self.path = csv_file_path + self.dst_db = dst_db + self.dst_table = dst_table + self.next = next def opened(self): return bool(self.file) @@ -56,15 +68,28 @@ def insert(self, event_or_events): self.open() if not self.writer: - self.writer = csv.DictWriter(self.file, fieldnames=sorted(event_or_events[0].row.keys())) + self.fieldnames = sorted(event_or_events[0].row.keys()) + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) if not self.header_written: self.writer.writeheader() for event in event_or_events: self.writer.writerow(event.row) + def push(self): + if not self.next: + return + + event = Event() + event.schema = self.dst_db + event.table = self.dst_table + event.file = self.path + event.fieldnames = self.fieldnames + self.next.insert([event]) + def close(self): if self.opened(): + self.file.flush() self.file.close() self.file = None self.writer = None diff --git a/src/writer/writer.py b/src/writer/writer.py index d0afd52..4f893e7 100644 --- a/src/writer/writer.py +++ b/src/writer/writer.py @@ -4,6 +4,8 @@ class Writer(object): + next = None + def opened(self): pass @@ -24,5 +26,8 @@ def insert(self, event_or_events=None): def flush(self): pass + def push(self): + pass + def close(self): pass From b36d8708c6abc6669b5d692dff299a91f54a6742 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Tue, 31 Oct 2017 17:10:33 +0300 Subject: [PATCH 3/6] default value for columns --- src/cliopts.py | 43 ++++++++++++++++++++++++++++++ src/config.py | 4 +++ src/converter/csvwriteconverter.py | 22 +++++++++++++++ src/writer/chcsvwriter.py | 3 ++- src/writer/csvwriter.py | 5 +++- 5 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 src/converter/csvwriteconverter.py diff --git a/src/cliopts.py b/src/cliopts.py index c270223..737d907 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -7,6 +7,31 @@ class CLIOpts(object): + @staticmethod + def join(lists_to_join): + # lists_to_join contains something like + # [['a=b', 'c=d'], ['e=f', 'z=x'], ] + if not isinstance(lists_to_join, list): + return None + + res = {} + for lst in lists_to_join: + # lst = ['a=b', 'c=d'] + for column_value_pair in lst: + # value = 'a=b' + column, value = column_value_pair.split('=', 2) + res[column] = value + + # dict { + # 'col1': 'value1', + # 'col2': 'value2', + # } + + if len(res) > 0: + return res + else: + return None + @staticmethod def config(): """ @@ -172,6 +197,15 @@ def config(): help='Table to be used when writing to dst' ) + argparser.add_argument( + '--csv-column-default-value', + type=str, + nargs='*', + action='append', + default=None, + help='Table to be used when writing to dst' + ) + args = argparser.parse_args() # build options @@ -188,6 +222,15 @@ def config(): 'csvpool_file_path_prefix': args.csvpool_file_path_prefix, }, + 'converter-config': { + 'clickhouse': { + + }, + 'csv': { + 'column_default_value': CLIOpts.join(args.csv_column_default_value), + }, + }, + 'reader-config': { 'mysql': { 'connection_settings': { diff --git a/src/config.py b/src/config.py index f31800e..2c8b487 100644 --- a/src/config.py +++ b/src/config.py @@ -3,11 +3,14 @@ from .reader.mysqlreader import MySQLReader from .reader.csvreader import CSVReader + from .writer.chwriter import CHWriter from .writer.csvwriter import CSVWriter from .writer.chcsvwriter import CHCSVWriter from .writer.poolwriter import PoolWriter +from .converter.csvwriteconverter import CSVWriteConverter + class Config(object): @@ -43,6 +46,7 @@ def writer_class(self): return CSVWriter, { **self.config['writer-config']['file'], 'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']), + 'converter': CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value']) if self.config['converter-config']['csv']['column_default_value'] else None, } elif self.config['writer-config']['file']['csv_file_path']: diff --git a/src/converter/csvwriteconverter.py b/src/converter/csvwriteconverter.py new file mode 100644 index 0000000..41d07d2 --- /dev/null +++ b/src/converter/csvwriteconverter.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .converter import Converter + + +class CSVWriteConverter(Converter): + + defaults = None + + def __init__(self, defaults={}): + self.defaults = defaults + + + def convert(self, event): + if not self.defaults: + return event + + for column in event.row: + if column in self.defaults and event.row[column] is None: + event.row[column] = self.defaults[column] + return event diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py index 1eed680..a386788 100644 --- a/src/writer/chcsvwriter.py +++ b/src/writer/chcsvwriter.py @@ -48,6 +48,7 @@ def insert(self, event_or_events=None): sql, ) - print(bash) + print('running:', bash) + os.system(bash) pass diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index d23ec50..74f4898 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -16,6 +16,7 @@ class CSVWriter(Writer): dst_table = None fieldnames = None header_written = False + converter = None def __init__( self, @@ -23,11 +24,13 @@ def __init__( dst_db=None, dst_table=None, next=None, + converter=None, ): self.path = csv_file_path self.dst_db = dst_db self.dst_table = dst_table self.next = next + self.converter = converter def opened(self): return bool(self.file) @@ -74,7 +77,7 @@ def insert(self, event_or_events): self.writer.writeheader() for event in event_or_events: - self.writer.writerow(event.row) + self.writer.writerow(self.converter.convert(event).row if self.converter else event.row) def push(self): if not self.next: From c14507771f561c6bbb9401e2bd7508ec5f1643a4 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Tue, 31 Oct 2017 18:01:48 +0300 Subject: [PATCH 4/6] converters normalization --- .../{chdatatypeconverter.py => chwriteconverter.py} | 2 +- .../{csvemptyvalueconverter.py => csvreadconverter.py} | 2 +- src/reader/csvreader.py | 9 ++++----- src/reader/reader.py | 5 ++++- src/writer/chwriter.py | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) rename src/converter/{chdatatypeconverter.py => chwriteconverter.py} (97%) rename src/converter/{csvemptyvalueconverter.py => csvreadconverter.py} (91%) diff --git a/src/converter/chdatatypeconverter.py b/src/converter/chwriteconverter.py similarity index 97% rename from src/converter/chdatatypeconverter.py rename to src/converter/chwriteconverter.py index c7e850e..4fb6df5 100644 --- a/src/converter/chdatatypeconverter.py +++ b/src/converter/chwriteconverter.py @@ -7,7 +7,7 @@ import decimal -class CHDataTypeConverter(Converter): +class CHWriteConverter(Converter): delete_empty_columns = False diff --git a/src/converter/csvemptyvalueconverter.py b/src/converter/csvreadconverter.py similarity index 91% rename from src/converter/csvemptyvalueconverter.py rename to src/converter/csvreadconverter.py index 80e18dc..70ab90d 100644 --- a/src/converter/csvemptyvalueconverter.py +++ b/src/converter/csvreadconverter.py @@ -5,7 +5,7 @@ import ast -class CSVEmptyValueConverter(Converter): +class CSVReadConverter(Converter): def convert(self, event): for column in event.row: diff --git a/src/reader/csvreader.py b/src/reader/csvreader.py index a7ccd2f..e0a439e 100644 --- a/src/reader/csvreader.py +++ b/src/reader/csvreader.py @@ -3,7 +3,7 @@ from .reader import Reader from ..event.event import Event -from ..converter.csvemptyvalueconverter import CSVEmptyValueConverter +from ..converter.csvreadconverter import CSVReadConverter import csv import os @@ -17,8 +17,8 @@ class CSVReader(Reader): has_header = False reader = None - def __init__(self, csv_file_path, callbacks={}): - super().__init__(callbacks=callbacks) + def __init__(self, csv_file_path, converter=None, callbacks={}): + super().__init__(converter=converter, callbacks=callbacks) self.csv_file_path = csv_file_path self.csvfile = open(self.csv_file_path) @@ -44,8 +44,7 @@ def read(self): self.fire('WriteRowsEvent', event=event) for row in self.reader: event.row = row - converter = CSVEmptyValueConverter() - self.fire('WriteRowsEvent.EachRow', event=converter.convert(event)) + self.fire('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event) except KeyboardInterrupt: pass diff --git a/src/reader/reader.py b/src/reader/reader.py index 1af518a..547edc1 100644 --- a/src/reader/reader.py +++ b/src/reader/reader.py @@ -4,6 +4,8 @@ class Reader(object): + converter = None + callbacks = { # called on each WriteRowsEvent 'WriteRowsEvent': [], @@ -15,7 +17,8 @@ class Reader(object): 'ReaderIdleEvent': [], } - def __init__(self, callbacks={}): + def __init__(self, converter=None, callbacks={}): + self.converter = converter self.subscribe(callbacks) def subscribe(self, callbacks): diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index 671c7bd..c331630 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -4,7 +4,7 @@ from clickhouse_driver.client import Client from .writer import Writer from ..event.event import Event -from ..converter.chdatatypeconverter import CHDataTypeConverter +from ..converter.chwriteconverter import CHWriteConverter class CHWriter(Writer): @@ -41,7 +41,7 @@ def insert(self, event_or_events=None): # event_or_events is instance of Event event_or_events = [event_or_events] - converter = CHDataTypeConverter() + converter = CHWriteConverter() values = [] ev = None From 65d777f3ed7c44c8460a762a43bc3ee138bc8b6b Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Tue, 31 Oct 2017 19:22:21 +0300 Subject: [PATCH 5/6] csv pool file sequence --- run.sh | 12 ++++++++++++ src/cliopts.py | 3 ++- src/pool/bbpool.py | 1 + src/writer/csvwriter.py | 9 +++++++++ 4 files changed, 24 insertions(+), 1 deletion(-) create mode 100755 run.sh diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..4e00348 --- /dev/null +++ b/run.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +python3 main.py \ + --src-resume --src-wait \ + --src-host=127.0.0.1 --src-user=reader --src-password=qwerty \ + --dst-host=192.168.74.251 \ + --dst-db=db --dst-table=datatypes \ + --mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \ + --csvpool --csvpool-file-path-prefix=qwe \ + --csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 + +# --dst-file=dst.csv \ diff --git a/src/cliopts.py b/src/cliopts.py index 737d907..ce80c21 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -219,7 +219,6 @@ def config(): 'mempool-max-events-num': args.mempool_max_events_num, 'mempool-max-flush-interval': args.mempool_max_flush_interval, 'csvpool': args.csvpool, - 'csvpool_file_path_prefix': args.csvpool_file_path_prefix, }, 'converter-config': { @@ -263,6 +262,8 @@ def config(): }, 'file': { 'csv_file_path': args.dst_file, + 'csv_file_path_prefix': args.csvpool_file_path_prefix, + 'csv_file_path_suffix_parts': [], 'dst_db': args.dst_db, 'dst_table': args.dst_table, }, diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 9943795..47766a7 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -125,6 +125,7 @@ def rotate_belt(self, belt_index, flush=False): print(now, self.buckets_count, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts)) # time to flush data for specified key + self.writer_params['csv_file_path_suffix_parts'] = [str(now), str(self.buckets_count)] writer = self.writer_class(**self.writer_params) writer.insert(self.belts[belt_index].pop()) writer.close() diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index 74f4898..57edfd3 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -17,21 +17,30 @@ class CSVWriter(Writer): fieldnames = None header_written = False converter = None + path_prefix = None + path_suffix_parts = [] def __init__( self, csv_file_path=None, + csv_file_path_prefix=None, + csv_file_path_suffix_parts=[], dst_db=None, dst_table=None, next=None, converter=None, ): self.path = csv_file_path + self.path_prefix = csv_file_path_prefix + self.path_suffix_parts = csv_file_path_suffix_parts self.dst_db = dst_db self.dst_table = dst_table self.next = next self.converter = converter + if self.path is None: + self.path = self.path_prefix + '_' + '_'.join(self.path_suffix_parts) + '.csv' + def opened(self): return bool(self.file) From 65836b7df8024a2ea8d1121c981bfeca70669471 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 1 Nov 2017 00:09:31 +0300 Subject: [PATCH 6/6] keep or delete pool files --- run.sh | 3 ++- src/cliopts.py | 6 ++++++ src/pool/bbpool.py | 1 + src/writer/csvwriter.py | 7 +++++++ src/writer/writer.py | 3 +++ 5 files changed, 19 insertions(+), 1 deletion(-) diff --git a/run.sh b/run.sh index 4e00348..fd8218e 100755 --- a/run.sh +++ b/run.sh @@ -9,4 +9,5 @@ python3 main.py \ --csvpool --csvpool-file-path-prefix=qwe \ --csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 -# --dst-file=dst.csv \ +# --dst-file=dst.csv +# --csvpool-keep-files diff --git a/src/cliopts.py b/src/cliopts.py index ce80c21..fc78332 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -94,6 +94,11 @@ def config(): default='/tmp/csvpool', help='file path prefix to CSV pool files' ) + argparser.add_argument( + '--csvpool-keep-files', + action='store_true', + help='Keep pool csv files.' + ) argparser.add_argument( '--src-server-id', @@ -264,6 +269,7 @@ def config(): 'csv_file_path': args.dst_file, 'csv_file_path_prefix': args.csvpool_file_path_prefix, 'csv_file_path_suffix_parts': [], + 'csv_keep_file': args.csvpool_keep_files, 'dst_db': args.dst_db, 'dst_table': args.dst_table, }, diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 47766a7..29d74f5 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -130,6 +130,7 @@ def rotate_belt(self, belt_index, flush=False): writer.insert(self.belts[belt_index].pop()) writer.close() writer.push() + writer.destroy() del writer # belt rotated diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index 57edfd3..d425e25 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -19,12 +19,14 @@ class CSVWriter(Writer): converter = None path_prefix = None path_suffix_parts = [] + delete = False def __init__( self, csv_file_path=None, csv_file_path_prefix=None, csv_file_path_suffix_parts=[], + csv_keep_file=False, dst_db=None, dst_table=None, next=None, @@ -40,6 +42,7 @@ def __init__( if self.path is None: self.path = self.path_prefix + '_' + '_'.join(self.path_suffix_parts) + '.csv' + self.delete = not csv_keep_file def opened(self): return bool(self.file) @@ -106,6 +109,10 @@ def close(self): self.file = None self.writer = None + def destroy(self): + if self.delete: + os.remove(self.path) + if __name__ == '__main__': path = 'file.csv' diff --git a/src/writer/writer.py b/src/writer/writer.py index 4f893e7..b6159c7 100644 --- a/src/writer/writer.py +++ b/src/writer/writer.py @@ -29,5 +29,8 @@ def flush(self): def push(self): pass + def destroy(self): + pass + def close(self): pass