Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add remote table materialization #8

Open
wants to merge 5 commits into
base: dap-main
Choose a base branch
from

Conversation

canbekley
Copy link
Collaborator

Materialization strategy for creating distributed engine table with arbitrary base table target

@@ -117,8 +117,7 @@ def convert_time_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
@available.parse(lambda *a, **k: {})
def get_clickhouse_cluster_name(self):
conn = self.connections.get_if_exists()
if conn.credentials.cluster:
return f'"{conn.credentials.cluster}"'
Copy link
Collaborator

@Magicbeanbuyer Magicbeanbuyer Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALTER TABLE _admin_portal_aggregation.usage_aggregation_full_period_blue_base
        ON CLUSTER "fal"
        DROP PARTITION tuple(toYYYYMM(toDate('2025-01-12 03:24:00')))

also works :D
code
dag

@@ -32,7 +32,7 @@
db.engine as db_engine,
{%- if adapter.get_clickhouse_cluster_name() -%}
count(distinct _shard_num) > 1 as is_on_cluster
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t
from clusterAllReplicas('{{ adapter.get_clickhouse_cluster_name() }}', system.tables) as t
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three variants work

select * from cluster(default, system.one);
select * from cluster('default', system.one);
select * from cluster("default", system.one);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about passing reserved names to the first parameter


# this `remote_table` run with default configuration will point to previously created local table, taking
# `local_db_prefix` and `local_suffix` settings into account.
run_dbt()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  create or replace table `dbt_clickhouse_3961_test_remote_table_1736978435032`.`remote_table` 
   (key1 UInt64, key2 Int64)
  engine = Distributed('test_shard', 'dbt_clickhouse_3961_test_remote_table_1736978435032', 'remote_table_local'
    , rand()
  )

I copied this DDL from test debugging, and I have two questions.

  1. What is the use case for creating a Distributed table on test_shard when the base table is also on test_shard? Wouldn't this simply overwrite the Distributed table created by distributed_incremental materialization?
  2. I didn't dig deeper, but on_cluster_clause returned nothing in this case, but I believe this query should have an on cluster test_shard clause, otherwise the Distributed table will only be available on one shard.

{%- set remote_config = config.get('remote_config', {}) -%}
{%- set remote_cluster = remote_config.get('cluster') or adapter.get_clickhouse_cluster_name() -%}
{%- set remote_schema = remote_config.get('schema') or adapter.get_clickhouse_local_db_prefix() + this.schema -%}
{%- set remote_identifier = remote_config.get('identifier') or this.identifier + adapter.get_clickhouse_local_suffix() -%}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remote_config.get('schema') be with leading underscore and remote_config.get('identifier') with _base suffix in case the base table in on the ingestion cluster?

""")

# the created distributed table should point to a local table as defined in the model's `remote_config`
run_dbt()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 create or replace table `dbt_clickhouse_3076_test_remote_table_1736979527642`.`remote_table` 
   (key1 UInt64, key2 Int64)
  engine = Distributed('test_remote', 'dbt_clickhouse_3076_test_remote_table_1736979527642', 'remote_target_table', key1)

Is on cluster test_shard also missing here?

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% call statement('main') %}
{{ create_distributed_table(target_relation, remote_relation, sql) }}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the sql defined?

{%- set remote_cluster = local_relation.remote_cluster or adapter.get_clickhouse_cluster_name() -%}
{%- set sharding = config.get('sharding_key') -%}
{%- set reference = "as " ~ local_relation -%}
{%- if sql -%}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use case for having an SQL when creating a distributed table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't rely on an existing local relation for the ddl query. the sql will allow to directly pass the column names and types

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants