Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to release 2.3.0 #352

Merged
merged 6 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
## 2.3.0 (June 19, 2024)

### Breaking changes

- Introduced a new pagination approach in `AssetClient.search()` and `FluentSearch.execute()` called **bulk search** (disabled by default). It minimizes system impact when handling large result sets. The SDK switches to this search operation automatically if results exceed a predefined threshold (i.e: `100,000` results). Alternatively, users can enable bulk search explicitly by setting `bulk=True` in `AssetClient.search()` or `FluentSearch.execute()`. One side effect of this search operation is that any requested sorting of the results will be overridden. If you need to sort such a large number of results, you will have to do so after retrieving them.
Aryamanz29 marked this conversation as resolved.
Show resolved Hide resolved

- The `AssetClient.search()` and `FluentSearch.execute()` methods will now raise an exception (`InvalidRequestError`) in the following scenarios:

- when bulk search is enabled (`bulk=True`) and any user-specified sorting options are found in the search request.

- when bulk search is disabled (`bulk=False`), the number of results exceeds the predefined threshold (i.e: `100,000` assets), and any user-specified sorting options are found in the search request.

_This is because the bulk search approach ignores user-specified sorting and instead reorders the results based on the creation timestamps of assets to handle large numbers of assets efficiently._

### QOL improvements

- Pinned `urllib3>=1.26.0,<3` and moved `networkx` to the dev requirements to avoid potential version mismatches.

## 2.2.4 (June 11, 2024)

### New features
Expand Down
50 changes: 41 additions & 9 deletions pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,26 @@ def _prepare_sorts_for_bulk_search(sorts: List[SortItem]):
def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults:
"""
Search for assets using the provided criteria.
`Note:` if the number of results exceeds the predefined threshold
(100,000 assets) this will be automatically converted into a `bulk` search.

:param criteria: detailing the search query, parameters, and so on to run
:param bulk: whether to run the search to retrieve assets that match the supplied criteria,
for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
(based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
:returns: the results of the search
:raises InvalidRequestError:

- if bulk search is enabled (`bulk=True`) and any
user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
exceeds the predefined threshold (i.e: `100,000` assets)
and any user-specified sorting options are found in the search request.

:raises AtlanError: on any API communication issue
:returns: the results of the search
"""
if bulk:
# If there is any user-specified sorting present in the search request
if criteria.dsl.sort and len(criteria.dsl.sort) > 1:
raise ErrorCode.UNABLE_TO_RUN_BULK_WITH_SORTS.exception_with_parameters()
criteria.dsl.sort = self._prepare_sorts_for_bulk_search(criteria.dsl.sort)
Expand All @@ -190,10 +201,17 @@ def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults
aggregations = self._get_aggregations(raw_json)
count = raw_json.get("approximateCount", 0)

if not bulk and count > IndexSearchResults._MASS_EXTRACT_THRESHOLD:
raise ErrorCode.ENABLE_BULK_FOR_MASS_EXTRACTION.exception_with_parameters(
IndexSearchResults._MASS_EXTRACT_THRESHOLD
)
if (
count > IndexSearchResults._MASS_EXTRACT_THRESHOLD
and not IndexSearchResults.presorted_by_timestamp(criteria.dsl.sort)
):
# If there is any user-specified sorting present in the search request
if criteria.dsl.sort and len(criteria.dsl.sort) > 1:
raise ErrorCode.UNABLE_TO_RUN_BULK_WITH_SORTS.exception_with_parameters()
# Re-fetch the first page results with updated timestamp sorting
# for bulk search if count > _MASS_EXTRACT_THRESHOLD (100,000 assets)
criteria.dsl.sort = self._prepare_sorts_for_bulk_search(criteria.dsl.sort)
return self.search(criteria)

