Skip to content

Fix: Cannot generate unfiltered JSONL manifest in AnVIL (#7190) #7225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 162 additions & 72 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
)
import time
from typing import (
Callable,
ClassVar,
IO,
Protocol,
Expand Down Expand Up @@ -139,9 +140,11 @@
PFBRelation,
)
from azul.service.elasticsearch_service import (
ElasticsearchChain,
ElasticsearchService,
Pagination,
PaginationStage,
SortKey,
ToDictStage,
)
from azul.service.storage_service import (
Expand Down Expand Up @@ -513,7 +516,7 @@ class ManifestPartition:

#: The `sort` value of the first hit of the current page in this partition,
#: or None if there is no current page.
search_after: tuple[str, str] | None = None
search_after: SortKey | None = None

@classmethod
def from_json(cls, partition: JSON) -> Self:
Expand All @@ -531,10 +534,10 @@ def first(cls) -> Self:
is_last=False)

@property
def is_first(self):
def is_first(self) -> bool:
return not (self.index or self.page_index)

def with_config(self, config: AnyJSON):
def with_config(self, config: AnyJSON) -> Self:
return attrs.evolve(self, config=config)

def with_upload(self, multipart_upload_id) -> Self:
Expand All @@ -550,7 +553,7 @@ def first_page(self) -> Self:

def next_page(self,
file_name: str | None,
search_after: tuple[str, str]
search_after: SortKey | None
) -> Self:
assert self.page_index is not None, self
# If different pages yield different file names, use default file name
Expand All @@ -562,7 +565,7 @@ def next_page(self,
file_name=file_name,
search_after=search_after)

def last_page(self):
def last_page(self) -> Self:
return attrs.evolve(self, is_last_page=True)

def next(self, part_etag: str) -> Self:
Expand Down Expand Up @@ -1047,7 +1050,7 @@ def _create_request(self) -> Search:
# The response is processed by the generator, not the pipeline
return request

def _create_pipeline(self):
def _create_pipeline(self) -> ElasticsearchChain:
if self.included_fields is None:
document_slice = DocumentSlice()
else:
Expand Down Expand Up @@ -1220,7 +1223,41 @@ def storage(self):
return self.service.storage_service


class PagedManifestGenerator(ManifestGenerator):
class ClientSidePagingManifestGenerator(ManifestGenerator, metaclass=ABCMeta):
"""
A mixin for manifest generators that use client-side paging to query
Elasticsearch.
"""
page_size = 500

def _create_paged_request(self, search_after: SortKey | None) -> Search:
pagination = Pagination(sort='entryId',
order='asc',
size=self.page_size,
search_after=search_after)
pipeline = self._create_pipeline()
# Only needs this to satisfy the type constraints
pipeline = ToDictStage(service=self.service,
catalog=self.catalog,
entity_type=self.entity_type).wrap(pipeline)
pipeline = PaginationStage(service=self.service,
catalog=self.catalog,
entity_type=self.entity_type,
pagination=pagination,
filters=self.filters,
peek_ahead=False).wrap(pipeline)
request = self.service.create_request(catalog=self.catalog,
entity_type=self.entity_type)
# The response is processed by the generator, not the pipeline
request = pipeline.prepare_request(request)
return request

def _search_after(self, hit: Hit) -> SortKey:
a, b = hit.meta.sort
return a, b


