diff --git a/composer/tools/README.rst b/composer/tools/README.rst new file mode 100644 index 00000000000..ebf51a76b58 --- /dev/null +++ b/composer/tools/README.rst @@ -0,0 +1,120 @@ +.. This file is automatically generated. Do not edit this file directly. + +Google Cloud Composer Python Samples +=============================================================================== + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=composer/tools/README.rst + + +This directory contains samples for Google Cloud Composer. `Google Cloud Composer`_ is a managed Apache Airflow service that helps you create, schedule, monitor and manage workflows. Cloud Composer automation helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command line tools, so you can focus on your workflows and not your infrastructure. + + + + +.. _Google Cloud Composer: https://cloud.google.com/composer/docs + +Setup +------------------------------------------------------------------------------- + + +Authentication +++++++++++++++ + +This sample requires you to have authentication setup. Refer to the +`Authentication Getting Started Guide`_ for instructions on setting up +credentials for applications. + +.. _Authentication Getting Started Guide: + https://cloud.google.com/docs/authentication/getting-started + +Install Dependencies +++++++++++++++++++++ + +#. Clone python-docs-samples and change directory to the sample directory you want to use. + + .. code-block:: bash + + $ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git + +#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions. + + .. _Python Development Environment Setup Guide: + https://cloud.google.com/python/setup + +#. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+. + + .. code-block:: bash + + $ virtualenv env + $ source env/bin/activate + +#. Install the dependencies needed to run the samples. + + .. code-block:: bash + + $ pip install -r requirements.txt + +.. _pip: https://pip.pypa.io/ +.. _virtualenv: https://virtualenv.pypa.io/ + +Samples +------------------------------------------------------------------------------- + +Create a new Composer environment based on an existing environment ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=composer/tools/copy_environment.py,composer/tools/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python copy_environment.py + + usage: copy_environment.py [-h] [--running_as_service_account] + [--override_machine_type OVERRIDE_MACHINE_TYPE] + [--override_disk_size_gb OVERRIDE_DISK_SIZE_GB] + [--override_network OVERRIDE_NETWORK] + [--override_subnetwork OVERRIDE_SUBNETWORK] + project location existing_env_name new_env_name + + Clone a composer environment. + + positional arguments: + project Google Cloud Project containing existing Composer + Environment. + location Google Cloud region containing existing Composer + Environment. For example `us-central1`. + existing_env_name The name of the existing Composer Environment. + new_env_name The name to use for the new Composer Environment. + + optional arguments: + -h, --help show this help message and exit + --running_as_service_account + Set this flag if the script is running on a VM with + same service account as used in the Composer + Environment. This avoids creating extra credentials. + --override_machine_type OVERRIDE_MACHINE_TYPE + Optional. Overrides machine type used for Cloud + Composer Environment. Must be a fully specified + machine type URI. + --override_disk_size_gb OVERRIDE_DISK_SIZE_GB + Optional. Overrides the disk size in GB used for Cloud + Composer Environment. + --override_network OVERRIDE_NETWORK + Optional. Overrides the network used for Cloud + Composer Environment. + --override_subnetwork OVERRIDE_SUBNETWORK + Optional. Overrides the subnetwork used for Cloud + Composer Environment. + + + + + +.. _Google Cloud SDK: https://cloud.google.com/sdk/ \ No newline at end of file diff --git a/composer/tools/README.rst.in b/composer/tools/README.rst.in new file mode 100644 index 00000000000..fba2f63e42a --- /dev/null +++ b/composer/tools/README.rst.in @@ -0,0 +1,26 @@ +# This file is used to generate README.rst + +product: + name: Google Cloud Composer + short_name: Cloud Composer + url: https://cloud.google.com/composer/docs + description: > + `Google Cloud Composer`_ is a managed Apache Airflow service that helps + you create, schedule, monitor and manage workflows. Cloud Composer + automation helps you create Airflow environments quickly and use + Airflow-native tools, such as the powerful Airflow web interface and + command line tools, so you can focus on your workflows and not your + infrastructure. + +setup: +- auth +- install_deps + +samples: +- name: Create a new Composer environment based on an existing environment + file: copy_environment.py + show_help: True + +cloud_client_library: false + +folder: composer/tools \ No newline at end of file diff --git a/composer/tools/copy_environment.py b/composer/tools/copy_environment.py index bff10899cd3..2fba11d44de 100644 --- a/composer/tools/copy_environment.py +++ b/composer/tools/copy_environment.py @@ -30,7 +30,9 @@ import argparse import ast import base64 +import contextlib import json +import os import re import shutil import subprocess @@ -39,12 +41,15 @@ import time import uuid +from cryptography import fernet import google.auth from google.cloud import storage from google.oauth2 import service_account -from googleapiclient import discovery -from googleapiclient import errors - +from googleapiclient import discovery, errors +from kubernetes import client, config +from mysql import connector +import six +from six.moves import configparser DEFAULT_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"] @@ -370,8 +375,112 @@ def export_data(sql_client, project, instance, gcs_bucket_name, filename): wait_sql_operation(sql_client, project, operation.get("name")) +def get_fernet_key(composer_env): + print("Retrieving fernet key for Composer Environment {}.".format( + composer_env.get('name', ''))) + gke_cluster_resource = composer_env.get("config", {}).get("gkeCluster") + project_zone_cluster = re.match( + "projects/([^/]*)/zones/([^/]*)/clusters/([^/]*)", gke_cluster_resource + ).groups() + tmp_dir_name = None + try: + print("Getting cluster credentials {} to retrieve fernet key.".format( + gke_cluster_resource)) + tmp_dir_name = tempfile.mkdtemp() + kubeconfig_file = tmp_dir_name + "/config" + os.environ["KUBECONFIG"] = kubeconfig_file + if subprocess.call( + [ + "gcloud", + "container", + "clusters", + "get-credentials", + project_zone_cluster[2], + "--zone", + project_zone_cluster[1], + "--project", + project_zone_cluster[0] + ] + ): + print("Failed to retrieve cluster credentials: {}.".format( + gke_cluster_resource)) + sys.exit(1) + + kubernetes_client = client.CoreV1Api( + api_client=config.new_client_from_config( + config_file=kubeconfig_file)) + airflow_configmap = kubernetes_client.read_namespaced_config_map( + "airflow-configmap", "default") + config_str = airflow_configmap.data['airflow.cfg'] + with contextlib.closing(six.StringIO(config_str)) as config_buffer: + config_parser = configparser.ConfigParser() + config_parser.readfp(config_buffer) + return config_parser.get("core", "fernet_key") + except Exception as exc: + print("Failed to get fernet key for cluster: {}.".format(str(exc))) + sys.exit(1) + finally: + if tmp_dir_name: + shutil.rmtree(tmp_dir_name) + + +def reencrypt_variables_connections(old_fernet_key_str, new_fernet_key_str): + old_fernet_key = fernet.Fernet(old_fernet_key_str.encode("utf-8")) + new_fernet_key = fernet.Fernet(new_fernet_key_str.encode("utf-8")) + db = connector.connect( + host="127.0.0.1", + user="root", + database="airflow-db", + ) + variable_cursor = db.cursor() + variable_cursor.execute("SELECT id, val, is_encrypted FROM variable") + rows = variable_cursor.fetchall() + for row in rows: + id = row[0] + val = row[1] + is_encrypted = row[2] + if is_encrypted: + updated_val = new_fernet_key.encrypt( + old_fernet_key.decrypt(bytes(val))).decode() + variable_cursor.execute( + "UPDATE variable SET val=%s WHERE id=%s", (updated_val, id)) + db.commit() + + conn_cursor = db.cursor() + conn_cursor.execute( + "SELECT id, password, extra, is_encrypted, is_extra_encrypted FROM " + "connection") + rows = conn_cursor.fetchall() + for row in rows: + id = row[0] + password = row[1] + extra = row[2] + is_encrypted = row[3] + is_extra_encrypted = row[4] + if is_encrypted: + updated_password = new_fernet_key.encrypt( + old_fernet_key.decrypt(bytes(password))).decode() + conn_cursor.execute( + "UPDATE connection SET password=%s WHERE id=%s", + (updated_password, id)) + if is_extra_encrypted: + updated_extra = new_fernet_key.encrypt( + old_fernet_key.decrypt(bytes(extra))).decode() + conn_cursor.execute( + "UPDATE connection SET extra=%s WHERE id=%s", + (updated_extra, id)) + db.commit() + + def import_data( - sql_client, service_account_key, project, instance, gcs_bucket, filename + sql_client, + service_account_key, + project, + instance, + gcs_bucket, + filename, + old_fernet_key, + new_fernet_key ): tmp_dir_name = None fuse_dir = None @@ -383,7 +492,6 @@ def import_data( if subprocess.call(["mkdir", fuse_dir]): print("Failed to make temporary subdir {}.".format(fuse_dir)) sys.exit(1) - print(str(["gcsfuse", gcs_bucket, fuse_dir])) if subprocess.call(["gcsfuse", gcs_bucket, fuse_dir]): print( "Failed to fuse bucket {} with temp local directory {}".format( @@ -424,9 +532,11 @@ def import_data( ): print("Failed to import database.") sys.exit(1) + print("Reencrypting variables and connections.") + reencrypt_variables_connections(old_fernet_key, new_fernet_key) print("Database import succeeded.") - except Exception: - print("Failed to copy database.") + except Exception as exc: + print("Failed to copy database: {}".format(str(exc))) sys.exit(1) finally: if proxy_subprocess: @@ -522,6 +632,9 @@ def copy_database(project, existing_env, new_env, running_as_service_account): gcs_db_dump_bucket.name, "db_dump.sql", ) + print("Obtaining fernet keys for Composer Environments.") + old_fernet_key = get_fernet_key(existing_env) + new_fernet_key = get_fernet_key(new_env) print("Preparing database import to new Environment.") import_data( sql_client, @@ -530,6 +643,8 @@ def copy_database(project, existing_env, new_env, running_as_service_account): new_sql_instance, gcs_db_dump_bucket.name, "db_dump.sql", + old_fernet_key, + new_fernet_key, ) finally: if gke_service_account_key: @@ -542,7 +657,7 @@ def copy_database(project, existing_env, new_env, running_as_service_account): ) if gcs_db_dump_bucket: print("Deleting temporary Cloud Storage bucket.") - # delete_bucket(gcs_db_dump_bucket) + delete_bucket(gcs_db_dump_bucket) def copy_gcs_bucket(existing_env, new_env): diff --git a/composer/tools/requirements.txt b/composer/tools/requirements.txt index 906e1a24e50..3f821b30abf 100644 --- a/composer/tools/requirements.txt +++ b/composer/tools/requirements.txt @@ -1,3 +1,6 @@ +cryptography==2.3.1 google-api-python-client==1.6.4 google-auth==1.5.1 google-cloud-storage==1.11.0 +kubernetes==7.0.0 +mysql-connector-python==8.0.12