Current class definition of an index is incompatible with data streams. It does not allow to initialize the data stream from the class, nor to access the usual methods and utils (init(), save(), bulk(), etc)
Current way of handling data streams :
from elasticsearch import Elasticsearch
from elasticsearch.dsl import Date, Document, Float
from testcontainers.elasticsearch import ElasticSearchContainer
PASSWORD = "test-password-123"
ES_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:9.3.3"
class Foo(Document):
data = Float()
timestamp = Date(name="@timestamp") # Python attr -> ES field
class Index:
name = "foo"
with (
ElasticSearchContainer(ES_IMAGE, mem_limit="2G")
.with_env("discovery.type", "single-node")
.with_env("ELASTIC_PASSWORD", PASSWORD) as es
):
url = f"http://{es.get_container_host_ip()}:{es.get_exposed_port(9200)}"
with Elasticsearch(
url,
basic_auth=("elastic", PASSWORD),
verify_certs=False,
request_timeout=120,
) as client:
client.cluster.health(wait_for_status="yellow", timeout="30s")
print("1) DSL init() does not support data streams")
try:
Foo.init(using=client)
print("UNEXPECTED: Foo.init() succeeded")
except Exception as exc:
print(f"Expected failure: {exc!r}")
print("\n2) Create the data stream manually")
client.indices.put_index_template(
name="foo-template",
index_patterns=[Foo.Index.name],
data_stream={},
template={
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"data": {"type": "float"},
}
}
},
)
client.indices.create_data_stream(name=Foo.Index.name)
doc = Foo(timestamp="2024-01-01T00:00:00+00:00", data=0.01)
payload = doc.to_dict()
print("\n3) BUG: DSL serializes `timestamp`, not `@timestamp`")
print(payload)
print("\n4) save() fails because the payload is not valid for a data stream")
try:
doc.save(using=client)
print("UNEXPECTED: save() succeeded")
except Exception as exc:
print(f"Expected failure: {exc!r}")
payload["@timestamp"] = payload.pop("timestamp")
payload["_op_type"] = "create"
print("\n5) Manual fix works: rename field and use op_type=create")
Foo.bulk(using=client, actions=[payload])
print("Bulk indexing succeeded")
I guess simply allowing DataStream in the class definition and making sure .to_dict() correctly set '@timestamp' as the key would allow a cleaner syntax like so :
class Foo(Document):
data = Float()
timestamp = Date(name="@timestamp") # Python attr -> ES field
class DataStream:
name = "foo"
with (
ElasticSearchContainer(ES_IMAGE)
.with_env("discovery.type", "single-node")
.with_env("ELASTIC_PASSWORD", PASSWORD) as es
):
url = f"http://{es.get_container_host_ip()}:{es.get_exposed_port(9200)}"
with Elasticsearch(
url,
basic_auth=("elastic", PASSWORD),
verify_certs=False,
) as client:
client.cluster.health(wait_for_status="yellow", timeout="30s")
Foo.init(using=client)
doc = Foo(timestamp="2024-01-01T00:00:00+00:00", data=0.01)
doc.save(using=client)
Current class definition of an index is incompatible with data streams. It does not allow to initialize the data stream from the class, nor to access the usual methods and utils (
init(),save(),bulk(), etc)Current way of handling data streams :
I guess simply allowing
DataStreamin the class definition and making sure.to_dict()correctly set '@timestamp' as the key would allow a cleaner syntax like so :