diff --git a/.gitignore b/.gitignore index afcc796..b01f141 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ config.ini mergin .coverage htmlcov +.vscode +.env \ No newline at end of file diff --git a/README.md b/README.md index c9f1638..bd440bc 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,7 @@ To run automatic tests: export TEST_MERGIN_URL= # testing server export TEST_API_USERNAME= export TEST_API_PASSWORD= + export TEST_API_WORKSPACE= pytest-3 test/ diff --git a/dbsync.py b/dbsync.py index 51ccedf..2cfa736 100644 --- a/dbsync.py +++ b/dbsync.py @@ -16,6 +16,7 @@ import tempfile import random import uuid +import re import psycopg2 from itertools import chain @@ -34,6 +35,14 @@ class DbSyncError(Exception): pass +def _add_quotes_to_schema_name(schema: str) -> str: + matches = re.findall(r"[^a-z0-9_]", schema) + if len(matches) != 0: + schema = schema.replace("\"", "\"\"") + schema = f'"{schema}"' + return schema + + def _tables_list_to_string(tables): return ";".join(tables) @@ -230,6 +239,7 @@ def _set_db_project_comment(conn, schema, project_name, version, project_id=None def _get_db_project_comment(conn, schema): """ Get Mergin Maps project name and its current version in db schema""" cur = conn.cursor() + schema = _add_quotes_to_schema_name(schema) cur.execute("SELECT obj_description(%s::regnamespace, 'pg_namespace')", (schema, )) res = cur.fetchone()[0] try: diff --git a/test/test_basic.py b/test/test_basic.py index fe08890..9a35c71 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -7,16 +7,18 @@ import tempfile import psycopg2 +from psycopg2 import sql from mergin import MerginClient, ClientError from dbsync import dbsync_init, dbsync_pull, dbsync_push, dbsync_status, config, DbSyncError, _geodiff_make_copy, \ - _get_db_project_comment, _get_mergin_project, _get_project_id, _validate_local_project_id, config + _get_db_project_comment, _get_mergin_project, _get_project_id, _validate_local_project_id, config, _add_quotes_to_schema_name GEODIFF_EXE = os.environ.get('TEST_GEODIFF_EXE') DB_CONNINFO = os.environ.get('TEST_DB_CONNINFO') SERVER_URL = os.environ.get('TEST_MERGIN_URL') API_USER = os.environ.get('TEST_API_USERNAME') USER_PWD = os.environ.get('TEST_API_PASSWORD') +WORKSPACE = os.environ.get('TEST_API_WORKSPACE') TMP_DIR = tempfile.gettempdir() TEST_DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_data') @@ -44,8 +46,8 @@ def cleanup(mc, project, dirs): def cleanup_db(conn, schema_base, schema_main): """ Removes test schemas from previous tests """ cur = conn.cursor() - cur.execute("DROP SCHEMA IF EXISTS {} CASCADE".format(schema_base)) - cur.execute("DROP SCHEMA IF EXISTS {} CASCADE".format(schema_main)) + cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_base))) + cur.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema_main))) cur.execute("COMMIT") @@ -56,7 +58,7 @@ def init_sync_from_geopackage(mc, project_name, source_gpkg_path, ignored_tables - (re)create local project working directory and sync directory - configure DB sync and let it do the init (make copies to the database) """ - full_project_name = API_USER + "/" + project_name + full_project_name = WORKSPACE + "/" + project_name project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory sync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync') # used by dbsync db_schema_main = project_name + '_main' @@ -68,7 +70,7 @@ def init_sync_from_geopackage(mc, project_name, source_gpkg_path, ignored_tables cleanup_db(conn, db_schema_base, db_schema_main) # prepare a new Mergin Maps project - mc.create_project(project_name) + mc.create_project(project_name, namespace=WORKSPACE) mc.download_project(full_project_name, project_dir) shutil.copy(source_gpkg_path, os.path.join(project_dir, 'test_sync.gpkg')) for extra_filepath in extra_init_files: @@ -96,8 +98,8 @@ def init_sync_from_geopackage(mc, project_name, source_gpkg_path, ignored_tables dbsync_init(mc, from_gpkg=True) -def test_init_from_gpkg(mc): - project_name = 'test_init' +def test_init_from_gpkg(mc: MerginClient): + project_name = "test_init" source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') project_dir = os.path.join(TMP_DIR, project_name + '_work') db_schema_main = project_name + '_main' @@ -108,24 +110,24 @@ def test_init_from_gpkg(mc): # test that database schemas are created + tables are populated conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() - cur.execute(f"SELECT count(*) from {db_schema_main}.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 3 # run again, nothing should change dbsync_init(mc, from_gpkg=True) - cur.execute(f"SELECT count(*) from {db_schema_main}.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 3 db_proj_info = _get_db_project_comment(conn, db_schema_base) assert db_proj_info["name"] == config.connections[0].mergin_project assert db_proj_info["version"] == 'v1' # rename base schema to mimic some mismatch - cur.execute(f"ALTER SCHEMA {db_schema_base} RENAME TO schema_tmp") + cur.execute(sql.SQL("ALTER SCHEMA {} RENAME TO schema_tmp").format(sql.Identifier(db_schema_base)).as_string(conn)) conn.commit() with pytest.raises(DbSyncError) as err: dbsync_init(mc, from_gpkg=True) assert "The 'modified' schema exists but the base schema is missing" in str(err.value) # and revert back - cur.execute(f"ALTER SCHEMA schema_tmp RENAME TO {db_schema_base}") + cur.execute(sql.SQL("ALTER SCHEMA schema_tmp RENAME TO {}").format(sql.Identifier(db_schema_base)).as_string(conn)) conn.commit() # make change in GPKG and push to server to create pending changes, it should pass but not sync @@ -134,7 +136,7 @@ def test_init_from_gpkg(mc): # remove local copy of project (to mimic loss at docker restart) shutil.rmtree(config.working_dir) dbsync_init(mc, from_gpkg=True) - cur.execute(f"SELECT count(*) from {db_schema_main}.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 3 db_proj_info = _get_db_project_comment(conn, db_schema_base) assert db_proj_info["version"] == 'v1' @@ -149,18 +151,18 @@ def test_init_from_gpkg(mc): # pull server changes to db to make sure we can sync again dbsync_pull(mc) - cur.execute(f"SELECT count(*) from {db_schema_main}.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) assert cur.fetchone()[0] == 4 db_proj_info = _get_db_project_comment(conn, db_schema_base) assert db_proj_info["version"] == 'v2' # update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync fid = 1 - cur.execute(f"SELECT * from {db_schema_main}.simple WHERE fid={fid}") + cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) old_value = cur.fetchone()[3] - cur.execute(f"UPDATE {db_schema_main}.simple SET rating=100 WHERE fid={fid}") + cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) conn.commit() - cur.execute(f"SELECT * from {db_schema_main}.simple WHERE fid={fid}") + cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) assert cur.fetchone()[3] == 100 dbsync_init(mc, from_gpkg=True) # check geopackage has not been modified - after init we are not synced! @@ -171,18 +173,18 @@ def test_init_from_gpkg(mc): # push db changes to server (and download new version to local working dir) to make sure we can sync again dbsync_push(mc) mc.pull_project(project_dir) - gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") + gpkg_cur.execute(f"SELECT * FROM simple WHERE fid ={fid}") assert gpkg_cur.fetchone()[3] == 100 db_proj_info = _get_db_project_comment(conn, db_schema_base) assert db_proj_info["version"] == 'v3' # update some feature from 'base' db to create mismatch with src geopackage and modified - cur.execute(f"SELECT * from {db_schema_base}.simple") + cur.execute(sql.SQL("SELECT * from {}.simple").format(sql.Identifier(db_schema_base))) fid = cur.fetchone()[0] old_value = cur.fetchone()[3] - cur.execute(f"UPDATE {db_schema_base}.simple SET rating=100 WHERE fid={fid}") + cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_base)), (fid,)) conn.commit() - cur.execute(f"SELECT * from {db_schema_base}.simple WHERE fid={fid}") + cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_base)), (fid,)) assert cur.fetchone()[3] == 100 with pytest.raises(DbSyncError) as err: dbsync_init(mc, from_gpkg=True) @@ -195,8 +197,8 @@ def test_init_from_gpkg(mc): assert "There are pending changes in the local directory - that should never happen" in str(err.value) -def test_init_from_gpkg_with_incomplete_dir(mc): - project_name = 'test_init' +def test_init_from_gpkg_with_incomplete_dir(mc: MerginClient): + project_name = "test_init_incomplete_dir" source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') init_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync', project_name) init_sync_from_geopackage(mc, project_name, source_gpkg_path) @@ -208,15 +210,16 @@ def test_init_from_gpkg_with_incomplete_dir(mc): assert os.listdir(init_project_dir) == ['test_sync.gpkg', '.mergin'] -def test_basic_pull(mc): +def test_basic_pull(mc: MerginClient): """ Test initialization and one pull from Mergin Maps to DB 1. create a Mergin Maps project using py-client with a testing gpkg 2. run init, check that everything is fine 3. make change in gpkg (copy new version), check everything is fine """ - project_name = 'test_sync_pull' + db_schema_main = project_name + "_main" + source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory @@ -226,7 +229,7 @@ def test_basic_pull(mc): # test that database schemas are created + tables are populated cur = conn.cursor() - cur.execute("SELECT count(*) from test_sync_pull_main.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 3 # make change in GPKG and push @@ -238,19 +241,20 @@ def test_basic_pull(mc): # check that a feature has been inserted cur = conn.cursor() - cur.execute("SELECT count(*) from test_sync_pull_main.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format((sql.Identifier(db_schema_main)))) assert cur.fetchone()[0] == 4 - db_proj_info = _get_db_project_comment(conn, 'test_sync_pull_base') + db_proj_info = _get_db_project_comment(conn, project_name + "_base") assert db_proj_info["version"] == 'v2' print("---") dbsync_status(mc) -def test_basic_push(mc): +def test_basic_push(mc: MerginClient): """ Initialize a project and test push of a new row from PostgreSQL to Mergin Maps""" + project_name = "test_sync_push" + db_schema_main = project_name + "_main" - project_name = 'test_sync_push' source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory @@ -260,19 +264,19 @@ def test_basic_push(mc): # test that database schemas are created + tables are populated cur = conn.cursor() - cur.execute("SELECT count(*) from test_sync_push_main.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 3 # make a change in PostgreSQL cur = conn.cursor() - cur.execute("INSERT INTO test_sync_push_main.simple (name, rating) VALUES ('insert in postgres', 123)") + cur.execute(sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format(sql.Identifier(db_schema_main))) cur.execute("COMMIT") - cur.execute("SELECT count(*) from test_sync_push_main.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 # push the change from DB to PostgreSQL dbsync_push(mc) - db_proj_info = _get_db_project_comment(conn, 'test_sync_push_base') + db_proj_info = _get_db_project_comment(conn, project_name + "_base") assert db_proj_info["version"] == 'v2' # pull new version of the project to the work project directory @@ -288,13 +292,15 @@ def test_basic_push(mc): dbsync_status(mc) -def test_basic_both(mc): +def test_basic_both(mc: MerginClient): """ Initializes a sync project and does both a change in Mergin Maps and in the database, and lets DB sync handle it: changes in PostgreSQL need to be rebased on top of changes in Mergin Maps server. """ + project_name = "test_sync_both" + db_schema_main = project_name + "_main" + db_schema_base = project_name + "_base" - project_name = 'test_sync_both' source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory @@ -304,7 +310,7 @@ def test_basic_both(mc): # test that database schemas are created + tables are populated cur = conn.cursor() - cur.execute(f"SELECT count(*) from {project_name}_main.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 3 # make change in GPKG and push @@ -313,17 +319,17 @@ def test_basic_both(mc): # make a change in PostgreSQL cur = conn.cursor() - cur.execute(f"INSERT INTO {project_name}_main.simple (name, rating) VALUES ('insert in postgres', 123)") + cur.execute(sql.SQL("INSERT INTO {}.simple (name, rating) VALUES ('insert in postgres', 123)").format(sql.Identifier(db_schema_main))) cur.execute("COMMIT") - cur.execute(f"SELECT count(*) from {project_name}_main.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 # first pull changes from Mergin Maps to DB (+rebase changes in DB) and then push the changes from DB to Mergin Maps dbsync_pull(mc) - db_proj_info = _get_db_project_comment(conn, 'test_sync_both_base') + db_proj_info = _get_db_project_comment(conn, db_schema_base) assert db_proj_info["version"] == 'v2' dbsync_push(mc) - db_proj_info = _get_db_project_comment(conn, 'test_sync_both_base') + db_proj_info = _get_db_project_comment(conn, db_schema_base) assert db_proj_info["version"] == 'v3' # pull new version of the project to the work project directory @@ -337,15 +343,15 @@ def test_basic_both(mc): # check that the insert has been applied to the DB cur = conn.cursor() - cur.execute(f"SELECT count(*) from {project_name}_main.simple") + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 5 print("---") dbsync_status(mc) -def test_init_with_skip(mc): - project_name = 'test_init_skip' +def test_init_with_skip(mc: MerginClient): + project_name = "test_init_skip" source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base_2tables.gpkg') project_dir = os.path.join(TMP_DIR, project_name + '_work') db_schema_main = project_name + '_main' @@ -356,16 +362,16 @@ def test_init_with_skip(mc): # test that database schemas does not have ignored table conn = psycopg2.connect(DB_CONNINFO) cur = conn.cursor() - cur.execute(f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{db_schema_main}' AND tablename = 'lines');") + cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == False - cur.execute(f"SELECT count(*) from {db_schema_main}.points") + cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 0 # run again, nothing should change dbsync_init(mc, from_gpkg=True) - cur.execute(f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{db_schema_main}' AND tablename = 'lines');") + cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == False - cur.execute(f"SELECT count(*) from {db_schema_main}.points") + cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 0 # make change in GPKG and push to server to create pending changes, it should pass but not sync @@ -374,14 +380,14 @@ def test_init_with_skip(mc): # pull server changes to db to make sure only points table is updated dbsync_pull(mc) - cur.execute(f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{db_schema_main}' AND tablename = 'lines');") + cur.execute(sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == False - cur.execute(f"SELECT count(*) from {db_schema_main}.points") + cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) assert cur.fetchone()[0] == 4 -def test_with_local_changes(mc): - project_name = 'test_local_changes' +def test_with_local_changes(mc: MerginClient): + project_name = "test_local_changes" source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') extra_files = [os.path.join(TEST_DATA_DIR, f) for f in ["note_1.txt", "note_3.txt", "modified_all.gpkg"]] dbsync_project_dir = os.path.join(TMP_DIR, project_name + '_dbsync', @@ -412,16 +418,16 @@ def test_with_local_changes(mc): dbsync_status(mc) -def test_recreated_project_ids(mc): - project_name = 'test_recreated_project_ids' +def test_recreated_project_ids(mc: MerginClient): + project_name = "test_recreated_project_ids" source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') project_dir = os.path.join(TMP_DIR, project_name + '_work') # working directory - full_project_name = API_USER + "/" + project_name + full_project_name = WORKSPACE + "/" + project_name init_sync_from_geopackage(mc, project_name, source_gpkg_path) # delete remote project mc.delete_project(full_project_name) # recreate project with the same name - mc.create_project(project_name) + mc.create_project(project_name, namespace=WORKSPACE) # comparing project IDs after recreating it with the same name mp = _get_mergin_project(project_dir) local_project_id = _get_project_id(mp) @@ -432,3 +438,51 @@ def test_recreated_project_ids(mc): assert local_project_id != server_project_id with pytest.raises(DbSyncError): dbsync_status(mc) + +@pytest.mark.parametrize("project_name", ['test_init_1', 'Test_Init_2', "Test 3", "Test-4"]) +def test_project_names(mc: MerginClient, project_name: str): + source_gpkg_path = os.path.join(TEST_DATA_DIR, 'base.gpkg') + project_dir = os.path.join(TMP_DIR, project_name + '_work') + db_schema_main = project_name + '_main' + db_schema_base = project_name + '_base' + + init_sync_from_geopackage(mc, project_name, source_gpkg_path) + + # test that database schemas are created + tables are populated + conn = psycopg2.connect(DB_CONNINFO) + cur = conn.cursor() + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) + assert cur.fetchone()[0] == 3 + + # make change in GPKG and push + shutil.copy(os.path.join(TEST_DATA_DIR, 'inserted_1_A.gpkg'), os.path.join(project_dir, 'test_sync.gpkg')) + mc.push_project(project_dir) + + # pull server changes to db to make sure we can sync again + dbsync_pull(mc) + cur.execute(sql.SQL("SELECT count(*) from {}.simple").format(sql.Identifier(db_schema_main)).as_string(conn)) + assert cur.fetchone()[0] == 4 + db_proj_info = _get_db_project_comment(conn, db_schema_base) + assert db_proj_info["version"] == 'v2' + + # update some feature from 'modified' db to create mismatch with src geopackage, it should pass but not sync + fid = 1 + cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + old_value = cur.fetchone()[3] + cur.execute(sql.SQL("UPDATE {}.simple SET rating=100 WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + conn.commit() + cur.execute(sql.SQL("SELECT * from {}.simple WHERE fid=%s").format(sql.Identifier(db_schema_main)), (fid,)) + assert cur.fetchone()[3] == 100 + dbsync_init(mc, from_gpkg=True) + # check geopackage has not been modified - after init we are not synced! + gpkg_conn = sqlite3.connect(os.path.join(project_dir, 'test_sync.gpkg')) + gpkg_cur = gpkg_conn.cursor() + gpkg_cur.execute(f"SELECT * FROM simple WHERE fid={fid}") + assert gpkg_cur.fetchone()[3] == old_value + # push db changes to server (and download new version to local working dir) to make sure we can sync again + dbsync_push(mc) + mc.pull_project(project_dir) + gpkg_cur.execute(f"SELECT * FROM simple WHERE fid ={fid}") + assert gpkg_cur.fetchone()[3] == 100 + db_proj_info = _get_db_project_comment(conn, db_schema_base) + assert db_proj_info["version"] == 'v3'