Skip to content

Spike/argocd demo #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 37 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b6f5ae5
wip
maltesander Apr 11, 2025
3e983a0
argocd / airflow stack working
maltesander Apr 11, 2025
d64a417
added spark op
maltesander Apr 11, 2025
64daa28
wip
maltesander Apr 11, 2025
15612f7
fix sealed secret location
maltesander Apr 11, 2025
e81624d
fix demo branches
maltesander Apr 11, 2025
822dee5
fixes
maltesander Apr 11, 2025
4bf2626
add role and binding for airflow / spark
maltesander Apr 11, 2025
b1e33bc
remove ns
maltesander Apr 11, 2025
f8ab043
test minio
maltesander Apr 11, 2025
3d8e664
fix sync policy
maltesander Apr 11, 2025
150fd76
testing
maltesander Apr 11, 2025
f05edcd
add airflow logs minio
maltesander Apr 12, 2025
3cedcfa
fixes
maltesander Apr 12, 2025
871d459
extend cert expiry to 10 years
maltesander Apr 13, 2025
e486d9c
split stack & demo
maltesander Apr 13, 2025
46f48b4
install all operators
maltesander Apr 13, 2025
5dca1f8
fixes
maltesander Apr 13, 2025
95f9e5a
use sealed secrets for minio / postgres
maltesander Apr 13, 2025
74b893c
add zookeeper
maltesander Apr 13, 2025
57f70cc
fix path
maltesander Apr 13, 2025
cbdb400
fix path 2
maltesander Apr 13, 2025
0b43683
fix secret name
maltesander Apr 13, 2025
3fa0691
fix credentials
maltesander Apr 13, 2025
d2b82f7
attempt to fix secret
maltesander Apr 13, 2025
a7a95fc
seal minio connection
maltesander Apr 13, 2025
450775e
fix secret
maltesander Apr 13, 2025
9319fcf
try fix postgres secret
maltesander Apr 13, 2025
84ee9a5
fix env override
maltesander Apr 13, 2025
4846f00
fix overrides
maltesander Apr 13, 2025
8f7d766
fix container name
maltesander Apr 13, 2025
1c8b5a5
fix overrides
maltesander Apr 13, 2025
4fcc2a1
enable gitsync
maltesander Apr 13, 2025
f52cb08
fix git sync
maltesander Apr 13, 2025
32011e8
move yaml out of dags git sync
maltesander Apr 13, 2025
2efbab2
set resources
maltesander Apr 13, 2025
945cbf2
linter
maltesander Apr 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions demos/argo-cd/applications/airflow-postgres.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: airflow-postgres
spec:
project: airflow
destination:
server: https://kubernetes.default.svc
namespace: stackable-airflow
sources:
- repoURL: "registry-1.docker.io/bitnamicharts"
path: postgresql
# helm inspect chart oci://registry-1.docker.io/bitnamicharts/postgresql
targetRevision: 16.6.3 # 17.4.0
chart: postgresql
helm:
# TODO this breaks naming as long as we use the airflow stack yaml which needs this svc name
releaseName: postgresql-airflow
valuesObject:
commonLabels:
stackable.tech/vendor: Stackable
auth:
database: airflow
username: airflow
existingSecret: postgresql-credentials
- repoURL: https://github.com/stackabletech/demos.git
# TODO: adapt to release-25.3
targetRevision: "spike/argocd-demo"
path: demos/argo-cd/manifests/airflow-postgres/
syncPolicy:
syncOptions:
- CreateNamespace=true
automated:
selfHeal: true
prune: true
21 changes: 21 additions & 0 deletions demos/argo-cd/applications/airflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: airflow
spec:
project: airflow
destination:
server: https://kubernetes.default.svc
namespace: stackable-airflow
source:
repoURL: https://github.com/stackabletech/demos.git
# TODO: adapt to release-25.3
targetRevision: "spike/argocd-demo"
path: demos/argo-cd/manifests/airflow/
syncPolicy:
syncOptions:
- CreateNamespace=true
automated:
selfHeal: true
prune: true
21 changes: 21 additions & 0 deletions demos/argo-cd/applications/minio.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: minio
spec:
project: minio
destination:
server: https://kubernetes.default.svc
namespace: minio
source:
repoURL: https://github.com/stackabletech/demos.git
# TODO: adapt to release-25.3
targetRevision: "spike/argocd-demo"
path: demos/argo-cd/manifests/minio/
syncPolicy:
syncOptions:
- CreateNamespace=true
automated:
selfHeal: true
prune: true
20 changes: 20 additions & 0 deletions demos/argo-cd/dags/date_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Example DAG returning the current date"""
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
dag_id='date_demo',
schedule_interval='0-59 * * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
dagrun_timeout=timedelta(minutes=5),
tags=['example'],
params={},
) as dag:

run_this = BashOperator(
task_id='run_every_minute',
bash_command='date',
)
177 changes: 177 additions & 0 deletions demos/argo-cd/dags/pyspark_pi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster"""
from datetime import datetime, timedelta
from airflow import DAG
from typing import TYPE_CHECKING, Optional, Sequence, Dict
from kubernetes import client
from airflow.exceptions import AirflowException
from airflow.sensors.base import BaseSensorOperator
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
import yaml
from airflow.utils import yaml
import os

