Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions macros/materializations/custom_dictionary.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{%- materialization custom_dictionary -%}
{#
Steamlines a dictionary creating process by reducing the amount of boilerplate code.
{#
Streamlines a dictionary creating process by reducing the amount of boilerplate code.
Creates a dictionary with a "direct" layout for "dev" or "staging" schemas; otherwise, keeps the settings unchanged.
This materialization requires "--depends_on:{{ ref('<source_table_name>') }}" clause for the "clickhouse" source.
Arguments:
Expand Down Expand Up @@ -36,7 +36,11 @@
target_relation = get_or_create_relation(none, target.schema, this.identifier, none) -%}
{%- set backup_relation = make_backup_relation(target_relation, none, suffix='__dbt_backup') -%}

-- log settings
{%- set silence_mode = config.get('silence_mode', default=false) -%}

-- materialization settings
{%- set use_yaml_types = config.get('use_yaml_types', default=false) -%}
{%- set source = config.get('source', default='clickhouse') -%}
{%- set primary_key_config = config.require('primary_key') -%}
{%- set columns_to_include = config.get('columns_to_include', default='*') -%}
Expand Down Expand Up @@ -77,12 +81,27 @@
{%- do exceptions.raise_compiler_error('Invalid source settings') -%}
{%- endif -%}

-- source columns redefinition
{%- set source_columns_override = namespace(value=[]) -%}

{%- if use_yaml_types -%}
{%- for model in graph.nodes.values() -%}
{%- if model.name == this.name -%}
-- unpacking the "columns" dictionary to fit the "source_columns" structure
{%- for column in model.columns.values() -%}
{%- do source_columns_override.value.append(column) -%}
{%- endfor -%}
{%- endif -%}
{%- endfor -%}
{%- endif -%}

{%- set source_columns = source_columns_override.value -%}

-- logic ---------------------------------------------------------------------------------------------------------------
-- in order to use "loop.last" and set a comma on the last line correctly "source_columns" is filtered
{%- set filtered_columns = namespace(value=[]) -%}

{{- log('Filtering columns', info=true) -}}
{{- diu.mcr_log_colored('Filtering columns', silence_mode) -}}
{%- if columns_to_include == '*' and columns_to_exclude -%}
{%- for column in source_columns -%}
{%- if column.name not in columns_to_exclude -%}
Expand All @@ -108,7 +127,7 @@
-- generating the dictionary ddl
{%- set dictionary_ddl = namespace(value=sql) -%}

{{- log('Generating the dictionary DDL', info=true) -}}
{{- diu.mcr_log_colored('Generating the dictionary DDL', silence_mode) -}}
{%- for column in source_columns -%}
{%- if loop.first -%}
{%- set dictionary_ddl.value = dictionary_ddl.value ~ create_clause ~ backup_relation ~ '\n(' -%}
Expand Down Expand Up @@ -136,18 +155,19 @@
{%- set sql = sql ~ '\n' ~ layout_pattern.format(dev_schema_layout) -%}
{%- endif -%}

{{- log('Executing generated SQL:\n' ~ sql, info=true) -}}
{{- diu.mcr_log_colored('Executing generated SQL:\n' ~ sql, silence_mode) -}}
{%- do run_query(sql) -%}

{{- log('Checking if the dictionary is queryable', info=true) -}}
{{- diu.mcr_log_colored('Checking if the dictionary is queryable', silence_mode) -}}
{%- set query_result = run_query('select 1 from ' ~ backup_relation ~ ' limit 1') -%}

{%- if query_result is none -%}
{%- do exceptions.raise_compiler_error('Dictionary is not queryable. Aborting') -%}
{%- do exceptions.raise_compiler_error(
diu.mcr_log_colored('Dictionary is not queryable. Aborting', silence_mode, 'red')) -%}
{%- endif -%}

{{- log('Dictionary is queryable', info=true) -}}
{{- log('Replacing the backup relation with the target relation', info=true) -}}
{{- diu.mcr_log_colored('Dictionary is queryable', silence_mode, 'green') -}}
{{- diu.mcr_log_colored('Replacing the backup relation with the target relation', silence_mode) -}}

-- if the target relation exists, exchange the tables; rename otherwise
{%- if target_relation_exists -%}
Expand All @@ -157,7 +177,7 @@
{%- do adapter.rename_relation(backup_relation, target_relation) -%}
{%- endif -%}

{{- log('Dictionary ' ~ target_relation ~ ' is ready', info=true) -}}
{{- diu.mcr_log_colored('Dictionary ' ~ target_relation ~ ' is ready', silence_mode) -}}

-- dropping the backup relation
{%- do diu.mcr_drop_relation_if_exists(backup_relation) -%}
Expand Down
76 changes: 26 additions & 50 deletions macros/materializations/microbatch.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@

-- log settings
{%- set debug_mode = config.get('debug_mode', default=false) -%}
{%- set silence_mode = config.get('silence_mode', default=true) -%}
{%- set silence_mode = config.get('silence_mode', default=false) -%}

-- logic ---------------------------------------------------------------------------------------------------------------
{%- if not microbatch_settings -%}
{%- do exceptions.raise_compiler_error(
diu.log_colored(
diu.mcr_log_colored(
'No microbatch settings found\n' ~
'Please add microbatch settings to the model input section', silence_mode, color='red')) -%}
{%- endif -%}
Expand All @@ -61,7 +61,7 @@
{%- do input_lookforward_windows_list.append((setting[2] | int) if setting[2] else 0) -%}
{%- endfor -%}

{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Input models:\n\t' ~ input_models_list | join('\n\t') ~
'\nFinal is set for:\n\t' ~
zip(input_models_list, final_settings_list) | selectattr(1) | map(attribute=0) | join('\n\t') ~
Expand All @@ -75,13 +75,13 @@
{%- set sql = re.sub('`(.+)`\.`(.+)`\s+final', '`\\1`.`\\2`', sql, flags=re.IGNORECASE) -%}

{%- if target_schema == production_schema -%}
{{- diu.log_colored('Starting to build to production schema: ' ~ target_schema, silence_mode) -}}
{{- diu.mcr_log_colored('Starting to build to production schema: ' ~ target_schema, silence_mode) -}}

{%- else -%}
{%- set materialization_start_date =
dt.datetime.combine(dt.datetime.today() - dt.timedelta(days=dev_days_offset), dt.time.min) -%}

{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Starting to build to dev schema:\n\t' ~ target_schema ~
'\nDev schema materialization start date:\n\t' ~ materialization_start_date ~
'\nOriginal materialization start date:\n\t' ~ config.get('materialization_start_date'), silence_mode) -}}
Expand All @@ -101,14 +101,14 @@
{%- if is_schema_changed -%}
{%- if on_schema_change == 'fail' and not full_refresh -%}
{%- do exceptions.raise_compiler_error(
diu.log_colored(
diu.mcr_log_colored(
'Schema change detected. Materialization will be stopped\n' ~
'Please revise the schema changes or set "on_schema_change" to "full_refresh"\n' ~
'If you want to force materialization - run with "full-refresh" flag', silence_mode, color='red')) -%}

{%- elif on_schema_change == 'full_refresh' -%}
{%- set full_refresh = true -%}
{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Full refresh will be done since "on_schema_change" is set to "full_refresh"', silence_mode, color='red') -}}

{%- endif -%}
Expand Down Expand Up @@ -138,14 +138,14 @@

{%- set start_time = last_record_datetime - diu.get_unit_interval(value=overwrite_size, unit=time_unit_name) -%}

{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Target relation exists' ~
'\nLast record datetime:\n\t' ~ last_record_datetime ~
'\nLast record datetime with overwrite size:\n\t' ~ start_time, debug_mode) -}}

{%- else -%}
{%- set start_time = materialization_start_date -%}
{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Target relation doesn\'t exist' ~
'\nDefault datetime:\n\t' ~ start_time, debug_mode) -}}

