diff --git a/docs/reference/dsl_how_to_guides.md b/docs/reference/dsl_how_to_guides.md index 6dc03c822..dbe739874 100644 --- a/docs/reference/dsl_how_to_guides.md +++ b/docs/reference/dsl_how_to_guides.md @@ -1159,8 +1159,7 @@ If you want to create a model-like wrapper around your documents, use the `Docum :sync: sync ```python from datetime import datetime -from elasticsearch.dsl import Document, Date, Nested, Boolean, \ - analyzer, InnerDoc, Completion, Keyword, Text +from elasticsearch.dsl import AsyncDocument, Boolean, InnerDoc, Completion, Keyword, Text, analyzer html_strip = analyzer('html_strip', tokenizer="standard", @@ -1169,24 +1168,23 @@ html_strip = analyzer('html_strip', ) class Comment(InnerDoc): - author = Text(fields={'raw': Keyword()}) - content = Text(analyzer='snowball') - created_at = Date() + author: str = Text(fields={'raw': Keyword()}) + content: str = Text(analyzer='snowball') + created_at: datetime def age(self): return datetime.now() - self.created_at class Post(Document): - title = Text() - title_suggest = Completion() - created_at = Date() - published = Boolean() - category = Text( + title: str + title_suggest: str = Completion() + created_at: datetime + published: bool + category: str = Text( analyzer=html_strip, fields={'raw': Keyword()} ) - - comments = Nested(Comment) + comments: Comment class Index: name = 'blog' @@ -1205,8 +1203,7 @@ class Post(Document): :sync: async ```python from datetime import datetime -from elasticsearch.dsl import AsyncDocument, Date, Nested, Boolean, \ - analyzer, InnerDoc, Completion, Keyword, Text +from elasticsearch.dsl import AsyncDocument, Boolean, InnerDoc, Completion, Keyword, Text, analyzer html_strip = analyzer('html_strip', tokenizer="standard", @@ -1215,24 +1212,23 @@ html_strip = analyzer('html_strip', ) class Comment(InnerDoc): - author = Text(fields={'raw': Keyword()}) - content = Text(analyzer='snowball') - created_at = Date() + author: str = Text(fields={'raw': Keyword()}) + content: str = Text(analyzer='snowball') + created_at: datetime def age(self): return datetime.now() - self.created_at class Post(AsyncDocument): - title = Text() - title_suggest = Completion() - created_at = Date() - published = Boolean() - category = Text( + title: str + title_suggest: str = Completion() + created_at: datetime + published: bool + category: str = Text( analyzer=html_strip, fields={'raw': Keyword()} ) - - comments = Nested(Comment) + comments: Comment class Index: name = 'blog' @@ -1251,9 +1247,9 @@ class Post(AsyncDocument): #### Data types [_data_types] -The `Document` instances use native python types such as `str` and `datetime` for its attributes. In case of `Object` or `Nested` fields an instance of the `InnerDoc` subclass is used, as in the `add_comment` method in the above example, where we are creating an instance of the `Comment` class. +The `Document` instances can use native Python types such as `str` and `datetime` for its attributes. In case of `Object` or `Nested` fields an instance of the `InnerDoc` subclass is used, as in the `add_comment` method in the above example, where we are creating an instance of the `Comment` class. -There are some specific types that were created to make working with some field types easier, for example the `Range` object used in any of the [range fields](elasticsearch://reference/elasticsearch/mapping-reference/range.md): +There are also specific type classes that were created to make working with some field types easier, for example the `Range` object used in any of the [range fields](elasticsearch://reference/elasticsearch/mapping-reference/range.md): ::::{tab-set} :group: sync_or_async @@ -1351,7 +1347,7 @@ class Post(AsyncDocument): :::: -::::{note} +:::::{note} When using `Field` subclasses such as `Text`, `Date` and `Boolean` to define attributes, these classes must be given in the right-hand side. ::::{tab-set} @@ -1378,7 +1374,7 @@ class Post(AsyncDocument): :::: Using a `Field` subclass as a Python type hint will result in errors. -:::: +::::: Python types are mapped to their corresponding `Field` types according to the following table: @@ -1668,8 +1664,54 @@ class Post(AsyncDocument): :::: -In that case any `datetime` object passed in (or parsed from elasticsearch) will be treated as if it were in `UTC` timezone. +In that case any `datetime` object passed in (or parsed from Elasticsearch) will be treated as if it were in `UTC` timezone. + + +#### Custom field names + +By default, the `Document` and `AsyncDocument` classes use the names given to the field attributes as the field names in the Elasticsearch index. But sometimes it is necessary for the names of a field in Python and Elasticsearch to be different, such as when the Elasticsearch name is not a valid Python identifier. + +The following example shows how to define the Elasticsearch field `@timestamp`, used with data streams: + +:::{tab-item} Standard Python +:sync: sync +```python +class MyDoc(Document): + timestamp: datetime = mapped_field(es_name='@timestamp') +``` +::: + +:::{tab-item} Async Python +:sync: async +```python +class MyDoc(AsyncDocument): + timestamp: datetime = mapped_field(es_name='@timestamp') +``` +::: + +:::: + +If using `Field` subclasses to define attribute types, the `_es_name` argument can be passed with the desired Elasticsearch name: + +:::{tab-item} Standard Python +:sync: sync +```python +class MyDoc(Document): + timestamp = Date(_es_name='@timestamp') +``` +::: + +:::{tab-item} Async Python +:sync: async +```python +class MyDoc(AsyncDocument): + timestamp = Date(_es_name='@timestamp') +``` +::: + +:::: +The conversion between the Python and Elasticsearch names happens automatically during serialization and deserialization of `Document` and `AsyncDocument` instances. The Elasticsearch field names must be used when sending requests outside of the document classes. Likewise, responses from Elasticsearch that are not deserialized to a document class will reference the Elasticsearch field names. #### Document life cycle [life-cycle] @@ -2341,6 +2383,7 @@ This section of the `Document` definition can contain any information about the * `settings`: dictionary containing any settings for the `Index` object like `number_of_shards`. * `analyzers`: additional list of analyzers that should be defined on an index (see `analysis` for details). * `aliases`: dictionary with any aliases definitions +* `data_stream`: set to `True` to configure a data stream instead of an index. #### Document Inheritance [_document_inheritance] @@ -2550,7 +2593,7 @@ dev_blogs.setting(number_of_shards=1) The DSL module also exposes an option to manage [index templates](docs-content://manage-data/data-store/templates.md) in elasticsearch using the `ComposableIndexTemplate` and `IndexTemplate` classes, which have a similar API to `Index`. ::::{note} -Composable index templates should be always be preferred over the legacy index templates, since the latter are deprecated. +Composable index templates should always be preferred over the legacy index templates. :::: diff --git a/elasticsearch/dsl/_async/document.py b/elasticsearch/dsl/_async/document.py index 9815fb20e..00ce66508 100644 --- a/elasticsearch/dsl/_async/document.py +++ b/elasticsearch/dsl/_async/document.py @@ -66,15 +66,15 @@ def __new__( @classmethod def construct_index( - cls, opts: Dict[str, Any], bases: Tuple[type, ...] + cls, opts: Optional[Dict[str, Any]], bases: Tuple[type, ...] ) -> AsyncIndex: if opts is None: for b in bases: if hasattr(b, "_index"): - return b._index + return cast(AsyncIndex, b._index) # Set None as Index name so it will set _all while making the query - return AsyncIndex(name=None) + return AsyncIndex(name=None) # type: ignore[arg-type] i = AsyncIndex( getattr(opts, "name", "*"), using=getattr(opts, "using", "default") @@ -83,6 +83,8 @@ def construct_index( i.aliases(**getattr(opts, "aliases", {})) for a in getattr(opts, "analyzers", ()): i.analyzer(a) + if getattr(opts, "data_stream", None) is not None: + i.data_stream(True) return i @@ -517,11 +519,9 @@ async def __anext__(self) -> Dict[str, Any]: if validate: # pragma: no cover doc.full_clean() action = doc.to_dict(include_meta=True, skip_empty=skip_empty) - if "_index" not in action: - action["_index"] = i return action - return await async_bulk(es, Generate(actions), **kwargs) + return await async_bulk(es, Generate(actions), index=i, **kwargs) @classmethod async def esql_execute( diff --git a/elasticsearch/dsl/_async/index.py b/elasticsearch/dsl/_async/index.py index 0795d47d2..fe7fc5331 100644 --- a/elasticsearch/dsl/_async/index.py +++ b/elasticsearch/dsl/_async/index.py @@ -104,6 +104,8 @@ def to_dict(self) -> Dict[str, Any]: d["index_patterns"] = [self._index._name] if self.priority is not None: d["priority"] = self.priority + if self._index._data_stream: + d["data_stream"] = {} return d async def save( @@ -187,6 +189,7 @@ def clone( i._doc_types = self._doc_types[:] if self._mapping is not None: i._mapping = self._mapping._clone() + i._data_stream = self._data_stream return i def search(self, using: Optional[AsyncUsingType] = None) -> AsyncSearch: @@ -224,6 +227,10 @@ async def create( Any additional keyword arguments will be passed to ``Elasticsearch.indices.create`` unchanged. """ + if self._data_stream: + return await self._get_connection(using).indices.create_data_stream( + name=self._name, **kwargs + ) return await self._get_connection(using).indices.create( index=self._name, body=self.to_dict(), **kwargs ) @@ -241,13 +248,23 @@ async def save( Sync the index definition with elasticsearch, creating the index if it doesn't exist and updating its settings and mappings if it does. + If the index is marked as a data stream, then a template is created with + the name "{name}-template". + Note some settings and mapping changes cannot be done on an open index (or at all on an existing index) and for those this method will fail with the underlying exception. """ + if self._data_stream: + template = self.as_composable_template(f"{self._name}-template", self._name) + await template.save(using=using) + if not await self.exists(using=using): return await self.create(using=using) + if self._data_stream: + return None # the data stream's index template is already updated + body = self.to_dict() settings = body.pop("settings", {}) analysis = settings.pop("analysis", None) @@ -378,6 +395,10 @@ async def delete( Any additional keyword arguments will be passed to ``Elasticsearch.indices.delete`` unchanged. """ + if self._data_stream: + return await self._get_connection(using).indices.delete_data_stream( + name=self._name, **kwargs + ) return await self._get_connection(using).indices.delete( index=self._name, **kwargs ) diff --git a/elasticsearch/dsl/_sync/document.py b/elasticsearch/dsl/_sync/document.py index 8b62531c9..be135338e 100644 --- a/elasticsearch/dsl/_sync/document.py +++ b/elasticsearch/dsl/_sync/document.py @@ -65,20 +65,24 @@ def __new__( return cast(IndexMeta, new_cls) @classmethod - def construct_index(cls, opts: Dict[str, Any], bases: Tuple[type, ...]) -> Index: + def construct_index( + cls, opts: Optional[Dict[str, Any]], bases: Tuple[type, ...] + ) -> Index: if opts is None: for b in bases: if hasattr(b, "_index"): - return b._index + return cast(Index, b._index) # Set None as Index name so it will set _all while making the query - return Index(name=None) + return Index(name=None) # type: ignore[arg-type] i = Index(getattr(opts, "name", "*"), using=getattr(opts, "using", "default")) i.settings(**getattr(opts, "settings", {})) i.aliases(**getattr(opts, "aliases", {})) for a in getattr(opts, "analyzers", ()): i.analyzer(a) + if getattr(opts, "data_stream", None) is not None: + i.data_stream(True) return i @@ -509,11 +513,9 @@ def __next__(self) -> Dict[str, Any]: if validate: # pragma: no cover doc.full_clean() action = doc.to_dict(include_meta=True, skip_empty=skip_empty) - if "_index" not in action: - action["_index"] = i return action - return bulk(es, Generate(actions), **kwargs) + return bulk(es, Generate(actions), index=i, **kwargs) @classmethod def esql_execute( diff --git a/elasticsearch/dsl/_sync/index.py b/elasticsearch/dsl/_sync/index.py index 9f9f1e0d1..24ed83efa 100644 --- a/elasticsearch/dsl/_sync/index.py +++ b/elasticsearch/dsl/_sync/index.py @@ -100,6 +100,8 @@ def to_dict(self) -> Dict[str, Any]: d["index_patterns"] = [self._index._name] if self.priority is not None: d["priority"] = self.priority + if self._index._data_stream: + d["data_stream"] = {} return d def save(self, using: Optional[UsingType] = None) -> "ObjectApiResponse[Any]": @@ -177,6 +179,7 @@ def clone( i._doc_types = self._doc_types[:] if self._mapping is not None: i._mapping = self._mapping._clone() + i._data_stream = self._data_stream return i def search(self, using: Optional[UsingType] = None) -> Search: @@ -212,6 +215,10 @@ def create( Any additional keyword arguments will be passed to ``Elasticsearch.indices.create`` unchanged. """ + if self._data_stream: + return self._get_connection(using).indices.create_data_stream( + name=self._name, **kwargs + ) return self._get_connection(using).indices.create( index=self._name, body=self.to_dict(), **kwargs ) @@ -229,13 +236,23 @@ def save( Sync the index definition with elasticsearch, creating the index if it doesn't exist and updating its settings and mappings if it does. + If the index is marked as a data stream, then a template is created with + the name "{name}-template". + Note some settings and mapping changes cannot be done on an open index (or at all on an existing index) and for those this method will fail with the underlying exception. """ + if self._data_stream: + template = self.as_composable_template(f"{self._name}-template", self._name) + template.save(using=using) + if not self.exists(using=using): return self.create(using=using) + if self._data_stream: + return None # the data stream's index template is already updated + body = self.to_dict() settings = body.pop("settings", {}) analysis = settings.pop("analysis", None) @@ -356,6 +373,10 @@ def delete( Any additional keyword arguments will be passed to ``Elasticsearch.indices.delete`` unchanged. """ + if self._data_stream: + return self._get_connection(using).indices.delete_data_stream( + name=self._name, **kwargs + ) return self._get_connection(using).indices.delete(index=self._name, **kwargs) def exists(self, using: Optional[UsingType] = None, **kwargs: Any) -> bool: diff --git a/elasticsearch/dsl/document_base.py b/elasticsearch/dsl/document_base.py index 626179747..c34ac9fa6 100644 --- a/elasticsearch/dsl/document_base.py +++ b/elasticsearch/dsl/document_base.py @@ -237,6 +237,9 @@ class MyDocument(Document): """ def __init__(self, name: str, field: Optional[Field]): + if field and hasattr(field, "_es_name") and field._es_name: + parts = name.split(".") + name = ".".join(parts[:-1] + [field._es_name]) super().__init__(name) self._field = field @@ -424,6 +427,7 @@ def __init__(self, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]): } value = field(*field_args, **field_kwargs) + attr_es_name = None if name in attrs: # this field has a right-side value, which can be field # instance on its own or wrapped with mapped_field() @@ -436,6 +440,7 @@ def __init__(self, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]): # skip this field continue attr_value = attrs[name].get("_field") + attr_es_name = attrs[name].get("_es_name") default_value = attrs[name].get("default") or attrs[name].get( "default_factory" ) @@ -450,7 +455,8 @@ def __init__(self, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]): if value is None: raise TypeError(f"Cannot map field {name}") - + if attr_es_name: + value._es_name = attr_es_name self.mapping.field(name, value) if name in attrs: del attrs[name] @@ -529,6 +535,7 @@ class _FieldMetadataDict(dict[str, Any]): def mapped_field( field: Optional[Field] = None, *, + es_name: Optional[str] = None, init: bool = True, default: Any = None, default_factory: Optional[Callable[[], Any]] = None, @@ -543,6 +550,8 @@ def mapped_field( :param field: The instance of ``Field`` to use for this field. If not provided, an instance that is appropriate for the type given to the field is used. + :param es_name: A name to use for this field when serializing to Elasticsearch. + If this is omitted, the attribute name is used. :param init: a value of ``True`` adds this field to the constructor, and a value of ``False`` omits it from it. The default is ``True``. :param default: a default value to use for this field when one is not provided @@ -555,6 +564,7 @@ def mapped_field( """ return _FieldMetadataDict( _field=field, + _es_name=es_name, init=init, default=default, default_factory=default_factory, @@ -589,7 +599,11 @@ class DocumentBase(ObjectBase): def _matches(cls, hit: Dict[str, Any]) -> bool: if cls._index._name is None: return True - return fnmatch(hit.get("_index", ""), cls._index._name) + if cls._index._data_stream: + pattern = f".ds-{cls._index._name}-*" + else: + pattern = cls._index._name + return fnmatch(hit.get("_index", ""), pattern) @classmethod def _default_index(cls, index: Optional[str] = None) -> str: diff --git a/elasticsearch/dsl/field.py b/elasticsearch/dsl/field.py index 636ebcf8e..18989ad3e 100644 --- a/elasticsearch/dsl/field.py +++ b/elasticsearch/dsl/field.py @@ -106,14 +106,23 @@ class Field(DslBase): _coerce = False def __init__( - self, multi: bool = False, required: bool = False, *args: Any, **kwargs: Any + self, + multi: bool = False, + required: bool = False, + _es_name: Optional[str] = None, + *args: Any, + **kwargs: Any, ): """ - :arg bool multi: specifies whether field can contain array of values - :arg bool required: specifies whether field is required + :arg bool multi: specifies whether field can contain array of values + :arg bool required: specifies whether field is required + :arg str _es_name: a name to use for this field when serializing to Elasticsearch. + + If this is omitted, the attribute name is used. + """ self._multi = multi self._required = required + self._es_name = _es_name super().__init__(*args, **kwargs) def __getitem__(self, subfield: str) -> "Field": diff --git a/elasticsearch/dsl/index_base.py b/elasticsearch/dsl/index_base.py index 71ff50339..3705bd86a 100644 --- a/elasticsearch/dsl/index_base.py +++ b/elasticsearch/dsl/index_base.py @@ -42,6 +42,7 @@ def __init__(self, name: str, mapping_class: type, using: AnyUsingType = "defaul self._analysis: Dict[str, Any] = {} self._mapping_class = mapping_class self._mapping: Optional["MappingBase"] = None + self._data_stream: bool = False def resolve_nested( self, field_path: str @@ -158,6 +159,9 @@ def analyzer(self, *args: Any, **kwargs: Any) -> None: # merge the definition merge(self._analysis, d, True) + def data_stream(self, data_stream: bool) -> None: + self._data_stream = data_stream + def to_dict(self) -> Dict[str, Any]: out = {} if self._settings: diff --git a/elasticsearch/dsl/mapping_base.py b/elasticsearch/dsl/mapping_base.py index cb8110fd1..53fc2e386 100644 --- a/elasticsearch/dsl/mapping_base.py +++ b/elasticsearch/dsl/mapping_base.py @@ -56,7 +56,12 @@ def __contains__(self, name: str) -> bool: return name in self.properties def to_dict(self) -> Dict[str, Any]: - return cast(Dict[str, Field], super().to_dict()["properties"]) + props = {} + for pname, field in self.properties.items(): + if hasattr(field, "_es_name") and field._es_name: + pname = field._es_name + props[pname] = field.to_dict() + return {"properties": props} if props else {} def field(self, name: str, *args: Any, **kwargs: Any) -> Self: self.properties[name] = construct_field(*args, **kwargs) diff --git a/elasticsearch/dsl/utils.py b/elasticsearch/dsl/utils.py index 58bcb84a3..1f30f1f6c 100644 --- a/elasticsearch/dsl/utils.py +++ b/elasticsearch/dsl/utils.py @@ -564,6 +564,13 @@ def __get_field(cls, name: str) -> Optional["Field"]: pass return None + @classmethod + def __get_renamed_field(cls, name: str) -> Optional[Tuple[str, "Field"]]: + for k, v, _ in cls.__list_fields(): + if hasattr(v, "_es_name") and v._es_name == name: + return k, v + return None + @classmethod def from_es(cls, hit: Union[Dict[str, Any], "ObjectApiResponse[Any]"]) -> Self: meta = hit.copy() @@ -575,8 +582,14 @@ def from_es(cls, hit: Union[Dict[str, Any], "ObjectApiResponse[Any]"]) -> Self: def _from_dict(self, data: Dict[str, Any]) -> None: for k, v in data.items(): f = self.__get_field(k) + if f is None: + r = self.__get_renamed_field(k) + if r: + k, f = r if f and f._coerce: v = f.deserialize(v) + if hasattr(f, "_es_name") and f._es_name == k: + f = f setattr(self, k, v) def __getstate__(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: # type: ignore[override] @@ -612,6 +625,9 @@ def to_dict(self, skip_empty: bool = True) -> Dict[str, Any]: for k, v in self._d_.items(): # if this is a mapped field, f = self.__get_field(k) + name = k + if f is not None and hasattr(f, "_es_name") and f._es_name: + name = f._es_name if f and f._coerce: v = f.serialize(v, skip_empty=skip_empty) @@ -634,7 +650,7 @@ def to_dict(self, skip_empty: bool = True) -> Dict[str, Any]: except TypeError: pass - out[k] = v + out[name] = v return out def clean_fields(self, validate: bool = True) -> None: diff --git a/examples/dsl/async/data_stream.py b/examples/dsl/async/data_stream.py new file mode 100644 index 000000000..88ab0efac --- /dev/null +++ b/examples/dsl/async/data_stream.py @@ -0,0 +1,86 @@ +import asyncio +import os +from datetime import datetime +from typing import Any, AsyncGenerator, Dict + +from elasticsearch import dsl +from elasticsearch.dsl import types + + +class Log(dsl.AsyncDocument): + timestamp: dsl.M[datetime] = dsl.mapped_field( + dsl.Date(default_timezone="UTC"), es_name="@timestamp" + ) + level: dsl.M[str] = dsl.mapped_field(dsl.Keyword()) + message: dsl.M[str] + + class Index: + name = "logs" + data_stream = True + # pass a desired index lifecycle policy as follows: + # settings = {'index': {'lifecycle': {'name': 'my-ilm'}}} + + +async def main() -> None: + # initiate the default connection to elasticsearch + client = dsl.async_connections.create_connection( + hosts=[os.environ["ELASTICSEARCH_URL"]] + ) + + # delete a previous instance of the data stream if one exists + if await Log._index.exists(): + await Log._index.delete() + + # create the data stream + await Log.init() + + # write an individual entry + message = Log( + timestamp=datetime(2026, 1, 1, 0, 30, 22), + level="error", + message="test error (individual)", + ) + await message.save() + + # write multiple entries using the bulk API + async def get_next_log() -> AsyncGenerator[Dict[str, Any]]: + yield { + "_source": Log( + timestamp=datetime(2026, 1, 1, 10, 30, 45), + level="warning", + message="test warning", + ), + "_op_type": "create", + } + yield { + "_source": Log( + timestamp=datetime(2026, 1, 1, 10, 30, 46), + level="critical", + message="test error (bulk)", + ), + "_op_type": "create", + } + yield { + "_source": Log( + timestamp=datetime(2026, 1, 1, 10, 30, 50), + level="info", + message="test info message", + ), + "_op_type": "create", + } + + await Log.bulk(get_next_log(), refresh=True) + + # search the data stream + match_query = Log.search().query( + dsl.query.Match("message", types.MatchQuery(query="error")) + ) + async for log in match_query: + print(f"{log.meta.id} {log.timestamp.isoformat()} [{log.level}] {log.message}") + + # close the connection + await dsl.async_connections.get_connection().close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/dsl/data_stream.py b/examples/dsl/data_stream.py new file mode 100644 index 000000000..1fa7800c6 --- /dev/null +++ b/examples/dsl/data_stream.py @@ -0,0 +1,83 @@ +import os +from datetime import datetime +from typing import Any, Dict, Generator + +from elasticsearch import dsl +from elasticsearch.dsl import types + + +class Log(dsl.Document): + timestamp: dsl.M[datetime] = dsl.mapped_field( + dsl.Date(default_timezone="UTC"), es_name="@timestamp" + ) + level: dsl.M[str] = dsl.mapped_field(dsl.Keyword()) + message: dsl.M[str] + + class Index: + name = "logs" + data_stream = True + # pass a desired index lifecycle policy as follows: + # settings = {'index': {'lifecycle': {'name': 'my-ilm'}}} + + +def main() -> None: + # initiate the default connection to elasticsearch + client = dsl.connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]]) + + # delete a previous instance of the data stream if one exists + if Log._index.exists(): + Log._index.delete() + + # create the data stream + Log.init() + + # write an individual entry + message = Log( + timestamp=datetime(2026, 1, 1, 0, 30, 22), + level="error", + message="test error (individual)", + ) + message.save() + + # write multiple entries using the bulk API + def get_next_log() -> Generator[Dict[str, Any]]: + yield { + "_source": Log( + timestamp=datetime(2026, 1, 1, 10, 30, 45), + level="warning", + message="test warning", + ), + "_op_type": "create", + } + yield { + "_source": Log( + timestamp=datetime(2026, 1, 1, 10, 30, 46), + level="critical", + message="test error (bulk)", + ), + "_op_type": "create", + } + yield { + "_source": Log( + timestamp=datetime(2026, 1, 1, 10, 30, 50), + level="info", + message="test info message", + ), + "_op_type": "create", + } + + Log.bulk(get_next_log(), refresh=True) + + # search the data stream + match_query = Log.search().query( + dsl.query.Match("message", types.MatchQuery(query="error")) + ) + for log in match_query: + print(f"{log.meta.id} {log.timestamp.isoformat()} [{log.level}] {log.message}") + + # close the connection + dsl.connections.get_connection().close() + + +if __name__ == "__main__": + main() diff --git a/test_elasticsearch/test_dsl/_async/test_document.py b/test_elasticsearch/test_dsl/_async/test_document.py index ff5a36799..f7e5863d2 100644 --- a/test_elasticsearch/test_dsl/_async/test_document.py +++ b/test_elasticsearch/test_dsl/_async/test_document.py @@ -1075,3 +1075,54 @@ class User(AsyncBaseESModel): assert u3.to_doc().name == "Unknown" assert u3.to_doc().address.location.country == "Unknown" assert u3.to_doc().phones == [] + + +def test_renamed_fields() -> None: + class SubDoc(InnerDoc): + name: str + timestamp: datetime = mapped_field(es_name="timestamp_two") + this = field.Integer(_es_name="that_two") + + class Doc(AsyncDocument): + name: str + timestamp: datetime = mapped_field(es_name="@timestamp") + this = field.Integer(_es_name="that") + sub1: SubDoc + sub2: SubDoc = mapped_field(es_name="sub_two") + + # instrumented field names + assert str(Doc.timestamp) == "@timestamp" + assert str(Doc.this) == "that" + assert str(Doc.sub2) == "sub_two" + assert str(Doc.sub1.name) == "sub1.name" + assert str(Doc.sub1.timestamp) == "sub1.timestamp_two" + assert str(Doc.sub1.this) == "sub1.that_two" + assert str(Doc.sub2.name) == "sub_two.name" + assert str(Doc.sub2.timestamp) == "sub_two.timestamp_two" + assert str(Doc.sub2.this) == "sub_two.that_two" + + # document to dict + doc = Doc(name="foo", timestamp=datetime(2026, 1, 1, 0, 0, 0), this=42) + serialized = doc.to_dict() + assert serialized["name"] == "foo" + assert serialized["@timestamp"].isoformat() == "2026-01-01T00:00:00" + assert serialized["that"] == 42 + assert len(serialized.keys()) == 3 + + # dict to document using Python names + doc = Doc.from_es( + {"_source": {"name": "foo", "timestamp": "2026-01-01T00:00:00", "this": 42}} + ) + assert doc.name == "foo" + assert doc.timestamp.isoformat() == "2026-01-01T00:00:00" + assert doc.this == 42 + assert doc.to_dict() == serialized + + # dict to document using ES names + doc = Doc.from_es( + {"_source": {"name": "foo", "@timestamp": "2026-01-01T00:00:00", "that": 42}} + ) + assert doc.name == "foo" + assert doc.timestamp.isoformat() == "2026-01-01T00:00:00" + assert doc.this == 42 + assert doc.to_dict() == serialized diff --git a/test_elasticsearch/test_dsl/_sync/test_document.py b/test_elasticsearch/test_dsl/_sync/test_document.py index 2475d3267..18e28bf80 100644 --- a/test_elasticsearch/test_dsl/_sync/test_document.py +++ b/test_elasticsearch/test_dsl/_sync/test_document.py @@ -1075,3 +1075,54 @@ class User(BaseESModel): assert u3.to_doc().name == "Unknown" assert u3.to_doc().address.location.country == "Unknown" assert u3.to_doc().phones == [] + + +def test_renamed_fields() -> None: + class SubDoc(InnerDoc): + name: str + timestamp: datetime = mapped_field(es_name="timestamp_two") + this = field.Integer(_es_name="that_two") + + class Doc(Document): + name: str + timestamp: datetime = mapped_field(es_name="@timestamp") + this = field.Integer(_es_name="that") + sub1: SubDoc + sub2: SubDoc = mapped_field(es_name="sub_two") + + # instrumented field names + assert str(Doc.timestamp) == "@timestamp" + assert str(Doc.this) == "that" + assert str(Doc.sub2) == "sub_two" + assert str(Doc.sub1.name) == "sub1.name" + assert str(Doc.sub1.timestamp) == "sub1.timestamp_two" + assert str(Doc.sub1.this) == "sub1.that_two" + assert str(Doc.sub2.name) == "sub_two.name" + assert str(Doc.sub2.timestamp) == "sub_two.timestamp_two" + assert str(Doc.sub2.this) == "sub_two.that_two" + + # document to dict + doc = Doc(name="foo", timestamp=datetime(2026, 1, 1, 0, 0, 0), this=42) + serialized = doc.to_dict() + assert serialized["name"] == "foo" + assert serialized["@timestamp"].isoformat() == "2026-01-01T00:00:00" + assert serialized["that"] == 42 + assert len(serialized.keys()) == 3 + + # dict to document using Python names + doc = Doc.from_es( + {"_source": {"name": "foo", "timestamp": "2026-01-01T00:00:00", "this": 42}} + ) + assert doc.name == "foo" + assert doc.timestamp.isoformat() == "2026-01-01T00:00:00" + assert doc.this == 42 + assert doc.to_dict() == serialized + + # dict to document using ES names + doc = Doc.from_es( + {"_source": {"name": "foo", "@timestamp": "2026-01-01T00:00:00", "that": 42}} + ) + assert doc.name == "foo" + assert doc.timestamp.isoformat() == "2026-01-01T00:00:00" + assert doc.this == 42 + assert doc.to_dict() == serialized diff --git a/utils/templates/field.py.tpl b/utils/templates/field.py.tpl index 29c1005c4..472c54b6f 100644 --- a/utils/templates/field.py.tpl +++ b/utils/templates/field.py.tpl @@ -106,14 +106,19 @@ class Field(DslBase): _coerce = False def __init__( - self, multi: bool = False, required: bool = False, *args: Any, **kwargs: Any + self, multi: bool = False, required: bool = False, _es_name: Optional[str] = None, + *args: Any, **kwargs: Any ): """ :arg bool multi: specifies whether field can contain array of values :arg bool required: specifies whether field is required + :arg str _es_name: a name to use for this field when serializing to Elasticsearch. ++ If this is omitted, the attribute name is used. + """ self._multi = multi self._required = required + self._es_name = _es_name super().__init__(*args, **kwargs) def __getitem__(self, subfield: str) -> "Field":