Skip to content

Workflow operators for Batch scenarios #925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions api/py/ai/chronon/workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Orchestrators for Batch flows
## Airflow operator
```python
from airflow import DAG
from datetime import datetime
from ai.chronon.workflow.spark_submit import ChrononSparkSubmitOperator
from ai.chronon.repo.types import GroupBy, Aggregation, Operation, Window, TimeUnit

default_args = {
"owner": "airflow",
"start_date": datetime(2025, 2, 16),
"retries": 1
}

dag = DAG("chronon_spark_job", default_args=default_args, schedule_interval="@daily")

# Define a Chronon Thrift GroupBy object
group_by_job = GroupBy(
sources=["datasetA"],
keys=["user_id"],
aggregations=[
Aggregation(input_column="purchase_amount", operation=Operation.SUM),
Aggregation(input_column="purchase_amount", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)])
],
online=True
)

chronon_task = ChrononSparkSubmitOperator(
task_id="chronon_spark_submit",
thrift_obj=group_by_job,
application='ai.chronon:spark_uber_2.12:0.0.8',
conf={
'spark.jars.packages': 'ai.chronon:spark_uber_2.12:0.0.8' # Ensure this package is available
},
dag=dag
)
```

## Dagster pipeline
```python
from dagster import Definitions
from ai.chronon.api.ttypes import GroupBy, Aggregation, Operation, Window, TimeUnit
from ai.chronon.workflow.spark_asset import chronon_spark_job

# Define a Chronon Thrift GroupBy object
group_by_job = GroupBy(
sources=["datasetA"],
keys=["user_id"],
aggregations=[
Aggregation(input_column="purchase_amount", operation=Operation.SUM),
Aggregation(input_column="purchase_amount", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)])
],
online=True
)

defs = Definitions(
assets=[chronon_spark_job],
resources={
"thrift_obj": group_by_job
}
)

```
53 changes: 53 additions & 0 deletions api/py/ai/chronon/workflow/spark_asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from dagster import asset, AssetExecutionContext
from dagster_spark import SparkSubmitTaskDefinition
from thrift.protocol import TJSONProtocol
from thrift.transport import TTransport
from ai.chronon.repo.validator import Validator

def serialize_thrift_to_json(thrift_obj):
"""
Serializes a Thrift object to JSON for Spark.
"""
transport = TTransport.TMemoryBuffer()
protocol = TJSONProtocol.TJSONProtocol(transport)
thrift_obj.write(protocol)
return transport.getvalue().decode("utf-8")

def validate_thrift_obj(thrift_obj):
"""
Validates the Chronon Thrift object using Chronon's Validator.
"""
validator = Validator()
errors = validator.validate(thrift_obj)
if errors:
raise ValueError(f"Chronon config validation failed: {errors}")

@asset
def chronon_spark_job(context: AssetExecutionContext, thrift_obj):
"""
Dagster asset that:
- Validates a Chronon Thrift object.
- Serializes it to JSON.
- Submits a Spark job using SparkSubmitTaskDefinition.
"""

# Step 1: Validate
context.log.info("Validating Chronon Thrift object...")
validate_thrift_obj(thrift_obj)

# Step 2: Serialize to JSON
context.log.info("Serializing Thrift object to JSON...")
serialized_json = serialize_thrift_to_json(thrift_obj)

# Step 3: Submit Spark job
context.log.info("Submitting Spark job...")
spark_task = SparkSubmitTaskDefinition(
name="chronon_spark_submit",
application="ai.chronon:spark_uber_2.12:0.0.8",
application_args=[serialized_json],
spark_conf={
"spark.jars.packages": "ai.chronon:spark_uber_2.12:0.0.8",
}
)

return spark_task.execute(context)
58 changes: 58 additions & 0 deletions api/py/ai/chronon/workflow/spark_submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from ai.chronon.repo.validator import Validator
from thrift.protocol import TJSONProtocol
from thrift.transport import TTransport


class ChrononSparkSubmitOperator(SparkSubmitOperator):
"""
Custom SparkSubmitOperator for Chronon that:
- Validates Chronon job configuration.
- Serializes the input Thrift object to JSON.
- Submits the Spark job.
"""

def __init__(self, thrift_obj, *args, **kwargs):
"""
:param thrift_obj: A Chronon Thrift object (e.g., GroupBy, Join, Staging).
"""
self.thrift_obj = thrift_obj
super().__init__(*args, **kwargs)

def validate_config(self):
"""
Validates the Chronon Thrift object using Chronon's Validator.
"""
validator = Validator()
errors = validator.validate(self.thrift_obj)

if errors:
raise ValueError(f"Chronon config validation failed: {errors}")

def serialize_to_json(self):
"""
Serializes the Chronon Thrift object to JSON for Spark.
"""
transport = TTransport.TMemoryBuffer()
protocol = TJSONProtocol.TJSONProtocol(transport)
self.thrift_obj.write(protocol)
return transport.getvalue().decode("utf-8")

def execute(self, context):
"""
Overrides SparkSubmitOperator execute method:
1. Validate the Thrift object.
2. Serialize it to JSON.
3. Pass JSON to SparkSubmitOperator.
"""
self.log.info("Validating Chronon Thrift object...")
self.validate_config()

self.log.info("Serializing Thrift object to JSON...")
serialized_json = self.serialize_to_json()

# Pass serialized JSON as an argument to the Spark job
self._application_args = [serialized_json]

self.log.info("Submitting Spark job with Chronon config...")
super().execute(context)