Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#20 improve selection #22

Merged
merged 10 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ currently being supported with security updates.

| Version | Supported |
| ------- | ------------------ |
| 1.1.x | :white_check_mark: |
| 1.0.x | :white_check_mark: |
| < 1.0 | :white_check_mark: |

Expand Down
142 changes: 142 additions & 0 deletions dbterd/adapters/algos/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import copy

from dbterd.adapters.algos.meta import Column, Table


def get_tables(manifest, catalog):
"""Extract tables from dbt artifacts

Args:
manifest (dict): dbt manifest json
catalog (dict): dbt catalog json

Returns:
List[Table]: All tables parsed from dbt artifacts
"""
tables = []

if hasattr(manifest, "nodes"):
for table_name, node in manifest.nodes.items():
if (
table_name.startswith("model.")
or table_name.startswith("seed.")
or table_name.startswith("snapshot.")
):
catalog_node = catalog.nodes.get(table_name)
table = get_table(
table_name=table_name, manifest_node=node, catalog_node=catalog_node
)
tables.append(table)

if hasattr(manifest, "sources"):
for table_name, source in manifest.sources.items():
if table_name.startswith("source"):
catalog_source = catalog.sources.get(table_name)
table = get_table(
table_name=table_name,
manifest_node=source,
catalog_node=catalog_source,
)
tables.append(table)

return tables


def enrich_tables_from_relationships(tables, relationships):
"""Fullfill columns in Table due to `select *`

Args:
tables (List[Table]): List of Tables
relationships (List[Ref]): List of Relationships between Tables

Returns:
List[Table]: Enriched tables
"""
copied_tables = copy.deepcopy(tables)
for relationship in relationships:
for table in copied_tables:
table_columns = [x.name.lower() for x in table.columns]
if (
table.name == relationship.table_map[0]
and relationship.column_map[0].lower() not in table_columns
):
table.columns.append(Column(name=relationship.column_map[0]))
if (
table.name == relationship.table_map[1]
and relationship.column_map[1].lower() not in table_columns
):
table.columns.append(Column(name=relationship.column_map[1]))
return copied_tables


def get_table(table_name, manifest_node, catalog_node=None):
"""Construct a single Table object

Args:
table_name (str): Table name
manifest_node (dict): Manifest node
catalog_node (dict, optional): Catalog node. Defaults to None.

Returns:
Table: Parsed table
"""
table = Table(
name=table_name,
raw_sql=get_compiled_sql(manifest_node),
database=manifest_node.database.lower(),
schema=manifest_node.schema_.lower(),
columns=[],
resource_type=table_name.split(".")[0],
)

if catalog_node:
for column, metadata in catalog_node.columns.items():
table.columns.append(
Column(
name=str(column).lower(),
data_type=str(metadata.type).lower(),
)
)

for column_name, column_metadata in manifest_node.columns.items():
column_name = column_name.strip('"')
if not any(c.name.lower() == column_name.lower() for c in table.columns):
table.columns.append(
Column(
name=column_name.lower(),
data_type=str(column_metadata.data_type or "unknown").lower(),
)
)

if not table.columns:
table.columns.append(Column())

return table


def get_compiled_sql(manifest_node):
"""Retrieve compiled SQL from manifest node

Args:
manifest_node (dict): Manifest node

Returns:
str: Compiled SQL
"""
if hasattr(manifest_node, "compiled_sql"): # up to v6
return manifest_node.compiled_sql

if hasattr(manifest_node, "compiled_code"): # from v7
return manifest_node.compiled_code

if hasattr(
manifest_node, "columns"
): # nodes having no compiled but just list of columns
return """select
{columns}
from {table}""".format(
columns=",\n".join([f"{x}" for x in manifest_node.columns]),
table=f"{manifest_node.database}.{manifest_node.schema}.undefined",
)

return manifest_node.raw_sql # fallback to raw dbt code
75 changes: 75 additions & 0 deletions dbterd/adapters/algos/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import sys
from fnmatch import fnmatch
from typing import List

from dbterd.adapters.algos.meta import Table


def is_selected_table(
table: Table,
select_rules: List[str] = [],
exclude_rules: List[str] = [],
resource_types: List[str] = ["model"],
):
"""Check if Table is selected with defined selection criteria

Args:
table (Table): Table object
select_rules (List[str]): Selection rules. Defaults to [].
exclude_rules (List[str], optional): Exclusion rules. Defaults to [].
resource_types (List[str], optional): Selected resource types. Defaults to [].

Returns:
bool: True if Table is selected. False if Tables is excluded
"""
# Selection
selected = True
if select_rules:
selected = any([evaluate_rule(table=table, rule=rule) for rule in select_rules])
if resource_types:
selected = selected and table.resource_type in resource_types
# Exclusion
excluded = False
if exclude_rules:
excluded = any(
[evaluate_rule(table=table, rule=rule) for rule in exclude_rules]
)

return selected and not excluded


def evaluate_rule(table: Table, rule: str):
and_parts = rule.split(",")
results = []
for x in and_parts:
rule_parts = x.lower().split(":")
type, rule = "name", rule_parts[0]
if len(rule_parts) > 1:
type, rule = tuple(rule_parts[:2])
selected_func = getattr(sys.modules[__name__], f"__is_satisfied_by_{type}")
results.append(selected_func(table=table, rule=rule))
return all(results)


def __is_satisfied_by_name(table: Table, rule: str = ""):
if not rule:
return True
return table.name.startswith(rule)


def __is_satisfied_by_schema(table: Table, rule: str = ""):
if not rule:
return True

parts = rule.split(".")
selected_schema = parts[-1]
selected_database = parts[0] if len(parts) > 1 else table.database
return f"{table.database}.{table.schema}".startswith(
f"{selected_database}.{selected_schema}"
)


def __is_satisfied_by_wildcard(table: Table, rule: str = "*"):
if not rule:
return True
return fnmatch(table.name, rule)
13 changes: 3 additions & 10 deletions dbterd/adapters/algos/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@

@dataclass
class Column:
"""
Sample DBML: id varchar [primary key]
"""
"""Parsed Column object"""

name: str = "unknown"
data_type: str = "unknown"


@dataclass
class Table:
"""
Sample DBML:
Table posts {
id varchar [primary key]
}
"""
"""Parsed Table object"""

name: str
database: str
Expand All @@ -31,7 +24,7 @@ class Table:

@dataclass
class Ref:
"""Sample DBML: Ref: posts.user_id > users.id"""
"""Parsed Relationship object"""

name: str
table_map: Tuple[str, str]
Expand Down
Loading