Skip to content

Commit

Permalink
Feature/#20 improve selection (#22)
Browse files Browse the repository at this point in the history
* feat: algo base funcs

* refactor: add filter

* refactor: cosmetic

* docs: add cosmetic

* feat: make selection options to mulitple times

* fix: remove unused package

* feat: support multiple selections and apply wildcard

* feat: support OR logic

* feat: Support AND, OR logic and fulfill tests

* docs: add selection rules guide and deprecated the legacy option
  • Loading branch information
datnguye authored Apr 22, 2023
1 parent a34b848 commit 7ee2467
Show file tree
Hide file tree
Showing 22 changed files with 676 additions and 342 deletions.
1 change: 1 addition & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ currently being supported with security updates.

| Version | Supported |
| ------- | ------------------ |
| 1.1.x | :white_check_mark: |
| 1.0.x | :white_check_mark: |
| < 1.0 | :white_check_mark: |

Expand Down
142 changes: 142 additions & 0 deletions dbterd/adapters/algos/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import copy

from dbterd.adapters.algos.meta import Column, Table


def get_tables(manifest, catalog):
"""Extract tables from dbt artifacts
Args:
manifest (dict): dbt manifest json
catalog (dict): dbt catalog json
Returns:
List[Table]: All tables parsed from dbt artifacts
"""
tables = []

if hasattr(manifest, "nodes"):
for table_name, node in manifest.nodes.items():
if (
table_name.startswith("model.")
or table_name.startswith("seed.")
or table_name.startswith("snapshot.")
):
catalog_node = catalog.nodes.get(table_name)
table = get_table(
table_name=table_name, manifest_node=node, catalog_node=catalog_node
)
tables.append(table)

if hasattr(manifest, "sources"):
for table_name, source in manifest.sources.items():
if table_name.startswith("source"):
catalog_source = catalog.sources.get(table_name)
table = get_table(
table_name=table_name,
manifest_node=source,
catalog_node=catalog_source,
)
tables.append(table)

return tables


def enrich_tables_from_relationships(tables, relationships):
"""Fullfill columns in Table due to `select *`
Args:
tables (List[Table]): List of Tables
relationships (List[Ref]): List of Relationships between Tables
Returns:
List[Table]: Enriched tables
"""
copied_tables = copy.deepcopy(tables)
for relationship in relationships:
for table in copied_tables:
table_columns = [x.name.lower() for x in table.columns]
if (
table.name == relationship.table_map[0]
and relationship.column_map[0].lower() not in table_columns
):
table.columns.append(Column(name=relationship.column_map[0]))
if (
table.name == relationship.table_map[1]
and relationship.column_map[1].lower() not in table_columns
):
table.columns.append(Column(name=relationship.column_map[1]))
return copied_tables


def get_table(table_name, manifest_node, catalog_node=None):
"""Construct a single Table object
Args:
table_name (str): Table name
manifest_node (dict): Manifest node
catalog_node (dict, optional): Catalog node. Defaults to None.
Returns:
Table: Parsed table
"""
table = Table(
name=table_name,
raw_sql=get_compiled_sql(manifest_node),
database=manifest_node.database.lower(),
schema=manifest_node.schema_.lower(),
columns=[],
resource_type=table_name.split(".")[0],
)

if catalog_node:
for column, metadata in catalog_node.columns.items():
table.columns.append(
Column(
name=str(column).lower(),
data_type=str(metadata.type).lower(),
)
)

for column_name, column_metadata in manifest_node.columns.items():
column_name = column_name.strip('"')
if not any(c.name.lower() == column_name.lower() for c in table.columns):
table.columns.append(
Column(
name=column_name.lower(),
data_type=str(column_metadata.data_type or "unknown").lower(),
)
)

if not table.columns:
table.columns.append(Column())

return table


def get_compiled_sql(manifest_node):
"""Retrieve compiled SQL from manifest node
Args:
manifest_node (dict): Manifest node
Returns:
str: Compiled SQL
"""
if hasattr(manifest_node, "compiled_sql"): # up to v6
return manifest_node.compiled_sql

if hasattr(manifest_node, "compiled_code"): # from v7
return manifest_node.compiled_code

if hasattr(
manifest_node, "columns"
): # nodes having no compiled but just list of columns
return """select
{columns}
from {table}""".format(
columns=",\n".join([f"{x}" for x in manifest_node.columns]),
table=f"{manifest_node.database}.{manifest_node.schema}.undefined",
)

return manifest_node.raw_sql # fallback to raw dbt code
75 changes: 75 additions & 0 deletions dbterd/adapters/algos/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import sys
from fnmatch import fnmatch
from typing import List

from dbterd.adapters.algos.meta import Table


def is_selected_table(
table: Table,
select_rules: List[str] = [],
exclude_rules: List[str] = [],
resource_types: List[str] = ["model"],
):
"""Check if Table is selected with defined selection criteria
Args:
table (Table): Table object
select_rules (List[str]): Selection rules. Defaults to [].
exclude_rules (List[str], optional): Exclusion rules. Defaults to [].
resource_types (List[str], optional): Selected resource types. Defaults to [].
Returns:
bool: True if Table is selected. False if Tables is excluded
"""
# Selection
selected = True
if select_rules:
selected = any([evaluate_rule(table=table, rule=rule) for rule in select_rules])
if resource_types:
selected = selected and table.resource_type in resource_types
# Exclusion
excluded = False
if exclude_rules:
excluded = any(
[evaluate_rule(table=table, rule=rule) for rule in exclude_rules]
)

return selected and not excluded


def evaluate_rule(table: Table, rule: str):
and_parts = rule.split(",")
results = []
for x in and_parts:
rule_parts = x.lower().split(":")
type, rule = "name", rule_parts[0]
if len(rule_parts) > 1:
type, rule = tuple(rule_parts[:2])
selected_func = getattr(sys.modules[__name__], f"__is_satisfied_by_{type}")
results.append(selected_func(table=table, rule=rule))
return all(results)


def __is_satisfied_by_name(table: Table, rule: str = ""):
if not rule:
return True
return table.name.startswith(rule)


def __is_satisfied_by_schema(table: Table, rule: str = ""):
if not rule:
return True

parts = rule.split(".")
selected_schema = parts[-1]
selected_database = parts[0] if len(parts) > 1 else table.database
return f"{table.database}.{table.schema}".startswith(
f"{selected_database}.{selected_schema}"
)


def __is_satisfied_by_wildcard(table: Table, rule: str = "*"):
if not rule:
return True
return fnmatch(table.name, rule)
13 changes: 3 additions & 10 deletions dbterd/adapters/algos/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@

@dataclass
class Column:
"""
Sample DBML: id varchar [primary key]
"""
"""Parsed Column object"""

name: str = "unknown"
data_type: str = "unknown"


@dataclass
class Table:
"""
Sample DBML:
Table posts {
id varchar [primary key]
}
"""
"""Parsed Table object"""

name: str
database: str
Expand All @@ -31,7 +24,7 @@ class Table:

@dataclass
class Ref:
"""Sample DBML: Ref: posts.user_id > users.id"""
"""Parsed Relationship object"""

name: str
table_map: Tuple[str, str]
Expand Down
Loading

0 comments on commit 7ee2467

Please sign in to comment.