Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fieldop #292

Merged
merged 10 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .util.k8sClient import getDnLabelSelector, getPodIps
from . import hsds_logger as log

HSDS_VERSION = "0.8.4"
HSDS_VERSION = "0.8.5"


def getVersion():
Expand Down
129 changes: 81 additions & 48 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def write_chunk_hyperslab(
if not bucket:
bucket = config.get("bucket_name")

msg = f"write_chunk_hyperslab, chunk_id:{chunk_id}, slices:{slices}, "
msg = f"write_chunk_hyperslab, chunk_id: {chunk_id}, slices: {slices}, "
msg += f"bucket: {bucket}"
log.info(msg)
if "layout" not in dset_json:
Expand All @@ -95,11 +95,20 @@ async def write_chunk_hyperslab(
log.debug(f"using partition_chunk_id: {partition_chunk_id}")
chunk_id = partition_chunk_id # replace the chunk_id

params = {}

if "type" not in dset_json:
log.error(f"No type found in dset_json: {dset_json}")
raise HTTPInternalServerError()
type_json = dset_json["type"]
dset_dtype = createDataType(type_json)

if len(arr.dtype) < len(dset_dtype):
# field selection, pass in the field names
fields_param = ":".join(arr.dtype.names)
log.debug(f"setting fields_param to: {fields_param}")
params["fields"] = fields_param

params = {}
layout = getChunkLayout(dset_json)
log.debug(f"getChunkCoverage({chunk_id}, {slices}, {layout})")
chunk_sel = getChunkCoverage(chunk_id, slices, layout)
Expand Down Expand Up @@ -154,6 +163,7 @@ async def read_chunk_hyperslab(
chunk_id,
dset_json,
np_arr,
select_dtype=None,
query=None,
query_update=None,
limit=0,
Expand All @@ -178,6 +188,10 @@ async def read_chunk_hyperslab(
log.error("expected chunk_map to be set")
return

if np_arr is None and select_dtype is None:
log.error("expected np_arr to be set")
return

msg = f"read_chunk_hyperslab, chunk_id: {chunk_id},"
msg += f" bucket: {bucket}"
if query is not None:
Expand All @@ -187,7 +201,18 @@ async def read_chunk_hyperslab(
log.warn(f"expected to find {chunk_id} in chunk_map")
return
chunk_info = chunk_map[chunk_id]
log.debug(f"using chunk_map entry for {chunk_id}: {chunk_info}")
log.debug(f"using chunk_map entry for {chunk_id}")
if "points" in chunk_info:
points = chunk_info["points"]
log.debug(f"chunkinfo {len(points)} points")
elif "chunk_sel" in chunk_info:
chunk_sel = chunk_info["chunk_sel"]
log.debug(f"chunkinfo - chunk_sel: {chunk_sel}")
elif "data_sel" in chunk_info:
data_sel = chunk_info["data_sel"]
log.debug(f"chunkinfo - data_sel: {data_sel}")
else:
log.warn(f"unexpected chunkinfo: {chunk_info}")

partition_chunk_id = getChunkIdForPartition(chunk_id, dset_json)
if partition_chunk_id != chunk_id:
Expand All @@ -197,6 +222,8 @@ async def read_chunk_hyperslab(
if "type" not in dset_json:
log.error(f"No type found in dset_json: {dset_json}")
raise HTTPInternalServerError()
type_json = dset_json["type"]
dset_dt = createDataType(type_json)

chunk_shape = None # expected return array shape
chunk_sel = None # for hyperslab
Expand Down Expand Up @@ -224,17 +251,16 @@ async def read_chunk_hyperslab(
raise HTTPInternalServerError()
point_index = chunk_info["indices"]
method = "POST"
chunk_shape = [
len(point_list),
]
chunk_shape = [len(point_list), ]
log.debug(f"point selection - chunk_shape: {chunk_shape}")

type_json = dset_json["type"]
dt = createDataType(type_json)
if select_dtype is None and np_arr is not None:
select_dtype = np_arr.dtype

if query is None and query_update is None:
query_dtype = None
else:
query_dtype = getQueryDtype(dt)
query_dtype = getQueryDtype(select_dtype)

chunk_arr = None
array_data = None
Expand Down Expand Up @@ -265,6 +291,13 @@ async def read_chunk_hyperslab(
# convert to colon seperated string
hyper_dims = ":".join(map(str, hyper_dims))
params["hyper_dims"] = hyper_dims
if len(select_dtype) < len(dset_dt):
# field selection, pass in the field names
fields_param = ":".join(select_dtype.names)
log.debug(f"setting fields param to: {fields_param}")
params["fields"] = fields_param
else:
log.debug("no fields param")

# set query-based params
if query is not None:
Expand Down Expand Up @@ -323,14 +356,10 @@ async def read_chunk_hyperslab(
array_data = await http_get(app, req, params=params, client=client)
log.debug(f"http_get {req}, returned {len(array_data)} bytes")
elif method == "PUT":
array_data = await http_put(
app, req, data=body, params=params, client=client
)
array_data = await http_put(app, req, data=body, params=params, client=client)
log.debug(f"http_put {req}, returned {len(array_data)} bytes")
else: # POST
array_data = await http_post(
app, req, data=body, params=params, client=client
)
array_data = await http_post(app, req, data=body, params=params, client=client)
log.debug(f"http_post {req}, returned {len(array_data)} bytes")
except HTTPNotFound:
if query is None and "s3path" in params:
Expand Down Expand Up @@ -364,7 +393,9 @@ async def read_chunk_hyperslab(
else:
# convert binary data to numpy array
try:
chunk_arr = bytesToArray(array_data, dt, chunk_shape)
log.debug(f"np_arr.dtype: {np_arr.dtype}")
log.debug(f"chunk_shape: {chunk_shape}")
chunk_arr = bytesToArray(array_data, np_arr.dtype, chunk_shape)
except ValueError as ve:
log.warn(f"bytesToArray ValueError: {ve}")
raise HTTPBadRequest()
Expand All @@ -377,9 +408,9 @@ async def read_chunk_hyperslab(
raise HTTPInternalServerError()
chunk_arr = chunk_arr.reshape(chunk_shape)

log.info(f"chunk_arr shape: {chunk_arr.shape}")
log.info(f"data_sel: {data_sel}")
log.info(f"np_arr shape: {np_arr.shape}")
log.debug(f"chunk_arr shape: {chunk_arr.shape}, dtype: {chunk_arr.dtype}")
log.debug(f"data_sel: {data_sel}")
log.debug(f"np_arr shape: {np_arr.shape}")

if point_list is not None:
# point selection
Expand Down Expand Up @@ -593,6 +624,7 @@ def __init__(
bucket=None,
slices=None,
arr=None,
select_dtype=None,
query=None,
query_update=None,
limit=0,
Expand All @@ -603,14 +635,18 @@ def __init__(
max_tasks_per_node = config.get("max_tasks_per_node_per_request", default=16)
client_pool_count = config.get("client_pool_count", default=10)
log.info(f"ChunkCrawler.__init__ {len(chunk_ids)} chunks, action={action}")
log.debug(f"ChunkCrawler - chunk_ids: {chunk_ids}")
if len(chunk_ids) < 10:
log.debug(f"ChunkCrawler - chunk_ids: {chunk_ids}")
else:
log.debug(f"ChunkCrawler - chunk_ids: {chunk_ids[:10]} ...")

self._app = app
self._slices = slices
self._chunk_ids = chunk_ids
self._chunk_map = chunk_map
self._dset_json = dset_json
self._arr = arr
self._select_dtype = select_dtype
self._points = points
self._query = query
self._query_update = query_update
Expand All @@ -630,6 +666,7 @@ def __init__(
self._max_tasks = max_tasks
else:
self._max_tasks = len(chunk_ids)
log.debug(f"ChunkCrawler max_tasks: {max_tasks}")

if self._max_tasks >= client_pool_count:
self._client_pool = 1
Expand Down Expand Up @@ -683,7 +720,8 @@ async def work(self):
this_task = asyncio.current_task()
task_name = this_task.get_name()
log.info(f"ChunkCrawler - work method for task: {task_name}")
client_name = f"{task_name}.{random.randrange(0,self._client_pool)}"
task_suffix = random.randrange(0, self._client_pool)
client_name = f"{task_name}.{task_suffix}"
log.info(f"ChunkCrawler - client_name: {client_name}")
while True:
try:
Expand Down Expand Up @@ -739,16 +777,16 @@ async def do_work(self, chunk_id, client=None):
chunk_id,
self._dset_json,
self._arr,
select_dtype=self._select_dtype,
query=self._query,
query_update=self._query_update,
limit=self._limit,
chunk_map=self._chunk_map,
bucket=self._bucket,
client=client,
)
log.debug(
f"read_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}"
)
msg = f"read_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}"
log.debug(msg)
status_code = 200
elif self._action == "write_chunk_hyperslab":
await write_chunk_hyperslab(
Expand All @@ -760,19 +798,19 @@ async def do_work(self, chunk_id, client=None):
bucket=self._bucket,
client=client,
)
log.debug(
f"write_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}"
)

msg = f"write_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}"
log.debug(msg)
status_code = 200
elif self._action == "read_point_sel":
if not isinstance(self._points, dict):
log.error("ChunkCrawler - expected dict for points")
status_code = 500
break
if chunk_id not in self._points:
log.error(
f"ChunkCrawler - read_point_sel, no entry for chunk: {chunk_id}"
)
msg = "ChunkCrawler - read_point_sel, "
msg += f"no entry for chunk: {chunk_id}"
log.error(msg)
status_code = 500
break
item = self._points[chunk_id]
Expand All @@ -790,19 +828,18 @@ async def do_work(self, chunk_id, client=None):
bucket=self._bucket,
client=client,
)
log.debug(
f"read_point_sel - got 200 status for chunk_id: {chunk_id}"
)
msg = f"read_point_sel - got 200 status for chunk_id: {chunk_id}"
log.debug(msg)
status_code = 200
elif self._action == "write_point_sel":
if not isinstance(self._points, dict):
log.error("ChunkCrawler - expected dict for points")
status_code = 500
break
if chunk_id not in self._points:
log.error(
f"ChunkCrawler - read_point_sel, no entry for chunk: {chunk_id}"
)
msg = "ChunkCrawler - read_point_sel, "
msg += f"no entry for chunk: {chunk_id}"
log.error(msg)
status_code = 500
break
item = self._points[chunk_id]
Expand All @@ -819,9 +856,8 @@ async def do_work(self, chunk_id, client=None):
bucket=self._bucket,
client=client,
)
log.debug(
f"read_point_sel - got 200 status for chunk_id: {chunk_id}"
)
msg = f"read_point_sel - got 200 status for chunk_id: {chunk_id}"
log.debug(msg)
status_code = 200
else:
log.error(f"ChunkCrawler - unexpected action: {self._action}")
Expand All @@ -830,9 +866,8 @@ async def do_work(self, chunk_id, client=None):

except ClientError as ce:
status_code = 500
log.warn(
f"ClientError {type(ce)} for {self._action}({chunk_id}): {ce} "
)
msg = f"ClientError {type(ce)} for {self._action}({chunk_id}): {ce} "
log.warn(msg)
except CancelledError as cle:
status_code = 503
log.warn(f"CancelledError for {self._action}({chunk_id}): {cle}")
Expand All @@ -845,9 +880,8 @@ async def do_work(self, chunk_id, client=None):
break
except HTTPInternalServerError as ise:
status_code = 500
log.warn(
f"HTTPInternalServerError for {self._action}({chunk_id}): {ise}"
)
msg = f"HTTPInternalServerError for {self._action}({chunk_id}): {ise}"
log.warn(msg)
except HTTPServiceUnavailable as sue:
status_code = 503
msg = f"HTTPServiceUnavailable for {self._action}({chunk_id}): {sue}"
Expand Down Expand Up @@ -876,6 +910,5 @@ async def do_work(self, chunk_id, client=None):
if "query_rsp" in item:
query_rsp = item["query_rsp"]
self._hits += len(query_rsp)
log.info(
f"ChunkCrawler - worker status for chunk {chunk_id}: {self._status_map[chunk_id]}"
)
msg = f"ChunkCrawler - worker status for chunk {chunk_id}: {self._status_map[chunk_id]}"
log.info(msg)
Loading