Skip to content

Commit ee82077

Browse files
committed
first cut
1 parent f95dd2c commit ee82077

File tree

3 files changed

+175
-0
lines changed

3 files changed

+175
-0
lines changed

api/py/ai/chronon/workflow/README.md

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Orchestrators for Batch flows
2+
## Airflow operator
3+
```python
4+
from airflow import DAG
5+
from datetime import datetime
6+
from my_custom_operators import ChrononSparkSubmitOperator
7+
from ai.chronon.repo.types import GroupBy, Aggregation, Operation, Window, TimeUnit
8+
9+
default_args = {
10+
"owner": "airflow",
11+
"start_date": datetime(2025, 2, 16),
12+
"retries": 1
13+
}
14+
15+
dag = DAG("chronon_spark_job", default_args=default_args, schedule_interval="@daily")
16+
17+
# Define a Chronon Thrift GroupBy object
18+
group_by_job = GroupBy(
19+
sources=["datasetA"],
20+
keys=["user_id"],
21+
aggregations=[
22+
Aggregation(input_column="purchase_amount", operation=Operation.SUM),
23+
Aggregation(input_column="purchase_amount", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)])
24+
],
25+
online=True
26+
)
27+
28+
chronon_task = ChrononSparkSubmitOperator(
29+
task_id="chronon_spark_submit",
30+
thrift_obj=group_by_job,
31+
application='ai.chronon:spark_uber_2.12:0.0.8',
32+
conf={
33+
'spark.jars.packages': 'ai.chronon:spark_uber_2.12:0.0.8' # Ensure this package is available
34+
},
35+
dag=dag
36+
)
37+
```
38+
39+
## Dagstar pipeline
40+
```python
41+
from dagster import Definitions
42+
from ai.chronon.repo.types import GroupBy, Aggregation, Operation, Window, TimeUnit
43+
from chronon_spark_asset import chronon_spark_job
44+
45+
# Define a Chronon Thrift GroupBy object
46+
group_by_job = GroupBy(
47+
sources=["datasetA"],
48+
keys=["user_id"],
49+
aggregations=[
50+
Aggregation(input_column="purchase_amount", operation=Operation.SUM),
51+
Aggregation(input_column="purchase_amount", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)])
52+
],
53+
online=True
54+
)
55+
56+
defs = Definitions(
57+
assets=[chronon_spark_job],
58+
resources={
59+
"thrift_obj": group_by_job
60+
}
61+
)
62+
63+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from dagster import asset, AssetExecutionContext
2+
from dagster_spark import SparkSubmitTaskDefinition
3+
from thrift.protocol import TJSONProtocol
4+
from thrift.transport import TTransport
5+
from ai.chronon.repo.validator import Validator
6+
7+
def serialize_thrift_to_json(thrift_obj):
8+
"""
9+
Serializes a Thrift object to JSON for Spark.
10+
"""
11+
transport = TTransport.TMemoryBuffer()
12+
protocol = TJSONProtocol.TJSONProtocol(transport)
13+
thrift_obj.write(protocol)
14+
return transport.getvalue().decode("utf-8")
15+
16+
def validate_thrift_obj(thrift_obj):
17+
"""
18+
Validates the Chronon Thrift object using Chronon's Validator.
19+
"""
20+
validator = Validator()
21+
errors = validator.validate(thrift_obj)
22+
if errors:
23+
raise ValueError(f"Chronon config validation failed: {errors}")
24+
25+
@asset
26+
def chronon_spark_job(context: AssetExecutionContext, thrift_obj):
27+
"""
28+
Dagster asset that:
29+
- Validates a Chronon Thrift object.
30+
- Serializes it to JSON.
31+
- Submits a Spark job using SparkSubmitTaskDefinition.
32+
"""
33+
34+
# Step 1: Validate
35+
context.log.info("Validating Chronon Thrift object...")
36+
validate_thrift_obj(thrift_obj)
37+
38+
# Step 2: Serialize to JSON
39+
context.log.info("Serializing Thrift object to JSON...")
40+
serialized_json = serialize_thrift_to_json(thrift_obj)
41+
42+
# Step 3: Submit Spark job
43+
context.log.info("Submitting Spark job...")
44+
spark_task = SparkSubmitTaskDefinition(
45+
name="chronon_spark_submit",
46+
application="ai.chronon:spark_uber_2.12:0.0.8",
47+
application_args=[serialized_json],
48+
spark_conf={
49+
"spark.jars.packages": "ai.chronon:spark_uber_2.12:0.0.8",
50+
}
51+
)
52+
53+
return spark_task.execute(context)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
2+
from ai.chronon.repo.validator import Validator
3+
from thrift.protocol import TJSONProtocol
4+
from thrift.transport import TTransport
5+
import json
6+
7+
8+
class ChrononSparkSubmitOperator(SparkSubmitOperator):
9+
"""
10+
Custom SparkSubmitOperator for Chronon that:
11+
- Validates Chronon job configuration.
12+
- Serializes the input Thrift object to JSON.
13+
- Submits the Spark job.
14+
"""
15+
16+
def __init__(self, thrift_obj, *args, **kwargs):
17+
"""
18+
:param thrift_obj: A Chronon Thrift object (e.g., GroupBy, Join, Staging).
19+
"""
20+
self.thrift_obj = thrift_obj
21+
super().__init__(*args, **kwargs)
22+
23+
def validate_config(self):
24+
"""
25+
Validates the Chronon Thrift object using Chronon's Validator.
26+
"""
27+
validator = Validator()
28+
errors = validator.validate(self.thrift_obj)
29+
30+
if errors:
31+
raise ValueError(f"Chronon config validation failed: {errors}")
32+
33+
def serialize_to_json(self):
34+
"""
35+
Serializes the Chronon Thrift object to JSON for Spark.
36+
"""
37+
transport = TTransport.TMemoryBuffer()
38+
protocol = TJSONProtocol.TJSONProtocol(transport)
39+
self.thrift_obj.write(protocol)
40+
return transport.getvalue().decode("utf-8")
41+
42+
def execute(self, context):
43+
"""
44+
Overrides SparkSubmitOperator execute method:
45+
1. Validate the Thrift object.
46+
2. Serialize it to JSON.
47+
3. Pass JSON to SparkSubmitOperator.
48+
"""
49+
self.log.info("Validating Chronon Thrift object...")
50+
self.validate_config()
51+
52+
self.log.info("Serializing Thrift object to JSON...")
53+
serialized_json = self.serialize_to_json()
54+
55+
# Pass serialized JSON as an argument to the Spark job
56+
self._application_args = [serialized_json]
57+
58+
self.log.info("Submitting Spark job with Chronon config...")
59+
super().execute(context)

0 commit comments

Comments
 (0)