class PagedManifestGenerator(ClientSidePagingManifestGenerator):
"""
A manifest generator whose output can be split over multiple concatenable
IO streams.
Comment on lines 1262 to 1263
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
A manifest generator whose output can be split over multiple concatenable
IO streams.
A manifest generator whose output is split over several concatenable
segments, also known as pages.

And then describe how a page of ES results (as defined by the superclass) may or may not translate to a manifest page.

Expand Down Expand Up @@ -1276,52 +1313,23 @@ def write(self,
with TextIOWrapper(buffer, encoding='utf-8', write_through=True) as text_buffer:
while True:
partition = self.write_page_to(partition, output=text_buffer)
# Manifest lambda has 2 GB of memory
assert buffer.tell() < 1.5 * 1024 ** 3
if partition.is_last_page or buffer.tell() > self.part_size:
break

def upload_part():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL please. The commit titles calls this a refactoring but this does not look like a refactoring to me.

if buffer.tell() > 0:
buffer.seek(0)
return self.storage.upload_multipart_part(buffer, partition.index + 1, upload)

part_etag = self.storage.upload_multipart_part(buffer, partition.index + 1, upload)
partition = partition.next(part_etag=part_etag)
if partition.is_last_page:
if buffer.tell() > 0:
partition = partition.next(part_etag=upload_part())
self.storage.complete_multipart_upload(upload, partition.part_etags)
file_name = self.file_name(manifest_key, base_name=partition.file_name)
tagging = self.tagging(file_name)
if tagging is not None:
self.storage.put_object_tagging(object_key, tagging)
return partition.last(file_name)
else:
return partition.next(part_etag=upload_part())

page_size = 500

def _create_paged_request(self, partition: ManifestPartition) -> Search:
pagination = Pagination(sort='entryId',
order='asc',
size=self.page_size,
search_after=partition.search_after)
pipeline = self._create_pipeline()
# Only needs this to satisfy the type constraints
pipeline = ToDictStage(service=self.service,
catalog=self.catalog,
entity_type=self.entity_type).wrap(pipeline)
pipeline = PaginationStage(service=self.service,
catalog=self.catalog,
entity_type=self.entity_type,
pagination=pagination,
filters=self.filters,
peek_ahead=False).wrap(pipeline)
request = self.service.create_request(catalog=self.catalog,
entity_type=self.entity_type)
# The response is processed by the generator, not the pipeline
request = pipeline.prepare_request(request)
return request

def _search_after(self, hit: Hit) -> tuple[str, str]:
a, b = hit.meta.sort
return a, b
partition = partition.last(file_name)
return partition


class FileBasedManifestGenerator(ManifestGenerator):
Expand Down Expand Up @@ -1505,7 +1513,7 @@ def _write(file: JSON, is_related_file: bool = False):
output.write('\n\n'.join(curl_options))
output.write('\n\n')

request = self._create_paged_request(partition)
request = self._create_paged_request(partition.search_after)
response = request.execute()
if response.hits:
hit = None
Expand Down Expand Up @@ -1649,7 +1657,7 @@ def write_page_to(self,
if partition.page_index == 0:
writer.writeheader()

request = self._create_paged_request(partition)
request = self._create_paged_request(partition.search_after)
response = request.execute()
if response.hits:
project_short_names = set()
Expand Down Expand Up @@ -2006,7 +2014,8 @@ def qualify(qualifier, column_name, index=None):
bundle_tsv_writer.writerow(row)


class VerbatimManifestGenerator(FileBasedManifestGenerator, metaclass=ABCMeta):
class VerbatimManifestGenerator(ClientSidePagingManifestGenerator,
metaclass=ABCMeta):

@property
def entity_type(self) -> str:
Expand Down Expand Up @@ -2044,7 +2053,20 @@ def include_orphans(self) -> bool:
for field_name, field_path in plugin.field_mapping.items()
if field_path[0] == 'contents' and field_path[1] == plugin.root_entity_type
}
return self.filters.explicit.keys() < root_entity_fields
# For both HCA and AnVIL, these root entities are bijective with the
# sources used for indexing, and filtering by a specific project
# or dataset entity should produce the same results as filtering by
# that entity's source.
#
# The verbatim JSONL manifest generator temporarily inserts a source ID
# match into its provided filters in order to partition the manifest. If
# the source ID field were not included here, that insertion could
# cause orphans to be incorrectly dropped from the manifest.
source_fields = {
plugin.special_fields.source_id,
plugin.special_fields.source_spec
}
return self.filters.explicit.keys() < (root_entity_fields | source_fields)

@attrs.frozen(kw_only=True)
class ReplicaKeys:
Expand All @@ -2060,9 +2082,24 @@ class ReplicaKeys:
hub_id: str
replica_ids: list[str]

def _replica_keys(self) -> Iterable[ReplicaKeys]:
request = self._create_request()
for hit in request.scan():
def _get_all_hits(self,
request_callback: Callable[[SortKey | None], Search]
) -> Iterable[Hit]:
search_after = None
while True:
request = request_callback(search_after)
response = request.execute()
if response.hits:
hit = None
for hit in response.hits:
yield hit
assert hit is not None
search_after = self._search_after(hit)
else:
break

def _list_replica_keys(self) -> Iterable[ReplicaKeys]:
for hit in self._get_all_hits(self._create_paged_request):
document_ids = [
document_id
for entity_type in self.hot_entity_types
Expand All @@ -2075,10 +2112,9 @@ def _replica_keys(self) -> Iterable[ReplicaKeys]:
yield self.ReplicaKeys(hub_id=hit['entity_id'],
replica_ids=document_ids)

def _all_replicas(self) -> Iterable[JSON]:
def _list_replicas(self) -> Iterable[JSON]:
emitted_replica_ids = set()
page_size = 100
for page in chunked(self._replica_keys(), page_size):
for page in chunked(self._list_replica_keys(), self.page_size):
num_replicas = 0
num_new_replicas = 0
for replica in self._join_replicas(page):
Expand All @@ -2095,21 +2131,34 @@ def _all_replicas(self) -> Iterable[JSON]:
num_replicas, num_replicas - num_new_replicas, len(page))

def _join_replicas(self, keys: Iterable[ReplicaKeys]) -> Iterable[Hit]:
request = self.service.create_request(catalog=self.catalog,
entity_type='replica',
doc_type=DocumentType.replica)
hub_ids, replica_ids = set(), set()
for key in keys:
hub_ids.add(key.hub_id)
replica_ids.update(key.replica_ids)

request = self.service.create_request(catalog=self.catalog,
entity_type='replica',
doc_type=DocumentType.replica)
request = request.query(Q('bool', should=[
{'terms': {'hub_ids.keyword': list(hub_ids)}},
{'terms': {'entity_id.keyword': list(replica_ids)}}
]))
return request.scan()
request = request.extra(size=self.page_size)
# For unknown reasons, this combination of sort fields appears to be the
# only one that consistently produces the correct behavior. If entity_id
# alone is used, the manifest is generated correctly in production, but
# unit tests fail. If _doc alone is used, the tests pass, but replicas
# are missing from actual manifests.
request = request.sort('entity_id.keyword', '_doc')

def search(search_after: SortKey | None) -> Search:
return request.extra(search_after=search_after)

return self._get_all_hits(search)

class JSONLVerbatimManifestGenerator(VerbatimManifestGenerator):

class JSONLVerbatimManifestGenerator(PagedManifestGenerator,
VerbatimManifestGenerator):

@property
def content_type(self) -> str:
Expand All @@ -2123,21 +2172,62 @@ def file_name_extension(cls) -> str:
def format(cls) -> ManifestFormat:
return ManifestFormat.verbatim_jsonl

def create_file(self) -> tuple[str, str | None]:
fd, path = mkstemp(suffix=f'.{self.file_name_extension()}')
os.close(fd)
with open(path, 'w') as f:
for replica in self._all_replicas():
entry = {
'value': replica['contents'],
'type': replica['replica_type']
}
json.dump(entry, f)
f.write('\n')
return path, None
@property
def source_id_field(self) -> str:
return self.metadata_plugin.special_fields.source_id

def source_ids(self) -> list[str]:
# Currently, we process each source that might be included in the
# manifest. This can be very inefficient since many partitions may be
# empty for small manifests. A potential optimization is to use a terms
# aggregation to query for the set of nonempty sources before
# processing any hits.

# It's safe to assume that the explicit sources are a subset of the
# accessible sources. If they're not, an exception will be raised when
# the filters are applied.
sources = self.filters.explicit.get(self.source_id_field,
self.filters.source_ids)
return sorted(sources)

def write_page_to(self,
partition: ManifestPartition,
output: IO[str]
) -> ManifestPartition:
# All replicas from each source must be held in memory simultaneously to
# avoid emitting duplicates. Therefore, each "page" of this manifest
# must retrieve every replica from a given source, using multiple paged
# requests to ElasticSearch if necessary.
source_ids = self.source_ids()
if partition.page_index < len(source_ids):
source_id = source_ids[partition.page_index]
log.info('Listing replicas from source %r for manifest page %d',
source_id, partition.page_index)
source_partition_filters = self.filters.update({
self.source_id_field: {'is': [source_id]}
})
original_filters = self.filters
try:
self.filters = source_partition_filters
replicas = self._list_replicas()
for replica in replicas:
entry = {
'value': replica['contents'],
'type': replica['replica_type']
}
json.dump(entry, output)
output.write('\n')
finally:
self.filters = original_filters

if partition.page_index + 1 < len(source_ids):
return partition.next_page(file_name=None, search_after=None)
else:
return partition.last_page()


class PFBVerbatimManifestGenerator(VerbatimManifestGenerator):
class PFBVerbatimManifestGenerator(FileBasedManifestGenerator,
VerbatimManifestGenerator):

@property
def content_type(self) -> str:
Expand Down Expand Up @@ -2189,7 +2279,7 @@ def _include_relations(self, replica: JSON) -> bool:
)

def create_file(self) -> tuple[str, str | None]:
replicas = list(self._all_replicas())
replicas = list(self._list_replicas())
plugin = self.metadata_plugin
replica_schemas = plugin.verbatim_pfb_schema(replicas)
# Ensure field order is consistent for unit tests
Expand Down
3 changes: 2 additions & 1 deletion src/azul/service/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ def _handle_overwrite(self,
object_key: str
):
error = exception.response['Error']
code, condition = error['Code'], error['Condition']
# `Condition` is only present when using conditional writes
code, condition = error['Code'], error.get('Condition')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL, commit title misleading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment to explain when the KeyError occurs. Agreed at PL that this is sufficient.

if code == 'PreconditionFailed' and condition == 'If-None-Match':
raise StorageObjectExists(object_key)
else:
Expand Down