return IndexSearchResults(
client=self._client,
Expand Down Expand Up @@ -1839,6 +1857,16 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
self._criteria.dsl.from_ = 0 # type: ignore[attr-defined]
self._criteria.dsl.query = rewritten_query # type: ignore[attr-defined]

def _get_bulk_search_log_message(self):
return (
(
"Bulk search option is enabled. "
if self._bulk
else "Result size (%s) exceeds threshold (%s). "
)
+ "Ignoring requests for offset-based paging and using timestamp-based paging instead."
)

def _get_next_page(self):
"""
Fetches the next page of results.
Expand All @@ -1848,14 +1876,18 @@ def _get_next_page(self):
query = self._criteria.dsl.query
self._criteria.dsl.size = self._size
self._criteria.dsl.from_ = self._start
is_bulk_search = (
self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
)

if self._bulk:
if is_bulk_search:
LOGGER.debug(
"Bulk search option is enabled. Ignoring requests for default offset-based "
"paging and switching to a creation timestamp-based paging approach."
self._get_bulk_search_log_message(),
Aryamanz29 marked this conversation as resolved.
Show resolved Hide resolved
self._approximate_count,
self._MASS_EXTRACT_THRESHOLD,
)
self._prepare_query_for_timestamp_paging(query)
if raw_json := super()._get_next_page_json(self._bulk):
if raw_json := super()._get_next_page_json(is_bulk_search):
self._count = raw_json.get("approximateCount", 0)
return True
return False
Expand Down
7 changes: 0 additions & 7 deletions pyatlan/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,6 @@ class ErrorCode(Enum):
"Please ensure that no sorting options are included in your search request when performing a bulk search.",
InvalidRequestError,
)
ENABLE_BULK_FOR_MASS_EXTRACTION = (
400,
"ATLAN-PYTHON-400-063",
"Number of results exceeds the predefined threshold {}. Please execute the search again with `bulk=True`.",
"Please note that this will reorder the results based on creation timestamp to efficiently handle a large number of results.", # noqa
InvalidRequestError,
)
AUTHENTICATION_PASSTHROUGH = (
401,
"ATLAN-PYTHON-401-000",
Expand Down
11 changes: 11 additions & 0 deletions pyatlan/model/fluent_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,22 @@ def count(self, client: AtlanClient) -> int:
def execute(self, client: AtlanClient, bulk: bool = False) -> IndexSearchResults:
"""
Run the fluent search to retrieve assets that match the supplied criteria.
`Note:` if the number of results exceeds the predefined threshold
(100,000 assets) this will be automatically converted into a `bulk` search.

:param client: client through which to retrieve the assets.
:param bulk: whether to run the search to retrieve assets that match the supplied criteria,
for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
(based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
:raises InvalidRequestError:

- if bulk search is enabled (`bulk=True`) and any
user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
exceeds the predefined threshold (i.e: `100,000` assets)
and any user-specified sorting options are found in the search request.

:raises AtlanError: on any API communication issue
:returns: an iterable list of assets that match the supplied criteria, lazily-fetched
"""
return client.asset.search(criteria=self.to_request(), bulk=bulk)
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/model/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ def validate_order(cls, v, values):

class DSL(AtlanObject):
from_: int = Field(default=0, alias="from")
size: int = Field(default=100)
size: int = Field(default=300)
aggregations: Dict[str, Aggregation] = Field(default_factory=dict)
track_total_hits: Optional[bool] = Field(default=True, alias="track_total_hits")
post_filter: Optional[Query] = Field(default=None, alias="post_filter")
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.2.4
2.3.0
24 changes: 23 additions & 1 deletion tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pytest

from pyatlan.client.asset import LOGGER
from pyatlan.client.asset import LOGGER, IndexSearchResults
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Asset, Table
from pyatlan.model.assets.atlas_glossary_term import AtlasGlossaryTerm
Expand Down Expand Up @@ -249,6 +249,28 @@ def test_search_pagination(mock_logger, client: AtlanClient):
assert "Bulk search option is enabled." in mock_logger.call_args_list[0][0][0]
mock_logger.reset_mock()

# Test search(): when the number of results exceeds the predefined threshold,
# the SDK automatically switches to a `bulk` search option using timestamp-based pagination.
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", 1):
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
).to_request()
results = client.asset.search(criteria=request)
expected_sorts = [
Asset.CREATE_TIME.order(SortOrder.ASCENDING),
Asset.GUID.order(SortOrder.ASCENDING),
]
_assert_search_results(results, expected_sorts, size, TOTAL_ASSETS)
assert mock_logger.call_count < TOTAL_ASSETS
assert (
"Result size (%s) exceeds threshold (%s)."
in mock_logger.call_args_list[0][0][0]
)
mock_logger.reset_mock()


def test_search_iter(client: AtlanClient):
size = 15
Expand Down
49 changes: 42 additions & 7 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,8 @@ def test_index_search_pagination(
mock_logger.reset_mock()
mock_api_caller.reset_mock()

# Test search(): Raise an exception suggesting the user switch to bulk search
# when the number of results exceeds the predefined threshold
# Test search(): when the number of results exceeds the predefined threshold
# it will automatically convert to a `bulk` search.
TEST_THRESHOLD = 1
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", TEST_THRESHOLD):
mock_api_caller._call_api.side_effect = [
Expand All @@ -1477,17 +1477,52 @@ def test_index_search_pagination(
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
).to_request()
results = client.search(criteria=request)
expected_sorts = [
Asset.CREATE_TIME.order(SortOrder.ASCENDING),
Asset.GUID.order(SortOrder.ASCENDING),
]
_assert_search_results(results, index_search_paging_json, expected_sorts)
assert mock_api_caller._call_api.call_count == 3
assert mock_logger.call_count == 1
assert (
"Result size (%s) exceeds threshold (%s)"
in mock_logger.call_args_list[0][0][0]
)
mock_logger.reset_mock()
mock_api_caller.reset_mock()

# Test search(bulk=False): Raise an exception when the number of results exceeds
# the predefined threshold and there are any user-defined sorting options present
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", TEST_THRESHOLD):
mock_api_caller._call_api.side_effect = [
index_search_paging_json,
]
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
# With some sort options
.sort(Asset.NAME.order(SortOrder.ASCENDING))
).to_request()

with pytest.raises(
InvalidRequestError,
match=(
"ATLAN-PYTHON-400-063 Number of results exceeds the predefined threshold "
f"{TEST_THRESHOLD}. Please execute the search again with `bulk=True`."
"ATLAN-PYTHON-400-063 Unable to execute "
"bulk search with user-defined sorting options. "
"Suggestion: Please ensure that no sorting options are "
"included in your search request when performing a bulk search."
),
):
results = client.search(criteria=request)
client.search(criteria=request)
assert mock_api_caller._call_api.call_count == 1
mock_api_caller.reset_mock()
mock_api_caller.reset_mock()

# Test search(): Raise an exception when
# bulk search is attempted with any user-defined sorting options
# Test search(bulk=True): Raise an exception when bulk search is enabled
# and there are any user-defined sorting options present
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_search_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def test_dsl():
)
assert (
dsl.json(by_alias=True, exclude_none=True)
== '{"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
== '{"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": '
'{"__typeName.keyword": {"value": "Schema"}}}, "sort": []}'
Expand All @@ -283,7 +283,7 @@ def test_index_search_request():
assert (
request.json(by_alias=True, exclude_none=True)
== '{"attributes": ["schemaName", "databaseName"],'
' "dsl": {"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
' "dsl": {"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": {"__typeName.keyword": {"value": "Schema"}}}, '
'"sort": [{"__guid": {"order": "asc"}}]}, "relationAttributes": [], '
Expand All @@ -300,7 +300,7 @@ def test_audit_search_request():
assert (
request.json(by_alias=True, exclude_none=True)
== '{"attributes": ["schemaName", "databaseName"],'
' "dsl": {"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
' "dsl": {"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": {"__typeName.keyword": {"value": "Schema"}}}, '
'"sort": [{"entityId": {"order": "asc"}}]}}'
Expand All @@ -316,7 +316,7 @@ def test_search_log_request():
assert (
request.json(by_alias=True, exclude_none=True)
== '{"attributes": ["schemaName", "databaseName"],'
' "dsl": {"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
' "dsl": {"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": {"__typeName.keyword": {"value": "Schema"}}}, '
'"sort": [{"entityGuidsAll": {"order": "asc"}}]}}'
Expand Down
Loading