Skip to content

Commit

Permalink
Chunkinfo (HDFGroup#230)
Browse files Browse the repository at this point in the history
* add hdf5 to Docker image

* added IndexIterator

* added chunklocator.py

* update SingleObject design

* add test case for chunk init

* chunkinitializer support

* fix flake8 error

* filter out stray log messages from chunk initializer

* updates based on pr review
  • Loading branch information
jreadey authored Jun 2, 2023
1 parent 30ea4e6 commit 865aba2
Show file tree
Hide file tree
Showing 17 changed files with 813 additions and 73 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM python:3.9-slim AS hsds-base
#FROM python:3.10 AS hsds-base
FROM hdfgroup/hdf5lib:1.14.0 as hsds-base

# Install Curl
RUN apt-get update; apt-get -y install curl
Expand Down
21 changes: 20 additions & 1 deletion docs/design/single_object/SingleObject.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,31 @@ Example:
```
"layout": {
"class": "H5D_CHUNKED_REF_INDIRECT",
"dims": [40, 80],
"dims": [100, 100],
"file_uri": "s3://mybucket/mylocation/myfile.h5",
"chunk_table": "d-7fbe2e27-87c5e1d8-f736-a6af0f-4d6950"
}
```

By default, the chunk_table needs be initialized by an external program (e.g. `hsload --init`) in
order for data in the HDF5 file to be returned correctly. It is also possible for the chunk table
dataset to be initialized by HSDS by using the following layout for the chunk_table dataset:

```
"layout": {
"class:": "H5D_CHUNKED",
"dims": [10, 10],
"chunk_initializer": "hsds-chunklocator",
"initializer_args": ["s3://mybucket/mylocation/myfile.h5",]
}
```

In this example, the chunk table dataset has shape [10,10] and is chunked with one 10 x 10 chunk.
When the chunk table dataset is first accessed, the application specified in chunk_init will be
used to initalize the chunk.





## HSDS Service changes
Expand Down
45 changes: 34 additions & 11 deletions hsds/chunk_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .util.storUtil import isStorObj, deleteStorObj
from .util.hdf5dtype import createDataType
from .util.dsetUtil import getSelectionList, getChunkLayout
from .util.dsetUtil import getSelectionShape
from .util.dsetUtil import getSelectionShape, getChunkInitializer
from .util.chunkUtil import getChunkIndex, getDatasetId, chunkQuery
from .util.chunkUtil import chunkWriteSelection, chunkReadSelection
from .util.chunkUtil import chunkWritePoints, chunkReadPoints
Expand Down Expand Up @@ -77,11 +77,6 @@ async def PUT_Chunk(request):
log.warn(msg)
raise HTTPInternalServerError(reason=msg)

if query:
chunk_init = False # don't initalize new chunks on query update
else:
chunk_init = True

try:
validateInPartition(app, chunk_id)
except KeyError:
Expand Down Expand Up @@ -138,6 +133,14 @@ async def PUT_Chunk(request):
num_elements = 1
for extent in mshape:
num_elements *= extent

if getChunkInitializer(dset_json):
chunk_init = True
elif query:
chunk_init = False # don't initalize new chunks on query update
else:
chunk_init = True

kwargs = {"bucket": bucket, "chunk_init": chunk_init}
chunk_arr = await get_chunk(app, chunk_id, dset_json, **kwargs)
is_dirty = False
Expand Down Expand Up @@ -289,7 +292,6 @@ async def GET_Chunk(request):
msg = f"invalid partition for obj id: {chunk_id}"
log.error(msg)
raise HTTPInternalServerError()
log.debug(f"GET_Chunk - request params: {params.keys()}")

if "s3path" in params:
s3path = params["s3path"]
Expand Down Expand Up @@ -328,7 +330,6 @@ async def GET_Chunk(request):

dset_json = await get_metadata_obj(app, dset_id, bucket=bucket)
dims = getChunkLayout(dset_json)
log.debug(f"GET_Chunk - dset_json: {dset_json}")
log.debug(f"GET_Chunk - got dims: {dims}")

# get chunk selection from query params
Expand All @@ -345,19 +346,30 @@ async def GET_Chunk(request):
raise HTTPInternalServerError()
log.debug(f"GET_Chunk - got selection: {selection}")

kwargs = {"chunk_init": False}
if getChunkInitializer(dset_json):
chunk_init = True
else:
chunk_init = False

kwargs = {}
if s3path:
kwargs["s3path"] = s3path
kwargs["s3offset"] = s3offset
kwargs["s3size"] = s3size
else:
kwargs["bucket"] = bucket

kwargs["chunk_init"] = chunk_init

chunk_arr = await get_chunk(app, chunk_id, dset_json, **kwargs)
if chunk_arr is None:
msg = f"chunk {chunk_id} not found"
log.warn(msg)
raise HTTPNotFound()

if chunk_init:
save_chunk(app, chunk_id, dset_json, chunk_arr, bucket=bucket)

if query:

try:
Expand Down Expand Up @@ -521,13 +533,21 @@ async def POST_Chunk(request):
dset_id = getDatasetId(chunk_id)

dset_json = await get_metadata_obj(app, dset_id, bucket=bucket)
log.debug(f"get_metadata_obj for {dset_id} returned {dset_json}")
dims = getChunkLayout(dset_json)
rank = len(dims)

type_json = dset_json["type"]
dset_dtype = createDataType(type_json)
output_arr = None
chunk_init = False # will be set to True for setting points

if getChunkInitializer(dset_json):
chunk_init = True
elif put_points:
chunk_init = True
else:
# don't need for getting points
chunk_init = False

if content_type == "binary":
# create a numpy array for incoming points
Expand All @@ -549,7 +569,6 @@ async def POST_Chunk(request):
type_fields = [("coord", np.dtype(coord_type_str)), ("value", dset_dtype)]
point_dt = np.dtype(type_fields)
point_shape = (num_points,)
chunk_init = True
else:
point_dt = np.dtype("uint64")
point_shape = (num_points, rank)
Expand All @@ -576,6 +595,10 @@ async def POST_Chunk(request):
log.warn(f"chunk {chunk_id} not found")
raise HTTPNotFound()

if chunk_init and not put_points:
# lazily write chunk to storage
save_chunk(app, chunk_id, dset_json, chunk_arr, bucket=bucket)

if put_points:
# writing point data
try:
Expand Down
1 change: 1 addition & 0 deletions hsds/chunk_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def getChunkItem(chunkid):
log.debug(f"got chunktable data: {point_data}")
if "file_uri" in layout:
s3_layout_path = layout["file_uri"]
log.debug(f"got s3_layout_path: {s3_layout_path}")
else:
s3_layout_path = None

Expand Down
Loading

0 comments on commit 865aba2

Please sign in to comment.