|
6 | 6 | from typing import Iterable, Optional
|
7 | 7 |
|
8 | 8 | from django.db import models
|
9 |
| -from django.db.models import Q, QuerySet |
| 9 | +from django.db.models import Max, Min, Q, QuerySet |
10 | 10 | from opensearchpy.helpers import bulk, parallel_bulk
|
11 | 11 | from opensearchpy.helpers.document import Document as DSLDocument
|
12 | 12 |
|
@@ -101,29 +101,38 @@ def get_indexing_queryset(
|
101 | 101 | """Divide the queryset into chunks."""
|
102 | 102 | chunk_size = self.django.queryset_pagination
|
103 | 103 | qs = self.get_queryset(db_alias=db_alias, filter_=filter_, exclude=exclude, count=count)
|
104 |
| - qs = qs.order_by("pk") if not qs.query.is_sliced else qs |
105 | 104 | count = qs.count()
|
106 | 105 | model = self.django.model.__name__
|
107 | 106 | action = action.present_participle.title()
|
108 | 107 |
|
109 |
| - i = 0 |
| 108 | + if self.django.order_indexing_queryset and not qs.query.is_ordered: |
| 109 | + qs = qs.order_by("pk") |
| 110 | + |
| 111 | + # In order to avoid loading big querysets into memory or |
| 112 | + # loading them in temporary tables in the database, |
| 113 | + # we have the possibility to divide the queryset using batch_size. |
| 114 | + result = qs.aggregate(min_pk=Min("pk"), max_pk=Max("pk")) |
| 115 | + min_value = result["min_pk"] |
| 116 | + max_value = result["max_pk"] + 1 |
| 117 | + |
110 | 118 | done = 0
|
| 119 | + current_batch = 0 |
| 120 | + total_batches = (max_value - min_value + chunk_size - 1) // chunk_size |
111 | 121 | start = time.time()
|
112 | 122 | if verbose:
|
113 | 123 | stdout.write(f"{action} {model}: 0% ({self._eta(start, done, count)})\r")
|
114 |
| - while done < count: |
115 |
| - if verbose: |
116 |
| - stdout.write(f"{action} {model}: {round(i / count * 100)}% ({self._eta(start, done, count)})\r") |
117 | 124 |
|
118 |
| - for obj in qs[i : i + chunk_size]: |
| 125 | + for pk_offset in range(min_value, max_value, chunk_size): |
| 126 | + current_batch += 1 |
| 127 | + max_pk = min(pk_offset + self.django.queryset_pagination, max_value) |
| 128 | + batch_qs = qs.filter(pk__gte=pk_offset, pk__lt=max_pk) |
| 129 | + stdout.write(f"Processing batch {current_batch}/{total_batches} with pk from {pk_offset} to {max_pk - 1}\n") |
| 130 | + for obj in batch_qs: |
119 | 131 | done += 1
|
| 132 | + if done % chunk_size == 0: |
| 133 | + stdout.write(f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r") |
120 | 134 | yield obj
|
121 | 135 |
|
122 |
| - i = min(i + chunk_size, count) |
123 |
| - |
124 |
| - if verbose: |
125 |
| - stdout.write(f"{action} {count} {model}: OK \n") |
126 |
| - |
127 | 136 | def init_prepare(self):
|
128 | 137 | """Initialise the data model preparers once here.
|
129 | 138 |
|
|
0 commit comments