Skip to content

process writer and csv pooling #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from src.daemon import Daemon

import sys
import multiprocessing as mp


if sys.version_info[0] < 3:
Expand All @@ -17,6 +18,7 @@ class Main(Daemon):
config = None

def __init__(self):
mp.set_start_method('forkserver')
self.config = CLIOpts.config()
super().__init__(pidfile=self.config.pid_file())

Expand Down
25 changes: 17 additions & 8 deletions run.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
#!/bin/bash

python3 main.py \
--src-resume --src-wait \
--src-host=127.0.0.1 --src-user=reader --src-password=qwerty \
--dst-host=192.168.74.251 \
--dst-db=db --dst-table=datatypes \
--mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \
--csvpool --csvpool-file-path-prefix=qwe \
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03
--src-resume \
--src-wait \
--src-host=127.0.0.1 \
--src-user=reader \
--src-password=qwerty \
--dst-host=192.168.74.251 \
--csvpool \
--csvpool-file-path-prefix=qwe_ \
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 \
--mempool-max-flush-interval=600 \
--mempool-max-events-num=900000

# --dst-file=dst.csv
# --mempool
# --mempool-max-events-num=3
# --mempool-max-flush-interval=30
# --dst-file=dst.csv
# --dst-schema=db
# --dst-table=datatypes
# --csvpool-keep-files
28 changes: 14 additions & 14 deletions src/cliopts.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def config():
argparser.add_argument(
'--dry',
action='store_true',
help='Dry mode - do not do anything that can harm.'
help='Dry mode - do not do anything that can harm. '
'Useful for debugging.'
)
argparser.add_argument(
Expand All @@ -74,30 +74,30 @@ def config():
argparser.add_argument(
'--mempool-max-events-num',
type=int,
default=1000,
help='max events num to pool before batch write'
default=100000,
help='Max events number to pool - triggering pool flush'
)
argparser.add_argument(
'--mempool-max-flush-interval',
type=int,
default=60,
help='max seconds num between flushes'
help='Max seconds number between pool flushes'
)
argparser.add_argument(
'--csvpool',
action='store_true',
help='Cache data in csv files.'
help='Cache data in CSV pool files on disk. Requires memory pooling, thus enables --mempool even if it is not explicitly specified'
)
argparser.add_argument(
'--csvpool-file-path-prefix',
type=str,
default='/tmp/csvpool',
help='file path prefix to CSV pool files'
default='/tmp/csvpool_',
help='File path prefix to CSV pool files'
)
argparser.add_argument(
'--csvpool-keep-files',
action='store_true',
help='Keep pool csv files.'
help='Keep CSV pool files. Useful for debugging'
)

argparser.add_argument(
Expand Down Expand Up @@ -156,7 +156,7 @@ def config():
'--src-file',
type=str,
default=None,
help='Source file tp read data from'
help='Source file to read data from'
)

argparser.add_argument(
Expand Down Expand Up @@ -190,10 +190,10 @@ def config():
help='Password to be used when writing to dst'
)
argparser.add_argument(
'--dst-db',
'--dst-schema',
type=str,
default=None,
help='Database to be used when writing to dst'
help='Database/schema to be used when writing to dst'
)
argparser.add_argument(
'--dst-table',
Expand All @@ -220,7 +220,7 @@ def config():
'dry': args.dry,
'daemon': args.daemon,
'pid_file': args.pid_file,
'mempool': args.mempool,
'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled
'mempool-max-events-num': args.mempool_max_events_num,
'mempool-max-flush-interval': args.mempool_max_flush_interval,
'csvpool': args.csvpool,
Expand Down Expand Up @@ -262,15 +262,15 @@ def config():
'user': args.dst_user,
'password': args.dst_password,
},
'dst_db': args.dst_db,
'dst_schema': args.dst_schema,
'dst_table': args.dst_table,
},
'file': {
'csv_file_path': args.dst_file,
'csv_file_path_prefix': args.csvpool_file_path_prefix,
'csv_file_path_suffix_parts': [],
'csv_keep_file': args.csvpool_keep_files,
'dst_db': args.dst_db,
'dst_schema': args.dst_schema,
'dst_table': args.dst_table,
},
},
Expand Down
46 changes: 33 additions & 13 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
from .writer.csvwriter import CSVWriter
from .writer.chcsvwriter import CHCSVWriter
from .writer.poolwriter import PoolWriter
from .writer.processwriter import ProcessWriter
from .objectbuilder import ObjectBuilder

from .converter.csvwriteconverter import CSVWriteConverter
from .converter.chwriteconverter import CHWriteConverter