if TYPE_CHECKING:
from airflow.utils.context import Context

class SparkKubernetesOperator(BaseOperator):
template_fields: Sequence[str] = ('application_file', 'namespace')
template_ext: Sequence[str] = ('.yaml', '.yml', '.json')
ui_color = '#f4a460'

def __init__(
self,
*,
application_file: str,
namespace: Optional[str] = None,
kubernetes_conn_id: str = 'kubernetes_in_cluster',
api_group: str = 'spark.stackable.tech',
api_version: str = 'v1alpha1',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.application_file = application_file
self.namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.api_group = api_group
self.api_version = api_version
self.plural = "sparkapplications"

def execute(self, context: 'Context'):
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
self.log.info("Creating SparkApplication...")
response = hook.create_custom_object(
group=self.api_group,
version=self.api_version,
plural=self.plural,
body=self.application_file,
namespace=self.namespace,
)
return response


class SparkKubernetesSensor(BaseSensorOperator):
template_fields = ("application_name", "namespace")
# See https://github.com/stackabletech/spark-k8s-operator/pull/460/files#diff-d737837121132af6b60f50279a78464b05dcfd06c05d1d090f4198a5e962b5f6R371
# Unknown is set immediately so it must be excluded from the failed states.
FAILURE_STATES = ("Failed")
SUCCESS_STATES = ("Succeeded")

def __init__(
self,
*,
application_name: str,
attach_log: bool = False,
namespace: Optional[str] = None,
kubernetes_conn_id: str = 'kubernetes_in_cluster',
api_group: str = 'spark.stackable.tech',
api_version: str = 'v1alpha1',
poke_interval: float = 60,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.application_name = application_name
self.attach_log = attach_log
self.namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
self.api_group = api_group
self.api_version = api_version
self.poke_interval = poke_interval

def _log_driver(self, application_state: str, response: dict) -> None:
if not self.attach_log:
return
status_info = response["status"]
if "driverInfo" not in status_info:
return
driver_info = status_info["driverInfo"]
if "podName" not in driver_info:
return
driver_pod_name = driver_info["podName"]
namespace = response["metadata"]["namespace"]
log_method = self.log.error if application_state in self.FAILURE_STATES else self.log.info
try:
log = ""
for line in self.hook.get_pod_logs(driver_pod_name, namespace=namespace):
log += line.decode()
log_method(log)
except client.rest.ApiException as e:
self.log.warning(
"Could not read logs for pod %s. It may have been disposed.\n"
"Make sure timeToLiveSeconds is set on your SparkApplication spec.\n"
"underlying exception: %s",
driver_pod_name,
e,
)

def poke(self, context: Dict) -> bool:
self.log.info("Poking: %s", self.application_name)
response = self.hook.get_custom_object(
group=self.api_group,
version=self.api_version,
plural="sparkapplications",
name=self.application_name,
namespace=self.namespace,
)
try:
application_state = response["status"]["phase"]
except KeyError:
self.log.debug(f"SparkApplication status could not be established: {response}")
return False
if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES:
self._log_driver(application_state, response)
if application_state in self.FAILURE_STATES:
raise AirflowException(f"SparkApplication failed with state: {application_state}")
elif application_state in self.SUCCESS_STATES:
self.log.info("SparkApplication ended successfully")
return True
else:
self.log.info("SparkApplication is still in state: %s", application_state)
return False

with DAG(
dag_id='sparkapp_dag',
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
dagrun_timeout=timedelta(minutes=60),
tags=['example'],
params={"example_key": "example_value"},
) as dag:

def load_body_to_dict(body):
try:
body_dict = yaml.safe_load(body)
except yaml.YAMLError as e:
raise AirflowException(f"Exception when loading resource definition: {e}\n")
return body_dict

yaml_path = os.path.join(os.environ.get('AIRFLOW__CORE__DAGS_FOLDER'), '../manifests/spark-k8s/pyspark_pi.yaml')

with open(yaml_path, 'r') as file:
crd = file.read()
with open('/run/secrets/kubernetes.io/serviceaccount/namespace', 'r') as file:
ns = file.read()

document=load_body_to_dict(crd)
application_name='pyspark-pi-'+datetime.utcnow().strftime('%Y%m%d%H%M%S')
document.update({'metadata': {'name': application_name, 'namespace': ns}})

t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace=ns,
application_file=document,
do_xcom_push=True,
dag=dag,
)

t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace=ns,
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
poke_interval=5,
dag=dag,
)

