Skip to content

Add bulk-update command and support for TIMDEX parquet dataset #359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,13 @@ dist-stage:
publish-stage:
docker login -u AWS -p $$(aws ecr get-login-password --region us-east-1) $(ECR_URL_STAGE)
docker push $(ECR_URL_STAGE):latest
docker push $(ECR_URL_STAGE):`git describe --always`
docker push $(ECR_URL_STAGE):`git describe --always`

##############################
# Local Opensearch commands
##############################

local-opensearch: # Run a local instance of Opensearch via Docker Compose
docker pull opensearchproject/opensearch:latest
docker pull opensearchproject/opensearch-dashboards:latest
docker compose --env-file .env up
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mypy = "*"
pre-commit = "*"
pytest = "*"
ruff = "*"
timdex-dataset-api = { git = "git+https://github.com/MITLibraries/timdex-dataset-api.git"}
vcrpy = "*"

[requires]
Expand Down
1,707 changes: 968 additions & 739 deletions Pipfile.lock

Large diffs are not rendered by default.

37 changes: 13 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ TIMDEX! Index Manager (TIM) is a Python CLI application for managing TIMDEX indi
- To update dependencies: `make update`
- To run unit tests: `make test`
- To lint the repo: `make lint`
- To run local OpenSearch with Docker: `make local-opensearch`
- To run the app: `pipenv run tim --help`

**Important note:** The sections that follow provide instructions for running OpenSearch **locally with Docker**. These instructions are useful for testing. Please make sure the environment variable `TIMDEX_OPENSEARCH_ENDPOINT` is **not** set before proceeding.
Expand Down Expand Up @@ -92,34 +93,21 @@ For a more detailed example with test data, please refer to the Confluence docum

### Required ENV

```
# Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
WORKSPACE=dev
```shell
WORKSPACE=### Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
```

## Optional ENV

```
# Only needed if AWS region changes from the default of us-east-1.
AWS_REGION=

# Chunk size limit for sending requests to the bulk indexing endpoint, in bytes. Defaults to 104857600 (100 * 1024 * 1024) if not set.
OPENSEARCH_BULK_MAX_CHUNK_BYTES=

# Maximum number of retries when sending requests to the bulk indexing endpoint. Defaults to 50 if not set.
OPENSEARCH_BULK_MAX_RETRIES=

# Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 120 seconds if not set.
OPENSEARCH_REQUEST_TIMEOUT=

# The ingest process logs the # of records indexed every nth record. Set this env variable to any integer to change the frequency of logging status updates. Can be useful for development/debugging. Defaults to 1000 if not set.
STATUS_UPDATE_INTERVAL=

# If using a local Docker OpenSearch instance, this isn't needed. Otherwise set to OpenSearch instance endpoint without the http scheme (e.g., "search-timdex-env-1234567890.us-east-1.es.amazonaws.com"). Can also be passed directly to the CLI via the `--url` option.
TIMDEX_OPENSEARCH_ENDPOINT=

# If set to a valid Sentry DSN, enables Sentry exception monitoring This is not needed for local development.
SENTRY_DSN=
```shell
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much cleaner, great change!

AWS_REGION=### Only needed if AWS region changes from the default of us-east-1.
OPENSEARCH_BULK_MAX_CHUNK_BYTES=### Chunk size limit for sending requests to the bulk indexing endpoint, in bytes. Defaults to 104857600 (100 * 1024 * 1024) if not set.
OPENSEARCH_BULK_MAX_RETRIES=### Maximum number of retries when sending requests to the bulk indexing endpoint. Defaults to 50 if not set.
OPENSEARCH_INITIAL_ADMIN_PASSWORD=###If using a local Docker OpenSearch instance, this must be set (for versions >= 2.12.0).
OPENSEARCH_REQUEST_TIMEOUT=### Only used for OpenSearch requests that tend to take longer than the default timeout of 10 seconds, such as bulk or index refresh requests. Defaults to 120 seconds if not set.
STATUS_UPDATE_INTERVAL=### The ingest process logs the # of records indexed every nth record. Set this env variable to any integer to change the frequency of logging status updates. Can be useful for development/debugging. Defaults to 1000 if not set.
TIMDEX_OPENSEARCH_ENDPOINT=### If using a local Docker OpenSearch instance, this isn't needed. Otherwise set to OpenSearch instance endpoint without the http scheme (e.g., "search-timdex-env-1234567890.us-east-1.es.amazonaws.com"). Can also be passed directly to the CLI via the `--url` option.
SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring This is not needed for local development.
```

