diff --git a/pychunkedgraph/graph/meta.py b/pychunkedgraph/graph/meta.py index f3914e2b1..83d670ffe 100644 --- a/pychunkedgraph/graph/meta.py +++ b/pychunkedgraph/graph/meta.py @@ -1,3 +1,4 @@ +import json from datetime import timedelta from typing import Dict from typing import List @@ -10,6 +11,9 @@ from .utils.generic import compute_bitmasks from .chunks.utils import get_chunks_boundary +from ..utils.redis import keys as r_keys +from ..utils.redis import get_rq_queue +from ..utils.redis import get_redis_connection _datasource_fields = ("EDGES", "COMPONENTS", "WATERSHED", "DATA_VERSION", "CV_MIP") @@ -80,7 +84,20 @@ def custom_data(self): def ws_cv(self): if self._ws_cv: return self._ws_cv - self._ws_cv = CloudVolume(self._data_source.WATERSHED) + + cache_key = f"{self.graph_config.ID}:ws_cv_info_cached" + try: + # try reading a cached info file for distributed workers + # useful to avoid md5 errors on high gcs load + redis = get_redis_connection() + cached_info = json.loads(redis.get(cache_key)) + self._ws_cv = CloudVolume(self._data_source.WATERSHED, info=cached_info) + except Exception: + self._ws_cv = CloudVolume(self._data_source.WATERSHED) + try: + redis.set(cache_key, json.dumps(self._ws_cv.info)) + except Exception: + ... return self._ws_cv @property diff --git a/pychunkedgraph/ingest/cli.py b/pychunkedgraph/ingest/cli.py index 7061cd0f7..7668e8f24 100644 --- a/pychunkedgraph/ingest/cli.py +++ b/pychunkedgraph/ingest/cli.py @@ -2,15 +2,21 @@ cli for running ingest """ +from os import environ +from time import sleep + import click import yaml from flask.cli import AppGroup +from rq import Queue from .manager import IngestionManager from .utils import bootstrap +from .cluster import randomize_grid_points from ..graph.chunkedgraph import ChunkedGraph from ..utils.redis import get_redis_connection from ..utils.redis import keys as r_keys +from ..utils.general import chunked ingest_cli = AppGroup("ingest") @@ -96,18 +102,40 @@ def queue_layer(parent_layer): chunk_coords = [(0, 0, 0)] else: bounds = imanager.cg_meta.layer_chunk_bounds[parent_layer] - chunk_coords = list(product(*[range(r) for r in bounds])) - np.random.shuffle(chunk_coords) - - for coords in chunk_coords: - task_q = imanager.get_task_queue(f"l{parent_layer}") - task_q.enqueue( - create_parent_chunk, - job_id=chunk_id_str(parent_layer, coords), - job_timeout=f"{int(parent_layer * parent_layer)}m", - result_ttl=0, - args=(parent_layer, coords), - ) + chunk_coords = randomize_grid_points(*bounds) + + def get_chunks_not_done(coords: list) -> list: + """check for set membership in redis in batches""" + coords_strs = ["_".join(map(str, coord)) for coord in coords] + try: + completed = imanager.redis.smismember(f"{parent_layer}c", coords_strs) + except Exception: + return coords + return [coord for coord, c in zip(coords, completed) if not c] + + batch_size = int(environ.get("JOB_BATCH_SIZE", 10000)) + batches = chunked(chunk_coords, batch_size) + q = imanager.get_task_queue(f"l{parent_layer}") + + for batch in batches: + _coords = get_chunks_not_done(batch) + # buffer for optimal use of redis memory + if len(q) > int(environ.get("QUEUE_SIZE", 100000)): + interval = int(environ.get("QUEUE_INTERVAL", 300)) + sleep(interval) + + job_datas = [] + for chunk_coord in _coords: + job_datas.append( + Queue.prepare_data( + create_parent_chunk, + args=(parent_layer, chunk_coord), + result_ttl=0, + job_id=chunk_id_str(parent_layer, chunk_coord), + timeout=f"{int(parent_layer * parent_layer)}m", + ) + ) + q.enqueue_many(job_datas) @ingest_cli.command("status")