Skip to content

Incompatibility with data streams #3398

@timo-obrecht

Description

@timo-obrecht

Current class definition of an index is incompatible with data streams. It does not allow to initialize the data stream from the class, nor to access the usual methods and utils (init(), save(), bulk(), etc)

Current way of handling data streams :

from elasticsearch import Elasticsearch
from elasticsearch.dsl import Date, Document, Float
from testcontainers.elasticsearch import ElasticSearchContainer

PASSWORD = "test-password-123"
ES_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:9.3.3"


class Foo(Document):
    data = Float()
    timestamp = Date(name="@timestamp")  # Python attr -> ES field

    class Index:
        name = "foo"


with (
    ElasticSearchContainer(ES_IMAGE, mem_limit="2G")
    .with_env("discovery.type", "single-node")
    .with_env("ELASTIC_PASSWORD", PASSWORD) as es
):
    url = f"http://{es.get_container_host_ip()}:{es.get_exposed_port(9200)}"

    with Elasticsearch(
        url,
        basic_auth=("elastic", PASSWORD),
        verify_certs=False,
        request_timeout=120,
    ) as client:
        client.cluster.health(wait_for_status="yellow", timeout="30s")

        print("1) DSL init() does not support data streams")
        try:
            Foo.init(using=client)
            print("UNEXPECTED: Foo.init() succeeded")
        except Exception as exc:
            print(f"Expected failure: {exc!r}")

        print("\n2) Create the data stream manually")
        client.indices.put_index_template(
            name="foo-template",
            index_patterns=[Foo.Index.name],
            data_stream={},
            template={
                "mappings": {
                    "properties": {
                        "@timestamp": {"type": "date"},
                        "data": {"type": "float"},
                    }
                }
            },
        )
        client.indices.create_data_stream(name=Foo.Index.name)

        doc = Foo(timestamp="2024-01-01T00:00:00+00:00", data=0.01)
        payload = doc.to_dict()

        print("\n3) BUG: DSL serializes `timestamp`, not `@timestamp`")
        print(payload)

        print("\n4) save() fails because the payload is not valid for a data stream")
        try:
            doc.save(using=client)
            print("UNEXPECTED: save() succeeded")
        except Exception as exc:
            print(f"Expected failure: {exc!r}")

        payload["@timestamp"] = payload.pop("timestamp")
        payload["_op_type"] = "create"

        print("\n5) Manual fix works: rename field and use op_type=create")
        Foo.bulk(using=client, actions=[payload])
        print("Bulk indexing succeeded")

I guess simply allowing DataStream in the class definition and making sure .to_dict() correctly set '@timestamp' as the key would allow a cleaner syntax like so :

class Foo(Document):
    data = Float()
    timestamp = Date(name="@timestamp")  # Python attr -> ES field

    class DataStream:
        name = "foo"

with (
    ElasticSearchContainer(ES_IMAGE)
    .with_env("discovery.type", "single-node")
    .with_env("ELASTIC_PASSWORD", PASSWORD) as es
):
    url = f"http://{es.get_container_host_ip()}:{es.get_exposed_port(9200)}"

    with Elasticsearch(
        url,
        basic_auth=("elastic", PASSWORD),
        verify_certs=False,
    ) as client:
        client.cluster.health(wait_for_status="yellow", timeout="30s")

        Foo.init(using=client)
        
        doc = Foo(timestamp="2024-01-01T00:00:00+00:00", data=0.01)
        doc.save(using=client)

Metadata

Metadata

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions