Skip to content

Commit 05d9885

Browse files
committed
feat: add support for AWS Athena
1 parent 44e458d commit 05d9885

File tree

10 files changed

+279
-4
lines changed

10 files changed

+279
-4
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
11+
## [0.10.34] - 2025-08-06
12+
1013
### Added
1114

12-
- `datacontract test` now supports testing HTTP APIs.
15+
- `datacontract test` now supports HTTP APIs.
16+
- `datacontract test` now supports Athena.
1317

1418
### Fixed
1519

README.md

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ A list of available extras:
222222

223223
| Dependency | Installation Command |
224224
|-------------------------|--------------------------------------------|
225+
| Amazon Athena | `pip install datacontract-cli[athena]` |
225226
| Avro Support | `pip install datacontract-cli[avro]` |
226227
| Google BigQuery | `pip install datacontract-cli[bigquery]` |
227228
| Databricks Integration | `pip install datacontract-cli[databricks]` |
@@ -366,6 +367,7 @@ Credentials are provided with environment variables.
366367
Supported server types:
367368

368369
- [s3](#S3)
370+
- [athena](#athena)
369371
- [bigquery](#bigquery)
370372
- [azure](#azure)
371373
- [sqlserver](#sqlserver)
@@ -436,6 +438,41 @@ servers:
436438
| `DATACONTRACT_S3_SESSION_TOKEN` | `AQoDYXdzEJr...` | AWS temporary session token (optional) |
437439

438440

441+
#### Athena
442+
443+
Data Contract CLI can test data in AWS Athena stored in S3.
444+
Supports different file formats, such as Iceberg, Parquet, JSON, CSV...
445+
446+
##### Example
447+
448+
datacontract.yaml
449+
```yaml
450+
servers:
451+
athena:
452+
type: athena
453+
catalog: awsdatacatalog # awsdatacatalog is the default setting
454+
schema: icebergdemodb # in Athena, this is called "database"
455+
regionName: eu-central-1
456+
stagingDir: s3://my-bucket/athena-results/
457+
models:
458+
my_table: # corresponds to a table of view name
459+
type: table
460+
fields:
461+
my_column_1: # corresponds to a column
462+
type: string
463+
config:
464+
physicalType: varchar
465+
```
466+
467+
##### Environment Variables
468+
469+
| Environment Variable | Example | Description |
470+
|-------------------------------------|---------------------------------|----------------------------------------|
471+
| `DATACONTRACT_S3_REGION` | `eu-central-1` | Region of Athena service |
472+
| `DATACONTRACT_S3_ACCESS_KEY_ID` | `AKIAXV5Q5QABCDEFGH` | AWS Access Key ID |
473+
| `DATACONTRACT_S3_SECRET_ACCESS_KEY` | `93S7LRrJcqLaaaa/XXXXXXXXXXXXX` | AWS Secret Access Key |
474+
| `DATACONTRACT_S3_SESSION_TOKEN` | `AQoDYXdzEJr...` | AWS temporary session token (optional) |
475+
439476

440477
#### Google Cloud Storage (GCS)
441478

@@ -898,8 +935,10 @@ models:
898935
│ --engine TEXT [engine] The engine used for great │
899936
│ expection run. │
900937
[default: None]
901-
│ --template PATH [custom] The file path of Jinja │
902-
│ template. │
938+
│ --template PATH The file path or URL of a template. │
939+
│ For Excel format: path/URL to custom │
940+
│ Excel template. For custom format: │
941+
│ path to Jinja template. │
903942
[default: None]
904943
│ --help Show this message and exit. │
905944
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯

datacontract/engines/soda/check_soda_execute.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import typing
33
import uuid
44

5+
from datacontract.engines.soda.connections.athena import to_athena_soda_configuration
6+
57
if typing.TYPE_CHECKING:
68
from pyspark.sql import SparkSession
79

@@ -106,6 +108,10 @@ def check_soda_execute(
106108
soda_configuration_str = to_trino_soda_configuration(server)
107109
scan.add_configuration_yaml_str(soda_configuration_str)
108110
scan.set_data_source_name(server.type)
111+
elif server.type == "athena":
112+
soda_configuration_str = to_athena_soda_configuration(server)
113+
scan.add_configuration_yaml_str(soda_configuration_str)
114+
scan.set_data_source_name(server.type)
109115

110116
else:
111117
run.checks.append(
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import os
2+
3+
import yaml
4+
5+
from datacontract.model.exceptions import DataContractException
6+
7+
8+
def to_athena_soda_configuration(server):
9+
s3_region = os.getenv("DATACONTRACT_S3_REGION")
10+
s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID")
11+
s3_secret_access_key = os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY")
12+
s3_session_token = os.getenv("DATACONTRACT_S3_SESSION_TOKEN")
13+
14+
# Validate required parameters
15+
if not s3_access_key_id:
16+
raise DataContractException(
17+
type="athena-connection",
18+
name="missing_access_key_id",
19+
reason="AWS access key ID is required. Set the DATACONTRACT_S3_ACCESS_KEY_ID environment variable.",
20+
engine="datacontract",
21+
)
22+
23+
if not s3_secret_access_key:
24+
raise DataContractException(
25+
type="athena-connection",
26+
name="missing_secret_access_key",
27+
reason="AWS secret access key is required. Set the DATACONTRACT_S3_SECRET_ACCESS_KEY environment variable.",
28+
engine="datacontract",
29+
)
30+
31+
if not hasattr(server, "schema_") or not server.schema_:
32+
raise DataContractException(
33+
type="athena-connection",
34+
name="missing_schema",
35+
reason="Schema is required for Athena connection. Specify the schema where your tables exist in the server configuration.",
36+
engine="datacontract",
37+
)
38+
39+
if not hasattr(server, "stagingDir") or not server.stagingDir:
40+
raise DataContractException(
41+
type="athena-connection",
42+
name="missing_s3_staging_dir",
43+
reason="S3 staging directory is required for Athena connection. This should be the Amazon S3 Query Result Location (e.g., 's3://my-bucket/athena-results/').",
44+
engine="datacontract",
45+
)
46+
47+
# Validate S3 staging directory format
48+
if not server.stagingDir.startswith("s3://"):
49+
raise DataContractException(
50+
type="athena-connection",
51+
name="invalid_s3_staging_dir",
52+
reason=f"S3 staging directory must start with 's3://'. Got: {server.s3_staging_dir}. Example: 's3://my-bucket/athena-results/'",
53+
engine="datacontract",
54+
)
55+
56+
data_source = {
57+
"type": "athena",
58+
"access_key_id": s3_access_key_id,
59+
"secret_access_key": s3_secret_access_key,
60+
"schema": server.schema_,
61+
"staging_dir": server.stagingDir,
62+
}
63+
64+
if s3_region:
65+
data_source["region_name"] = s3_region
66+
elif server.region_name:
67+
data_source["region_name"] = server.region_name
68+
69+
if server.catalog:
70+
# Optional, Identify the name of the Data Source, also referred to as a Catalog. The default value is `awsdatacatalog`.
71+
data_source["catalog"] = server.catalog
72+
73+
if s3_session_token:
74+
data_source["aws_session_token"] = s3_session_token
75+
76+
soda_configuration = {f"data_source {server.type}": data_source}
77+
78+
soda_configuration_str = yaml.dump(soda_configuration)
79+
return soda_configuration_str

datacontract/export/sql_type_converter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44

55
def convert_to_sql_type(field: Field, server_type: str) -> str:
6+
if field.config and "physicalType" in field.config:
7+
return field.config["physicalType"]
8+
69
if server_type == "snowflake":
710
return convert_to_snowflake(field)
811
elif server_type == "postgres":
@@ -19,6 +22,7 @@ def convert_to_sql_type(field: Field, server_type: str) -> str:
1922
return convert_type_to_bigquery(field)
2023
elif server_type == "trino":
2124
return convert_type_to_trino(field)
25+
2226
return field.type
2327

2428

datacontract/imports/odcs_v3_importer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def import_servers(odcs: OpenDataContractStandard) -> Dict[str, Server] | None:
131131
server.host = odcs_server.host
132132
server.port = odcs_server.port
133133
server.catalog = odcs_server.catalog
134+
server.stagingDir = odcs_server.stagingDir
134135
server.topic = getattr(odcs_server, "topic", None)
135136
server.http_path = getattr(odcs_server, "http_path", None)
136137
server.token = getattr(odcs_server, "token", None)

pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ sqlserver = [
9292
"soda-core-sqlserver>=3.3.20,<3.6.0"
9393
]
9494

95+
athena = [
96+
"soda-core-athena>=3.3.20,<3.6.0"
97+
]
98+
9599
trino = [
96100
"soda-core-trino>=3.3.20,<3.6.0"
97101
]
@@ -122,7 +126,7 @@ protobuf = [
122126
]
123127

124128
all = [
125-
"datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf]"
129+
"datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf]"
126130
]
127131

128132
# for development, we pin all libraries to an exact version
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
Setup:
2+
3+
# Create an S3 bucket for Iceberg data
4+
5+
6+
```
7+
s3://datacontract-iceberg-demo
8+
```
9+
10+
# Create an S3 bucket for Athena Results
11+
12+
```
13+
s3://entropy-data-demo-athena-results-dfhsiuya
14+
```
15+
16+
# Create a Glue database
17+
18+
In Athena run:
19+
```
20+
CREATE DATABASE icebergdemodb
21+
```
22+
23+
# Create an Iceberg table
24+
In Athena run:
25+
```
26+
CREATE TABLE athena_iceberg_table_partitioned (
27+
color string,
28+
date string,
29+
name string,
30+
price bigint,
31+
product string,
32+
ts timestamp)
33+
PARTITIONED BY (day(ts))
34+
LOCATION 's3://datacontract-iceberg-demo/ice_warehouse/iceberg_db/athena_iceberg_table/'
35+
TBLPROPERTIES (
36+
'table_type' ='ICEBERG'
37+
)
38+
```
39+
40+
# Add some data to the Iceberg table
41+
42+
In Athena run:
43+
```
44+
INSERT INTO "icebergdemodb"."athena_iceberg_table_partitioned" VALUES (
45+
'red', '222022-07-19T03:47:29', 'PersonNew', 178, 'Tuna', now()
46+
)
47+
```
48+
49+
# Add a new IAM user
50+
No permissions needed
51+
52+
E.g. `datacontract-cli-unittests`
53+
54+
# Create an Access Key for this IAM user
55+
56+
Use type `other`
57+
Save them in .env file
58+
```
59+
DATACONTRACT_S3_ACCESS_KEY_ID=AKIA...
60+
DATACONTRACT_S3_SECRET_ACCESS_KEY=...
61+
```
62+
63+
# Give permissions to the IAM user
64+
65+
In Glue ->
66+
https://eu-central-1.console.aws.amazon.com/glue/home?region=eu-central-1#/v2/iam-permissions/select-users
67+
68+
Select the S3 bucket
69+
70+
Create the standard role `AWSGlueServiceRole`
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
apiVersion: v3.0.1
2+
kind: DataContract
3+
id: iceberg-example
4+
name: Iceberg Example
5+
version: 0.0.1
6+
status: active
7+
customProperties:
8+
- property: owner
9+
value: data--ai
10+
description: {}
11+
servers:
12+
- server: athena
13+
type: athena
14+
description: Iceberg files on S3
15+
catalog: awsdatacatalog # awsdatacatalog is the default catalog in Athena
16+
schema: icebergdemodb # called database in Athena
17+
regionName: eu-central-1
18+
stagingDir: s3://entropy-data-demo-athena-results-dfhsiuya/cli
19+
schema:
20+
- name: athena_iceberg_table_partitioned
21+
logicalType: object
22+
properties:
23+
- name: color
24+
logicalType: string
25+
required: true
26+
unique: true
27+
physicalType: varchar
28+
- name: date
29+
logicalType: string
30+
physicalType: varchar
31+
- name: name
32+
logicalType: string
33+
physicalType: varchar
34+
- name: price
35+
logicalType: integer
36+
physicalType: bigint
37+
- name: product
38+
logicalType: string
39+
physicalType: varchar
40+
- name: ts
41+
logicalType: date
42+
physicalType: timestamp(6)

tests/test_test_athena_iceberg.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import logging
2+
import os
3+
4+
import pytest
5+
from dotenv import load_dotenv
6+
7+
from datacontract.data_contract import DataContract
8+
9+
logging.basicConfig(level=logging.INFO, force=True)
10+
load_dotenv(override=True)
11+
datacontract = "fixtures/athena-iceberg/iceberg_example.odcs.yaml"
12+
13+
14+
@pytest.mark.skipif(
15+
os.environ.get("DATACONTRACT_S3_ACCESS_KEY_ID") is None
16+
or os.environ.get("DATACONTRACT_S3_SECRET_ACCESS_KEY") is None,
17+
reason="Requires DATACONTRACT_S3_ACCESS_KEY_ID, and DATACONTRACT_S3_SECRET_ACCESS_KEY to be set",
18+
)
19+
def test_test_athena_iceberg(monkeypatch):
20+
data_contract = DataContract(data_contract_file=datacontract)
21+
22+
run = data_contract.test()
23+
24+
print(run.pretty())
25+
assert run.result == "passed"
26+
assert all(check.result == "passed" for check in run.checks)

0 commit comments

Comments
 (0)