Skip to content

Commit 0806fb9

Browse files
authored
Merge pull request #28 from sunsingerus/master
process writer and csv pooling
2 parents 9874758 + c4f6ec1 commit 0806fb9

17 files changed

+292
-142
lines changed

main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from src.daemon import Daemon
77

88
import sys
9+
import multiprocessing as mp
910

1011

1112
if sys.version_info[0] < 3:
@@ -17,6 +18,7 @@ class Main(Daemon):
1718
config = None
1819

1920
def __init__(self):
21+
mp.set_start_method('forkserver')
2022
self.config = CLIOpts.config()
2123
super().__init__(pidfile=self.config.pid_file())
2224

run.sh

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
#!/bin/bash
22

33
python3 main.py \
4-
--src-resume --src-wait \
5-
--src-host=127.0.0.1 --src-user=reader --src-password=qwerty \
6-
--dst-host=192.168.74.251 \
7-
--dst-db=db --dst-table=datatypes \
8-
--mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \
9-
--csvpool --csvpool-file-path-prefix=qwe \
10-
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03
4+
--src-resume \
5+
--src-wait \
6+
--src-host=127.0.0.1 \
7+
--src-user=reader \
8+
--src-password=qwerty \
9+
--dst-host=192.168.74.251 \
10+
--csvpool \
11+
--csvpool-file-path-prefix=qwe_ \
12+
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 \
13+
--mempool-max-flush-interval=600 \
14+
--mempool-max-events-num=900000
1115

12-
# --dst-file=dst.csv
16+
# --mempool
17+
# --mempool-max-events-num=3
18+
# --mempool-max-flush-interval=30
19+
# --dst-file=dst.csv
20+
# --dst-schema=db
21+
# --dst-table=datatypes
1322
# --csvpool-keep-files

src/cliopts.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def config():
5252
argparser.add_argument(
5353
'--dry',
5454
action='store_true',
55-
help='Dry mode - do not do anything that can harm.'
55+
help='Dry mode - do not do anything that can harm. '
5656
'Useful for debugging.'
5757
)
5858
argparser.add_argument(
@@ -74,30 +74,30 @@ def config():
7474
argparser.add_argument(
7575
'--mempool-max-events-num',
7676
type=int,
77-
default=1000,
78-
help='max events num to pool before batch write'
77+
default=100000,
78+
help='Max events number to pool - triggering pool flush'
7979
)
8080
argparser.add_argument(
8181
'--mempool-max-flush-interval',
8282
type=int,
8383
default=60,
84-
help='max seconds num between flushes'
84+
help='Max seconds number between pool flushes'
8585
)
8686
argparser.add_argument(
8787
'--csvpool',
8888
action='store_true',
89-
help='Cache data in csv files.'
89+
help='Cache data in CSV pool files on disk. Requires memory pooling, thus enables --mempool even if it is not explicitly specified'
9090
)
9191
argparser.add_argument(
9292
'--csvpool-file-path-prefix',
9393
type=str,
94-
default='/tmp/csvpool',
95-
help='file path prefix to CSV pool files'
94+
default='/tmp/csvpool_',
95+
help='File path prefix to CSV pool files'
9696
)
9797
argparser.add_argument(
9898
'--csvpool-keep-files',
9999
action='store_true',
100-
help='Keep pool csv files.'
100+
help='Keep CSV pool files. Useful for debugging'
101101
)
102102

103103
argparser.add_argument(
@@ -156,7 +156,7 @@ def config():
156156
'--src-file',
157157
type=str,
158158
default=None,
159-
help='Source file tp read data from'
159+
help='Source file to read data from'
160160
)
161161

162162
argparser.add_argument(
@@ -190,10 +190,10 @@ def config():
190190
help='Password to be used when writing to dst'
191191
)
192192
argparser.add_argument(
193-
'--dst-db',
193+
'--dst-schema',
194194
type=str,
195195
default=None,
196-
help='Database to be used when writing to dst'
196+
help='Database/schema to be used when writing to dst'
197197
)
198198
argparser.add_argument(
199199
'--dst-table',
@@ -220,7 +220,7 @@ def config():
220220
'dry': args.dry,
221221
'daemon': args.daemon,
222222
'pid_file': args.pid_file,
223-
'mempool': args.mempool,
223+
'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled
224224
'mempool-max-events-num': args.mempool_max_events_num,
225225
'mempool-max-flush-interval': args.mempool_max_flush_interval,
226226
'csvpool': args.csvpool,
@@ -262,15 +262,15 @@ def config():
262262
'user': args.dst_user,
263263
'password': args.dst_password,
264264
},
265-
'dst_db': args.dst_db,
265+
'dst_schema': args.dst_schema,
266266
'dst_table': args.dst_table,
267267
},
268268
'file': {
269269
'csv_file_path': args.dst_file,
270270
'csv_file_path_prefix': args.csvpool_file_path_prefix,
271271
'csv_file_path_suffix_parts': [],
272272
'csv_keep_file': args.csvpool_keep_files,
273-
'dst_db': args.dst_db,
273+
'dst_schema': args.dst_schema,
274274
'dst_table': args.dst_table,
275275
},
276276
},

src/config.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
from .writer.csvwriter import CSVWriter
99
from .writer.chcsvwriter import CHCSVWriter
1010
from .writer.poolwriter import PoolWriter
11+
from .writer.processwriter import ProcessWriter
12+
from .objectbuilder import ObjectBuilder
1113

1214
from .converter.csvwriteconverter import CSVWriteConverter
15+
from .converter.chwriteconverter import CHWriteConverter
1316

1417

