Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 73 additions & 30 deletions docs/reference/dsl_how_to_guides.md
Original file line number Diff line number Diff line change
Expand Up @@ -1159,8 +1159,7 @@
:sync: sync
```python
from datetime import datetime
from elasticsearch.dsl import Document, Date, Nested, Boolean, \
analyzer, InnerDoc, Completion, Keyword, Text
from elasticsearch.dsl import AsyncDocument, Boolean, InnerDoc, Completion, Keyword, Text, analyzer

html_strip = analyzer('html_strip',
tokenizer="standard",
Expand All @@ -1169,24 +1168,23 @@
)

class Comment(InnerDoc):
author = Text(fields={'raw': Keyword()})
content = Text(analyzer='snowball')
created_at = Date()
author: str = Text(fields={'raw': Keyword()})
content: str = Text(analyzer='snowball')
created_at: datetime

def age(self):
return datetime.now() - self.created_at

class Post(Document):
title = Text()
title_suggest = Completion()
created_at = Date()
published = Boolean()
category = Text(
title: str
title_suggest: str = Completion()
created_at: datetime
published: bool
category: str = Text(
analyzer=html_strip,
fields={'raw': Keyword()}
)

comments = Nested(Comment)
comments: Comment

class Index:
name = 'blog'
Expand All @@ -1205,8 +1203,7 @@
:sync: async
```python
from datetime import datetime
from elasticsearch.dsl import AsyncDocument, Date, Nested, Boolean, \
analyzer, InnerDoc, Completion, Keyword, Text
from elasticsearch.dsl import AsyncDocument, Boolean, InnerDoc, Completion, Keyword, Text, analyzer

html_strip = analyzer('html_strip',
tokenizer="standard",
Expand All @@ -1215,24 +1212,23 @@
)

class Comment(InnerDoc):
author = Text(fields={'raw': Keyword()})
content = Text(analyzer='snowball')
created_at = Date()
author: str = Text(fields={'raw': Keyword()})
content: str = Text(analyzer='snowball')
created_at: datetime

def age(self):
return datetime.now() - self.created_at

