Skip to content

Commit

Permalink
Job state (lifecycle management) (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejmaciejko-gid authored Mar 19, 2024
1 parent 1334cd4 commit 02cca86
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased]

- Handling job_state (running, suspended)

## [1.3.10] - 2024-03-06

- Handling flink job upgrade_mode (savepoint, stateless)
Expand Down
13 changes: 7 additions & 6 deletions dbt/adapters/flink/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
QueryHints,
QueryHintsParser,
QueryMode,
UpgradeMode,
JobState,
)

from flink.sqlgateway.client import FlinkSqlGatewayClient
Expand Down Expand Up @@ -103,25 +105,24 @@ def execute(self, sql: str, bindings: Optional[Sequence[Any]] = None) -> None:
sql = sql.format(*[self._convert_binding(binding) for binding in bindings])
self.last_query_hints: QueryHints = QueryHintsParser.parse(sql)
execution_config = self.last_query_hints.execution_config
start_from_savepoint = False
if execution_config:
if not self.last_query_hints.test_query:
with_savepoint = self.last_query_hints.upgrade_mode == "savepoint"
with_savepoint = self.last_query_hints.upgrade_mode == UpgradeMode.SAVEPOINT
savepoint_path = FlinkJobManager(self.session).stop_job(
execution_config, with_savepoint
)
if savepoint_path:
logger.debug("Savepoint path {}", savepoint_path)
execution_config[ExecutionConfig.SAVEPOINT_PATH] = savepoint_path
start_from_savepoint = True
if not start_from_savepoint:
logger.debug("Job starting without savepoint")
execution_config.pop(ExecutionConfig.SAVEPOINT_PATH, None)

if self.last_query_hints.drop_statement:
logger.debug("Executing drop statement: {}", self.last_query_hints.drop_statement)
FlinkCursor(self.session).execute(self.last_query_hints.drop_statement)

if JobState.SUSPENDED == self.last_query_hints.job_state:
logger.info("Job suspended")
return

self._set_query_mode()
logger.info("Executing statement:\n{}\nExecution config:\n{}", sql, execution_config)
operation_handle = FlinkSqlGatewayClient.execute_statement(
Expand Down
16 changes: 14 additions & 2 deletions dbt/adapters/flink/query_hints_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ class QueryMode(Enum):
STREAMING = "streaming"


class UpgradeMode(Enum):
SAVEPOINT = "savepoint"
STATELESS = "stateless"


class JobState(Enum):
RUNNING = "running"
SUSPENDED = "suspended"


class QueryHints:
fetch_max: Optional[int] = None
fetch_timeout_ms: Optional[int] = None
Expand Down Expand Up @@ -36,8 +46,10 @@ def __init__(self, hints=None):
self.execution_config[key_val[0]] = key_val[1]
if "drop_statement" in hints:
self.drop_statement = hints["drop_statement"]
if "upgrade_mode" in hints:
self.upgrade_mode = hints["upgrade_mode"]
self.upgrade_mode = UpgradeMode(
hints.get("upgrade_mode", UpgradeMode.STATELESS.value).lower()
)
self.job_state = JobState(hints.get("job_state", JobState.RUNNING.value).lower())


class QueryHintsParser:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
{% set execution_config = config.get('default_execution_config', {}) %}
{% set _dummy = execution_config.update(config.get('execution_config', {})) %}
{% set upgrade_mode = config.get('upgrade_mode', 'stateless') %}
{% set job_state = config.get('job_state', 'running') %}

{{ sql_header if sql_header is not none }}
/** upgrade_mode('{{upgrade_mode}}') */
/** upgrade_mode('{{upgrade_mode}}') */ /** job_state('{{job_state}}') */
{% if execution_config %}/** execution_config('{% for cfg_name in execution_config %}{{cfg_name}}={{execution_config[cfg_name]}}{% if not loop.last %};{% endif %}{% endfor %}') */{% endif %}
/** drop_statement('drop {% if temporary: -%}temporary {%- endif %}table if exists {{ this.render() }}') */
/** drop_statement('drop {% if temporary: -%}temporary {%- endif %}table if exists `{{ this.render() }}`') */
create {% if temporary: -%}temporary {%- endif %}table
{{ this.render() }}
{% if type %}/** mode('{{type}}')*/{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}
/** drop_statement('drop view if exists {{ this.render() }}') */
/** drop_statement('drop view if exists `{{ this.render() }}`') */
create view /*TODO {{ relation }}*/ {{ this.render() }} {% if type %}/** mode('{{type}}')*/{% endif %} as (
{{ sql }}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{% set watermark_properties = node.config.get('watermark') %}
{% set type = node.config.get('type', None) %}
{% set table_column_ids = node.columns.keys() %}
/** drop_statement('DROP TABLE IF EXISTS {{ node.identifier }}') */
/** drop_statement('DROP TABLE IF EXISTS `{{ node.identifier }}`') */
CREATE TABLE {{ node.identifier }} {% if type %}/** mode('{{type}}')*/{% endif %} (
{% for column_id in table_column_ids %}
{%- if node.columns[column_id]["column_type"] == 'metadata' %} `{{ node.columns[column_id]["name"] }}` {{ node.columns[column_id]["data_type"] }} METADATA {% if node.columns[column_id]["expression"] %} FROM '{{node.columns[column_id]["expression"]}}' {% endif %}
Expand Down
26 changes: 23 additions & 3 deletions tests/adapters/flink/test_query_hints_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from dbt.adapters.flink.query_hints_parser import QueryHintsParser, QueryMode
from dbt.adapters.flink.query_hints_parser import (
QueryHintsParser,
QueryMode,
JobState,
UpgradeMode,
)


class TestQueryHintsParser:
Expand Down Expand Up @@ -39,11 +44,26 @@ def test_drop_statement(self):
sql = "/** drop_statement('DROP TABLE IF EXISTS TABLE_A') */ CREATE TABLE TABLE_A (id STRING)"
hints = QueryHintsParser.parse(sql)
assert hints.drop_statement == "DROP TABLE IF EXISTS TABLE_A"

def test_upgrade_mode(self):
sql = "/** upgrade_mode('savepoint') */ CREATE TABLE TABLE_X (id String) AS SELECT * FROM Y"
hints = QueryHintsParser.parse(sql)
assert hints.upgrade_mode == "savepoint"
assert hints.upgrade_mode == UpgradeMode.SAVEPOINT

def test_default_upgrade_mode(self):
sql = "CREATE TABLE TABLE_X (id String) AS SELECT * FROM Y"
hints = QueryHintsParser.parse(sql)
assert hints.upgrade_mode == UpgradeMode.STATELESS

def test_job_state(self):
sql = "/** job_state('suspended') */ CREATE TABLE TABLE_X (id String) AS SELECT * FROM Y"
hints = QueryHintsParser.parse(sql)
assert hints.job_state == JobState.SUSPENDED

def test_default_job_state(self):
sql = "CREATE TABLE TABLE_X (id String) AS SELECT * FROM Y"
hints = QueryHintsParser.parse(sql)
assert hints.job_state == JobState.RUNNING

def test_multiple_hints_in_single_comment(self):
sql = "select /** fetch_max(10) fetch_timeout_ms(1000) */ from input"
Expand Down

0 comments on commit 02cca86

Please sign in to comment.