|
1 | | -from datetime import datetime, timedelta |
| 1 | +from datetime import timedelta |
2 | 2 | from os import environ |
3 | 3 |
|
4 | 4 | import clickhouse_connect |
|
8 | 8 | from tests.pytest.constants import ( |
9 | 9 | MICROBATCH_INPUT_MODEL, |
10 | 10 | MICROBATCH_TEST_MODEL, |
| 11 | + QUERY_COUNT_ROWS, |
| 12 | + QUERY_TIMESTAMP, |
11 | 13 | ) |
12 | 14 |
|
13 | 15 |
|
14 | | -@pytest.fixture(scope="session") |
15 | | -def ch_client(): |
16 | | - client = clickhouse_connect.get_client( |
17 | | - host=environ['CLICKHOUSE_HOST'], |
18 | | - port=environ['CLICKHOUSE_PORT'], |
19 | | - user=environ['CLICKHOUSE_USER'], |
20 | | - password=environ['CLICKHOUSE_PASSWORD'], |
21 | | - database=environ['CLICKHOUSE_DATABASE'], |
22 | | - ) |
23 | | - return client |
| 16 | +class TestMicrobatch: |
| 17 | + @pytest.fixture(scope="class") |
| 18 | + def ch_client(self): |
| 19 | + """ClickHouse client setup fixture""" |
24 | 20 |
|
| 21 | + client = clickhouse_connect.get_client( |
| 22 | + host=environ['CLICKHOUSE_HOST'], |
| 23 | + port=environ['CLICKHOUSE_PORT'], |
| 24 | + user=environ['CLICKHOUSE_USER'], |
| 25 | + password=environ['CLICKHOUSE_PASSWORD'], |
| 26 | + database=environ['CLICKHOUSE_DATABASE'], |
| 27 | + ) |
| 28 | + return client |
25 | 29 |
|
26 | | -@pytest.fixture(scope="session") |
27 | | -def setup_test_environment(ch_client): |
28 | | - con = ch_client |
29 | | - return con |
| 30 | + @pytest.fixture(scope="class") |
| 31 | + def setup_test_environment(self, ch_client): |
| 32 | + """Pretest setup fixture""" |
30 | 33 |
|
| 34 | + run_dbt(['run', '--select', f'+{MICROBATCH_INPUT_MODEL}']) |
31 | 35 |
|
32 | | -def test_batching_24h(setup_test_environment): |
33 | | - con = setup_test_environment |
| 36 | + con = ch_client |
| 37 | + timestamps = con.query_df( |
| 38 | + QUERY_TIMESTAMP.format( |
| 39 | + table_name=MICROBATCH_TEST_MODEL, timestamp_column='event_datetime' |
| 40 | + ) |
| 41 | + ) |
| 42 | + min_timestamp = timestamps['min_timestamp'][0] |
| 43 | + max_timestamp = timestamps['max_timestamp'][0] |
34 | 44 |
|
35 | | - result_query = """ |
36 | | - select |
37 | | - count() as rows_count |
38 | | - from |
39 | | - default.{table_name} |
40 | | - """ |
| 45 | + return {'min_timestamp': min_timestamp, 'max_timestamp': max_timestamp} |
41 | 46 |
|
42 | | - run_dbt( |
43 | | - [ |
44 | | - 'run', |
45 | | - '--select', |
46 | | - f'+{MICROBATCH_TEST_MODEL}', |
47 | | - '--vars', |
48 | | - f'{{"materialization_start_date": "{(datetime.now() - timedelta(hours=1000)).strftime("%Y-%m-%d")}" }}', |
49 | | - ] |
50 | | - ) |
| 47 | + def test_batching_1h(self, ch_client, setup_test_environment): |
| 48 | + """ |
| 49 | + Microbatch test with 1h batch size |
| 50 | + """ |
51 | 51 |
|
52 | | - expected_df = con.query_df(result_query.format(table_name=MICROBATCH_TEST_MODEL)) |
| 52 | + con = ch_client |
| 53 | + max_timestamp = setup_test_environment['max_timestamp'] |
| 54 | + offset_hours = 50 |
53 | 55 |
|
54 | | - assert expected_df['rows_count'][0] == 1000 |
| 56 | + run_dbt( |
| 57 | + [ |
| 58 | + 'run', |
| 59 | + '--select', |
| 60 | + f'{MICROBATCH_TEST_MODEL}', |
| 61 | + '--vars', |
| 62 | + f'''{{ |
| 63 | + "materialization_start_date": "{(max_timestamp - timedelta(hours=offset_hours)).strftime("%Y-%m-%d")}", |
| 64 | + "batch_size": 1 |
| 65 | + }}''', |
| 66 | + '--full-refresh', |
| 67 | + ] |
| 68 | + ) |
| 69 | + |
| 70 | + actual_result = con.query_df( |
| 71 | + QUERY_COUNT_ROWS.format(table_name=MICROBATCH_TEST_MODEL) |
| 72 | + ) |
| 73 | + |
| 74 | + expected_result = con.query_df( |
| 75 | + QUERY_COUNT_ROWS.format(table_name=MICROBATCH_INPUT_MODEL) |
| 76 | + + f"where event_datetime >= toDate('{max_timestamp}' - interval {offset_hours} hour)" |
| 77 | + ) |
| 78 | + |
| 79 | + assert expected_result['rows_count'][0] == actual_result['rows_count'][0] |
| 80 | + |
| 81 | + def test_batching_8h(self, ch_client, setup_test_environment): |
| 82 | + """ |
| 83 | + Microbatch test with 8h batch size |
| 84 | + """ |
| 85 | + |
| 86 | + con = ch_client |
| 87 | + max_timestamp = setup_test_environment['max_timestamp'] |
| 88 | + offset_hours = 100 |
| 89 | + |
| 90 | + run_dbt( |
| 91 | + [ |
| 92 | + 'run', |
| 93 | + '--select', |
| 94 | + f'{MICROBATCH_TEST_MODEL}', |
| 95 | + '--vars', |
| 96 | + f'''{{ |
| 97 | + "materialization_start_date": "{(max_timestamp - timedelta(hours=offset_hours)).strftime("%Y-%m-%d")}", |
| 98 | + "batch_size": 8 |
| 99 | + }}''', |
| 100 | + '--full-refresh', |
| 101 | + ] |
| 102 | + ) |
| 103 | + |
| 104 | + actual_result = con.query_df( |
| 105 | + QUERY_COUNT_ROWS.format(table_name=MICROBATCH_TEST_MODEL) |
| 106 | + ) |
| 107 | + |
| 108 | + expected_result = con.query_df( |
| 109 | + QUERY_COUNT_ROWS.format(table_name=MICROBATCH_INPUT_MODEL) |
| 110 | + + f"where event_datetime >= toDate('{max_timestamp}' - interval {offset_hours} hour)" |
| 111 | + ) |
| 112 | + |
| 113 | + assert expected_result['rows_count'][0] == actual_result['rows_count'][0] |
| 114 | + |
| 115 | + def test_batching_24h(self, ch_client, setup_test_environment): |
| 116 | + """ |
| 117 | + Microbatch test with 24h batch size |
| 118 | + """ |
| 119 | + |
| 120 | + con = ch_client |
| 121 | + min_timestamp = setup_test_environment['min_timestamp'] |
| 122 | + |
| 123 | + run_dbt( |
| 124 | + [ |
| 125 | + 'run', |
| 126 | + '--select', |
| 127 | + f'{MICROBATCH_TEST_MODEL}', |
| 128 | + '--vars', |
| 129 | + f'{{"materialization_start_date": "{min_timestamp.strftime("%Y-%m-%d")}"}}', |
| 130 | + '--full-refresh', |
| 131 | + ] |
| 132 | + ) |
| 133 | + |
| 134 | + actual_result = con.query_df( |
| 135 | + QUERY_COUNT_ROWS.format(table_name=MICROBATCH_TEST_MODEL) |
| 136 | + ) |
| 137 | + |
| 138 | + expected_result = con.query_df( |
| 139 | + QUERY_COUNT_ROWS.format(table_name=MICROBATCH_INPUT_MODEL) |
| 140 | + ) |
| 141 | + |
| 142 | + assert expected_result['rows_count'][0] == actual_result['rows_count'][0] |
0 commit comments