Skip to content

Commit 5364218

Browse files
handling of more data stream operations
1 parent 561b50c commit 5364218

4 files changed

Lines changed: 26 additions & 4 deletions

File tree

elasticsearch/dsl/_async/index.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ async def create(
227227
Any additional keyword arguments will be passed to
228228
``Elasticsearch.indices.create`` unchanged.
229229
"""
230+
if self._data_stream:
231+
return await self._get_connection(using).indices.create_data_stream(
232+
name=self._name, **kwargs
233+
)
230234
return await self._get_connection(using).indices.create(
231235
index=self._name, body=self.to_dict(), **kwargs
232236
)
@@ -253,11 +257,14 @@ async def save(
253257
"""
254258
if self._data_stream:
255259
template = self.as_composable_template(f"{self._name}-template", self._name)
256-
return await template.save(using=using)
260+
await template.save(using=using)
257261

258262
if not await self.exists(using=using):
259263
return await self.create(using=using)
260264

265+
if self._data_stream:
266+
return None # the data stream's index template is already updated
267+
261268
body = self.to_dict()
262269
settings = body.pop("settings", {})
263270
analysis = settings.pop("analysis", None)
@@ -388,6 +395,10 @@ async def delete(
388395
Any additional keyword arguments will be passed to
389396
``Elasticsearch.indices.delete`` unchanged.
390397
"""
398+
if self._data_stream:
399+
return await self._get_connection(using).indices.delete_data_stream(
400+
name=self._name, **kwargs
401+
)
391402
return await self._get_connection(using).indices.delete(
392403
index=self._name, **kwargs
393404
)

elasticsearch/dsl/_sync/index.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ def create(
215215
Any additional keyword arguments will be passed to
216216
``Elasticsearch.indices.create`` unchanged.
217217
"""
218+
if self._data_stream:
219+
return self._get_connection(using).indices.create_data_stream(
220+
name=self._name, **kwargs
221+
)
218222
return self._get_connection(using).indices.create(
219223
index=self._name, body=self.to_dict(), **kwargs
220224
)
@@ -241,11 +245,14 @@ def save(
241245
"""
242246
if self._data_stream:
243247
template = self.as_composable_template(f"{self._name}-template", self._name)
244-
return template.save(using=using)
248+
template.save(using=using)
245249

246250
if not self.exists(using=using):
247251
return self.create(using=using)
248252

253+
if self._data_stream:
254+
return None # the data stream's index template is already updated
255+
249256
body = self.to_dict()
250257
settings = body.pop("settings", {})
251258
analysis = settings.pop("analysis", None)
@@ -366,6 +373,10 @@ def delete(
366373
Any additional keyword arguments will be passed to
367374
``Elasticsearch.indices.delete`` unchanged.
368375
"""
376+
if self._data_stream:
377+
return self._get_connection(using).indices.delete_data_stream(
378+
name=self._name, **kwargs
379+
)
369380
return self._get_connection(using).indices.delete(index=self._name, **kwargs)
370381

371382
def exists(self, using: Optional[UsingType] = None, **kwargs: Any) -> bool:

examples/dsl/async/data_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async def main() -> None:
2929

3030
# delete a previous instance of the data stream if one exists
3131
if await Log._index.exists():
32-
await client.indices.delete_data_stream(name=Log._index._name)
32+
await Log._index.delete()
3333

3434
# create the data stream
3535
await Log.init()

examples/dsl/data_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def main() -> None:
2626

2727
# delete a previous instance of the data stream if one exists
2828
if Log._index.exists():
29-
client.indices.delete_data_stream(name=Log._index._name)
29+
Log._index.delete()
3030

3131
# create the data stream
3232
Log.init()

0 commit comments

Comments
 (0)