Skip to content

feat: Add JoinField and update indexation command to select on which … #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
62 changes: 42 additions & 20 deletions django_opensearch_dsl/documents.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import io
import sys
import time
Expand All @@ -6,7 +7,7 @@
from typing import Iterable, Optional

from django.db import models
from django.db.models import Q, QuerySet
from django.db.models import Max, Min, Q, QuerySet
from opensearchpy.helpers import bulk, parallel_bulk
from opensearchpy.helpers.document import Document as DSLDocument

Expand Down Expand Up @@ -63,9 +64,11 @@ def search(cls, using=None, index=None):
using=cls._get_using(using), index=cls._default_index(index), doc_type=[cls], model=cls.django.model
)

def get_queryset(self, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None) -> QuerySet:
def get_queryset(
self, db_alias: str = None, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None
) -> QuerySet:
"""Return the queryset that should be indexed by this doc type."""
qs = self.django.model.objects.all()
qs = self.django.model.objects.using(db_alias).all()

if filter_:
qs = qs.filter(filter_)
Expand All @@ -88,38 +91,57 @@ def _eta(self, start, done, total): # pragma: no cover

def get_indexing_queryset(
self,
db_alias: str = None,
verbose: bool = False,
filter_: Optional[Q] = None,
exclude: Optional[Q] = None,
count: int = None,
action: OpensearchAction = OpensearchAction.INDEX,
stdout: io.FileIO = sys.stdout,
batch_size: int = None,
batch_type: str = "offset",
) -> Iterable:
"""Divide the queryset into chunks."""
chunk_size = self.django.queryset_pagination
qs = self.get_queryset(filter_=filter_, exclude=exclude, count=count)
qs = qs.order_by("pk") if not qs.query.is_sliced else qs
chunk_size = batch_size or self.django.queryset_pagination
qs = self.get_queryset(db_alias=db_alias, filter_=filter_, exclude=exclude, count=count)
qs = qs.order_by("pk")
count = qs.count()
model = self.django.model.__name__
action = action.present_participle.title()

i = 0
done = 0
start = time.time()
done = 0
if verbose:
stdout.write(f"{action} {model}: 0% ({self._eta(start, done, count)})\r")
while done < count:
if verbose:
stdout.write(f"{action} {model}: {round(i / count * 100)}% ({self._eta(start, done, count)})\r")

for obj in qs[i : i + chunk_size]:
done += 1
yield obj

i = min(i + chunk_size, count)

if verbose:
stdout.write(f"{action} {count} {model}: OK \n")
if batch_type == "pk_filters":
pks = qs.aggregate(min=Min("pk"), max=Max("pk"))
total_batches = (pks["max"] - pks["min"]) // chunk_size
for batch_number, offset in enumerate(range(pks["min"], pks["max"] + 1, chunk_size), start=1):
batch_qs = list(copy.deepcopy(qs.filter(pk__gte=offset, pk__lt=offset + chunk_size)))
stdout.write(f"Processing batch {batch_number}/{total_batches}: \n")
for obj in batch_qs:
done += 1
if done % chunk_size == 0:
stdout.write(
f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r"
)
yield obj
if len(batch_qs) > 0:
stdout.write(f"Max primary key in the current batch: {batch_qs[-1].pk}\n")
else:
total_batches = (count + chunk_size - 1) // chunk_size
for batch_number, offset in enumerate(range(0, count, chunk_size), start=1):
batch_qs = list(copy.deepcopy(qs[offset : offset + chunk_size].all()))
stdout.write(f"Processing batch {batch_number}/{total_batches}: \n")
for obj in batch_qs:
done += 1
if done % chunk_size == 0:
stdout.write(
f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r"
)
yield obj
if len(batch_qs) > 0:
stdout.write(f"Max primary key in the current batch: {batch_qs[-1].pk}\n")