class Post(AsyncDocument):
title = Text()
title_suggest = Completion()
created_at = Date()
published = Boolean()
category = Text(
title: str
title_suggest: str = Completion()
created_at: datetime
published: bool
category: str = Text(
analyzer=html_strip,
fields={'raw': Keyword()}
)

comments = Nested(Comment)
comments: Comment

class Index:
name = 'blog'
Expand All @@ -1251,9 +1247,9 @@

#### Data types [_data_types]

The `Document` instances use native python types such as `str` and `datetime` for its attributes. In case of `Object` or `Nested` fields an instance of the `InnerDoc` subclass is used, as in the `add_comment` method in the above example, where we are creating an instance of the `Comment` class.
The `Document` instances can use native Python types such as `str` and `datetime` for its attributes. In case of `Object` or `Nested` fields an instance of the `InnerDoc` subclass is used, as in the `add_comment` method in the above example, where we are creating an instance of the `Comment` class.

There are some specific types that were created to make working with some field types easier, for example the `Range` object used in any of the [range fields](elasticsearch://reference/elasticsearch/mapping-reference/range.md):
There are also specific type classes that were created to make working with some field types easier, for example the `Range` object used in any of the [range fields](elasticsearch://reference/elasticsearch/mapping-reference/range.md):

::::{tab-set}
:group: sync_or_async
Expand Down Expand Up @@ -1351,8 +1347,8 @@

::::

::::{note}
:::::{note}
When using `Field` subclasses such as `Text`, `Date` and `Boolean` to define attributes, these classes must be given in the right-hand side.

Check warning on line 1351 in docs/reference/dsl_how_to_guides.md

View workflow job for this annotation

GitHub Actions / build / vale

Elastic.DirectionalLanguage: Don't use directional language. Use 'the label of the element' instead of 'in the right'.

::::{tab-set}
:group: sync_or_async
Expand All @@ -1378,7 +1374,7 @@
::::

Using a `Field` subclass as a Python type hint will result in errors.
::::
:::::

Python types are mapped to their corresponding `Field` types according to the following table:

Expand Down Expand Up @@ -1668,8 +1664,54 @@

::::

In that case any `datetime` object passed in (or parsed from elasticsearch) will be treated as if it were in `UTC` timezone.
In that case any `datetime` object passed in (or parsed from Elasticsearch) will be treated as if it were in `UTC` timezone.


#### Custom field names

By default, the `Document` and `AsyncDocument` classes use the names given to the field attributes as the field names in the Elasticsearch index. But sometimes it is necessary for the names of a field in Python and Elasticsearch to be different, such as when the Elasticsearch name is not a valid Python identifier.

The following example shows how to define the Elasticsearch field `@timestamp`, used with data streams:

:::{tab-item} Standard Python
:sync: sync
```python
class MyDoc(Document):
timestamp: datetime = mapped_field(es_name='@timestamp')
```
:::

:::{tab-item} Async Python
:sync: async
```python
class MyDoc(AsyncDocument):
timestamp: datetime = mapped_field(es_name='@timestamp')
```
:::

::::

If using `Field` subclasses to define attribute types, the `_es_name` argument can be passed with the desired Elasticsearch name:

:::{tab-item} Standard Python
:sync: sync
```python
class MyDoc(Document):
timestamp = Date(_es_name='@timestamp')
```
:::

:::{tab-item} Async Python
:sync: async
```python
class MyDoc(AsyncDocument):
timestamp = Date(_es_name='@timestamp')
```
:::

::::

The conversion between the Python and Elasticsearch names happens automatically during serialization and deserialization of `Document` and `AsyncDocument` instances. The Elasticsearch field names must be used when sending requests outside of the document classes. Likewise, responses from Elasticsearch that are not deserialized to a document class will reference the Elasticsearch field names.

#### Document life cycle [life-cycle]

Expand Down Expand Up @@ -2341,6 +2383,7 @@
* `settings`: dictionary containing any settings for the `Index` object like `number_of_shards`.
* `analyzers`: additional list of analyzers that should be defined on an index (see `analysis` for details).
* `aliases`: dictionary with any aliases definitions
* `data_stream`: set to `True` to configure a data stream instead of an index.


#### Document Inheritance [_document_inheritance]
Expand Down Expand Up @@ -2550,7 +2593,7 @@
The DSL module also exposes an option to manage [index templates](docs-content://manage-data/data-store/templates.md) in elasticsearch using the `ComposableIndexTemplate` and `IndexTemplate` classes, which have a similar API to `Index`.

::::{note}
Composable index templates should be always be preferred over the legacy index templates, since the latter are deprecated.
Composable index templates should always be preferred over the legacy index templates.

::::

Expand Down
12 changes: 6 additions & 6 deletions elasticsearch/dsl/_async/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ def __new__(

@classmethod
def construct_index(
cls, opts: Dict[str, Any], bases: Tuple[type, ...]
cls, opts: Optional[Dict[str, Any]], bases: Tuple[type, ...]
) -> AsyncIndex:
if opts is None:
for b in bases:
if hasattr(b, "_index"):
return b._index
return cast(AsyncIndex, b._index)

# Set None as Index name so it will set _all while making the query
return AsyncIndex(name=None)
return AsyncIndex(name=None) # type: ignore[arg-type]

i = AsyncIndex(
getattr(opts, "name", "*"), using=getattr(opts, "using", "default")
Expand All @@ -83,6 +83,8 @@ def construct_index(
i.aliases(**getattr(opts, "aliases", {}))
for a in getattr(opts, "analyzers", ()):
i.analyzer(a)
if getattr(opts, "data_stream", None) is not None:
i.data_stream(True)
return i


Expand Down Expand Up @@ -517,11 +519,9 @@ async def __anext__(self) -> Dict[str, Any]:
if validate: # pragma: no cover
doc.full_clean()
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
if "_index" not in action:
action["_index"] = i
return action

return await async_bulk(es, Generate(actions), **kwargs)
return await async_bulk(es, Generate(actions), index=i, **kwargs)

@classmethod
async def esql_execute(
Expand Down
21 changes: 21 additions & 0 deletions elasticsearch/dsl/_async/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def to_dict(self) -> Dict[str, Any]:
d["index_patterns"] = [self._index._name]
if self.priority is not None:
d["priority"] = self.priority
if self._index._data_stream:
d["data_stream"] = {}
return d

async def save(
Expand Down Expand Up @@ -187,6 +189,7 @@ def clone(
i._doc_types = self._doc_types[:]
if self._mapping is not None:
i._mapping = self._mapping._clone()
i._data_stream = self._data_stream
return i

def search(self, using: Optional[AsyncUsingType] = None) -> AsyncSearch:
Expand Down Expand Up @@ -224,6 +227,10 @@ async def create(
Any additional keyword arguments will be passed to
``Elasticsearch.indices.create`` unchanged.
"""
if self._data_stream:
return await self._get_connection(using).indices.create_data_stream(
name=self._name, **kwargs
)
return await self._get_connection(using).indices.create(
index=self._name, body=self.to_dict(), **kwargs
)
Expand All @@ -241,13 +248,23 @@ async def save(
Sync the index definition with elasticsearch, creating the index if it
doesn't exist and updating its settings and mappings if it does.

If the index is marked as a data stream, then a template is created with
the name "{name}-template".

Note some settings and mapping changes cannot be done on an open
index (or at all on an existing index) and for those this method will
fail with the underlying exception.
"""
if self._data_stream:
template = self.as_composable_template(f"{self._name}-template", self._name)
await template.save(using=using)

if not await self.exists(using=using):
return await self.create(using=using)

if self._data_stream:
return None # the data stream's index template is already updated

body = self.to_dict()
settings = body.pop("settings", {})
analysis = settings.pop("analysis", None)
Expand Down Expand Up @@ -378,6 +395,10 @@ async def delete(
Any additional keyword arguments will be passed to
``Elasticsearch.indices.delete`` unchanged.
"""
if self._data_stream:
return await self._get_connection(using).indices.delete_data_stream(
name=self._name, **kwargs
)
return await self._get_connection(using).indices.delete(
index=self._name, **kwargs
)
Expand Down
14 changes: 8 additions & 6 deletions elasticsearch/dsl/_sync/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,24 @@ def __new__(
return cast(IndexMeta, new_cls)

@classmethod
def construct_index(cls, opts: Dict[str, Any], bases: Tuple[type, ...]) -> Index:
def construct_index(
cls, opts: Optional[Dict[str, Any]], bases: Tuple[type, ...]
) -> Index:
if opts is None:
for b in bases:
if hasattr(b, "_index"):
return b._index
return cast(Index, b._index)

# Set None as Index name so it will set _all while making the query
return Index(name=None)
return Index(name=None) # type: ignore[arg-type]

i = Index(getattr(opts, "name", "*"), using=getattr(opts, "using", "default"))
i.settings(**getattr(opts, "settings", {}))
i.aliases(**getattr(opts, "aliases", {}))
for a in getattr(opts, "analyzers", ()):
i.analyzer(a)
if getattr(opts, "data_stream", None) is not None:
i.data_stream(True)
return i


Expand Down Expand Up @@ -509,11 +513,9 @@ def __next__(self) -> Dict[str, Any]:
if validate: # pragma: no cover
doc.full_clean()
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
if "_index" not in action:
action["_index"] = i
return action

return bulk(es, Generate(actions), **kwargs)
return bulk(es, Generate(actions), index=i, **kwargs)

@classmethod
def esql_execute(
Expand Down
21 changes: 21 additions & 0 deletions elasticsearch/dsl/_sync/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def to_dict(self) -> Dict[str, Any]:
d["index_patterns"] = [self._index._name]
if self.priority is not None:
d["priority"] = self.priority
if self._index._data_stream:
d["data_stream"] = {}
return d

def save(self, using: Optional[UsingType] = None) -> "ObjectApiResponse[Any]":
Expand Down Expand Up @@ -177,6 +179,7 @@ def clone(
i._doc_types = self._doc_types[:]
if self._mapping is not None:
i._mapping = self._mapping._clone()
i._data_stream = self._data_stream
return i

def search(self, using: Optional[UsingType] = None) -> Search:
Expand Down Expand Up @@ -212,6 +215,10 @@ def create(
Any additional keyword arguments will be passed to
``Elasticsearch.indices.create`` unchanged.
"""
if self._data_stream:
return self._get_connection(using).indices.create_data_stream(
name=self._name, **kwargs
)
return self._get_connection(using).indices.create(
index=self._name, body=self.to_dict(), **kwargs
)
Expand All @@ -229,13 +236,23 @@ def save(
Sync the index definition with elasticsearch, creating the index if it
doesn't exist and updating its settings and mappings if it does.

If the index is marked as a data stream, then a template is created with
the name "{name}-template".

Note some settings and mapping changes cannot be done on an open
index (or at all on an existing index) and for those this method will
fail with the underlying exception.
"""
if self._data_stream:
template = self.as_composable_template(f"{self._name}-template", self._name)
template.save(using=using)

if not self.exists(using=using):
return self.create(using=using)

if self._data_stream:
return None # the data stream's index template is already updated

body = self.to_dict()
settings = body.pop("settings", {})
analysis = settings.pop("analysis", None)
Expand Down Expand Up @@ -356,6 +373,10 @@ def delete(
Any additional keyword arguments will be passed to
``Elasticsearch.indices.delete`` unchanged.
"""
if self._data_stream:
return self._get_connection(using).indices.delete_data_stream(
name=self._name, **kwargs
)
return self._get_connection(using).indices.delete(index=self._name, **kwargs)

def exists(self, using: Optional[UsingType] = None, **kwargs: Any) -> bool:
Expand Down
Loading
Loading