Skip to content

pooling writers #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

python3 main.py \
--src-resume --src-wait \
--src-host=127.0.0.1 --src-user=reader --src-password=qwerty \
--dst-host=192.168.74.251 \
--dst-db=db --dst-table=datatypes \
--mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \
--csvpool --csvpool-file-path-prefix=qwe \
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03

# --dst-file=dst.csv
# --csvpool-keep-files
62 changes: 60 additions & 2 deletions src/cliopts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,31 @@

class CLIOpts(object):

@staticmethod
def join(lists_to_join):
# lists_to_join contains something like
# [['a=b', 'c=d'], ['e=f', 'z=x'], ]
if not isinstance(lists_to_join, list):
return None

res = {}
for lst in lists_to_join:
# lst = ['a=b', 'c=d']
for column_value_pair in lst:
# value = 'a=b'
column, value = column_value_pair.split('=', 2)
res[column] = value

# dict {
# 'col1': 'value1',
# 'col2': 'value2',
# }

if len(res) > 0:
return res
else:
return None

@staticmethod
def config():
"""
Expand Down Expand Up @@ -58,12 +83,22 @@ def config():
default=60,
help='max seconds num between flushes'
)
argparser.add_argument(
'--csvpool',
action='store_true',
help='Cache data in csv files.'
)
argparser.add_argument(
'--csvpool-file-path-prefix',
type=str,
default=None,
default='/tmp/csvpool',
help='file path prefix to CSV pool files'
)
argparser.add_argument(
'--csvpool-keep-files',
action='store_true',
help='Keep pool csv files.'
)

argparser.add_argument(
'--src-server-id',
Expand Down Expand Up @@ -167,6 +202,15 @@ def config():
help='Table to be used when writing to dst'
)

argparser.add_argument(
'--csv-column-default-value',
type=str,
nargs='*',
action='append',
default=None,
help='Table to be used when writing to dst'
)

args = argparser.parse_args()

# build options
Expand All @@ -179,7 +223,16 @@ def config():
'mempool': args.mempool,
'mempool-max-events-num': args.mempool_max_events_num,
'mempool-max-flush-interval': args.mempool_max_flush_interval,
'csvpool_file_path_prefix': args.csvpool_file_path_prefix,
'csvpool': args.csvpool,
},

'converter-config': {
'clickhouse': {

},
'csv': {
'column_default_value': CLIOpts.join(args.csv_column_default_value),
},
},

'reader-config': {
Expand Down Expand Up @@ -214,6 +267,11 @@ def config():
},
'file': {
'csv_file_path': args.dst_file,
'csv_file_path_prefix': args.csvpool_file_path_prefix,
'csv_file_path_suffix_parts': [],
'csv_keep_file': args.csvpool_keep_files,
'dst_db': args.dst_db,
'dst_table': args.dst_table,
},
},
})
15 changes: 14 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@

from .reader.mysqlreader import MySQLReader
from .reader.csvreader import CSVReader

from .writer.chwriter import CHWriter
from .writer.csvwriter import CSVWriter
from .writer.chcsvwriter import CHCSVWriter
from .writer.poolwriter import PoolWriter

from .converter.csvwriteconverter import CSVWriteConverter


class Config(object):

Expand Down Expand Up @@ -37,8 +41,17 @@ def reader(self):
return MySQLReader(**self.config['reader-config']['mysql'])

def writer_class(self):
if self.config['writer-config']['file']['csv_file_path']:

if self.config['app-config']['csvpool']:
return CSVWriter, {
**self.config['writer-config']['file'],
'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']),
'converter': CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value']) if self.config['converter-config']['csv']['column_default_value'] else None,
}

elif self.config['writer-config']['file']['csv_file_path']:
return CSVWriter, self.config['writer-config']['file']

else:
return CHWriter, self.config['writer-config']['clickhouse']

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import decimal


class CHDataTypeConverter(Converter):
class CHWriteConverter(Converter):

delete_empty_columns = False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import ast


class CSVEmptyValueConverter(Converter):
class CSVReadConverter(Converter):

def convert(self, event):
for column in event.row:
Expand Down
22 changes: 22 additions & 0 deletions src/converter/csvwriteconverter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from .converter import Converter


class CSVWriteConverter(Converter):

defaults = None

def __init__(self, defaults={}):
self.defaults = defaults


def convert(self, event):
if not self.defaults:
return event

for column in event.row:
if column in self.defaults and event.row[column] is None:
event.row[column] = self.defaults[column]
return event
5 changes: 5 additions & 0 deletions src/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ class Event(object):

# {'id':1, 'col1':1}
row = None

file = None

# ['id', 'col1', 'col2']
fieldnames = None
10 changes: 9 additions & 1 deletion src/pool/bbpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class BBPool(Pool):
# 'key.2': UNIX TIMESTAMP
}

buckets_count = 0

def __init__(
self,
writer_class=None,
Expand Down Expand Up @@ -115,14 +117,20 @@ def rotate_belt(self, belt_index, flush=False):

while len(self.belts[belt_index]) > buckets_num_left_on_belt:
# too many buckets on the belt
# time to rotate belt and flush the most-right-bucket
self.buckets_count += 1

buckets_num = len(self.belts[belt_index])
last_bucket_size = len(self.belts[belt_index][buckets_num-1])
print(now, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts))
print(now, self.buckets_count, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts))

# time to flush data for specified key
self.writer_params['csv_file_path_suffix_parts'] = [str(now), str(self.buckets_count)]
writer = self.writer_class(**self.writer_params)
writer.insert(self.belts[belt_index].pop())
writer.close()
writer.push()
writer.destroy()
del writer

# belt rotated
Expand Down
9 changes: 4 additions & 5 deletions src/reader/csvreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .reader import Reader
from ..event.event import Event
from ..converter.csvemptyvalueconverter import CSVEmptyValueConverter
from ..converter.csvreadconverter import CSVReadConverter
import csv
import os

Expand All @@ -17,8 +17,8 @@ class CSVReader(Reader):
has_header = False
reader = None

def __init__(self, csv_file_path, callbacks={}):
super().__init__(callbacks=callbacks)
def __init__(self, csv_file_path, converter=None, callbacks={}):
super().__init__(converter=converter, callbacks=callbacks)

self.csv_file_path = csv_file_path
self.csvfile = open(self.csv_file_path)
Expand All @@ -44,8 +44,7 @@ def read(self):
self.fire('WriteRowsEvent', event=event)
for row in self.reader:
event.row = row
converter = CSVEmptyValueConverter()
self.fire('WriteRowsEvent.EachRow', event=converter.convert(event))
self.fire('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event)
except KeyboardInterrupt:
pass

Expand Down
5 changes: 4 additions & 1 deletion src/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

class Reader(object):

converter = None

callbacks = {
# called on each WriteRowsEvent
'WriteRowsEvent': [],
Expand All @@ -15,7 +17,8 @@ class Reader(object):
'ReaderIdleEvent': [],
}

def __init__(self, callbacks={}):
def __init__(self, converter=None, callbacks={}):
self.converter = converter
self.subscribe(callbacks)

def subscribe(self, callbacks):
Expand Down
54 changes: 54 additions & 0 deletions src/writer/chcsvwriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os

class CHCSVWriter(object):

host = None
port = None
user = None
password = None

def __init__(self, host=None, port=None, user=None, password=None):
self.host = host
self.port = port
self.user = user
self.password = password

def insert(self, event_or_events=None):
# event_or_events = [
# event: {
# row: {'id': 3, 'a': 3}
# },
# event: {
# row: {'id': 3, 'a': 3}
# },
# ]

for event in event_or_events:
sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format(
event.schema,
event.table,
', '.join(map(lambda column: '`%s`' % column, event.fieldnames)),
)

choptions = ""
if self.host:
choptions += " --host=" + self.host
if self.port:
choptions += " --port=" + str(self.port)
if self.user:
choptions += " --user=" + self.user
if self.password:
choptions += " --password=" + self.password
bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format(
event.file,
choptions,
sql,
)

print('running:', bash)
os.system(bash)

pass
4 changes: 2 additions & 2 deletions src/writer/chwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from clickhouse_driver.client import Client
from .writer import Writer
from ..event.event import Event
from ..converter.chdatatypeconverter import CHDataTypeConverter
from ..converter.chwriteconverter import CHWriteConverter


class CHWriter(Writer):
Expand Down Expand Up @@ -41,7 +41,7 @@ def insert(self, event_or_events=None):
# event_or_events is instance of Event
event_or_events = [event_or_events]

converter = CHDataTypeConverter()
converter = CHWriteConverter()

values = []
ev = None
Expand Down
Loading