Skip to content

Commit bac7b09

Browse files
DSL data stream support (#3413)
* ability to rename document fields * lint fixes * dict to obj support * lint fixes * lint fixes * use es_name, and also expose this option in mapped_field * support for data stream templates and bulk upload * type hint fixes * save implementation for data streams * doc updates and example app * vale suggestion * minor fixes * handling of more data stream operations
1 parent 00735e0 commit bac7b09

15 files changed

Lines changed: 461 additions & 50 deletions

File tree

docs/reference/dsl_how_to_guides.md

Lines changed: 73 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,8 +1159,7 @@ If you want to create a model-like wrapper around your documents, use the `Docum
11591159
:sync: sync
11601160
```python
11611161
from datetime import datetime
1162-
from elasticsearch.dsl import Document, Date, Nested, Boolean, \
1163-
analyzer, InnerDoc, Completion, Keyword, Text
1162+
from elasticsearch.dsl import AsyncDocument, Boolean, InnerDoc, Completion, Keyword, Text, analyzer
11641163

11651164
html_strip = analyzer('html_strip',
11661165
tokenizer="standard",
@@ -1169,24 +1168,23 @@ html_strip = analyzer('html_strip',
11691168
)
11701169

11711170
class Comment(InnerDoc):
1172-
author = Text(fields={'raw': Keyword()})
1173-
content = Text(analyzer='snowball')
1174-
created_at = Date()
1171+
author: str = Text(fields={'raw': Keyword()})
1172+
content: str = Text(analyzer='snowball')
1173+
created_at: datetime
11751174

11761175
def age(self):
11771176
return datetime.now() - self.created_at
11781177

11791178
class Post(Document):
1180-
title = Text()
1181-
title_suggest = Completion()
1182-
created_at = Date()
1183-
published = Boolean()
1184-
category = Text(
1179+
title: str
1180+
title_suggest: str = Completion()
1181+
created_at: datetime
1182+
published: bool
1183+
category: str = Text(
11851184
analyzer=html_strip,
11861185
fields={'raw': Keyword()}
11871186
)
1188-
1189-
comments = Nested(Comment)
1187+
comments: Comment
11901188

11911189
class Index:
11921190
name = 'blog'
@@ -1205,8 +1203,7 @@ class Post(Document):
12051203
:sync: async
12061204
```python
12071205
from datetime import datetime
1208-
from elasticsearch.dsl import AsyncDocument, Date, Nested, Boolean, \
1209-
analyzer, InnerDoc, Completion, Keyword, Text
1206+
from elasticsearch.dsl import AsyncDocument, Boolean, InnerDoc, Completion, Keyword, Text, analyzer
12101207

12111208
html_strip = analyzer('html_strip',
12121209
tokenizer="standard",
@@ -1215,24 +1212,23 @@ html_strip = analyzer('html_strip',
12151212
)
12161213

12171214
class Comment(InnerDoc):
1218-
author = Text(fields={'raw': Keyword()})
1219-
content = Text(analyzer='snowball')
1220-
created_at = Date()
1215+
author: str = Text(fields={'raw': Keyword()})
1216+
content: str = Text(analyzer='snowball')
1217+
created_at: datetime
12211218

12221219
def age(self):
12231220
return datetime.now() - self.created_at
12241221

12251222
class Post(AsyncDocument):
1226-
title = Text()
1227-
title_suggest = Completion()
1228-
created_at = Date()
1229-
published = Boolean()
1230-
category = Text(
1223+
title: str
1224+
title_suggest: str = Completion()
1225+
created_at: datetime
1226+
published: bool
1227+
category: str = Text(
12311228
analyzer=html_strip,
12321229
fields={'raw': Keyword()}
12331230
)
1234-
1235-
comments = Nested(Comment)
1231+
comments: Comment
12361232

12371233
class Index:
12381234
name = 'blog'
@@ -1251,9 +1247,9 @@ class Post(AsyncDocument):
12511247

12521248
#### Data types [_data_types]
12531249

1254-
The `Document` instances use native python types such as `str` and `datetime` for its attributes. In case of `Object` or `Nested` fields an instance of the `InnerDoc` subclass is used, as in the `add_comment` method in the above example, where we are creating an instance of the `Comment` class.
1250+
The `Document` instances can use native Python types such as `str` and `datetime` for its attributes. In case of `Object` or `Nested` fields an instance of the `InnerDoc` subclass is used, as in the `add_comment` method in the above example, where we are creating an instance of the `Comment` class.
12551251

1256-
There are some specific types that were created to make working with some field types easier, for example the `Range` object used in any of the [range fields](elasticsearch://reference/elasticsearch/mapping-reference/range.md):
1252+
There are also specific type classes that were created to make working with some field types easier, for example the `Range` object used in any of the [range fields](elasticsearch://reference/elasticsearch/mapping-reference/range.md):
12571253

12581254
::::{tab-set}
12591255
:group: sync_or_async
@@ -1351,7 +1347,7 @@ class Post(AsyncDocument):
13511347

13521348
::::
13531349

1354-
::::{note}
1350+
:::::{note}
13551351
When using `Field` subclasses such as `Text`, `Date` and `Boolean` to define attributes, these classes must be given in the right-hand side.
13561352

13571353
::::{tab-set}
@@ -1378,7 +1374,7 @@ class Post(AsyncDocument):
13781374
::::
13791375

13801376
Using a `Field` subclass as a Python type hint will result in errors.
1381-
::::
1377+
:::::
13821378

13831379
Python types are mapped to their corresponding `Field` types according to the following table:
13841380

@@ -1668,8 +1664,54 @@ class Post(AsyncDocument):
16681664

16691665
::::
16701666

1671-
In that case any `datetime` object passed in (or parsed from elasticsearch) will be treated as if it were in `UTC` timezone.
1667+
In that case any `datetime` object passed in (or parsed from Elasticsearch) will be treated as if it were in `UTC` timezone.
1668+
1669+
1670+
#### Custom field names
1671+
1672+
By default, the `Document` and `AsyncDocument` classes use the names given to the field attributes as the field names in the Elasticsearch index. But sometimes it is necessary for the names of a field in Python and Elasticsearch to be different, such as when the Elasticsearch name is not a valid Python identifier.
1673+
1674+
The following example shows how to define the Elasticsearch field `@timestamp`, used with data streams:
1675+
1676+
:::{tab-item} Standard Python
1677+
:sync: sync
1678+
```python
1679+
class MyDoc(Document):
1680+
timestamp: datetime = mapped_field(es_name='@timestamp')
1681+
```
1682+
:::
1683+
1684+
:::{tab-item} Async Python
1685+
:sync: async
1686+
```python
1687+
class MyDoc(AsyncDocument):
1688+
timestamp: datetime = mapped_field(es_name='@timestamp')
1689+
```
1690+
:::
1691+
1692+
::::
1693+
1694+
If using `Field` subclasses to define attribute types, the `_es_name` argument can be passed with the desired Elasticsearch name:
1695+
1696+
:::{tab-item} Standard Python
1697+
:sync: sync
1698+
```python
1699+
class MyDoc(Document):
1700+
timestamp = Date(_es_name='@timestamp')
1701+
```
1702+
:::
1703+
1704+
:::{tab-item} Async Python
1705+
:sync: async
1706+
```python
1707+
class MyDoc(AsyncDocument):
1708+
timestamp = Date(_es_name='@timestamp')
1709+
```
1710+
:::
1711+
1712+
::::
16721713

1714+
The conversion between the Python and Elasticsearch names happens automatically during serialization and deserialization of `Document` and `AsyncDocument` instances. The Elasticsearch field names must be used when sending requests outside of the document classes. Likewise, responses from Elasticsearch that are not deserialized to a document class will reference the Elasticsearch field names.
16731715

16741716
#### Document life cycle [life-cycle]
16751717

@@ -2341,6 +2383,7 @@ This section of the `Document` definition can contain any information about the
23412383
* `settings`: dictionary containing any settings for the `Index` object like `number_of_shards`.
23422384
* `analyzers`: additional list of analyzers that should be defined on an index (see `analysis` for details).
23432385
* `aliases`: dictionary with any aliases definitions
2386+
* `data_stream`: set to `True` to configure a data stream instead of an index.
23442387

23452388

23462389
#### Document Inheritance [_document_inheritance]
@@ -2550,7 +2593,7 @@ dev_blogs.setting(number_of_shards=1)
25502593
The DSL module also exposes an option to manage [index templates](docs-content://manage-data/data-store/templates.md) in elasticsearch using the `ComposableIndexTemplate` and `IndexTemplate` classes, which have a similar API to `Index`.
25512594

25522595
::::{note}
2553-
Composable index templates should be always be preferred over the legacy index templates, since the latter are deprecated.
2596+
Composable index templates should always be preferred over the legacy index templates.
25542597

25552598
::::
25562599

elasticsearch/dsl/_async/document.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ def __new__(
6666

6767
@classmethod
6868
def construct_index(
69-
cls, opts: Dict[str, Any], bases: Tuple[type, ...]
69+
cls, opts: Optional[Dict[str, Any]], bases: Tuple[type, ...]
7070
) -> AsyncIndex:
7171
if opts is None:
7272
for b in bases:
7373
if hasattr(b, "_index"):
74-
return b._index
74+
return cast(AsyncIndex, b._index)
7575

7676
# Set None as Index name so it will set _all while making the query
77-
return AsyncIndex(name=None)
77+
return AsyncIndex(name=None) # type: ignore[arg-type]
7878

7979
i = AsyncIndex(
8080
getattr(opts, "name", "*"), using=getattr(opts, "using", "default")
@@ -83,6 +83,8 @@ def construct_index(
8383
i.aliases(**getattr(opts, "aliases", {}))
8484
for a in getattr(opts, "analyzers", ()):
8585
i.analyzer(a)
86+
if getattr(opts, "data_stream", None) is not None:
87+
i.data_stream(True)
8688
return i
8789

8890

@@ -517,11 +519,9 @@ async def __anext__(self) -> Dict[str, Any]:
517519
if validate: # pragma: no cover
518520
doc.full_clean()
519521
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
520-
if "_index" not in action:
521-
action["_index"] = i
522522
return action
523523

524-
return await async_bulk(es, Generate(actions), **kwargs)
524+
return await async_bulk(es, Generate(actions), index=i, **kwargs)
525525

526526
@classmethod
527527
async def esql_execute(

elasticsearch/dsl/_async/index.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ def to_dict(self) -> Dict[str, Any]:
104104
d["index_patterns"] = [self._index._name]
105105
if self.priority is not None:
106106
d["priority"] = self.priority
107+
if self._index._data_stream:
108+
d["data_stream"] = {}
107109
return d
108110

109111
async def save(
@@ -187,6 +189,7 @@ def clone(
187189
i._doc_types = self._doc_types[:]
188190
if self._mapping is not None:
189191
i._mapping = self._mapping._clone()
192+
i._data_stream = self._data_stream
190193
return i
191194

192195
def search(self, using: Optional[AsyncUsingType] = None) -> AsyncSearch:
@@ -224,6 +227,10 @@ async def create(
224227
Any additional keyword arguments will be passed to
225228
``Elasticsearch.indices.create`` unchanged.
226229
"""
230+
if self._data_stream:
231+
return await self._get_connection(using).indices.create_data_stream(
232+
name=self._name, **kwargs
233+
)
227234
return await self._get_connection(using).indices.create(
228235
index=self._name, body=self.to_dict(), **kwargs
229236
)
@@ -241,13 +248,23 @@ async def save(
241248
Sync the index definition with elasticsearch, creating the index if it
242249
doesn't exist and updating its settings and mappings if it does.
243250
251+
If the index is marked as a data stream, then a template is created with
252+
the name "{name}-template".
253+
244254
Note some settings and mapping changes cannot be done on an open
245255
index (or at all on an existing index) and for those this method will
246256
fail with the underlying exception.
247257
"""
258+
if self._data_stream:
259+
template = self.as_composable_template(f"{self._name}-template", self._name)
260+
await template.save(using=using)
261+
248262
if not await self.exists(using=using):
249263
return await self.create(using=using)
250264

265+
if self._data_stream:
266+
return None # the data stream's index template is already updated
267+
251268
body = self.to_dict()
252269
settings = body.pop("settings", {})
253270
analysis = settings.pop("analysis", None)
@@ -378,6 +395,10 @@ async def delete(
378395
Any additional keyword arguments will be passed to
379396
``Elasticsearch.indices.delete`` unchanged.
380397
"""
398+
if self._data_stream:
399+
return await self._get_connection(using).indices.delete_data_stream(
400+
name=self._name, **kwargs
401+
)
381402
return await self._get_connection(using).indices.delete(
382403
index=self._name, **kwargs
383404
)

elasticsearch/dsl/_sync/document.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,24 @@ def __new__(
6565
return cast(IndexMeta, new_cls)
6666

6767
@classmethod
68-
def construct_index(cls, opts: Dict[str, Any], bases: Tuple[type, ...]) -> Index:
68+
def construct_index(
69+
cls, opts: Optional[Dict[str, Any]], bases: Tuple[type, ...]
70+
) -> Index:
6971
if opts is None:
7072
for b in bases:
7173
if hasattr(b, "_index"):
72-
return b._index
74+
return cast(Index, b._index)
7375

7476
# Set None as Index name so it will set _all while making the query
75-
return Index(name=None)
77+
return Index(name=None) # type: ignore[arg-type]
7678

7779
i = Index(getattr(opts, "name", "*"), using=getattr(opts, "using", "default"))
7880
i.settings(**getattr(opts, "settings", {}))
7981
i.aliases(**getattr(opts, "aliases", {}))
8082
for a in getattr(opts, "analyzers", ()):
8183
i.analyzer(a)
84+
if getattr(opts, "data_stream", None) is not None:
85+
i.data_stream(True)
8286
return i
8387

8488

@@ -509,11 +513,9 @@ def __next__(self) -> Dict[str, Any]:
509513
if validate: # pragma: no cover
510514
doc.full_clean()
511515
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
512-
if "_index" not in action:
513-
action["_index"] = i
514516
return action
515517

516-
return bulk(es, Generate(actions), **kwargs)
518+
return bulk(es, Generate(actions), index=i, **kwargs)
517519

518520
@classmethod
519521
def esql_execute(

elasticsearch/dsl/_sync/index.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ def to_dict(self) -> Dict[str, Any]:
100100
d["index_patterns"] = [self._index._name]
101101
if self.priority is not None:
102102
d["priority"] = self.priority
103+
if self._index._data_stream:
104+
d["data_stream"] = {}
103105
return d
104106

105107
def save(self, using: Optional[UsingType] = None) -> "ObjectApiResponse[Any]":
@@ -177,6 +179,7 @@ def clone(
177179
i._doc_types = self._doc_types[:]
178180
if self._mapping is not None:
179181
i._mapping = self._mapping._clone()
182+
i._data_stream = self._data_stream
180183
return i
181184

182185
def search(self, using: Optional[UsingType] = None) -> Search:
@@ -212,6 +215,10 @@ def create(
212215
Any additional keyword arguments will be passed to
213216
``Elasticsearch.indices.create`` unchanged.
214217
"""
218+
if self._data_stream:
219+
return self._get_connection(using).indices.create_data_stream(
220+
name=self._name, **kwargs
221+
)
215222
return self._get_connection(using).indices.create(
216223
index=self._name, body=self.to_dict(), **kwargs
217224
)
@@ -229,13 +236,23 @@ def save(
229236
Sync the index definition with elasticsearch, creating the index if it
230237
doesn't exist and updating its settings and mappings if it does.
231238
239+
If the index is marked as a data stream, then a template is created with
240+
the name "{name}-template".
241+
232242
Note some settings and mapping changes cannot be done on an open
233243
index (or at all on an existing index) and for those this method will
234244
fail with the underlying exception.
235245
"""
246+
if self._data_stream:
247+
template = self.as_composable_template(f"{self._name}-template", self._name)
248+
template.save(using=using)
249+
236250
if not self.exists(using=using):
237251
return self.create(using=using)
238252

253+
if self._data_stream:
254+
return None # the data stream's index template is already updated
255+
239256
body = self.to_dict()
240257
settings = body.pop("settings", {})
241258
analysis = settings.pop("analysis", None)
@@ -356,6 +373,10 @@ def delete(
356373
Any additional keyword arguments will be passed to
357374
``Elasticsearch.indices.delete`` unchanged.
358375
"""
376+
if self._data_stream:
377+
return self._get_connection(using).indices.delete_data_stream(
378+
name=self._name, **kwargs
379+
)
359380
return self._get_connection(using).indices.delete(index=self._name, **kwargs)
360381

361382
def exists(self, using: Optional[UsingType] = None, **kwargs: Any) -> bool:

0 commit comments

Comments
 (0)