1518
class Config(object):
@@ -40,30 +43,47 @@ def reader(self):
4043
else:
4144
return MySQLReader(**self.config['reader-config']['mysql'])
4245

43-
def writer_class(self):
46+
def converter_builder(self):
47+
if not self.config['converter-config']['csv']['column_default_value']:
48+
# no default values for CSV columns provided
49+
return None
4450

51+
return ObjectBuilder(
52+
instance=CSVWriteConverter(
53+
defaults=self.config['converter-config']['csv']['column_default_value']
54+
))
55+
56+
def writer_builder(self):
4557
if self.config['app-config']['csvpool']:
46-
return CSVWriter, {
47-
**self.config['writer-config']['file'],
48-
'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']),
49-
'converter': CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value']) if self.config['converter-config']['csv']['column_default_value'] else None,
50-
}
58+
return ObjectBuilder(class_name=ProcessWriter, constructor_params={
59+
'next_writer_builder': ObjectBuilder(class_name=CSVWriter, constructor_params={
60+
**self.config['writer-config']['file'],
61+
'next_writer_builder': ObjectBuilder(
62+
class_name=CHCSVWriter,
63+
constructor_params=self.config['writer-config']['clickhouse']['connection_settings']
64+
),
65+
'converter_builder': self.converter_builder(),
66+
})
67+
})
5168

5269
elif self.config['writer-config']['file']['csv_file_path']:
53-
return CSVWriter, self.config['writer-config']['file']
70+
return ObjectBuilder(class_name=CSVWriter, constructor_params={
71+
**self.config['writer-config']['file'],
72+
'converter_builder': self.converter_builder(),
73+
})
5474

5575
else:
56-
return CHWriter, self.config['writer-config']['clickhouse']
76+
return ObjectBuilder(class_name=CHWriter, constructor_params={
77+
**self.config['writer-config']['clickhouse'],
78+
'converter_builder': ObjectBuilder(instance=CHWriteConverter()),
79+
})
5780

5881
def writer(self):
59-
writer_class, writer_params = self.writer_class()
60-
6182
if self.config['app-config']['mempool']:
6283
return PoolWriter(
63-
writer_class=writer_class,
64-
writer_params=writer_params,
84+
writer_builder=self.writer_builder(),
6585
max_pool_size=self.config['app-config']['mempool-max-events-num'],
6686
max_flush_interval=self.config['app-config']['mempool-max-flush-interval'],
6787
)
6888
else:
69-
return writer_class(**writer_params)
89+
return self.writer_builder().get()

src/converter/chwriteconverter.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,27 @@ class CHWriteConverter(Converter):
2828
def convert(self, event):
2929

3030
columns_to_delete = []
31-
for column_name in event.row:
32-
# print(column_name, row['values'][column_name], type(row['values'][column_name]))
3331

34-
if self.delete_empty_columns and (event.row[column_name] is None):
35-
# print("Skip None value for column", column_name)
36-
columns_to_delete.append(column_name)
32+
for column in event.row:
33+
if (event.row[column] is None) and self.delete_empty_columns:
34+
# include empty column to the list of to be deleted columns
35+
columns_to_delete.append(column)
36+
# move to next column
3737
continue
3838

3939
for t in self.types_to_convert:
40-
if isinstance(event.row[column_name], t):
41-
# print("Converting column", column_name, "of type", type(event.row[column_name]),
42-
# event.row[column_name])
43-
event.row[column_name] = str(event.row[column_name])
44-
# print("res", event.row[column_name])
40+
if isinstance(event.row[column], t):
41+
# print("Converting column", column, "of type", type(event.row[column]),
42+
# event.row[column])
43+
event.row[column] = str(event.row[column])
44+
# print("res", event.row[column])
4545
break
4646
else:
47-
# print("Using asis column", column_name, "of type", type(event.row[column_name]))
47+
# print("Using asis column", column, "of type", type(event.row[column]))
4848
pass
4949

50-
for column_to_delete in columns_to_delete:
51-
event.row.pop(column_to_delete)
50+
# delete columns according to the list
51+
for column in columns_to_delete:
52+
event.row.pop(column)
5253

5354
return event

src/converter/csvwriteconverter.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,25 @@
66

77
class CSVWriteConverter(Converter):
88

9+
# default values for columns - dict
910
defaults = None
1011

11-
def __init__(self, defaults={}):
12+
def __init__(self, defaults=None):
1213
self.defaults = defaults
1314

14-
1515
def convert(self, event):
16+
# no defaults - nothing to convert
1617
if not self.defaults:
1718
return event
1819

20+
# defaults are empty - nothing to convert
21+
if len(self.defaults) < 1:
22+
return event
23+
24+
# have defaults
1925
for column in event.row:
20-
if column in self.defaults and event.row[column] is None:
26+
# replace None column with default value
27+
if event.row[column] is None and column in self.defaults:
2128
event.row[column] = self.defaults[column]
29+
2230
return event

src/objectbuilder.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
5+
class ObjectBuilder(object):
6+
7+
class_name = None
8+
constructor_params = None
9+
instance = None
10+
11+
def __init__(self, class_name=None, constructor_params=None, instance=None):
12+
self.class_name = class_name
13+
self.constructor_params = constructor_params
14+
self.instance = instance
15+
16+
def param(self, name, value):
17+
if not self.constructor_params:
18+
self.constructor_params = {}
19+
self.constructor_params[name] = value
20+
21+
def get(self):
22+
if not self.class_name:
23+
# no class name - return instance, it may be None
24+
return self.instance
25+
26+
# have class name
27+
28+
if self.constructor_params:
29+
return self.class_name(**self.constructor_params)
30+
else:
31+
return self.class_name()

0 commit comments

Comments
 (0)