Expand All @@ -158,9 +158,9 @@
enddate=fixed_now,
unit=time_unit_name) -%}

{{- diu.log_colored('Calculating interval parts', silence_mode) -}}
{{- diu.mcr_log_colored('Calculating interval parts', silence_mode) -}}
{%- set parts_count = [(interval_range / batch_size) | round(0, 'ceil') | int, 2] | max -%}
{{- diu.log_colored('Interval parts count:\n\t' ~ parts_count, silence_mode) -}}
{{- diu.mcr_log_colored('Interval parts count:\n\t' ~ parts_count, silence_mode) -}}

-- temporary table creation
{%- set tmp_relation_exists, tmp_relation =
Expand All @@ -176,7 +176,7 @@
{%- set partition_id = diu.get_partition_id(start_time, partition_by_format) -%}

{%- if not full_refresh -%}
{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Copying partition "' ~ partition_id ~ '" from ' ~ target_relation ~ ' to ' ~ tmp_relation, silence_mode) -}}
{%- do diu.copy_partition(target_relation, tmp_relation, partition_id) -%}

Expand Down Expand Up @@ -213,14 +213,14 @@

{%- if execute -%}
{%- if loop.first -%}
{{- diu.log_colored('Inserting into: ' ~ tmp_relation, silence_mode) -}}
{{- diu.mcr_log_colored('Inserting into: ' ~ tmp_relation, silence_mode) -}}
{%- endif -%}

{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Inserting batch: ' ~ (i + 1) ~ ' out of ' ~ parts_count ~
'\nDate range: from ' ~ having_conditions[0] ~ ' to ' ~ having_conditions[1], silence_mode) -}}

