diff --git a/django_opensearch_dsl/documents.py b/django_opensearch_dsl/documents.py index 3a6f4f4..bd62fc5 100644 --- a/django_opensearch_dsl/documents.py +++ b/django_opensearch_dsl/documents.py @@ -1,3 +1,4 @@ +import copy import io import sys import time @@ -6,7 +7,7 @@ from typing import Iterable, Optional from django.db import models -from django.db.models import Q, QuerySet +from django.db.models import Max, Min, Q, QuerySet from opensearchpy.helpers import bulk, parallel_bulk from opensearchpy.helpers.document import Document as DSLDocument @@ -63,9 +64,11 @@ def search(cls, using=None, index=None): using=cls._get_using(using), index=cls._default_index(index), doc_type=[cls], model=cls.django.model ) - def get_queryset(self, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None) -> QuerySet: + def get_queryset( + self, db_alias: str = None, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None + ) -> QuerySet: """Return the queryset that should be indexed by this doc type.""" - qs = self.django.model.objects.all() + qs = self.django.model.objects.using(db_alias).all() if filter_: qs = qs.filter(filter_) @@ -88,38 +91,57 @@ def _eta(self, start, done, total): # pragma: no cover def get_indexing_queryset( self, + db_alias: str = None, verbose: bool = False, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None, action: OpensearchAction = OpensearchAction.INDEX, stdout: io.FileIO = sys.stdout, + batch_size: int = None, + batch_type: str = "offset", ) -> Iterable: """Divide the queryset into chunks.""" - chunk_size = self.django.queryset_pagination - qs = self.get_queryset(filter_=filter_, exclude=exclude, count=count) - qs = qs.order_by("pk") if not qs.query.is_sliced else qs + chunk_size = batch_size or self.django.queryset_pagination + qs = self.get_queryset(db_alias=db_alias, filter_=filter_, exclude=exclude, count=count) + qs = qs.order_by("pk") count = qs.count() model = self.django.model.__name__ action = action.present_participle.title() - - i = 0 - done = 0 start = time.time() + done = 0 if verbose: stdout.write(f"{action} {model}: 0% ({self._eta(start, done, count)})\r") - while done < count: - if verbose: - stdout.write(f"{action} {model}: {round(i / count * 100)}% ({self._eta(start, done, count)})\r") - - for obj in qs[i : i + chunk_size]: - done += 1 - yield obj - i = min(i + chunk_size, count) - - if verbose: - stdout.write(f"{action} {count} {model}: OK \n") + if batch_type == "pk_filters": + pks = qs.aggregate(min=Min("pk"), max=Max("pk")) + total_batches = (pks["max"] - pks["min"]) // chunk_size + for batch_number, offset in enumerate(range(pks["min"], pks["max"] + 1, chunk_size), start=1): + batch_qs = list(copy.deepcopy(qs.filter(pk__gte=offset, pk__lt=offset + chunk_size))) + stdout.write(f"Processing batch {batch_number}/{total_batches}: \n") + for obj in batch_qs: + done += 1 + if done % chunk_size == 0: + stdout.write( + f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r" + ) + yield obj + if len(batch_qs) > 0: + stdout.write(f"Max primary key in the current batch: {batch_qs[-1].pk}\n") + else: + total_batches = (count + chunk_size - 1) // chunk_size + for batch_number, offset in enumerate(range(0, count, chunk_size), start=1): + batch_qs = list(copy.deepcopy(qs[offset : offset + chunk_size].all())) + stdout.write(f"Processing batch {batch_number}/{total_batches}: \n") + for obj in batch_qs: + done += 1 + if done % chunk_size == 0: + stdout.write( + f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r" + ) + yield obj + if len(batch_qs) > 0: + stdout.write(f"Max primary key in the current batch: {batch_qs[-1].pk}\n") def init_prepare(self): """Initialise the data model preparers once here. diff --git a/django_opensearch_dsl/fields.py b/django_opensearch_dsl/fields.py index 41b98bc..cb77f7e 100644 --- a/django_opensearch_dsl/fields.py +++ b/django_opensearch_dsl/fields.py @@ -218,6 +218,10 @@ class IpField(DODField, fields.Ip): """Allow indexing of IPv4 and IPv6 addresses.""" +class JoinField(DODField, fields.Join): + """Allow indexing of Join fields (with parent/child relation).""" + + class LongField(DODField, fields.Long): """Allow indexing of long. diff --git a/django_opensearch_dsl/management/commands/opensearch.py b/django_opensearch_dsl/management/commands/opensearch.py index 59e912b..d77a68f 100644 --- a/django_opensearch_dsl/management/commands/opensearch.py +++ b/django_opensearch_dsl/management/commands/opensearch.py @@ -1,21 +1,18 @@ import argparse import functools import operator -import os import sys from argparse import ArgumentParser from collections import defaultdict from typing import Any, Callable -import opensearchpy -from django.conf import settings from django.core.exceptions import FieldError from django.core.management import BaseCommand -from django.core.management.base import OutputWrapper from django.db.models import Q from django_opensearch_dsl.registries import registry +from ...utils import manage_document, manage_index from ..enums import OpensearchAction from ..types import parse @@ -66,163 +63,48 @@ def __list_index(self, **options): # noqa pragma: no cover def _manage_index(self, action, indices, force, verbosity, ignore_error, **options): # noqa """Manage the creation and deletion of indices.""" - action = OpensearchAction(action) - known = registry.get_indices() - - # Filter indices - if indices: - # Ensure every given indices exists - known_name = [i._name for i in known] # noqa - unknown = set(indices) - set(known_name) - if unknown: - self.stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") - exit(1) - - # Only keep given indices - indices = list(filter(lambda i: i._name in indices, known)) # noqa - else: - indices = known - - # Display expected action - if verbosity or not force: - self.stdout.write(f"The following indices will be {action.past}:") - for index in indices: - self.stdout.write(f"\t- {index._name}.") # noqa - self.stdout.write("") - - # Ask for confirmation to continue - if not force: # pragma: no cover - while True: - p = input("Continue ? [y]es [n]o : ") - if p.lower() in ["yes", "y"]: - self.stdout.write("") - break - elif p.lower() in ["no", "n"]: - exit(1) - - pp = action.present_participle.title() - for index in indices: - if verbosity: - self.stdout.write( - f"{pp} index '{index._name}'...\r", - ending="", - ) # noqa - self.stdout.flush() - try: - if action == OpensearchAction.CREATE: - index.create() - elif action == OpensearchAction.DELETE: - index.delete() - elif action == OpensearchAction.UPDATE: - index.put_mapping(body=index.to_dict()["mappings"]) - else: - try: - index.delete() - except opensearchpy.exceptions.NotFoundError: - pass - index.create() - except opensearchpy.exceptions.TransportError as e: - if verbosity or not ignore_error: - error = self.style.ERROR(f"Error: {e.error} - {e.info}") - self.stderr.write(f"{pp} index '{index._name}'...\n{error}") # noqa - if not ignore_error: - self.stderr.write("exiting...") - exit(1) - else: - if verbosity: - self.stdout.write(f"{pp} index '{index._name}'... {self.style.SUCCESS('OK')}") # noqa + manage_index( + action, indices, force, ignore_error, verbosity, stderr=self.stderr, stdout=self.stdout, style=self.style + ) def _manage_document( - self, action, indices, force, filters, excludes, verbosity, parallel, count, refresh, missing, **options + self, + action, + indices, + objects, + force, + filters, + excludes, + verbosity, + parallel, + count, + refresh, + missing, + database, + batch_size, + batch_type, + **options, ): # noqa """Manage the creation and deletion of indices.""" - action = OpensearchAction(action) - known = registry.get_indices() - filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None - exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None - - # Filter indices - if indices: - # Ensure every given indices exists - known_name = [i._name for i in known] # noqa - unknown = set(indices) - set(known_name) - if unknown: - self.stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") - exit(1) - - # Only keep given indices - indices = list(filter(lambda i: i._name in indices, known)) # noqa - else: - indices = known - - # Ensure every indices needed are created - not_created = [i._name for i in indices if not i.exists()] # noqa - if not_created: - self.stderr.write(f"The following indices are not created : {not_created}") - self.stderr.write("Use 'python3 manage.py opensearch list' to list indices' state.") - exit(1) - - # Check field, preparing to display expected actions - s = f"The following documents will be {action.past}:" - kwargs_list = [] - for index in indices: - # Handle --missing - exclude_ = exclude - if missing and action == OpensearchAction.INDEX: - q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()]) - exclude_ = exclude_ & q if exclude_ is not None else q - - document = index._doc_types[0]() # noqa - try: - kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count}) - qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count() - except FieldError as e: - model = index._doc_types[0].django.model.__name__ # noqa - self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa - exit(1) - else: - s += f"\n\t- {qs} {document.django.model.__name__}." - - # Display expected actions - if verbosity or not force: - self.stdout.write(s + "\n\n") - - # Ask for confirmation to continue - if not force: # pragma: no cover - while True: - p = input("Continue ? [y]es [n]o : ") - if p.lower() in ["yes", "y"]: - self.stdout.write("\n") - break - elif p.lower() in ["no", "n"]: - exit(1) - - result = "\n" - for index, kwargs in zip(indices, kwargs_list): - document = index._doc_types[0]() # noqa - qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs) - success, errors = document.update( - qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False - ) - - success_str = self.style.SUCCESS(success) if success else success - errors_str = self.style.ERROR(len(errors)) if errors else len(errors) - model = document.django.model.__name__ - - if verbosity == 1: - result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" - reasons = defaultdict(int) - for e in errors: # Count occurrence of each error - error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") - reasons[error] += 1 - for reasons, total in reasons.items(): - result += f" - {reasons} : {total}\n" - - if verbosity > 1: - result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" - - if verbosity: - self.stdout.write(result + "\n") + manage_document( + action=action, + indices=indices, + objects=objects, + filters=filters, + excludes=excludes, + force=force, + parallel=parallel, + count=count, + refresh=refresh, + missing=missing, + database=database, + batch_size=batch_size, + batch_type=batch_type, + verbosity=verbosity, + stderr=self.stderr, + stdout=self.stdout, + style=self.style, + ) def add_arguments(self, parser): """Add arguments to parser.""" @@ -237,7 +119,7 @@ def add_arguments(self, parser): ) subparser.set_defaults(func=self.__list_index) - # 'manage' subcommand + # 'index' subcommand subparser = subparsers.add_parser( "index", help="Manage the creation an deletion of indices.", @@ -288,6 +170,13 @@ def add_arguments(self, parser): OpensearchAction.UPDATE.value, ], ) + subparser.add_argument( + "-d", + "--database", + type=str, + default=None, + help="Nominates a database to use as source.", + ) subparser.add_argument( "-f", "--filters", @@ -321,6 +210,7 @@ def add_arguments(self, parser): subparser.add_argument( "-i", "--indices", type=str, nargs="*", help="Only update documents on the given indices." ) + subparser.add_argument("-o", "--objects", type=str, nargs="*", help="Only update selected objects.") subparser.add_argument( "-c", "--count", type=int, default=None, help="Update at most COUNT objects (0 to index everything)." ) @@ -345,6 +235,20 @@ def add_arguments(self, parser): default=False, help="When used with 'index' action, only index documents not indexed yet.", ) + subparser.add_argument( + "-b", + "--batch-size", + type=int, + default=None, + help="Specify the batch size for processing documents.", + ) + subparser.add_argument( + "-t", + "--batch-type", + type=str, + default="offset", + help="Specify the batch type for processing documents (pk_filters | offset).", + ) self.usage = parser.format_usage() diff --git a/django_opensearch_dsl/registries.py b/django_opensearch_dsl/registries.py index ca3678b..f61a023 100644 --- a/django_opensearch_dsl/registries.py +++ b/django_opensearch_dsl/registries.py @@ -50,6 +50,7 @@ def register_document(self, document): "ignore_signals": getattr(django_meta, "ignore_signals", False), "auto_refresh": getattr(django_meta, "auto_refresh", DODConfig.auto_refresh_enabled()), "related_models": getattr(django_meta, "related_models", []), + "order_indexing_queryset": getattr(django_meta, "order_indexing_queryset", True), } ) if not django_attr.model: # pragma: no cover @@ -180,5 +181,9 @@ def __contains__(self, obj): f"'in <{type(self).__name__}>' requires a Model subclass as left operand, not {type(dict).__name__}" ) + def get_indices_raw(self): + """Get all indices as they are store in the registry or the indices for a list of models.""" + return self._indices + registry = DocumentRegistry() diff --git a/django_opensearch_dsl/utils/__init__.py b/django_opensearch_dsl/utils/__init__.py new file mode 100644 index 0000000..a52973d --- /dev/null +++ b/django_opensearch_dsl/utils/__init__.py @@ -0,0 +1,2 @@ +from .documen_management import manage_document +from .index_management import manage_index diff --git a/django_opensearch_dsl/utils/documen_management.py b/django_opensearch_dsl/utils/documen_management.py new file mode 100644 index 0000000..78f00e7 --- /dev/null +++ b/django_opensearch_dsl/utils/documen_management.py @@ -0,0 +1,205 @@ +import functools +import operator +import sys +from collections import defaultdict +from typing import Any, List, Tuple + +from django.core.exceptions import FieldError +from django.core.management.base import OutputWrapper +from django.core.management.color import color_style +from django.db.models import Q + +from django_opensearch_dsl.management.enums import OpensearchAction +from django_opensearch_dsl.registries import registry as default_registry + + +def manage_document( + action, + filters: List[Tuple[str, Any]] = None, + excludes: List[Tuple[str, Any]] = None, + indices: List[str] = None, + objects: List[str] = None, + parallel: bool = False, + refresh: bool = False, + missing: bool = False, + force: bool = False, + database: str = None, + batch_type: str = "offset", + batch_size: int = None, + count: int = None, + verbosity: int = 1, + stderr=OutputWrapper(sys.stderr), + stdout=OutputWrapper(sys.stdout), + style=color_style(), + registry=default_registry, + using: str = None, +): # noqa + """Manage the creation and deletion of indices.""" + choices = [OpensearchAction.INDEX.value, OpensearchAction.DELETE.value, OpensearchAction.UPDATE.value] + if action not in choices: + raise ValueError(f"Invalid action '{action}'. Valid actions are: {', '.join(choices)}") + + action = OpensearchAction(action) + known = registry.get_indices() + filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None + exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None + + # Filter existing objects + valid_models = [] + registered_models = [m.__name__.lower() for m in registry.get_models()] + if objects: + for model in objects: + if model.lower() in registered_models: + valid_models.append(model) + else: + stderr.write(f"Unknown object '{model}', choices are: '{registered_models}'") + exit(1) + + # Filter indices + if indices: + # Ensure every given indices exists + known_name = [i._name for i in known] # noqa + unknown = set(indices) - set(known_name) + if unknown: + stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") + exit(1) + + # Only keep given indices + indices = list(filter(lambda i: i._name in indices, known)) # noqa + else: + indices = known + + # Ensure every indices needed are created + not_created = [i._name for i in indices if not i.exists(using=using)] # noqa + if not_created: + stderr.write(f"The following indices are not created : {not_created}") + stderr.write("Use 'python3 manage.py opensearch list' to list indices' state.") + exit(1) + + # Check field, preparing to display expected actions + s = f"The following documents will be {action.past}:" + kwargs_list = [] + + if objects: + django_models = [m for m in registry.get_models() if m.__name__.lower() in valid_models] + all_os_models = [] + selected_os_models = [] + indices_raw = registry.get_indices_raw() + + for k, v in indices_raw.items(): + for model in list(v): + all_os_models.append(model) + + for os_model in all_os_models: + if os_model.django.model in django_models and os_model._index._name in list(i._name for i in indices): + selected_os_models.append(os_model) + + # Handle --missing + exclude_ = exclude + for model in selected_os_models: + try: + kwargs_list.append({"db_alias": database, "filter_": filter_, "exclude": exclude_, "count": count}) + qs = model().get_queryset(filter_=filter_, exclude=exclude_, count=count, db_alias=database).count() + except FieldError as e: + stderr.write(f"Error while filtering on '{model.django.model.__name__}':\n{e}'") # noqa + exit(1) + else: + s += f"\n\t- {qs} {model.django.model.__name__}." + else: + for index in indices: + # Handle --missing + exclude_ = exclude + if missing and action == OpensearchAction.INDEX: + q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()]) + exclude_ = exclude_ & q if exclude_ is not None else q + + document = index._doc_types[0]() # noqa + try: + kwargs_list.append({"db_alias": database, "filter_": filter_, "exclude": exclude_, "count": count}) + qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count, db_alias=database).count() + except FieldError as e: + model = index._doc_types[0].django.model.__name__ # noqa + stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa + exit(1) + else: + s += f"\n\t- {qs} {document.django.model.__name__}." + + # Display expected actions + if verbosity or not force: + stdout.write(s + "\n\n") + + # Ask for confirmation to continue + if not force: # pragma: no cover + while True: + p = input("Continue ? [y]es [n]o : ") + if p.lower() in ["yes", "y"]: + stdout.write("\n") + break + elif p.lower() in ["no", "n"]: + exit(1) + + result = "\n" + if objects: + for model, kwargs in zip(selected_os_models, kwargs_list): + document = model() # noqa + qs = document.get_indexing_queryset( + stdout=stdout._out, + verbose=verbosity, + action=action, + batch_size=batch_size, + batch_type=batch_type, + **kwargs, + ) + success, errors = document.update( + qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False, using=using + ) + + success_str = style.SUCCESS(success) if success else success + errors_str = style.ERROR(len(errors)) if errors else len(errors) + model = document.django.model.__name__ + + if verbosity == 1: + result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" + reasons = defaultdict(int) + for e in errors: # Count occurrence of each error + error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") + reasons[error] += 1 + for reasons, total in reasons.items(): + result += f" - {reasons} : {total}\n" + + if verbosity > 1: + result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" + + else: + for index, kwargs in zip(indices, kwargs_list): + document = index._doc_types[0]() # noqa + qs = document.get_indexing_queryset( + stdout=stdout._out, + verbose=verbosity, + action=action, + batch_size=batch_size, + batch_type=batch_type, + **kwargs, + ) + success, errors = document.update( + qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False, using=using + ) + + success_str = style.SUCCESS(success) if success else success + errors_str = style.ERROR(len(errors)) if errors else len(errors) + model = document.django.model.__name__ + + if verbosity == 1: + result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" + reasons = defaultdict(int) + for e in errors: # Count occurrence of each error + error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") + reasons[error] += 1 + for reasons, total in reasons.items(): + result += f" - {reasons} : {total}\n" + + if verbosity > 1: + result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" + + if verbosity: + stdout.write(result + "\n") diff --git a/django_opensearch_dsl/utils/index_management.py b/django_opensearch_dsl/utils/index_management.py new file mode 100644 index 0000000..900f776 --- /dev/null +++ b/django_opensearch_dsl/utils/index_management.py @@ -0,0 +1,105 @@ +import sys +from typing import List + +import opensearchpy +from django.core.management.base import OutputWrapper +from django.core.management.color import color_style + +from django_opensearch_dsl.management.enums import OpensearchAction +from django_opensearch_dsl.registries import registry as default_registry + + +def manage_index( + action, + indices: List[str] = None, + force: bool = False, + ignore_error: bool = False, + verbosity: int = 1, + stderr=OutputWrapper(sys.stderr), + stdout=OutputWrapper(sys.stdout), + style=color_style(), + registry=default_registry, + using: str = None, +): # noqa + """Manage the creation and deletion of indices.""" + choices = [ + OpensearchAction.CREATE.value, + OpensearchAction.DELETE.value, + OpensearchAction.REBUILD.value, + OpensearchAction.UPDATE.value, + ] + if action not in choices: + raise ValueError(f"Invalid action '{action}'. Valid actions are: {', '.join(choices)}") + + action = OpensearchAction(action) + known = registry.get_indices() + + # Filter indices + if indices: + # Ensure every given indices exists + known_name = [i._name for i in known] # noqa + unknown = set(indices) - set(known_name) + if unknown: + stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") + exit(1) + + # Only keep given indices + indices = list(filter(lambda i: i._name in indices, known)) # noqa + else: + indices = known + + # Display expected action + if verbosity or not force: + stdout.write(f"The following indices will be {action.past}:") + for index in indices: + stdout.write(f"\t- {index._name}.") # noqa + stdout.write("") + + # Ask for confirmation to continue + if not force: # pragma: no cover + while True: + p = input("Continue ? [y]es [n]o : ") + if p.lower() in ["yes", "y"]: + stdout.write("") + break + elif p.lower() in ["no", "n"]: + exit(1) + + pp = action.present_participle.title() + for index in indices: + if verbosity: + stdout.write( + f"{pp} index '{index._name}'...\r", + ending="", + ) # noqa + stdout.flush() + try: + # If current index depends on many different models, add them to + # index._doc_types before indexing to make sure all mappings of different models + # are taken into account. + index_models = registry.get_indices_raw().get(index, None) + for model in list(index_models): + index._doc_types.append(model) + + if action == OpensearchAction.CREATE: + index.create(using=using) + elif action == OpensearchAction.DELETE: + index.delete(using=using) + elif action == OpensearchAction.UPDATE: + index.put_mapping(body=index.to_dict()["mappings"], using=using) + else: + try: + index.delete(using=using) + except opensearchpy.exceptions.NotFoundError: + pass + index.create(using=using) + except opensearchpy.exceptions.TransportError as e: + if verbosity or not ignore_error: + error = style.ERROR(f"Error: {e.error} - {e.info}") + stderr.write(f"{pp} index '{index._name}'...\n{error}") # noqa + if not ignore_error: + stderr.write("exiting...") + exit(1) + else: + if verbosity: + stdout.write(f"{pp} index '{index._name}'... {style.SUCCESS('OK')}") # noqa