diff --git a/composer/workflows/airflow_db_cleanup.py b/composer/workflows/airflow_db_cleanup.py index 65cc48c4688..4b8f0105a76 100644 --- a/composer/workflows/airflow_db_cleanup.py +++ b/composer/workflows/airflow_db_cleanup.py @@ -53,7 +53,6 @@ import airflow from airflow import settings -from airflow.jobs.base_job import BaseJob from airflow.models import ( DAG, DagModel, @@ -101,13 +100,6 @@ # List of all the objects that will be deleted. Comment out the DB objects you # want to skip. DATABASE_OBJECTS = [ - { - "airflow_db_model": BaseJob, - "age_check_column": BaseJob.latest_heartbeat, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None, - }, { "airflow_db_model": DagRun, "age_check_column": DagRun.execution_date, @@ -228,6 +220,35 @@ except Exception as e: logging.error(e) +if AIRFLOW_VERSION < ["2", "6", "0"]: + try: + from airflow.jobs.base_job import BaseJob + DATABASE_OBJECTS.append( + { + "airflow_db_model": BaseJob, + "age_check_column": BaseJob.latest_heartbeat, + "keep_last": False, + "keep_last_filters": None, + "keep_last_group_by": None, + } + ) + except Exception as e: + logging.error(e) +else: + try: + from airflow.jobs.job import Job + DATABASE_OBJECTS.append( + { + "airflow_db_model": Job, + "age_check_column": Job.latest_heartbeat, + "keep_last": False, + "keep_last_filters": None, + "keep_last_group_by": None, + } + ) + except Exception as e: + logging.error(e) + default_args = { "owner": DAG_OWNER_NAME, "depends_on_past": False,