Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add remote table materialization #8

Open
wants to merge 5 commits into
base: dap-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(self, credentials: ClickHouseCredentials):
"distributed_table": {},
"distributed_incremental": {},
"general": {},
"remote_table": {},
}
if (
not credentials.allow_automatic_deduplication
Expand Down
3 changes: 1 addition & 2 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ def convert_time_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
@available.parse(lambda *a, **k: {})
def get_clickhouse_cluster_name(self):
conn = self.connections.get_if_exists()
if conn.credentials.cluster:
return f'"{conn.credentials.cluster}"'
Copy link
Collaborator

@Magicbeanbuyer Magicbeanbuyer Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALTER TABLE _admin_portal_aggregation.usage_aggregation_full_period_blue_base
        ON CLUSTER "fal"
        DROP PARTITION tuple(toYYYYMM(toDate('2025-01-12 03:24:00')))

also works :D
code
dag

return conn.credentials.cluster

@available.parse(lambda *a, **k: {})
def get_clickhouse_local_suffix(self):
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ClickHouseRelation(BaseRelation):
quote_character: str = '`'
can_exchange: bool = False
can_on_cluster: bool = False
remote_cluster: Optional[str] = None

def __post_init__(self):
if self.database != self.schema and self.database:
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
db.engine as db_engine,
{%- if adapter.get_clickhouse_cluster_name() -%}
count(distinct _shard_num) > 1 as is_on_cluster
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t
from clusterAllReplicas('{{ adapter.get_clickhouse_cluster_name() }}', system.tables) as t
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three variants work

select * from cluster(default, system.one);
select * from cluster('default', system.one);
select * from cluster("default", system.one);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about passing reserved names to the first parameter

join system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
group by name, schema, type, db_engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,30 @@

{% endmaterialization %}

{% macro create_distributed_table(relation, local_relation) %}
{%- set cluster = adapter.get_clickhouse_cluster_name() -%}
{% if cluster is none %}
{% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %}
{% endif %}
{% macro create_distributed_table(relation, local_relation, sql=none) %}
{% if adapter.get_clickhouse_cluster_name() is none %}
{% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %}
{% endif %}

{%- set cluster = cluster[1:-1] -%}
{%- set sharding = config.get('sharding_key') -%}

create or replace table {{ relation }} {{ on_cluster_clause(relation) }} as {{ local_relation }}
ENGINE = Distributed('{{ cluster}}', '{{ local_relation.schema }}', '{{ local_relation.name }}'
{%- if sharding is not none and sharding.strip() != '' -%}
, {{ sharding }}
{%- else %}
, rand()
{% endif -%}
)
{% endmacro %}
{%- set remote_cluster = local_relation.remote_cluster or adapter.get_clickhouse_cluster_name() -%}
{%- set sharding = config.get('sharding_key') -%}
{%- set reference = "as " ~ local_relation -%}
{%- if sql -%}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use case for having an SQL when creating a distributed table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't rely on an existing local relation for the ddl query. the sql will allow to directly pass the column names and types

{%- set col_list = [] -%}
{%- for col in adapter.get_column_schema_from_query(sql) -%}
{%- do col_list.append(col.name + ' ' + col.data_type) -%}
{%- endfor -%}
{%- set reference = "(" ~ (col_list | join(', ')) ~ ")" -%}
{%- endif -%}
create or replace table {{ relation }} {{ on_cluster_clause(relation) }} {{ reference }}
engine = Distributed('{{ remote_cluster }}', '{{ local_relation.schema }}', '{{ local_relation.name }}'
{%- if sharding is not none and sharding.strip() != '' -%}
, {{ sharding }}
{%- else %}
, rand()
{% endif -%}
)
{% endmacro %}

{% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%}
{%- set sql_header = config.get('sql_header', none) -%}
Expand Down
27 changes: 27 additions & 0 deletions dbt/include/clickhouse/macros/materializations/remote_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{% materialization remote_table, adapter='clickhouse', supported_languages=['python', 'sql'] -%}
{%- set remote_config = config.get('remote_config', {}) -%}
{%- set remote_cluster = remote_config.get('cluster') or adapter.get_clickhouse_cluster_name() -%}
{%- set remote_schema = remote_config.get('schema') or adapter.get_clickhouse_local_db_prefix() + this.schema -%}
{%- set remote_identifier = remote_config.get('identifier') or this.identifier + adapter.get_clickhouse_local_suffix() -%}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remote_config.get('schema') be with leading underscore and remote_config.get('identifier') with _base suffix in case the base table in on the ingestion cluster?


{% set target_relation = this.incorporate(type='table') %}
{% set remote_relation = target_relation.incorporate(path={"identifier": remote_identifier, "schema": remote_schema}, remote_cluster=remote_cluster) %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% call statement('main') %}
{{ create_distributed_table(target_relation, remote_relation, sql) }}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the sql defined?

{% endcall %}

{% set should_revoke = should_revoke(target_relation, full_refresh_mode) %}
{% set grant_config = config.get('grants') %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
97 changes: 97 additions & 0 deletions tests/integration/adapter/remote_table/test_remote_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pytest
from dbt.tests.util import run_dbt


model_with_defaults = """
{{
config(
materialized='remote_table',
)
}}
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
"""


class TestRemoteTable:
@pytest.fixture(scope="class")
def models(self):
return {
"remote_table.sql": model_with_defaults,
}

def test_with_defaults(self, project):
# initialize a local table on current cluster
project.run_sql(f"""
create table {project.test_schema}.remote_table_local on cluster {project.test_config["cluster"]}
(key1 UInt64, key2 Int64)
engine=ReplicatedMergeTree order by key1
""")

# this `remote_table` run with default configuration will point to previously created local table, taking
# `local_db_prefix` and `local_suffix` settings into account.
run_dbt()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  create or replace table `dbt_clickhouse_3961_test_remote_table_1736978435032`.`remote_table` 
   (key1 UInt64, key2 Int64)
  engine = Distributed('test_shard', 'dbt_clickhouse_3961_test_remote_table_1736978435032', 'remote_table_local'
    , rand()
  )

I copied this DDL from test debugging, and I have two questions.

  1. What is the use case for creating a Distributed table on test_shard when the base table is also on test_shard? Wouldn't this simply overwrite the Distributed table created by distributed_incremental materialization?
  2. I didn't dig deeper, but on_cluster_clause returned nothing in this case, but I believe this query should have an on cluster test_shard clause, otherwise the Distributed table will only be available on one shard.


# insert into distributed table
project.run_sql(f"""
insert into {project.test_schema}.remote_table
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
settings insert_quorum=1
""")

_assert_is_distributed_table(project)
_assert_correct_distributed_data(project)


model_with_remote_configuration = """
{{
config(
materialized='remote_table',
remote_config={'cluster': 'test_remote', 'schema': this.schema, 'identifier': 'remote_target_table'},
sharding_key='key1',
)
}}
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
"""


class TestRemoteTableRemoteConfig:
@pytest.fixture(scope="class")
def models(self):
return {
"remote_table.sql": model_with_remote_configuration,
}

def test_with_remote_configuration(self, project):
# initialize a local table on remote cluster 'test_remote_cluster'
project.run_sql(f"create database if not exists {project.test_schema} on cluster `test_remote`")
project.run_sql(f"""
create table {project.test_schema}.remote_target_table on cluster `test_remote`
engine=MergeTree order by key1
as select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
""")

# the created distributed table should point to a local table as defined in the model's `remote_config`
run_dbt()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 create or replace table `dbt_clickhouse_3076_test_remote_table_1736979527642`.`remote_table` 
   (key1 UInt64, key2 Int64)
  engine = Distributed('test_remote', 'dbt_clickhouse_3076_test_remote_table_1736979527642', 'remote_target_table', key1)

Is on cluster test_shard also missing here?


_assert_is_distributed_table(project)
_assert_correct_distributed_data(project)

# assert correct engine parameters
result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one")
assert f"Distributed('test_remote', '{project.test_schema}', 'remote_target_table', key1)" in result[0]


def _assert_is_distributed_table(project):
# check correct table creation on current host
result = project.run_sql(
f"select engine from system.tables where name='remote_table'",
fetch="one"
)
assert result is not None
assert result[0] == "Distributed"


def _assert_correct_distributed_data(project):
# query remote data from distributed table
result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one")
assert result[0] == 10
10 changes: 10 additions & 0 deletions tests/integration/test_config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
<replica from_env="REPLICA_NUM" />
<server_index from_env="SERVER_INDEX" />
</macros>
<default_replica_path>/clickhouse/tables/{uuid}/{shard}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
<remote_servers>
<test_shard>
<shard>
Expand Down Expand Up @@ -44,6 +46,14 @@
</replica>
</shard>
</test_replica>
<test_remote>
<shard>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
</test_remote>
</remote_servers>
<keeper_server>
<tcp_port>9181</tcp_port>
Expand Down