We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Right now there is no sugar for working with queries. Even query waiting must be implemented manually.
I want to have official sugar like this:
ytsaurus/yt/python/yt/wrapper/operation_commands.py
Lines 613 to 617 in a3233e6
I have my own partial implementation:
# Some internal imports are skipped. from yt.common import YtError from yt.wrapper import YtClient from yt.wrapper.exceptions_catcher import ExceptionCatcher from yt.wrapper.operation_commands import TimeWatcher from yt.wrapper.response_stream import ResponseStream from yt.yson.yson_types import YsonMap # https://github.com/ydb-platform/ydb/blob/9bcbbd617cfcf26e791b5f16cac239f3fa1bc632/ydb/library/yql/core/yql_execution.h#L11-L15 _YQL_OPERATION_PROGRESS_STATES = [ "Started", "InProgress", "Finished", "Failed", "Aborted", ] _YQL_OPERATION_PROGRESS_STATES_SNAKE_CASE = [ "started", "in_progress", "finished", "failed", "aborted", ] def _format_yql_progress(state_counters: dict[str, int]) -> str: result = [] # Same as operation progress: https://github.com/ytsaurus/ytsaurus/blob/a3233e6e3ff64374a4f6cc7febc9302834beb4fa/yt/python/yt/wrapper/operation_commands.py#L390-L392 for camel_case_state, snake_case_state in zip(_YQL_OPERATION_PROGRESS_STATES, _YQL_OPERATION_PROGRESS_STATES_SNAKE_CASE): value = state_counters.get(camel_case_state, 0) result.append(f"{snake_case_state}={value:<5}") return " ".join(result) def _pretty_print_time(seconds: int | float) -> str: return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(seconds)) class YtQueryFailedError(YtError): def __init__(self, id: str, engine: TQueryEngine, state: str, error: YtError | None, url: str): message = "Query {0} {1}".format(id, state) attributes = {"id": id, "engine": engine, "state": state, "url": url} inner_errors = [] if error is not None: inner_errors.append(error) super().__init__(message, attributes=attributes, inner_errors=inner_errors) class YtQueryState: def __init__(self, name: str): self.name = name def is_finished(self) -> bool: return self.name in ("aborted", "completed", "failed") def is_unsuccessfully_finished(self) -> bool: return self.name in ("aborted", "failed") def is_running(self) -> bool: return self.name == "running" def is_starting(self) -> bool: return self.name in ( "draft", "pending", ) def __eq__(self, other: Any) -> bool: if not isinstance(other, YtQueryState): return False return self.name == str(other) def __ne__(self, other: Any) -> bool: return not (self == other) def __repr__(self) -> str: return self.name def __str__(self) -> str: return self.name class YtQuery: def __init__( self, id: str, client: YtClient, engine: TQueryEngine, stage: str | None = None, abort_exceptions: Iterable[Type[BaseException]] | None = None, finalization_actions: Iterable[Callable[[YtQueryState], None]] | None = None, ): self.id = id self.stage = stage self.client = client self.engine: TQueryEngine = engine self.client_config = get_config_from_yt_client(self.client) self.url = self.client_config.get_query_url(self.id) self.abort_exceptions: tuple[Type[BaseException], ...] = ( tuple(abort_exceptions) if abort_exceptions is not None else ( KeyboardInterrupt, TimeoutError, ) ) self.finalization_actions = finalization_actions or [] @classmethod def from_query_id( cls, query_id: str, client: YtClient, stage: str | None = None, ) -> Self: return cls( id=query_id, client=client, engine=typing_cast(TQueryEngine, client.get_query(query_id, attributes=["engine"], stage=stage)["engine"]), stage=stage, ) def get_info(self, attributes: list[str] | None = None, format: TFormat | None = None) -> YsonMap: return self.client.get_query(self.id, attributes=attributes, stage=self.stage, format=format) def get_state(self) -> YtQueryState: return YtQueryState(self.get_info(attributes=["state"])["state"]) def get_result( self, result_index: int | None = None, format: TFormat | None = None, ) -> YsonMap: return self.client.get_query_result(self.id, result_index=result_index, stage=self.stage, format=format) def read_result( self, result_index: int | None = None, format: TFormat | None = None, raw: bool | None = None, ) -> ResponseStream: return self.client.read_query_result( self.id, result_index=result_index, stage=self.stage, format=format, raw=raw ) def get_state_monitor( self, time_watcher: TimeWatcher, action: Callable[[], None] = lambda: None ) -> Iterable[YtQueryState]: last_state = None while True: action() state = self.get_state() yield state if state.is_finished(): break if state != last_state: time_watcher.reset() last_state = state time_watcher.wait() def abort(self, message: str | None = None): state = self.get_state() if state.is_finished(): return if state.name in ("aborting",): return # A race is possible here, but we can't do anything about it: https://github.com/ytsaurus/ytsaurus/issues/548 return self.client.abort_query(self.id, message=message, stage=self.stage) def wait(self, check_result: bool = True, print_progress: bool = True, timeout_ms: int | None = None): start_time = time.time() deadline = start_time + timeout_ms / 1000.0 if timeout_ms is not None else None timeout = None if timeout_ms is None else timeout_ms / 1000.0 query_poll_period = self.client_config.query_tracker_poll_period_ms / 1000.0 if timeout is not None and query_poll_period > timeout: query_poll_period = timeout time_watcher = TimeWatcher( min_interval=query_poll_period / 10.0, max_interval=query_poll_period, slowdown_coef=0.2 ) def _print_info(state: YtQueryState): match self.engine: case "yql": state_counters = defaultdict(int) info = self.get_info(attributes=["progress"]) yql_progress = info.get("progress", {}).get("yql_progress", {}) for node in yql_progress.values(): state_counters[node["state"]] += 1 logger.info(f"query {self.id}: {_format_yql_progress(state_counters)}") case _: logger.info(f"query {self.id}: {state}") print_info = _print_info if print_progress else lambda _: None def abort(): state = None for state in self.get_state_monitor(TimeWatcher(1.0, 1.0, 0.0), self.abort): print_info(state) assert state is not None for finalize_function in self.finalization_actions: finalize_function(state) abort_on_sigint = self.client_config.query_tracker_abort_on_sigint with ExceptionCatcher(self.abort_exceptions, abort, enable=abort_on_sigint): state = None for state in self.get_state_monitor(time_watcher): print_info(state) if deadline is not None: current_time = time.time() if current_time > deadline: raise TimeoutError( f"query {self.id} timed out, start_time: {_pretty_print_time(start_time)}, current_time: {_pretty_print_time(current_time)}, timeout_ms: {timeout_ms}ms, deadline: {_pretty_print_time(deadline)}" ) assert state is not None for finalize_function in self.finalization_actions: finalize_function(state) if state.is_unsuccessfully_finished(): if check_result: query_info = self.get_info(attributes=["error"]) yt_error = None if query_info["error"]["code"] != 0: yt_error = YtError.from_dict(query_info["error"]) raise YtQueryFailedError( id=self.id, engine=self.engine, state=str(state), error=yt_error, url=self.url, ) else: logger.warning(f"query {self.id} finished unsuccessfully, but result is not checked, state: {state}") else: logger.info(f"query {self.id} completed, final state: {state}") @func_with_param_spec_from(YtClient.start_query) def start_query( client: YtClient, engine: TQueryEngine, query: str, *args: Any, stage: str | None = None, **kwargs: Any, ) -> YtQuery: client_config = get_config_from_yt_client(client) query_id = client.start_query(engine, query, *args, stage=stage, **kwargs) logger.info(f"{engine.upper()} query started: {client_config.get_query_url(query_id)}") return YtQuery( id=query_id, engine=engine, stage=stage, client=client, ) def run_query( client: YtClient, engine: TQueryEngine, query: str, settings: dict[str, Any] | None = None, files: list[TQueryFile] | None = None, stage: str | None = None, annotations: dict[str, Any] | None = None, access_control_object: str | None = None, sync: bool = True, ) -> YtQuery: """ Start query. :param engine: one of "ql", "yql". :type engine: str :param query: text of a query :type query: str :param settings: a dictionary of settings :type settings: dict or None :param files: a YSON list of files, each of which is represented by a map with keys "name", "content", "type". Field "type" is one of "raw_inline_data", "url" :type files: list or None :param stage: query tracker stage, defaults to "production" :type stage: str :param annotations: a dictionary of annotations :type stage: dict or None :param access_control_object: access control object name :type access_control_object: str or None :param sync: wait for query to finish :type sync: bool """ yt_query = start_query( client, engine, query, settings=settings, files=files, stage=stage, annotations=annotations, access_control_object=access_control_object, ) if sync: yt_query.wait() return yt_query
2024-04-29T19:53:19.655 INFO[query] YQL query started: https://my.super.cluster/my_cluster/queries/fdfd51c4-774ad1c3-1919dde1-61e86113 2024-04-29T19:57:15.675 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T19:57:16.238 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T19:57:16.790 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T19:57:17.341 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T19:57:17.891 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T19:57:18.443 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:18.996 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:19.549 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:20.102 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:20.756 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:21.528 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:22.444 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:23.531 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:24.826 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:26.369 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:28.211 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=2 failed=0 aborted=0 2024-04-29T19:57:30.411 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=3 failed=0 aborted=0 2024-04-29T19:57:33.041 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=3 failed=0 aborted=0 2024-04-29T19:57:36.188 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=3 failed=0 aborted=0 2024-04-29T19:57:39.958 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=3 failed=0 aborted=0 2024-04-29T19:57:44.468 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=3 failed=0 aborted=0 2024-04-29T19:57:49.519 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=4 failed=0 aborted=0 2024-04-29T19:57:54.571 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=4 failed=0 aborted=0 2024-04-29T19:57:59.621 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=7 failed=0 aborted=0 2024-04-29T19:58:04.672 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=1 finished=7 failed=0 aborted=0 2024-04-29T19:58:09.724 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0 in_progress=0 finished=14 failed=0 aborted=0 2024-04-29T19:58:09.724 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7 completed, final state: completed
2024-04-29T20:08:16.506 INFO[query] YQL query started: https://my.super.cluster/my_cluster/queries/cd6cdc93-6b1497-f4288744-d9873cf1 2024-04-29T20:08:16.619 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T20:08:17.231 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T20:08:17.840 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T20:08:18.448 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0 in_progress=0 finished=0 failed=0 aborted=0 2024-04-29T20:08:19.059 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0 in_progress=0 finished=0 failed=0 aborted=0 Traceback (most recent call last): # ... File "base/py/yt/query.py", line 258, in wait raise YtQueryFailedError( twix.base.py.yt.query.YtQueryFailedError: Query cd6cdc93-6b1497-f4288744-d9873cf1 failed Query cd6cdc93-6b1497-f4288744-d9873cf1 failed YQL plugin call failed There are some issues Parse Sql Unexpected token 'kw' : cannot match to any predicted input... ***** Details: Query cd6cdc93-6b1497-f4288744-d9873cf1 failed origin eba930bd-f815-4432-8ee7-7b4d3720af8b on 2024-04-29T20:08:19.117577Z id cd6cdc93-6b1497-f4288744-d9873cf1 engine yql state failed url https://my.super.cluster/my_cluster/queries/cd6cdc93-6b1497-f4288744-d9873cf1 Query cd6cdc93-6b1497-f4288744-d9873cf1 failed origin eu-north1-c-4ct4-30b.hw.my.super.cluster on 2024-04-29T20:08:18.747946Z (pid 1, tid 126d4914eb502001, fid fffee13649df9f03) thread Control trace_id 93f21203-1701aa6d-52014f8e-92a63e18 span_id 966787753879437561 query_id cd6cdc93-6b1497-f4288744-d9873cf1 YQL plugin call failed # ...
But it is incomplete (only simple query wrapper and wait method for it).
The text was updated successfully, but these errors were encountered:
Krisha11
No branches or pull requests
Right now there is no sugar for working with queries.
Even query waiting must be implemented manually.
I want to have official sugar like this:
ytsaurus/yt/python/yt/wrapper/operation_commands.py
Lines 613 to 617 in a3233e6
I have my own partial implementation:
Code
Log examples
But it is incomplete (only simple query wrapper and wait method for it).
The text was updated successfully, but these errors were encountered: