Skip to content

Commit c70a429

Browse files
committed
dt: add test for exceptional code paths
1 parent 30d6443 commit c70a429

File tree

1 file changed

+89
-0
lines changed

1 file changed

+89
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright 2024 Redpanda Data, Inc.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the file licenses/BSL.md
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0
9+
10+
from rptest.services.catalog_service import CatalogType
11+
from rptest.services.cluster import cluster
12+
from rptest.services.redpanda import PandaproxyConfig, SISettings, SchemaRegistryConfig
13+
from rptest.tests.datalake.datalake_services import DatalakeServices
14+
from rptest.tests.datalake.query_engine_base import QueryEngineType
15+
from rptest.tests.datalake.utils import supported_storage_types
16+
from rptest.tests.redpanda_test import RedpandaTest
17+
from ducktape.mark import matrix
18+
19+
20+
class DatalakeTranslationInterruptionsTest(RedpandaTest):
21+
def __init__(self, test_ctx, *args, **kwargs):
22+
super(DatalakeTranslationInterruptionsTest,
23+
self).__init__(test_ctx,
24+
num_brokers=1,
25+
si_settings=SISettings(test_context=test_ctx),
26+
extra_rp_conf={
27+
"iceberg_enabled": "true",
28+
"iceberg_catalog_commit_interval_ms": 1000,
29+
},
30+
schema_registry_config=SchemaRegistryConfig(),
31+
pandaproxy_config=PandaproxyConfig(),
32+
*args,
33+
**kwargs)
34+
self.test_ctx = test_ctx
35+
self.topic_name = "test"
36+
37+
def setUp(self):
38+
# redpanda will be started by DatalakeServices
39+
pass
40+
41+
@cluster(num_nodes=4)
42+
@matrix(cloud_storage_type=supported_storage_types(),
43+
query_engine=[QueryEngineType.SPARK],
44+
catalog_type=[CatalogType.REST_HADOOP])
45+
def test_scheduler_time_slice_interruptions(self, cloud_storage_type,
46+
query_engine, catalog_type):
47+
"""This test verifies the error paths triggered due to scheduler
48+
time slice violation. Particularly exceptional paths in multiplexer/writers
49+
triggered by exceptions from abort source"""
50+
51+
# A low scheduler time slice guarantees that translation is frequently interrupted
52+
# triggering exceptional paths.
53+
row_count = 100000
54+
self.redpanda.add_extra_rp_conf(
55+
{"datalake_scheduler_time_slice_ms": "1000"})
56+
with DatalakeServices(self.test_ctx,
57+
redpanda=self.redpanda,
58+
include_query_engines=[query_engine],
59+
catalog_type=catalog_type) as dl:
60+
dl.create_iceberg_enabled_topic(self.topic_name, partitions=10)
61+
dl.produce_to_topic(self.topic_name, 1024, row_count)
62+
dl.wait_for_translation(self.topic_name, msg_count=row_count)
63+
64+
@cluster(num_nodes=4)
65+
@matrix(cloud_storage_type=supported_storage_types(),
66+
query_engine=[QueryEngineType.SPARK],
67+
catalog_type=[CatalogType.REST_HADOOP])
68+
def test_oom_interruptions(self, cloud_storage_type, query_engine,
69+
catalog_type):
70+
"""This test verifies the error paths triggered due to scheduler
71+
oom interruptions. Particularly exceptional paths in multiplexer/writers
72+
triggered by exceptions from abort source"""
73+
74+
# A low scheduler time slice guarantees that translation is frequently interrupted
75+
# triggering exceptional paths.
76+
row_count = 100000
77+
self.redpanda.add_extra_rp_conf({
78+
"datalake_scheduler_block_size_bytes":
79+
64 * 1024 * 1024,
80+
"datalake_scheduler_max_concurrent_translations":
81+
2
82+
})
83+
with DatalakeServices(self.test_ctx,
84+
redpanda=self.redpanda,
85+
include_query_engines=[query_engine],
86+
catalog_type=catalog_type) as dl:
87+
dl.create_iceberg_enabled_topic(self.topic_name, partitions=4)
88+
dl.produce_to_topic(self.topic_name, 1024, row_count)
89+
dl.wait_for_translation(self.topic_name, msg_count=row_count)

0 commit comments

Comments
 (0)