Skip to content

Commit

Permalink
[Trino] Ensures uninterrupted schema listing even if a catalog query …
Browse files Browse the repository at this point in the history
…fails
  • Loading branch information
agl29 committed Jan 6, 2025
1 parent c54f745 commit 5f090aa
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
17 changes: 14 additions & 3 deletions desktop/libs/notebook/src/notebook/connectors/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import json
import time
import logging
import textwrap
from urllib.parse import urlparse

Expand All @@ -35,6 +36,8 @@
from desktop.lib.rest.resource import Resource
from notebook.connectors.base import Api, ExecutionWrapper, QueryError, ResultWrapper

LOG = logging.getLogger()


def query_error_handler(func):
def decorator(*args, **kwargs):
Expand Down Expand Up @@ -297,12 +300,18 @@ def _show_databases(self):
databases = []

for catalog in catalogs:
query_client = TrinoQuery(self.trino_request, 'SHOW SCHEMAS FROM ' + catalog)
response = query_client.execute()
databases += [f'{catalog}.{item}' for sublist in response.rows for item in sublist]
try:
query_client = TrinoQuery(self.trino_request, 'SHOW SCHEMAS FROM ' + catalog)
response = query_client.execute()
databases += [f'{catalog}.{item}' for sublist in response.rows for item in sublist]
except Exception as e:
# Log the exception and continue with the next catalog
LOG.error(f"Failed to fetch schemas from catalog {catalog}: {str(e)}")
continue

return databases

@query_error_handler
def _show_catalogs(self):
query_client = TrinoQuery(self.trino_request, 'SHOW CATALOGS')
response = query_client.execute()
Expand All @@ -311,6 +320,7 @@ def _show_catalogs(self):

return catalogs

@query_error_handler
def _show_tables(self, database):
database = self._format_identifier(database, is_db=True)
query_client = TrinoQuery(self.trino_request, 'USE ' + database)
Expand All @@ -326,6 +336,7 @@ def _show_tables(self, database):
for table in tables
]

@query_error_handler
def _get_columns(self, database, table):
database = self._format_identifier(database, is_db=True)
query_client = TrinoQuery(self.trino_request, 'USE ' + database)
Expand Down
23 changes: 23 additions & 0 deletions desktop/libs/notebook/src/notebook/connectors/trino_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,26 @@ def test_get_log(self):
result = self.trino_api.get_log(notebook, snippet)

assert result == expected_log

def test_show_databases(self):
with patch('notebook.connectors.trino.LOG.error') as Log_error:
with patch('notebook.connectors.trino.TrinoQuery') as TrinoQuery:
with patch('notebook.connectors.trino.TrinoApi._show_catalogs') as _show_catalogs:
_show_catalogs.return_value = [
'catalog1', 'catalog2'
]
query_instance = TrinoQuery.return_value
query_instance.execute.side_effect = [
MagicMock(rows=[["schema1"], ["schema2"]]), # First catalog response
Exception("Some error") # Second catalog raises an exception
]
result = self.trino_api._show_databases()

# Assert the expected output
expected_result = ['catalog1.schema1', 'catalog1.schema2']
self.assertEqual(result, expected_result)

# Assert error logging was called for the exception
Log_error.assert_called_once_with(
"Failed to fetch schemas from catalog catalog2: Some error"
)

0 comments on commit 5f090aa

Please sign in to comment.