{{- diu.log_colored(insert_query[250:1100] ~ '\n\n...\n\n' ~ insert_query[-200:], debug_mode) -}}
{{- diu.mcr_log_colored(insert_query[250:1100] ~ '\n\n...\n\n' ~ insert_query[-200:], debug_mode) -}}

-- insert query execution
{%- call statement('inserting_new_data_to_temporary_table') -%}
Expand All @@ -232,11 +232,11 @@

{%- if full_refresh -%}
-- exchanging tmp table and target table
{{- diu.log_colored('Exchanging ' ~ tmp_relation ~ ' with ' ~ target_relation, silence_mode) -}}
{{- diu.mcr_log_colored('Exchanging ' ~ tmp_relation ~ ' with ' ~ target_relation, silence_mode) -}}
{{- diu.exchange_tables(tmp_relation, target_relation) -}}
{%- else -%}
-- replacing partitions
{{- diu.log_colored('Replacing partitions from ' ~ tmp_relation ~ ' to ' ~ target_relation, silence_mode) -}}
{{- diu.mcr_log_colored('Replacing partitions from ' ~ tmp_relation ~ ' to ' ~ target_relation, silence_mode) -}}
{{- diu.insert_overwrite_partitions(target_relation, tmp_relation) -}}

-- checking for duplicate parts and dropping if needed
Expand All @@ -250,30 +250,6 @@
{%- endmaterialization -%}


