Skip to content

Commit

Permalink
req-changes: Switched back to auto apply bulk search approach
Browse files Browse the repository at this point in the history
  • Loading branch information
Aryamanz29 committed Jun 19, 2024
1 parent b22e465 commit 5217a51
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 30 deletions.
10 changes: 6 additions & 4 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

### Breaking changes

- Introduced a new pagination approach for `AssetClient.search()` and `FluentSearch.execute()`, known as **bulk search** (disabled by default), based on asset creation timestamps. This approach will be used when the number of results exceeds the predefined threshold (i.e: `100,000` assets). Users can explicitly run this search by setting the optional keyword argument `bulk=True`.
- Introduced a new pagination approach in `AssetClient.search()` and `FluentSearch.execute()` called **bulk search** (disabled by default). It minimizes system impact when handling large result sets. The SDK switches to this search operation automatically if results exceed a predefined threshold (i.e: `100,000` results). Users can enable bulk search explicitly by setting `bulk=True` in `AssetClient.search()` or `FluentSearch.execute()`. One side effect of this search operation is that any requested sorting of the results will be overridden. If you need to sort such a large number of results, you will have to do so after retrieving them.

- The `AssetClient.search()` and `FluentSearch.execute()` methods will now raise `InvalidRequestError` in the following scenarios:
- The `AssetClient.search()` and `FluentSearch.execute()` methods will now raise an exception (`InvalidRequestError`) in the following scenarios:

- When bulk search is enabled (`bulk=True`) and any user-specified sorting option is found in the search request, the method raises an exception. This is because the bulk search approach ignores user-specified sorting and instead reorders the results based on the creation timestamps of assets to handle large numbers of assets efficiently.
- when bulk search is enabled (`bulk=True`) and any user-specified sorting options are found in the search request.

- When bulk search is disabled (`bulk=False`) and the number of results exceeds the predefined threshold (i.e: `100,000` assets). The error message will suggest that the user re-run the search with `bulk=True` to extract a large number of records.
- when bulk search is disabled (`bulk=False`), the number of results exceeds the predefined threshold (i.e: `100,000` assets), and any user-specified sorting options are found in the search request.

_This is because the bulk search approach ignores user-specified sorting and instead reorders the results based on the creation timestamps of assets to handle large numbers of assets efficiently._

### QOL improvements

Expand Down
45 changes: 36 additions & 9 deletions pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,26 @@ def _prepare_sorts_for_bulk_search(sorts: List[SortItem]):
def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults:
"""
Search for assets using the provided criteria.
`Note:` if the number of results exceeds the predefined threshold
(100,000 assets) this will be automatically converted into a `bulk` search.
:param criteria: detailing the search query, parameters, and so on to run
:param bulk: whether to run the search to retrieve assets that match the supplied criteria,
for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
(based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
:raises InvalidRequestError:
- if bulk search is enabled (`bulk=True`) and any user-specified
sorting option is found in the search request.
- if bulk search is enabled (`bulk=True`) and any
user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
exceeds the predefined threshold (i.e: `100,000` assets)
and any user-specified sorting options are found in the search request.
:raises AtlanError: on any API communication issue
:returns: the results of the search
"""
if bulk:
# If there is any user-specified sorting present in the search request
if criteria.dsl.sort and len(criteria.dsl.sort) > 1:
raise ErrorCode.UNABLE_TO_RUN_BULK_WITH_SORTS.exception_with_parameters()
criteria.dsl.sort = self._prepare_sorts_for_bulk_search(criteria.dsl.sort)
Expand All @@ -197,10 +201,17 @@ def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults
aggregations = self._get_aggregations(raw_json)
count = raw_json.get("approximateCount", 0)

if not bulk and count > IndexSearchResults._MASS_EXTRACT_THRESHOLD:
raise ErrorCode.ENABLE_BULK_FOR_MASS_EXTRACTION.exception_with_parameters(
IndexSearchResults._MASS_EXTRACT_THRESHOLD
)
if (
count > IndexSearchResults._MASS_EXTRACT_THRESHOLD
and not IndexSearchResults.presorted_by_timestamp(criteria.dsl.sort)
):
# If there is any user-specified sorting present in the search request
if criteria.dsl.sort and len(criteria.dsl.sort) > 1:
raise ErrorCode.UNABLE_TO_RUN_BULK_WITH_SORTS.exception_with_parameters()
# Re-fetch the first page results with updated timestamp sorting
# for bulk search if count > _MASS_EXTRACT_THRESHOLD (100,000 assets)
criteria.dsl.sort = self._prepare_sorts_for_bulk_search(criteria.dsl.sort)
return self.search(criteria)

return IndexSearchResults(
client=self._client,
Expand Down Expand Up @@ -1846,6 +1857,16 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
self._criteria.dsl.from_ = 0 # type: ignore[attr-defined]
self._criteria.dsl.query = rewritten_query # type: ignore[attr-defined]

def _get_bulk_search_log_message(self):
return (
(
"Bulk search option is enabled. "
if self._bulk
else "Result size (%s) exceeds threshold (%s). "
)
+ "Ignoring requests for offset-based paging and using timestamp-based paging instead."
)

def _get_next_page(self):
"""
Fetches the next page of results.
Expand All @@ -1855,14 +1876,20 @@ def _get_next_page(self):
query = self._criteria.dsl.query
self._criteria.dsl.size = self._size
self._criteria.dsl.from_ = self._start
is_bulk_search = (
self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
)

if self._bulk:
if is_bulk_search:
LOGGER.debug(
self._get_bulk_search_log_message(),
self._approximate_count,
self._MASS_EXTRACT_THRESHOLD,
"Bulk search option is enabled. Ignoring requests for default offset-based "
"paging and switching to a creation timestamp-based paging approach."
"paging and switching to a creation timestamp-based paging approach.",
)
self._prepare_query_for_timestamp_paging(query)
if raw_json := super()._get_next_page_json(self._bulk):
if raw_json := super()._get_next_page_json(is_bulk_search):
self._count = raw_json.get("approximateCount", 0)
return True
return False
Expand Down
7 changes: 0 additions & 7 deletions pyatlan/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,6 @@ class ErrorCode(Enum):
"Please ensure that no sorting options are included in your search request when performing a bulk search.",
InvalidRequestError,
)
ENABLE_BULK_FOR_MASS_EXTRACTION = (
400,
"ATLAN-PYTHON-400-063",
"Number of results exceeds the predefined threshold {}. Please execute the search again with `bulk=True`.",
"Please note that this will reorder the results based on creation timestamp to efficiently handle a large number of results.", # noqa
InvalidRequestError,
)
AUTHENTICATION_PASSTHROUGH = (
401,
"ATLAN-PYTHON-401-000",
Expand Down
7 changes: 5 additions & 2 deletions pyatlan/model/fluent_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,17 +402,20 @@ def count(self, client: AtlanClient) -> int:
def execute(self, client: AtlanClient, bulk: bool = False) -> IndexSearchResults:
"""
Run the fluent search to retrieve assets that match the supplied criteria.
`Note:` if the number of results exceeds the predefined threshold
(100,000 assets) this will be automatically converted into a `bulk` search.
:param client: client through which to retrieve the assets.
:param bulk: whether to run the search to retrieve assets that match the supplied criteria,
for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
(based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
:raises InvalidRequestError:
- if bulk search is enabled (`bulk=True`) and any user-specified
sorting option is found in the search request.
- if bulk search is enabled (`bulk=True`) and any
user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
exceeds the predefined threshold (i.e: `100,000` assets)
and any user-specified sorting options are found in the search request.
:raises AtlanError: on any API communication issue
:returns: an iterable list of assets that match the supplied criteria, lazily-fetched
Expand Down
24 changes: 23 additions & 1 deletion tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pytest

from pyatlan.client.asset import LOGGER
from pyatlan.client.asset import LOGGER, IndexSearchResults
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Asset, Table
from pyatlan.model.assets.atlas_glossary_term import AtlasGlossaryTerm
Expand Down Expand Up @@ -249,6 +249,28 @@ def test_search_pagination(mock_logger, client: AtlanClient):
assert "Bulk search option is enabled." in mock_logger.call_args_list[0][0][0]
mock_logger.reset_mock()

# Test search(): when the number of results exceeds the predefined threshold,
# the SDK automatically switches to a `bulk` search option using timestamp-based pagination.
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", 1):
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
).to_request()
results = client.asset.search(criteria=request)
expected_sorts = [
Asset.CREATE_TIME.order(SortOrder.ASCENDING),
Asset.GUID.order(SortOrder.ASCENDING),
]
_assert_search_results(results, expected_sorts, size, TOTAL_ASSETS)
assert mock_logger.call_count < TOTAL_ASSETS
assert (
"Result size (%s) exceeds threshold (%s)."
in mock_logger.call_args_list[0][0][0]
)
mock_logger.reset_mock()


def test_search_iter(client: AtlanClient):
size = 15
Expand Down
49 changes: 42 additions & 7 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,8 @@ def test_index_search_pagination(
mock_logger.reset_mock()
mock_api_caller.reset_mock()

# Test search(): Raise an exception suggesting the user switch to bulk search
# when the number of results exceeds the predefined threshold
# Test search(): when the number of results exceeds the predefined threshold
# it will automatically convert to a `bulk` search.
TEST_THRESHOLD = 1
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", TEST_THRESHOLD):
mock_api_caller._call_api.side_effect = [
Expand All @@ -1477,17 +1477,52 @@ def test_index_search_pagination(
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
).to_request()
results = client.search(criteria=request)
expected_sorts = [
Asset.CREATE_TIME.order(SortOrder.ASCENDING),
Asset.GUID.order(SortOrder.ASCENDING),
]
_assert_search_results(results, index_search_paging_json, expected_sorts)
assert mock_api_caller._call_api.call_count == 3
assert mock_logger.call_count == 1
assert (
"Result size (%s) exceeds threshold (%s)"
in mock_logger.call_args_list[0][0][0]
)
mock_logger.reset_mock()
mock_api_caller.reset_mock()

# Test search(bulk=False): Raise an exception when the number of results exceeds
# the predefined threshold and there are any user-defined sorting options present
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", TEST_THRESHOLD):
mock_api_caller._call_api.side_effect = [
index_search_paging_json,
]
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
# With some sort options
.sort(Asset.NAME.order(SortOrder.ASCENDING))
).to_request()

with pytest.raises(
InvalidRequestError,
match=(
"ATLAN-PYTHON-400-063 Number of results exceeds the predefined threshold "
f"{TEST_THRESHOLD}. Please execute the search again with `bulk=True`."
"ATLAN-PYTHON-400-063 Unable to execute "
"bulk search with user-defined sorting options. "
"Suggestion: Please ensure that no sorting options are "
"included in your search request when performing a bulk search."
),
):
results = client.search(criteria=request)
client.search(criteria=request)
assert mock_api_caller._call_api.call_count == 1
mock_api_caller.reset_mock()
mock_api_caller.reset_mock()

# Test search(): Raise an exception when
# bulk search is attempted with any user-defined sorting options
# Test search(bulk=True): Raise an exception when bulk search is enabled
# and there are any user-defined sorting options present
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
Expand Down

0 comments on commit 5217a51

Please sign in to comment.