class Config(object):
Expand Down Expand Up @@ -40,30 +43,47 @@ def reader(self):
else:
return MySQLReader(**self.config['reader-config']['mysql'])

def writer_class(self):
def converter_builder(self):
if not self.config['converter-config']['csv']['column_default_value']:
# no default values for CSV columns provided
return None

return ObjectBuilder(
instance=CSVWriteConverter(
defaults=self.config['converter-config']['csv']['column_default_value']
))

def writer_builder(self):
if self.config['app-config']['csvpool']:
return CSVWriter, {
**self.config['writer-config']['file'],
'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']),
'converter': CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value']) if self.config['converter-config']['csv']['column_default_value'] else None,
}
return ObjectBuilder(class_name=ProcessWriter, constructor_params={
'next_writer_builder': ObjectBuilder(class_name=CSVWriter, constructor_params={
**self.config['writer-config']['file'],
'next_writer_builder': ObjectBuilder(
class_name=CHCSVWriter,
constructor_params=self.config['writer-config']['clickhouse']['connection_settings']
),
'converter_builder': self.converter_builder(),
})
})

elif self.config['writer-config']['file']['csv_file_path']:
return CSVWriter, self.config['writer-config']['file']
return ObjectBuilder(class_name=CSVWriter, constructor_params={
**self.config['writer-config']['file'],
'converter_builder': self.converter_builder(),
})

else:
return CHWriter, self.config['writer-config']['clickhouse']
return ObjectBuilder(class_name=CHWriter, constructor_params={
**self.config['writer-config']['clickhouse'],
'converter_builder': ObjectBuilder(instance=CHWriteConverter()),
})

def writer(self):
writer_class, writer_params = self.writer_class()

if self.config['app-config']['mempool']:
return PoolWriter(
writer_class=writer_class,
writer_params=writer_params,
writer_builder=self.writer_builder(),
max_pool_size=self.config['app-config']['mempool-max-events-num'],
max_flush_interval=self.config['app-config']['mempool-max-flush-interval'],
)
else:
return writer_class(**writer_params)
return self.writer_builder().get()
27 changes: 14 additions & 13 deletions src/converter/chwriteconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,27 @@ class CHWriteConverter(Converter):
def convert(self, event):

columns_to_delete = []
for column_name in event.row:
# print(column_name, row['values'][column_name], type(row['values'][column_name]))

if self.delete_empty_columns and (event.row[column_name] is None):
# print("Skip None value for column", column_name)
columns_to_delete.append(column_name)
for column in event.row:
if (event.row[column] is None) and self.delete_empty_columns:
# include empty column to the list of to be deleted columns
columns_to_delete.append(column)
# move to next column
continue

for t in self.types_to_convert:
if isinstance(event.row[column_name], t):
# print("Converting column", column_name, "of type", type(event.row[column_name]),
# event.row[column_name])
event.row[column_name] = str(event.row[column_name])
# print("res", event.row[column_name])
if isinstance(event.row[column], t):
# print("Converting column", column, "of type", type(event.row[column]),
# event.row[column])
event.row[column] = str(event.row[column])
# print("res", event.row[column])
break
else:
# print("Using asis column", column_name, "of type", type(event.row[column_name]))
# print("Using asis column", column, "of type", type(event.row[column]))
pass

for column_to_delete in columns_to_delete:
event.row.pop(column_to_delete)
# delete columns according to the list
for column in columns_to_delete:
event.row.pop(column)

return event
14 changes: 11 additions & 3 deletions src/converter/csvwriteconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@

class CSVWriteConverter(Converter):

# default values for columns - dict
defaults = None

def __init__(self, defaults={}):
def __init__(self, defaults=None):
self.defaults = defaults


def convert(self, event):
# no defaults - nothing to convert
if not self.defaults:
return event

# defaults are empty - nothing to convert
if len(self.defaults) < 1:
return event

# have defaults
for column in event.row:
if column in self.defaults and event.row[column] is None:
# replace None column with default value
if event.row[column] is None and column in self.defaults:
event.row[column] = self.defaults[column]

return event
31 changes: 31 additions & 0 deletions src/objectbuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-


class ObjectBuilder(object):

class_name = None
constructor_params = None
instance = None

def __init__(self, class_name=None, constructor_params=None, instance=None):
self.class_name = class_name
self.constructor_params = constructor_params
self.instance = instance

def param(self, name, value):
if not self.constructor_params:
self.constructor_params = {}
self.constructor_params[name] = value

def get(self):
if not self.class_name:
# no class name - return instance, it may be None
return self.instance

# have class name

if self.constructor_params:
return self.class_name(**self.constructor_params)
else:
return self.class_name()
Loading