Skip to content

table builder #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,17 @@ def __init__(self):
# mp.set_start_method('forkserver')

def run(self):
pumper = Pumper(
reader=self.config.reader(),
writer=self.config.writer(),
)
pumper.run()
if self.config.is_table_templates():
templates = self.config.table_builder().templates()
for db in templates:
for table in templates[db]:
print(db, ':', table, ':', templates[db][table])
else:
pumper = Pumper(
reader=self.config.reader(),
writer=self.config.writer(),
)
pumper.run()

def start(self):
if self.config.is_daemon():
Expand Down
16 changes: 16 additions & 0 deletions src/cliopts.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ def config():
action='store_true',
help='Keep CSV pool files. Useful for debugging'
)
argparser.add_argument(
'--table-templates',
action='store_true',
help='Prepare table templates.'
)

argparser.add_argument(
'--src-server-id',
Expand Down Expand Up @@ -262,12 +267,14 @@ def config():

# build options
return Config ({

'app-config': {
'config-file': args.config_file,
'log-file': args.log_file,
'log-level': CLIOpts.log_level_from_string(args.log_level),
'dry': args.dry,
'daemon': args.daemon,
'table-templates': args.table_templates,
'pid_file': args.pid_file,
'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled
'mempool-max-events-num': args.mempool_max_events_num,
Expand All @@ -284,6 +291,15 @@ def config():
},
},

'tablebuilder-config': {
'host': args.src_host,
'port': args.src_port,
'user': args.src_user,
'password': args.src_password,
'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None,
'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
},

'reader-config': {
'mysql': {
'connection_settings': {
Expand Down
8 changes: 7 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from .converter.csvwriteconverter import CSVWriteConverter
from .converter.chwriteconverter import CHWriteConverter

from .tablebuilder import TableBuilder

class Config(object):

Expand Down Expand Up @@ -46,6 +46,12 @@ def is_daemon(self):
def is_pool(self):
return self.config['app-config']['mempool']

def is_table_templates(self):
return self.config['app-config']['table-templates']

def table_builder(self):
return TableBuilder(**self.config['tablebuilder-config'])

def reader(self):
if self.config['reader-config']['file']['csv_file_path']:
return CSVReader(**self.config['reader-config']['file'])
Expand Down
121 changes: 94 additions & 27 deletions src/tablebuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,95 @@
import logging
import MySQLdb


class TableBuilder(object):

connection = None
cursor = None

def template(self, host, user, password=None, db=None, tables=None):
host = None
port = None
user = None
password = None
dbs = None
tables = None

def __init__(self, host, port, user, password=None, dbs=None, tables=None):
self.host = host
self.port = port
self.user = user
self.password = password
self.dbs = dbs
self.tables = tables

def templates(self):
"""
Create templates for specified MySQL tables. In case no tables specified all tables from specified db are templated

:param host: string MySQL host
:param user: string MySQL user
:param password: string MySQL password
:param dbs: list of string MySQL datatabse/ May be omitted, in this case tables has to contain full table names, Ex.: db.table1
:param tables: list of string list of table names. May be short (in case db specified) or full (in the form db.table, in case no db specified)
:return: dict of CREATE TABLE () templates
"""
res = {}

db = None

try:
db = self.dbs[0]
except:
pass

# sanity check
if db is None and tables is None:
return None
if db is None and self.tables is None:
return res

# MySQL connections
self.connection = MySQLdb.connect(
host=host,
user=user,
passwd=password,
host=self.host,
user=self.user,
passwd=self.password,
db=db,
)
self.cursor = self.connection.cursor()

# in case to tables specified - list all tables of the DB specified
if db is not None and tables is None:
if db is not None and self.tables is None:
self.cursor.execute("USE " + db)
tables = []
self.tables = []
self.cursor.execute("SHOW TABLES") # execute 'SHOW TABLES' (but data is not returned)
for (table_name,) in self.cursor:
tables.append(table_name)

# tables can be something like 'db1, db2, db3'
# make [db1, db2, db3]
if isinstance(tables, str):
tables = [table.strip() for table in tables.split(',')]

for table in tables:
print(self.table(table, db))

def table(self, table_name, db=None):
self.tables.append(table_name)

# create dict of table templates
for table in self.tables:
if not db in res:
res[db] = {}
res[db][table] = self.create_table_template(table, db)

# {
# 'db': {
# 'table1': 'CREATE TABLE(...)...',
# 'table2': 'CREATE TABLE(...)...',
# }
# }
return res

def create_table_template(self, table_name, db=None):
"""
Produce template for CH's
CREATE TABLE(
...
columns specification
...
) ENGINE = MergeTree(_SPECIFY_DateField_HERE, (SPECIFY_INDEX_FIELD1, SPECIFY_INDEX_FIELD2, ...etc...), 8192)
for specified MySQL's table
:param table_name: string - name of the table in MySQL which will be used as a base for CH's CREATE TABLE template
:param db: string - name of the DB in MySQL
:return: string - almost-ready-to-use CREATE TABLE statement
"""

# `db`.`table` or just `table`
name = '`{0}`.`{1}`'.format(db, table_name) if db else '`{0}`'.format(table_name)
Expand All @@ -52,10 +105,10 @@ def table(self, table_name, db=None):
for (_field, _type, _null, _key, _default, _extra,) in self.cursor:
# Field | Type | Null | Key | Default | Extra

# build ready-to-sql column specification
# build ready-to-sql column specification Ex.:
# `integer_1` Nullable(Int32)
# `u_integer_1` Nullable(UInt32)
ch_columns.append('`{0}` {1}'.format(_field, self.map(mysql_type=_type, null=_null,)))
ch_columns.append('`{0}` {1}'.format(_field, self.map_type(mysql_type=_type, nullable=_null, )))

sql = """
CREATE TABLE {0} (
Expand All @@ -67,7 +120,17 @@ def table(self, table_name, db=None):
)
return sql

def map(self, mysql_type, null=False):
def map_type(self, mysql_type, nullable=False):
"""
Map MySQL type (as a string from DESC table statement) to CH type (as string)
:param mysql_type: string MySQL type (from DESC statement). Ex.: 'INT(10) UNSIGNED', 'BOOLEAN'
:param nullable: bool|string True|'yes' is this field nullable
:return: string CH's type specification directly usable in CREATE TABLE statement. Ex.:
Nullable(Int32)
Nullable(UInt32)
"""

# deal with UPPER CASE strings for simplicity
mysql_type = mysql_type.upper()

# Numeric Types
Expand Down Expand Up @@ -146,22 +209,26 @@ def map(self, mysql_type, null=False):
ch_type = 'UNKNOWN'

# Deal with NULLs
if isinstance(null, bool):
if null:
if isinstance(nullable, bool):
# for bool - simple statement
if nullable:
ch_type = 'Nullable(' + ch_type + ')'
elif isinstance(null, str):
if null.upper() == "YES":
elif isinstance(nullable, str):
# also accept case-insencitive string 'yes'
if nullable.upper() == "YES":
ch_type = 'Nullable(' + ch_type + ')'

return ch_type

if __name__ == '__main__':
tb = TableBuilder()
tb.template(
templates = tb.templates(
host='127.0.0.1',
user='reader',
password='qwerty',
db='db',
# tables='datatypes, enum_datatypes, json_datatypes',
tables=['datatypes', 'enum_datatypes', 'json_datatypes'],
)
for table in templates:
print(table, '=', templates[table])