Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added lineage in cli e2e #19216

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ingestion/tests/cli_e2e/base/config_builders/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ def build(self) -> dict:
return self.config


class LineageConfigBuilder(BaseBuilder):
"""Builder class for the Lineage config
"""

# pylint: disable=invalid-name
def __init__(self, config: dict, config_args: dict) -> None:
super().__init__(config, config_args)
self.resultLimit = self.config_args.get("resultLimit", 1000)
self.queryLogDuration = self.config_args.get("queryLogDuration", 1)

# pylint: enable=invalid-name
def build(self) -> dict:
"""build lineage config"""
self.config["source"]["type"] = self.config_args["source"]
self.config["source"]["sourceConfig"] = {
"config": {
"type": "DatabaseLineage",
"queryLogDuration": 1,
"resultLimit": 10000,
"processQueryLineage": False,
"processStoredProcedureLineage": False,
akashverma0786 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return self.config


class AutoClassificationConfigBuilder(BaseBuilder):
"""Builder class for the AutoClassification config"""

Expand Down Expand Up @@ -206,6 +232,7 @@ def builder_factory(builder, config: dict, config_args: dict):
"""Factory method to return the builder class"""
builder_classes = {
E2EType.PROFILER.value: ProfilerConfigBuilder,
E2EType.LINEAGE.value: LineageConfigBuilder,
E2EType.DATA_QUALITY.value: DataQualityConfigBuilder,
E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder,
E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder,
Expand Down
1 change: 1 addition & 0 deletions ingestion/tests/cli_e2e/base/e2e_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class E2EType(Enum):

INGEST = "ingest"
PROFILER = "profiler"
LINEAGE = "lineage"
PROFILER_PROCESSOR = "profiler-processor"
AUTO_CLASSIFICATION = "auto-classification"
DATA_QUALITY = "test"
Expand Down
21 changes: 21 additions & 0 deletions ingestion/tests/cli_e2e/base/test_cli_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ def test_lineage(self) -> None:

This test will need to be implemented on the database specific test classes
"""
self.delete_table_and_view()
self.create_table_and_view()
self.build_config_file(
E2EType.INGEST_DB_FILTER_SCHEMA,
{"includes": self.get_includes_schemas()},
)
self.run_command()
self.build_config_file(
E2EType.LINEAGE,
{"source": "mysql-lineage"},
)
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_test_lineage(source_status, sink_status)

@pytest.mark.order(12)
def test_profiler_with_time_partition(self) -> None:
Expand Down Expand Up @@ -325,6 +339,13 @@ def assert_for_vanilla_ingestion(
self, source_status: Status, sink_status: Status
) -> None:
raise NotImplementedError()

@abstractmethod
def assert_for_test_lineage(
self, source_status: Status, sink_status: Status
) -> None:
raise NotImplementedError()


@abstractmethod
def assert_for_table_with_profiler(
Expand Down
20 changes: 20 additions & 0 deletions ingestion/tests/cli_e2e/common/test_cli_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ def assert_for_table_with_profiler(
# of https://github.com/open-metadata/OpenMetadata/pull/18558
# we need to introduce Lineage E2E base and add view lineage check there.


def assert_for_test_lineage(
self, source_status: Status, sink_status: Status
):
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(len(source_status.warnings), 0)
self.assertEqual(len(sink_status.failures), 0)
self.assertEqual(len(sink_status.warnings), 0)
self.assertGreaterEqual(len(sink_status.records), 1)
lineage_data = self.retrieve_lineage(self.fqn_created_table())
retrieved_view_column_lineage_count = len(lineage_data['downstreamEdges'][0]['lineageDetails']['columnsLineage'])
self.assertEqual(retrieved_view_column_lineage_count, self.view_column_lineage_count())

retrieved_lineage_node = lineage_data['nodes'][0]['fullyQualifiedName']
self.assertEqual(retrieved_lineage_node, self.expected_lineage_node())

def assert_auto_classification_sample_data(
self, source_status: Status, sink_status: Status
):
Expand Down Expand Up @@ -205,6 +221,10 @@ def inserted_rows_count(self) -> int:
@abstractmethod
def view_column_lineage_count(self) -> int:
raise NotImplementedError()

@abstractmethod
def expected_lineage_node(self) -> str:
raise NotImplementedError()

@staticmethod
@abstractmethod
Expand Down
3 changes: 3 additions & 0 deletions ingestion/tests/cli_e2e/test_cli_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def inserted_rows_count(self) -> int:

def view_column_lineage_count(self) -> int:
return 22

def expected_lineage_node(self) -> str:
return 'local_mysql.default.openmetadata_db.view_persons'

@staticmethod
def fqn_created_table() -> str:
Expand Down
24 changes: 24 additions & 0 deletions ingestion/tests/cli_e2e/test_cli_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def view_column_lineage_count(self) -> int:
which does not propagate column lineage
"""
return 12

def expected_lineage_node(self) -> str:
return 'e2e_oracle.default.admin.admin_emp_view'

@staticmethod
def fqn_created_table() -> str:
Expand Down Expand Up @@ -262,6 +265,27 @@ def test_table_filter_excludes(self) -> None:
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_tables_excludes(source_status, sink_status)

@pytest.mark.order(11)
def test_lineage(self) -> None:
"""10. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage

This test will need to be implemented on the database specific test classes
"""
self.delete_table_and_view()
self.create_table_and_view()
self.build_config_file(
E2EType.INGEST_DB_FILTER_SCHEMA,
{"includes": self.get_includes_schemas()},
)
self.run_command()
self.build_config_file(
E2EType.LINEAGE,
{"source": "oracle-lineage"},
)
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_test_lineage(source_status, sink_status)

def assert_for_vanilla_ingestion(
self, source_status: Status, sink_status: Status
) -> None:
Expand Down
Loading