diff --git a/api/py/ai/chronon/workflow/README.md b/api/py/ai/chronon/workflow/README.md new file mode 100644 index 000000000..102afd36e --- /dev/null +++ b/api/py/ai/chronon/workflow/README.md @@ -0,0 +1,63 @@ +# Orchestrators for Batch flows +## Airflow operator +```python +from airflow import DAG +from datetime import datetime +from ai.chronon.workflow.spark_submit import ChrononSparkSubmitOperator +from ai.chronon.repo.types import GroupBy, Aggregation, Operation, Window, TimeUnit + +default_args = { +"owner": "airflow", +"start_date": datetime(2025, 2, 16), +"retries": 1 +} + +dag = DAG("chronon_spark_job", default_args=default_args, schedule_interval="@daily") + +# Define a Chronon Thrift GroupBy object +group_by_job = GroupBy( + sources=["datasetA"], + keys=["user_id"], + aggregations=[ + Aggregation(input_column="purchase_amount", operation=Operation.SUM), + Aggregation(input_column="purchase_amount", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)]) + ], + online=True +) + +chronon_task = ChrononSparkSubmitOperator( + task_id="chronon_spark_submit", + thrift_obj=group_by_job, + application='ai.chronon:spark_uber_2.12:0.0.8', + conf={ + 'spark.jars.packages': 'ai.chronon:spark_uber_2.12:0.0.8' # Ensure this package is available + }, + dag=dag +) +``` + +## Dagster pipeline +```python +from dagster import Definitions +from ai.chronon.api.ttypes import GroupBy, Aggregation, Operation, Window, TimeUnit +from ai.chronon.workflow.spark_asset import chronon_spark_job + +# Define a Chronon Thrift GroupBy object +group_by_job = GroupBy( + sources=["datasetA"], + keys=["user_id"], + aggregations=[ + Aggregation(input_column="purchase_amount", operation=Operation.SUM), + Aggregation(input_column="purchase_amount", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)]) + ], + online=True +) + +defs = Definitions( + assets=[chronon_spark_job], + resources={ + "thrift_obj": group_by_job + } +) + +``` diff --git a/api/py/ai/chronon/workflow/spark_asset.py b/api/py/ai/chronon/workflow/spark_asset.py new file mode 100644 index 000000000..75aa057ae --- /dev/null +++ b/api/py/ai/chronon/workflow/spark_asset.py @@ -0,0 +1,53 @@ +from dagster import asset, AssetExecutionContext +from dagster_spark import SparkSubmitTaskDefinition +from thrift.protocol import TJSONProtocol +from thrift.transport import TTransport +from ai.chronon.repo.validator import Validator + +def serialize_thrift_to_json(thrift_obj): + """ + Serializes a Thrift object to JSON for Spark. + """ + transport = TTransport.TMemoryBuffer() + protocol = TJSONProtocol.TJSONProtocol(transport) + thrift_obj.write(protocol) + return transport.getvalue().decode("utf-8") + +def validate_thrift_obj(thrift_obj): + """ + Validates the Chronon Thrift object using Chronon's Validator. + """ + validator = Validator() + errors = validator.validate(thrift_obj) + if errors: + raise ValueError(f"Chronon config validation failed: {errors}") + +@asset +def chronon_spark_job(context: AssetExecutionContext, thrift_obj): + """ + Dagster asset that: + - Validates a Chronon Thrift object. + - Serializes it to JSON. + - Submits a Spark job using SparkSubmitTaskDefinition. + """ + + # Step 1: Validate + context.log.info("Validating Chronon Thrift object...") + validate_thrift_obj(thrift_obj) + + # Step 2: Serialize to JSON + context.log.info("Serializing Thrift object to JSON...") + serialized_json = serialize_thrift_to_json(thrift_obj) + + # Step 3: Submit Spark job + context.log.info("Submitting Spark job...") + spark_task = SparkSubmitTaskDefinition( + name="chronon_spark_submit", + application="ai.chronon:spark_uber_2.12:0.0.8", + application_args=[serialized_json], + spark_conf={ + "spark.jars.packages": "ai.chronon:spark_uber_2.12:0.0.8", + } + ) + + return spark_task.execute(context) diff --git a/api/py/ai/chronon/workflow/spark_submit.py b/api/py/ai/chronon/workflow/spark_submit.py new file mode 100644 index 000000000..859167b89 --- /dev/null +++ b/api/py/ai/chronon/workflow/spark_submit.py @@ -0,0 +1,58 @@ +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from ai.chronon.repo.validator import Validator +from thrift.protocol import TJSONProtocol +from thrift.transport import TTransport + + +class ChrononSparkSubmitOperator(SparkSubmitOperator): + """ + Custom SparkSubmitOperator for Chronon that: + - Validates Chronon job configuration. + - Serializes the input Thrift object to JSON. + - Submits the Spark job. + """ + + def __init__(self, thrift_obj, *args, **kwargs): + """ + :param thrift_obj: A Chronon Thrift object (e.g., GroupBy, Join, Staging). + """ + self.thrift_obj = thrift_obj + super().__init__(*args, **kwargs) + + def validate_config(self): + """ + Validates the Chronon Thrift object using Chronon's Validator. + """ + validator = Validator() + errors = validator.validate(self.thrift_obj) + + if errors: + raise ValueError(f"Chronon config validation failed: {errors}") + + def serialize_to_json(self): + """ + Serializes the Chronon Thrift object to JSON for Spark. + """ + transport = TTransport.TMemoryBuffer() + protocol = TJSONProtocol.TJSONProtocol(transport) + self.thrift_obj.write(protocol) + return transport.getvalue().decode("utf-8") + + def execute(self, context): + """ + Overrides SparkSubmitOperator execute method: + 1. Validate the Thrift object. + 2. Serialize it to JSON. + 3. Pass JSON to SparkSubmitOperator. + """ + self.log.info("Validating Chronon Thrift object...") + self.validate_config() + + self.log.info("Serializing Thrift object to JSON...") + serialized_json = self.serialize_to_json() + + # Pass serialized JSON as an argument to the Spark job + self._application_args = [serialized_json] + + self.log.info("Submitting Spark job with Chronon config...") + super().execute(context)