def init_prepare(self):
"""Initialise the data model preparers once here.
Expand Down
4 changes: 4 additions & 0 deletions django_opensearch_dsl/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ class IpField(DODField, fields.Ip):
"""Allow indexing of IPv4 and IPv6 addresses."""


class JoinField(DODField, fields.Join):
"""Allow indexing of Join fields (with parent/child relation)."""


class LongField(DODField, fields.Long):
"""Allow indexing of long.

Expand Down
220 changes: 62 additions & 158 deletions django_opensearch_dsl/management/commands/opensearch.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import argparse
import functools
import operator
import os
import sys
from argparse import ArgumentParser
from collections import defaultdict
from typing import Any, Callable

import opensearchpy
from django.conf import settings
from django.core.exceptions import FieldError
from django.core.management import BaseCommand
from django.core.management.base import OutputWrapper
from django.db.models import Q

from django_opensearch_dsl.registries import registry

from ...utils import manage_document, manage_index
from ..enums import OpensearchAction
from ..types import parse

Expand Down Expand Up @@ -66,163 +63,48 @@ def __list_index(self, **options): # noqa pragma: no cover

def _manage_index(self, action, indices, force, verbosity, ignore_error, **options): # noqa
"""Manage the creation and deletion of indices."""
action = OpensearchAction(action)
known = registry.get_indices()

# Filter indices
if indices:
# Ensure every given indices exists
known_name = [i._name for i in known] # noqa
unknown = set(indices) - set(known_name)
if unknown:
self.stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'")
exit(1)

# Only keep given indices
indices = list(filter(lambda i: i._name in indices, known)) # noqa
else:
indices = known

# Display expected action
if verbosity or not force:
self.stdout.write(f"The following indices will be {action.past}:")
for index in indices:
self.stdout.write(f"\t- {index._name}.") # noqa
self.stdout.write("")

# Ask for confirmation to continue
if not force: # pragma: no cover
while True:
p = input("Continue ? [y]es [n]o : ")
if p.lower() in ["yes", "y"]:
self.stdout.write("")
break
elif p.lower() in ["no", "n"]:
exit(1)

pp = action.present_participle.title()
for index in indices:
if verbosity:
self.stdout.write(
f"{pp} index '{index._name}'...\r",
ending="",
) # noqa
self.stdout.flush()
try:
if action == OpensearchAction.CREATE:
index.create()
elif action == OpensearchAction.DELETE:
index.delete()
elif action == OpensearchAction.UPDATE:
index.put_mapping(body=index.to_dict()["mappings"])
else:
try:
index.delete()
except opensearchpy.exceptions.NotFoundError:
pass
index.create()
except opensearchpy.exceptions.TransportError as e:
if verbosity or not ignore_error:
error = self.style.ERROR(f"Error: {e.error} - {e.info}")
self.stderr.write(f"{pp} index '{index._name}'...\n{error}") # noqa
if not ignore_error:
self.stderr.write("exiting...")
exit(1)
else:
if verbosity:
self.stdout.write(f"{pp} index '{index._name}'... {self.style.SUCCESS('OK')}") # noqa
manage_index(
action, indices, force, ignore_error, verbosity, stderr=self.stderr, stdout=self.stdout, style=self.style
)

def _manage_document(
self, action, indices, force, filters, excludes, verbosity, parallel, count, refresh, missing, **options
self,
action,
indices,
objects,
force,
filters,
excludes,
verbosity,
parallel,
count,
refresh,
missing,
database,
batch_size,
batch_type,
**options,
): # noqa
"""Manage the creation and deletion of indices."""
action = OpensearchAction(action)
known = registry.get_indices()
filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None
exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None

# Filter indices
if indices:
# Ensure every given indices exists
known_name = [i._name for i in known] # noqa
unknown = set(indices) - set(known_name)
if unknown:
self.stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'")
exit(1)

# Only keep given indices
indices = list(filter(lambda i: i._name in indices, known)) # noqa
else:
indices = known