{%- macro log_colored(message, silence_mode=false, color='yellow') -%}
{#
Makes log message colored.
Arguments:
message(string): The log message to be colored
silence_mode(bool): Should the log message be printed
color(string): The color of the log message
Returns:
The colored log message
#}
{%- set color_code_start = '\n\033[0;' -%}

{%- if color == 'green' -%}
{%- set color_code = '32m' -%}
{%- elif color == 'red' -%}
{%- set color_code = '31m' -%}
{%- elif color == 'yellow' -%}
{%- set color_code = '33m' -%}
{%- endif -%}

{{- log(this.identifier ~ ' log:' ~ color_code_start ~ color_code ~ message ~ '\033[00m', silence_mode) -}}
{%- endmacro -%}


{%- macro check_schema_changes(database, schema, identifier, type, sql, debug_mode, silence_mode) -%}
{#
Compares history table columns names and types consistency with the query being executed
Expand All @@ -298,7 +274,7 @@
adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}

{%- if relation and execute -%}
{{- diu.log_colored('Checking existing table columns names and types consistency', debug_mode) -}}
{{- diu.mcr_log_colored('Checking existing table columns names and types consistency', debug_mode) -}}

{%- set check_relation_exists, check_relation =
diu.get_or_create_dataset(
Expand All @@ -320,11 +296,11 @@
-- columns number comparison
{%- if (columns_old | length) != (columns_new | length) -%}
{%- set is_schema_changed.value = true -%}
{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Number of columns doesn\'t match', silence_mode, color='red') -}}

{%- else -%}
{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Number of columns match\nChecking for name and type consistency', silence_mode) -}}

{%- for i in range(columns_new | length) -%}
Expand All @@ -333,15 +309,15 @@

{%- if column_old.data_type != column_new.data_type or column_old.name != column_new.name -%}
{%- set is_schema_changed.value = true -%}
{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Column name/type mismatch:' ~
'\n\told: ' ~ column_old.name ~ ' ' ~ column_old.data_type ~
'\n\tnew: ' ~ column_new.name ~ ' ' ~ column_new.data_type, silence_mode, color='red') -}}
{%- endif -%}
{%- endfor -%}
{%- endif -%}

{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Checking for name and type consistency is done',
silence_mode,
color='red' if is_schema_changed.value else 'green') -}}
Expand Down Expand Up @@ -370,7 +346,7 @@
#}
{%- set diu = dbt_improvado_utils -%}

{{- diu.log_colored('Checking if relation exists:\n\t' ~ identifier, debug_mode) -}}
{{- diu.mcr_log_colored('Checking if relation exists:\n\t' ~ identifier, debug_mode) -}}

{%- set relation_exists, relation =
get_or_create_relation(
Expand All @@ -381,7 +357,7 @@

-- creating relation if it doesn't exist
{%- if not relation_exists and execute -%}
{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Creating non-existing relation:\n\t' ~ identifier, debug_mode) -}}

{%- if type == 'table' -%}
Expand All @@ -390,7 +366,7 @@
{%- do create_view_as(relation, sql) -%}
{%- endif -%}

{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Relation has been created:\n\t' ~ identifier, debug_mode) -}}
{%- endif -%}

Expand Down Expand Up @@ -706,7 +682,7 @@
{%- set part_names = dup_group['part_names'] -%}
{%- set parts_to_delete = part_names[1:] -%}

{{- diu.log_colored(
{{- diu.mcr_log_colored(
'Detected ' ~ (part_names | length) ~ ' duplicate parts in partition '~
dup_partition_id ~ ' with hash ' ~ dup_hash ~ ' - deleting duplicates',
silence_mode, color='red') -}}
Expand Down
22 changes: 22 additions & 0 deletions macros/utils/mcr_log_colored.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{%- macro mcr_log_colored(message, silence_mode=false, color='yellow') -%}
{#
Makes log message colored.
Arguments:
message(string): The log message to be colored
silence_mode(bool): Should silence mode be used
color(string): The color of the log message
Returns:
The colored log message
#}
{%- set color_code_start = '\n\033[0;' -%}

{%- if color == 'green' -%}
{%- set color_code = '32m' -%}
{%- elif color == 'red' -%}
{%- set color_code = '31m' -%}
{%- elif color == 'yellow' -%}
{%- set color_code = '33m' -%}
{%- endif -%}

{{- log(this.identifier ~ ' log:' ~ color_code_start ~ color_code ~ message ~ '\033[00m', not silence_mode) -}}
{%- endmacro -%}