t1 >> t2
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
apiVersion: bitnami.com/v1alpha1
kind: SealedSecret
metadata:
creationTimestamp: null
name: postgresql-credentials
namespace: stackable-airflow
spec:
encryptedData:
database: AgAlOcdpjYAaeiHbIwmGM3lb7okU4DROr+4bs7oQBPBY924XdamqlSw4Y7V8svvHRqD5Hz8SRx0SY2dkLaBQZBaXKV8qaMaVdqX0zK8zvoIbw/bQFXMTFpPq/kUD/kBdTwGJdAd/VpdY1m4Dcn247AhsQuNbqcZ8xibH2IW1e41f1/5nidHY+0a0O2qrSjAU8RxBAuBlLS6ZZiT2vmgahe6CUarJ3Dzr8InZRmtgwT7F8Zzli6cqgOPDsUg768KwDOIxpO+tnOT9b57SAbLUn5SOedFbAodWgQAg913URTiPOCrt5Loyp9B4tQXv1s74WFmgPlEkQmavkIyQg3R2Yki0QhkFysOFtp41ilRFTvgauxV3irryTv/frgA+XJDwfYexG2whgWH/VbT/HHIMoHBuRD864lpWZ/qDdLhFx7A5Eflli+YO+xuOQP4eFEsWeVYtcBNEi8Epy86LBiDCttCYWJ96XEmriqTpBebjxpaihxfCnPEbq5bkLv9RhO9vgzgYPgceLvPJ9GSpSJFv2s+ohvvJYyeAiuNLRkPR++tHFaofCVnuIfbeFgS1WVi/F4TrNsq2JlNfUnrzGTkc1CIkx/u02cE7K+Smfi/Dz30iDE7A+TLxDz7pguozMHX0eQCYDJ4ehehkArfH/XXe/ZH7Hb5MV6nMV32ZxkNi4x5iMdIsuLUdXJHWqSi0Ue9cZFBsP2rqmr5S
password: AgB/x1/DNC9uYWCTH1xu17qSr9GrnTcQ4LD5BCTu/5wUd/TK2rWM8zHce0mF97e6km+FyT+G96utiRThutNkAOpzT7xrZwkp45F0HqF0aD+NOYMzMn4duVLaK7JXLm6u7187Ci6kFMkmZ+Npb94wX+iUa+kgTQTkMP1HsdgQj4c8gj1ldpnoUDQt8/wsTF/gEukXD14E0BbEMkRTPh8PzkiirlLVWCff3YauQVRKXL2yWa79Fo47zkZ/WMJXyB8PVBQot2OIDL1ygddTBklBoP6ts2MOo3G7blmQWu5cgS/+IAw837pEWpDg5+UnUkM1+OQ+52PWLRXPbiSMFFlcbZIHwuASx7lRRm482q+kfJBb4oALd7EPHmjcBg1lV1iR74NetJ2wkMMcqnvkITxw8U8To1ZKseBpNH7mBWB9u2lvpjT5EWMOveVLCq6NwU2IcdgHhUaMByrxq5I3u1k1Y1hZvM0txQp8Mr/Zas/EEJ/w3sN+i9O/JWIonRx84Y9OJxIQEXManz7LzzzIxXikCvenm/8oQMfPHa7w5Scdb6sEQKsFXZ+aMQMcUTp7xuZbYzuX/7CmVdPHlFOhAXGAbRE2J739uWmyMVQkJ9cHSKzbCJdXDD3+oJB42Xz37tJg08xvMGBTPJjJoKnXfHIPS2yQISg1VgfyWNg/T2LZLuqt5htpUbQsWsL6K9/z7qtsama5pUiVTsmu
postgres-password: AgBpGdazaNMQ2j2jI1sK5D1PCFvhXn/kH7Xhj2P2GgHesCxml2WN8B99dtk2O81BDt7IYNqadhwdXe+pmvJU48qkbfcmgE+Kbjbbdo8bv9GxMQcYPPH0ahqIfFAxNgMBHnbQ0Zxy+a3vzNiXLeD4FAwU0Pxa7kSwED0/EiyfmZcvph2J5JTi4FwhU2ITycMwsjQnt7KVw1SGWMZkro6HoJ9wGSj7GRSeHX7HEMBCehf8TOMEo1Emy6IqYHAywdcNgfBEoLvgUUFElBxYLshCP2NTTCU2o374Xw2fiErjiR7Q075Mshpxl4bqGf2HgGT44Lsn5KDupPNnV7FPx0xTk3OmmE5Ja5gUAeLYU3bF8B7DSm61ltQnkL+FQ5sy3tjCTsH3wyYbg4zVoQlJuyRXJdG48Y0aHkQHwMpZlZEkB8IWGZZ9Kd9UKTTFnS3odca8OhdWYQ28ARz4Mj0Jio0dEO/63EyCJZiL4/Ck4rj8I49Wz832GjHVkMNFqthK6jN+HmlGcKXRKRMFLyIrvH7mUv1EuOsvh2+qtrsGGWSYwfETmb1J/iUp7X2+KhPnn6GP/poXLKeeDtiRfRSXTP70vRDllOMgSRTorRxS44FD/ac3KVA4RlpA+Ho+XU4QhYnNx8Q6haTQINY8Yypw/JRAPO4l379EFZtFdCSrKZ9XmRPsSZkZPZZJSVUe2ByGs/SScOR1xf1pwdPjWoOd
username: AgC716fxKUrMXCsRsWeiSTliHKP8NrwBr0TD7/w2F1HO93pqjSVU5/6hKOiFdNllbyPRudSHAeYWqJhQxxl5tS9kSaEp29zkxAUnmNwfLL3MKdCohxz2LZzC7saaOdjDkX2x8jcy68STPupsdg5bX40D4tO/s8qXnWWZJrW0ht/J+orgjbnfHk51JBMRBh6WD43yKLOBTHGO5U/TqRKg20kn+jDm2hcdec8axcjn2E57B3wrL8tvUcVRElO0XOcLHHBrav/BumRkwS4bMGqa+Vcarrr8U9MuQXVUzbCuwt326q/Kg862ozYY9uWAVtUy+Yfb4SAcEUC9Y+Ioo776H3YIr6q8P6RI+lFNmz4Q8AFDN9D5IRNjQ/D73llHw944dYRJl8CVjoJJYvRXv8z+4GzL7O+eF+smx4EhcY3sFxjkNCv+s+EDbUXvQR0ULqTEc5WZlBqp9TQEvzBwN3CVYO0k76FQYdjoU6xb/32gALe6zMxOLTliR9axqDvCL3OCj97/HJy8RLL2sYH82NZ1sGHqfTkTKYGk24Zfzd+a+p0PDbU14ZBuXKWBDkzNaVdpLvWhtdBczarfVp4Z2b6IutbT2C43fcmi7WYJOtV5em1sGhKOtbEy+wC+r5GKr6qRV58jm88cwtqZgm6IirDt/bzJDqhPndBzW5iQZzkko/1Tr4y/6oaTleTcmkDlSg2Cn8q/R7MNYUjT
template:
metadata:
creationTimestamp: null
name: postgresql-credentials
namespace: stackable-airflow
type: Opaque
36 changes: 36 additions & 0 deletions demos/argo-cd/manifests/airflow/airflow-spark-clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: airflow-spark-clusterrole
rules:
- apiGroups:
- spark.stackable.tech
resources:
- sparkapplications
verbs:
- create
- get
- list
- apiGroups:
- airflow.stackable.tech
resources:
- airflowdbs
verbs:
- create
- get
- list
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- get
- watch
- list
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- list
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-spark-clusterrole-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: airflow-spark-clusterrole
subjects:
- apiGroup: rbac.authorization.k8s.io
kind: Group
name: system:serviceaccounts
Loading
Loading