# Ensure every indices needed are created
not_created = [i._name for i in indices if not i.exists()] # noqa
if not_created:
self.stderr.write(f"The following indices are not created : {not_created}")
self.stderr.write("Use 'python3 manage.py opensearch list' to list indices' state.")
exit(1)

# Check field, preparing to display expected actions
s = f"The following documents will be {action.past}:"
kwargs_list = []
for index in indices:
# Handle --missing
exclude_ = exclude
if missing and action == OpensearchAction.INDEX:
q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()])
exclude_ = exclude_ & q if exclude_ is not None else q

document = index._doc_types[0]() # noqa
try:
kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count})
qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count()
except FieldError as e:
model = index._doc_types[0].django.model.__name__ # noqa
self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa
exit(1)
else:
s += f"\n\t- {qs} {document.django.model.__name__}."

# Display expected actions
if verbosity or not force:
self.stdout.write(s + "\n\n")

# Ask for confirmation to continue
if not force: # pragma: no cover
while True:
p = input("Continue ? [y]es [n]o : ")
if p.lower() in ["yes", "y"]:
self.stdout.write("\n")
break
elif p.lower() in ["no", "n"]:
exit(1)

result = "\n"
for index, kwargs in zip(indices, kwargs_list):
document = index._doc_types[0]() # noqa
qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs)
success, errors = document.update(
qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False
)

success_str = self.style.SUCCESS(success) if success else success
errors_str = self.style.ERROR(len(errors)) if errors else len(errors)
model = document.django.model.__name__

if verbosity == 1:
result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n"
reasons = defaultdict(int)
for e in errors: # Count occurrence of each error
error = e.get(action, {"result": "unknown error"}).get("result", "unknown error")
reasons[error] += 1
for reasons, total in reasons.items():
result += f" - {reasons} : {total}\n"

if verbosity > 1:
result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n"

if verbosity:
self.stdout.write(result + "\n")
manage_document(
action=action,
indices=indices,
objects=objects,
filters=filters,
excludes=excludes,
force=force,
parallel=parallel,
count=count,
refresh=refresh,
missing=missing,
database=database,
batch_size=batch_size,
batch_type=batch_type,
verbosity=verbosity,
stderr=self.stderr,
stdout=self.stdout,
style=self.style,
)

def add_arguments(self, parser):
"""Add arguments to parser."""
Expand All @@ -237,7 +119,7 @@ def add_arguments(self, parser):
)
subparser.set_defaults(func=self.__list_index)

# 'manage' subcommand
# 'index' subcommand
subparser = subparsers.add_parser(
"index",
help="Manage the creation an deletion of indices.",
Expand Down Expand Up @@ -288,6 +170,13 @@ def add_arguments(self, parser):
OpensearchAction.UPDATE.value,
],
)
subparser.add_argument(
"-d",
"--database",
type=str,
default=None,
help="Nominates a database to use as source.",
)
subparser.add_argument(
"-f",
"--filters",
Expand Down Expand Up @@ -321,6 +210,7 @@ def add_arguments(self, parser):
subparser.add_argument(
"-i", "--indices", type=str, nargs="*", help="Only update documents on the given indices."
)
subparser.add_argument("-o", "--objects", type=str, nargs="*", help="Only update selected objects.")
subparser.add_argument(
"-c", "--count", type=int, default=None, help="Update at most COUNT objects (0 to index everything)."
)
Expand All @@ -345,6 +235,20 @@ def add_arguments(self, parser):
default=False,
help="When used with 'index' action, only index documents not indexed yet.",
)
subparser.add_argument(
"-b",
"--batch-size",
type=int,
default=None,
help="Specify the batch size for processing documents.",
)
subparser.add_argument(
"-t",
"--batch-type",
type=str,
default="offset",
help="Specify the batch type for processing documents (pk_filters | offset).",
)

self.usage = parser.format_usage()

Expand Down
Loading