-
Notifications
You must be signed in to change notification settings - Fork 2
Fix: Cannot generate unfiltered JSONL manifest in AnVIL (#7190) #7225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
bb42811
e0a0b93
f69ba53
1c74f02
5aaf75d
1a68e2d
1d811d4
cff6f72
d8cc55b
b88bc66
35be710
be52dd6
73b5240
4cab6ca
884f6c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ | |
) | ||
import time | ||
from typing import ( | ||
Callable, | ||
ClassVar, | ||
IO, | ||
Protocol, | ||
|
@@ -139,9 +140,11 @@ | |
PFBRelation, | ||
) | ||
from azul.service.elasticsearch_service import ( | ||
ElasticsearchChain, | ||
ElasticsearchService, | ||
Pagination, | ||
PaginationStage, | ||
SortKey, | ||
ToDictStage, | ||
) | ||
from azul.service.storage_service import ( | ||
|
@@ -513,7 +516,7 @@ class ManifestPartition: | |
|
||
#: The `sort` value of the first hit of the current page in this partition, | ||
#: or None if there is no current page. | ||
search_after: tuple[str, str] | None = None | ||
search_after: SortKey | None = None | ||
|
||
@classmethod | ||
def from_json(cls, partition: JSON) -> Self: | ||
|
@@ -531,10 +534,10 @@ def first(cls) -> Self: | |
is_last=False) | ||
|
||
@property | ||
def is_first(self): | ||
def is_first(self) -> bool: | ||
return not (self.index or self.page_index) | ||
|
||
def with_config(self, config: AnyJSON): | ||
def with_config(self, config: AnyJSON) -> Self: | ||
return attrs.evolve(self, config=config) | ||
|
||
def with_upload(self, multipart_upload_id) -> Self: | ||
|
@@ -550,7 +553,7 @@ def first_page(self) -> Self: | |
|
||
def next_page(self, | ||
file_name: str | None, | ||
search_after: tuple[str, str] | ||
search_after: SortKey | None | ||
) -> Self: | ||
assert self.page_index is not None, self | ||
# If different pages yield different file names, use default file name | ||
|
@@ -562,7 +565,7 @@ def next_page(self, | |
file_name=file_name, | ||
search_after=search_after) | ||
|
||
def last_page(self): | ||
def last_page(self) -> Self: | ||
return attrs.evolve(self, is_last_page=True) | ||
|
||
def next(self, part_etag: str) -> Self: | ||
|
@@ -1047,7 +1050,7 @@ def _create_request(self) -> Search: | |
# The response is processed by the generator, not the pipeline | ||
return request | ||
|
||
def _create_pipeline(self): | ||
def _create_pipeline(self) -> ElasticsearchChain: | ||
if self.included_fields is None: | ||
document_slice = DocumentSlice() | ||
else: | ||
|
@@ -1220,7 +1223,41 @@ def storage(self): | |
return self.service.storage_service | ||
|
||
|
||
class PagedManifestGenerator(ManifestGenerator): | ||
class ClientSidePagingManifestGenerator(ManifestGenerator, metaclass=ABCMeta): | ||
""" | ||
A mixin for manifest generators that use client-side paging to query | ||
Elasticsearch. | ||
""" | ||
page_size = 500 | ||
|
||
def _create_paged_request(self, search_after: SortKey | None) -> Search: | ||
pagination = Pagination(sort='entryId', | ||
order='asc', | ||
size=self.page_size, | ||
search_after=search_after) | ||
pipeline = self._create_pipeline() | ||
# Only needs this to satisfy the type constraints | ||
pipeline = ToDictStage(service=self.service, | ||
catalog=self.catalog, | ||
entity_type=self.entity_type).wrap(pipeline) | ||
pipeline = PaginationStage(service=self.service, | ||
catalog=self.catalog, | ||
entity_type=self.entity_type, | ||
pagination=pagination, | ||
filters=self.filters, | ||
peek_ahead=False).wrap(pipeline) | ||
request = self.service.create_request(catalog=self.catalog, | ||
entity_type=self.entity_type) | ||
# The response is processed by the generator, not the pipeline | ||
request = pipeline.prepare_request(request) | ||
return request | ||
|
||
def _search_after(self, hit: Hit) -> SortKey: | ||
a, b = hit.meta.sort | ||
return a, b | ||
|
||
|
||
class PagedManifestGenerator(ClientSidePagingManifestGenerator): | ||
""" | ||
A manifest generator whose output can be split over multiple concatenable | ||
IO streams. | ||
|
@@ -1276,52 +1313,23 @@ def write(self, | |
with TextIOWrapper(buffer, encoding='utf-8', write_through=True) as text_buffer: | ||
while True: | ||
partition = self.write_page_to(partition, output=text_buffer) | ||
# Manifest lambda has 2 GB of memory | ||
assert buffer.tell() < 1.5 * 1024 ** 3 | ||
if partition.is_last_page or buffer.tell() > self.part_size: | ||
break | ||
|
||
def upload_part(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PL please. The commit titles calls this a refactoring but this does not look like a refactoring to me. |
||
if buffer.tell() > 0: | ||
buffer.seek(0) | ||
return self.storage.upload_multipart_part(buffer, partition.index + 1, upload) | ||
|
||
part_etag = self.storage.upload_multipart_part(buffer, partition.index + 1, upload) | ||
partition = partition.next(part_etag=part_etag) | ||
if partition.is_last_page: | ||
if buffer.tell() > 0: | ||
partition = partition.next(part_etag=upload_part()) | ||
self.storage.complete_multipart_upload(upload, partition.part_etags) | ||
file_name = self.file_name(manifest_key, base_name=partition.file_name) | ||
tagging = self.tagging(file_name) | ||
if tagging is not None: | ||
self.storage.put_object_tagging(object_key, tagging) | ||
return partition.last(file_name) | ||
else: | ||
return partition.next(part_etag=upload_part()) | ||
|
||
page_size = 500 | ||
|
||
def _create_paged_request(self, partition: ManifestPartition) -> Search: | ||
pagination = Pagination(sort='entryId', | ||
order='asc', | ||
size=self.page_size, | ||
search_after=partition.search_after) | ||
pipeline = self._create_pipeline() | ||
# Only needs this to satisfy the type constraints | ||
pipeline = ToDictStage(service=self.service, | ||
catalog=self.catalog, | ||
entity_type=self.entity_type).wrap(pipeline) | ||
pipeline = PaginationStage(service=self.service, | ||
catalog=self.catalog, | ||
entity_type=self.entity_type, | ||
pagination=pagination, | ||
filters=self.filters, | ||
peek_ahead=False).wrap(pipeline) | ||
request = self.service.create_request(catalog=self.catalog, | ||
entity_type=self.entity_type) | ||
# The response is processed by the generator, not the pipeline | ||
request = pipeline.prepare_request(request) | ||
return request | ||
|
||
def _search_after(self, hit: Hit) -> tuple[str, str]: | ||
a, b = hit.meta.sort | ||
return a, b | ||
partition = partition.last(file_name) | ||
return partition | ||
|
||
|
||
class FileBasedManifestGenerator(ManifestGenerator): | ||
|
@@ -1505,7 +1513,7 @@ def _write(file: JSON, is_related_file: bool = False): | |
output.write('\n\n'.join(curl_options)) | ||
output.write('\n\n') | ||
|
||
request = self._create_paged_request(partition) | ||
request = self._create_paged_request(partition.search_after) | ||
response = request.execute() | ||
if response.hits: | ||
hit = None | ||
|
@@ -1649,7 +1657,7 @@ def write_page_to(self, | |
if partition.page_index == 0: | ||
writer.writeheader() | ||
|
||
request = self._create_paged_request(partition) | ||
request = self._create_paged_request(partition.search_after) | ||
response = request.execute() | ||
if response.hits: | ||
project_short_names = set() | ||
|
@@ -2006,7 +2014,8 @@ def qualify(qualifier, column_name, index=None): | |
bundle_tsv_writer.writerow(row) | ||
|
||
|
||
class VerbatimManifestGenerator(FileBasedManifestGenerator, metaclass=ABCMeta): | ||
class VerbatimManifestGenerator(ClientSidePagingManifestGenerator, | ||
metaclass=ABCMeta): | ||
|
||
@property | ||
def entity_type(self) -> str: | ||
|
@@ -2044,7 +2053,20 @@ def include_orphans(self) -> bool: | |
for field_name, field_path in plugin.field_mapping.items() | ||
if field_path[0] == 'contents' and field_path[1] == plugin.root_entity_type | ||
} | ||
return self.filters.explicit.keys() < root_entity_fields | ||
# For both HCA and AnVIL, these root entities are bijective with the | ||
# sources used for indexing, and filtering by a specific project | ||
# or dataset entity should produce the same results as filtering by | ||
# that entity's source. | ||
# | ||
# The verbatim JSONL manifest generator temporarily inserts a source ID | ||
# match into its provided filters in order to partition the manifest. If | ||
# the source ID field were not included here, that insertion could | ||
# cause orphans to be incorrectly dropped from the manifest. | ||
source_fields = { | ||
plugin.special_fields.source_id, | ||
plugin.special_fields.source_spec | ||
} | ||
return self.filters.explicit.keys() < (root_entity_fields | source_fields) | ||
|
||
@attrs.frozen(kw_only=True) | ||
class ReplicaKeys: | ||
|
@@ -2060,9 +2082,24 @@ class ReplicaKeys: | |
hub_id: str | ||
replica_ids: list[str] | ||
|
||
def _replica_keys(self) -> Iterable[ReplicaKeys]: | ||
request = self._create_request() | ||
for hit in request.scan(): | ||
def _get_all_hits(self, | ||
request_callback: Callable[[SortKey | None], Search] | ||
) -> Iterable[Hit]: | ||
search_after = None | ||
while True: | ||
request = request_callback(search_after) | ||
response = request.execute() | ||
if response.hits: | ||
hit = None | ||
for hit in response.hits: | ||
yield hit | ||
assert hit is not None | ||
search_after = self._search_after(hit) | ||
else: | ||
break | ||
|
||
def _list_replica_keys(self) -> Iterable[ReplicaKeys]: | ||
for hit in self._get_all_hits(self._create_paged_request): | ||
document_ids = [ | ||
document_id | ||
for entity_type in self.hot_entity_types | ||
|
@@ -2075,10 +2112,9 @@ def _replica_keys(self) -> Iterable[ReplicaKeys]: | |
yield self.ReplicaKeys(hub_id=hit['entity_id'], | ||
replica_ids=document_ids) | ||
|
||
def _all_replicas(self) -> Iterable[JSON]: | ||
def _list_replicas(self) -> Iterable[JSON]: | ||
emitted_replica_ids = set() | ||
page_size = 100 | ||
for page in chunked(self._replica_keys(), page_size): | ||
for page in chunked(self._list_replica_keys(), self.page_size): | ||
num_replicas = 0 | ||
num_new_replicas = 0 | ||
for replica in self._join_replicas(page): | ||
|
@@ -2095,21 +2131,34 @@ def _all_replicas(self) -> Iterable[JSON]: | |
num_replicas, num_replicas - num_new_replicas, len(page)) | ||
|
||
def _join_replicas(self, keys: Iterable[ReplicaKeys]) -> Iterable[Hit]: | ||
request = self.service.create_request(catalog=self.catalog, | ||
entity_type='replica', | ||
doc_type=DocumentType.replica) | ||
hub_ids, replica_ids = set(), set() | ||
for key in keys: | ||
hub_ids.add(key.hub_id) | ||
replica_ids.update(key.replica_ids) | ||
|
||
request = self.service.create_request(catalog=self.catalog, | ||
entity_type='replica', | ||
doc_type=DocumentType.replica) | ||
request = request.query(Q('bool', should=[ | ||
{'terms': {'hub_ids.keyword': list(hub_ids)}}, | ||
{'terms': {'entity_id.keyword': list(replica_ids)}} | ||
])) | ||
return request.scan() | ||
request = request.extra(size=self.page_size) | ||
# For unknown reasons, this combination of sort fields appears to be the | ||
# only one that consistently produces the correct behavior. If entity_id | ||
# alone is used, the manifest is generated correctly in production, but | ||
# unit tests fail. If _doc alone is used, the tests pass, but replicas | ||
# are missing from actual manifests. | ||
request = request.sort('entity_id.keyword', '_doc') | ||
|
||
def search(search_after: SortKey | None) -> Search: | ||
return request.extra(search_after=search_after) | ||
|
||
return self._get_all_hits(search) | ||
|
||
class JSONLVerbatimManifestGenerator(VerbatimManifestGenerator): | ||
|
||
class JSONLVerbatimManifestGenerator(PagedManifestGenerator, | ||
VerbatimManifestGenerator): | ||
|
||
@property | ||
def content_type(self) -> str: | ||
|
@@ -2123,21 +2172,62 @@ def file_name_extension(cls) -> str: | |
def format(cls) -> ManifestFormat: | ||
return ManifestFormat.verbatim_jsonl | ||
|
||
def create_file(self) -> tuple[str, str | None]: | ||
fd, path = mkstemp(suffix=f'.{self.file_name_extension()}') | ||
os.close(fd) | ||
with open(path, 'w') as f: | ||
for replica in self._all_replicas(): | ||
entry = { | ||
'value': replica['contents'], | ||
'type': replica['replica_type'] | ||
} | ||
json.dump(entry, f) | ||
f.write('\n') | ||
return path, None | ||
@property | ||
def source_id_field(self) -> str: | ||
return self.metadata_plugin.special_fields.source_id | ||
|
||
def source_ids(self) -> list[str]: | ||
# Currently, we process each source that might be included in the | ||
# manifest. This can be very inefficient since many partitions may be | ||
# empty for small manifests. A potential optimization is to use a terms | ||
# aggregation to query for the set of nonempty sources before | ||
# processing any hits. | ||
|
||
# It's safe to assume that the explicit sources are a subset of the | ||
# accessible sources. If they're not, an exception will be raised when | ||
# the filters are applied. | ||
sources = self.filters.explicit.get(self.source_id_field, | ||
self.filters.source_ids) | ||
return sorted(sources) | ||
|
||
def write_page_to(self, | ||
partition: ManifestPartition, | ||
output: IO[str] | ||
) -> ManifestPartition: | ||
# All replicas from each source must be held in memory simultaneously to | ||
# avoid emitting duplicates. Therefore, each "page" of this manifest | ||
# must retrieve every replica from a given source, using multiple paged | ||
# requests to ElasticSearch if necessary. | ||
source_ids = self.source_ids() | ||
if partition.page_index < len(source_ids): | ||
source_id = source_ids[partition.page_index] | ||
log.info('Listing replicas from source %r for manifest page %d', | ||
source_id, partition.page_index) | ||
source_partition_filters = self.filters.update({ | ||
self.source_id_field: {'is': [source_id]} | ||
}) | ||
original_filters = self.filters | ||
try: | ||
self.filters = source_partition_filters | ||
replicas = self._list_replicas() | ||
for replica in replicas: | ||
entry = { | ||
'value': replica['contents'], | ||
'type': replica['replica_type'] | ||
} | ||
json.dump(entry, output) | ||
output.write('\n') | ||
finally: | ||
self.filters = original_filters | ||
|
||
if partition.page_index + 1 < len(source_ids): | ||
return partition.next_page(file_name=None, search_after=None) | ||
else: | ||
return partition.last_page() | ||
|
||
|
||
class PFBVerbatimManifestGenerator(VerbatimManifestGenerator): | ||
class PFBVerbatimManifestGenerator(FileBasedManifestGenerator, | ||
VerbatimManifestGenerator): | ||
|
||
@property | ||
def content_type(self) -> str: | ||
|
@@ -2189,7 +2279,7 @@ def _include_relations(self, replica: JSON) -> bool: | |
) | ||
|
||
def create_file(self) -> tuple[str, str | None]: | ||
replicas = list(self._all_replicas()) | ||
replicas = list(self._list_replicas()) | ||
plugin = self.metadata_plugin | ||
replica_schemas = plugin.verbatim_pfb_schema(replicas) | ||
# Ensure field order is consistent for unit tests | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -329,7 +329,8 @@ def _handle_overwrite(self, | |
object_key: str | ||
): | ||
error = exception.response['Error'] | ||
code, condition = error['Code'], error['Condition'] | ||
# `Condition` is only present when using conditional writes | ||
code, condition = error['Code'], error.get('Condition') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PL, commit title misleading There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment to explain when the KeyError occurs. Agreed at PL that this is sufficient. |
||
if code == 'PreconditionFailed' and condition == 'If-None-Match': | ||
raise StorageObjectExists(object_key) | ||
else: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then describe how a page of ES results (as defined by the superclass) may or may not translate to a manifest page.