## CLI commands
Expand Down Expand Up @@ -153,6 +141,7 @@ Usage: tim [OPTIONS] COMMAND [ARGS]...
╭─ Bulk record processing commands ───────────────────────────────────────────────────────────────────────────────────────────────────╮
│ bulk-index Bulk index records into an index. │
│ bulk-delete Bulk delete records from an index. │
│ bulk-update Bulk update records from an index. │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```

2 changes: 2 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
volumes:
- opensearch-local-data:/usr/share/opensearch/data
networks:
Expand All @@ -21,6 +22,7 @@ services:
environment:
- "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
- 'OPENSEARCH_HOSTS=["http://opensearch:9200"]'
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
networks:
- opensearch-local-net
volumes:
Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ disallow_untyped_defs = true
exclude = ["tests/"]

[[tool.mypy.overrides]]
module = ["ijson", "smart_open"]
module = ["ijson", "smart_open", "timdex_dataset_api"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand All @@ -27,8 +27,6 @@ select = ["ALL", "PT"]

ignore = [
# default
"ANN101",
"ANN102",
"COM812",
"D107",
"N812",
Expand All @@ -41,6 +39,7 @@ ignore = [
"D102",
"D103",
"D104",
"G004",
"PLR0912",
"PLR0913",
"PLR0915",
Expand Down
97 changes: 97 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import re
from unittest.mock import MagicMock, patch

from freezegun import freeze_time

from tim.cli import main
from tim.errors import BulkIndexingError

from .conftest import EXIT_CODES, my_vcr

Expand Down Expand Up @@ -256,3 +259,97 @@ def test_bulk_delete_with_source_success(caplog, runner):
"from index 'alma-2022-09-01t00-00-00'" in caplog.text
)
assert "Bulk deletion complete!" in caplog.text


@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
@patch("tim.helpers.validate_bulk_cli_options")
@patch("tim.opensearch.bulk_delete")
@patch("tim.opensearch.bulk_index")
def test_bulk_update_with_source_success(
mock_bulk_index,
mock_bulk_delete,
mock_validate_bulk_cli_options,
mock_timdex_dataset,
caplog,
monkeypatch,
runner,
):
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
mock_bulk_index.return_value = {
"created": 1000,
"updated": 0,
"errors": 0,
"total": 1000,
}
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
mock_validate_bulk_cli_options.return_value = "alma"
mock_timdex_dataset.return_value = MagicMock()

result = runner.invoke(
main,
[
"bulk-update",
"--source",
"alma",
"--run-date",
"2024-12-01",
"--run-id",
"abc123",
"s3://test-timdex-bucket/dataset",
],
)
assert result.exit_code == EXIT_CODES["success"]
assert (
"Bulk update complete: "
f'{{"index": {json.dumps(mock_bulk_index())}, '
f'"delete": {json.dumps(mock_bulk_delete())}}}' in caplog.text
)


@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
@patch("tim.helpers.validate_bulk_cli_options")
@patch("tim.opensearch.bulk_delete")
@patch("tim.opensearch.bulk_index")
def test_bulk_update_with_source_raise_bulk_indexing_error(
mock_bulk_index,
mock_bulk_delete,
mock_validate_bulk_cli_options,
mock_timdex_dataset,
caplog,
monkeypatch,
runner,
):
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
mock_bulk_index.side_effect = BulkIndexingError(
record="alma:0", index="index", error="exception"
)
mock_bulk_delete.return_value = {"deleted": 0, "errors": 0, "total": 0}
mock_validate_bulk_cli_options.return_value = "alma"
mock_timdex_dataset.return_value = MagicMock()

index_results_default = {
"created": 0,
"updated": 0,
"errors": 0,
"total": 0,
}

result = runner.invoke(
main,
[
"bulk-update",
"--source",
"alma",
"--run-date",
"2024-12-01",
"--run-id",
"abc123",
"s3://test-timdex-bucket/dataset",
],
)
assert result.exit_code == EXIT_CODES["success"]
assert (
"Bulk update complete: "
f'{{"index": {json.dumps(index_results_default)}, '
f'"delete": {json.dumps(mock_bulk_delete())}}}' in caplog.text
)
65 changes: 64 additions & 1 deletion tim/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# ruff: noqa: TRY003, EM101
import json
import logging
from datetime import timedelta
from time import perf_counter

import rich_click as click
from timdex_dataset_api import TIMDEXDataset

from tim import errors, helpers
from tim import opensearch as tim_os
from tim.config import PRIMARY_ALIAS, VALID_SOURCES, configure_logger, configure_sentry
from tim.errors import BulkIndexingError

logger = logging.getLogger(__name__)

Expand All @@ -23,7 +26,7 @@
},
{
"name": "Bulk record processing commands",
"commands": ["bulk-index", "bulk-delete"],
"commands": ["bulk-index", "bulk-delete", "bulk-update"],
},
]
}
Expand Down Expand Up @@ -252,6 +255,7 @@ def promote(ctx: click.Context, index: str, alias: list[str]) -> None:
# Bulk record processing commands


# NOTE: FEATURE FLAG: 'bulk_index' supports ETL v1
@main.command()
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
@click.option(
Expand Down Expand Up @@ -295,6 +299,7 @@ def bulk_index(ctx: click.Context, index: str, source: str, filepath: str) -> No
)


# NOTE: FEATURE FLAG: 'bulk_delete' supports ETL v1
@main.command()
@click.option("-i", "--index", help="Name of the index to bulk delete records from.")
@click.option(
Expand Down Expand Up @@ -334,3 +339,61 @@ def bulk_delete(ctx: click.Context, index: str, source: str, filepath: str) -> N
results["deleted"],
results["total"],
)


@main.command()
@click.option("-i", "--index", help="Name of the index to bulk index records into.")
@click.option(
"-s",
"--source",
type=click.Choice(VALID_SOURCES),
help="Source whose primary-aliased index to bulk index records into.",
)
@click.option("-d", "--run-date", help="Run date, formatted as YYYY-MM-DD.")
@click.option("-rid", "--run-id", help="Run ID.")
@click.argument("dataset_path", type=click.Path())
@click.pass_context
def bulk_update(
ctx: click.Context,
index: str,
source: str,
run_date: str,
run_id: str,
dataset_path: str,
) -> None:
"""Bulk update records for an index.

Must provide either the name of an existing index in the cluster or a valid source.
If source is provided, it will perform indexing and/or deletion of records for
the primary-aliased index for the source.

The method will read transformed records from a TIMDEXDataset
located at dataset_path using the 'timdex-dataset-api' library. The dataset
is filtered by run date and run ID.

Logs an error and aborts if the provided index doesn't exist in the cluster.
"""
client = ctx.obj["CLIENT"]
index = helpers.validate_bulk_cli_options(index, source, client)

logger.info(f"Bulk updating records from dataset '{dataset_path}' into '{index}'")

index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
delete_results = {"deleted": 0, "errors": 0, "total": 0}

td = TIMDEXDataset(location=dataset_path)
td.load(run_date=run_date, run_id=run_id)

# bulk index records
records_to_index = td.read_transformed_records_iter(action="index")
try:
index_results.update(tim_os.bulk_index(client, index, records_to_index))
except BulkIndexingError as exception:
logger.info(f"Bulk indexing failed: {exception}")

# bulk delete records
records_to_delete = td.read_dicts_iter(columns=["timdex_record_id"], action="delete")
delete_results.update(tim_os.bulk_delete(client, index, records_to_delete))

summary_results = {"index": index_results, "delete": delete_results}
logger.info(f"Bulk update complete: {json.dumps(summary_results)}")
Loading