diff --git a/pyatlan/model/enums.py b/pyatlan/model/enums.py index 4ba6fa0e..48afc28f 100644 --- a/pyatlan/model/enums.py +++ b/pyatlan/model/enums.py @@ -2197,6 +2197,28 @@ class WorkflowPackage(str, Enum): TRINO = "atlan-trino" +class CustomWorkflowPackage(str, Enum): + ASSET_IMPORT = "csa-asset-import" + ASSET_EXPORT = "csa-asset-export" + RELATIONAL_ASSETS_BUILDER = "csa-relational-assets-builder" + + +class AssetInputHandling(str, Enum): + UPSERT = "upsert" + PARTIAL = "partial" + UPDATE = "update" + + +class AssetDeltaHandling(str, Enum): + FULL_REPLACEMENT = "full" + INCREMENTAL = "delta" + + +class AssetRemovalType(str, Enum): + ARCHIVE = "archive" + PURGE = "purge" + + class UTMTags(str, Enum): # PAGE_ entries indicate where the action was taken. # Search was made from the home page. diff --git a/pyatlan/model/packages/__init__.py b/pyatlan/model/packages/__init__.py index ff5cbd74..d8b11539 100644 --- a/pyatlan/model/packages/__init__.py +++ b/pyatlan/model/packages/__init__.py @@ -1,4 +1,6 @@ # flake8: noqa +from .asset_export_basic import AssetExportBasic +from .asset_import import AssetImport from .big_query_crawler import BigQueryCrawler from .confluent_kafka_crawler import ConfluentKafkaCrawler from .connection_delete import ConnectionDelete @@ -12,3 +14,21 @@ from .snowflake_crawler import SnowflakeCrawler from .snowflake_miner import SnowflakeMiner from .tableau_crawler import TableauCrawler + +__all__ = [ + "BigQueryCrawler", + "ConfluentKafkaCrawler", + "ConnectionDelete", + "DbtCrawler", + "DynamoDBCrawler", + "GlueCrawler", + "PostgresCrawler", + "PowerBICrawler", + "SQLServerCrawler", + "SigmaCrawler", + "SnowflakeCrawler", + "SnowflakeMiner", + "TableauCrawler", + "AssetImport", + "AssetExportBasic", +] diff --git a/pyatlan/model/packages/asset_export_basic.py b/pyatlan/model/packages/asset_export_basic.py new file mode 100644 index 00000000..c00cd57e --- /dev/null +++ b/pyatlan/model/packages/asset_export_basic.py @@ -0,0 +1,325 @@ +from __future__ import annotations + +from typing import List, Optional + +from pyatlan.model.enums import CustomWorkflowPackage +from pyatlan.model.packages.base.custom_package import AbstractCustomPackage +from pyatlan.model.workflow import WorkflowMetadata + + +class AssetExportBasic(AbstractCustomPackage): + """ + Base configuration for the Asset Export package. + """ + + _NAME = "asset-export" + _PACKAGE_NAME = f"@csa/{_NAME}" + _PACKAGE_PREFIX = CustomWorkflowPackage.ASSET_EXPORT.value + _PACKAGE_ICON = "http://assets.atlan.com/assets/ph-cloud-arrow-down-light.svg" + _PACKAGE_LOGO = "http://assets.atlan.com/assets/ph-cloud-arrow-down-light.svg" + + def __init__( + self, + ): + super().__init__() + self._email_addresses = None + self._delivery_type = None + self._export_scope = None + self._parameters = [] + + def glossaries_only( + self, include_archived: Optional[bool] = None + ) -> AssetExportBasic: + """ + Set up the package to export only glossaries. + + :param include_archived: Whether to include archived assets in the export (true) or only active assets (false). + + :returns: package, set up to export only glossaries + """ + self._export_scope = "GLOSSARIES_ONLY" + self._parameters.append({"name": "export_scope", "value": self._export_scope}) + self._parameters.append( + { + "name": "include_archived", + "value": include_archived, + } + ) + return self + + def enriched_only( + self, + prefix: str, + include_description: Optional[str] = None, + include_glossaries: Optional[bool] = None, + include_data_products: Optional[bool] = None, + include_archived: Optional[bool] = None, + ) -> AssetExportBasic: + """ + Set up the package to export only enriched assets. + + :param prefix: Starting value for a qualifiedName that will determine which assets to export. + :param include_description: Whether to extract only user-entered description (false), or to also include + system-level description (true). + :param include_glossaries: Whether glossaries (and their terms and + categories) should be exported, too. + :param include_data_products: Whether data products (and their domains) + should be exported, too. + :param include_archived: Whether to include archived assets in the export (true) or + only active assets (false). + + :returns: package, set up to export only enriched assets + """ + self._export_scope = "ENRICHED_ONLY" + self._parameters.append({"name": "export_scope", "value": self._export_scope}) + params = { + "qn_prefix": prefix, + "include_description": include_description, + "include_glossaries": include_glossaries, + "include_data_products": include_data_products, + "include_archived": include_archived, + } + self._add_optional_params(params) + return self + + def products_only( + self, include_archived: Optional[bool] = None + ) -> AssetExportBasic: + """ + Set up the package to export only data products. + + :param include_archived: Whether to include archived assets in the export (true) or only active assets (false). + + :returns: package, set up to export only data products + """ + self._export_scope = "PRODUCTS_ONLY" + self._parameters.append({"name": "export_scope", "value": self._export_scope}) + self._parameters.append( + { + "name": "include_archived", + "value": include_archived, + } + ) + return self + + def all_assets( + self, + prefix: str, + include_description: Optional[str] = None, + include_glossaries: Optional[bool] = None, + include_data_products: Optional[bool] = None, + include_archived: Optional[bool] = None, + ) -> AssetExportBasic: + """ + Set up the package to export all assets. + + :param prefix: Starting value for a qualifiedName that will determine which assets to export. + :param include_description: Whether to extract only user-entered description (false), or to also include + system-level description (true). + :param include_glossaries: Whether glossaries (and their terms and + categories) should be exported, too. + :param include_data_products: Whether data products (and their domains) + should be exported, too. + :param include_archived: Whether to include archived assets in the export (true) or + only active assets (false). + + :returns: package, set up to export all assets + """ + self._export_scope = "ALL" + self._parameters.append({"name": "export_scope", "value": self._export_scope}) + params = { + "qn_prefix": prefix, + "include_description": include_description, + "include_glossaries": include_glossaries, + "include_data_products": include_data_products, + "include_archived": include_archived, + } + self._add_optional_params(params) + + return self + + def direct(self) -> AssetExportBasic: + """ + Set up the package to deliver the export via direct download. + + :returns: package, set up to deliver the export via direct download + """ + self._delivery_type = "DIRECT" + self._add_delivery_parameters() + return self + + def email(self, email_addresses: List[str]) -> AssetExportBasic: + """ + Set up the package to deliver the export via email. + + :param email_addresses: List of email addresses to send the export to. + + :returns: package, set up to deliver the export via email + """ + self._delivery_type = "EMAIL" + self._email_addresses = email_addresses + self._add_delivery_parameters() + + return self + + def object_store(self, prefix: Optional[str] = None) -> AssetExportBasic: + """ + Set up the package to export to an object storage location. + + :param prefix: The directory (path) within the object store to upload the exported file. + + :returns: package, set up to export metadata to an object store + """ + self._delivery_type = "CLOUD" + self._add_delivery_parameters() + self._parameters.append({"name": "target_prefix", "value": prefix}) + self._parameters.append({"name": "cloud_target", "value": "{{credentialGuid}}"}) + return self + + def s3( + self, access_key: str, secret_key: str, region: str, bucket: str + ) -> AssetExportBasic: + """ + Set up package to export to S3. + + :param access_key: AWS access key + :param secret_key: AWS secret key + :param region: AWS region + :param bucket: S3 bucket to upload the export file to + + :returns: package, set up to export metadata to S3 + """ + self._credentials_body.update( + { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "s3", + "username": access_key, + "password": secret_key, + "extra": { + "region": region, + "s3_bucket": bucket, + }, + "connector_config_name": "csa-connectors-objectstore", + } + ) + return self + + def gcs( + self, project_id: str, service_account_json: str, bucket: str + ) -> AssetExportBasic: + """ + Set up package to export to Google Cloud Storage. + + :param project_id: ID of GCP project + :param service_account_json: service account credentials in JSON format + :param bucket: bucket to upload the export file to + + :returns: package, set up to export metadata to GCS + """ + self._credentials_body.update( + { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "gcs", + "username": project_id, + "password": service_account_json, + "extra": { + "gcs_bucket": bucket, + }, + "connector_config_name": "csa-connectors-objectstore", + } + ) + return self + + def adls( + self, + client_id: str, + client_secret: str, + tenant_id: str, + account_name: str, + container: str, + ) -> AssetExportBasic: + """ + Set up package to export to Azure Data Lake Storage. + + :param client_id: unique application (client) ID assigned by Azure AD when the app was registered + :param client_secret: client secret for authentication + :param tenant_id: unique ID of the Azure Active Directory instance + :param account_name: name of the storage account + :param container: container to upload the export file to + + :returns: package, set up to export metadata to ADLS + """ + self._credentials_body.update( + { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "adls", + "username": client_id, + "password": client_secret, + "extra": { + "azure_tenant_id": tenant_id, + "storage_account_name": account_name, + "adls_container": container, + }, + "connector_config_name": "csa-connectors-objectstore", + } + ) + return self + + def _add_delivery_parameters(self): + """ + Add delivery parameters to the parameters list. + """ + self._parameters.append( + { + "name": "delivery_type", + "value": self._delivery_type, + } + ) + if self._delivery_type == "EMAIL" and self._email_addresses: + self._parameters.append( + { + "name": "email_addresses", + "value": ",".join( + self._email_addresses + ), # Join the email addresses if they are in a list + } + ) + + def _get_metadata(self) -> WorkflowMetadata: + return WorkflowMetadata( + labels={ + "orchestration.atlan.com/certified": "true", + "orchestration.atlan.com/preview": "true", + "orchestration.atlan.com/source": self._NAME, + "orchestration.atlan.com/sourceCategory": "utility", + "orchestration.atlan.com/type": "custom", + "orchestration.atlan.com/verified": "true", + "package.argoproj.io/installer": "argopm", + "package.argoproj.io/name": f"a-t-rcsas-l-a-s-h{self._NAME}", + "package.argoproj.io/registry": "httpsc-o-l-o-ns-l-a-s-hs-l-a-s-hpackages.atlan.com", + "orchestration.atlan.com/atlan-ui": "true", + }, + annotations={ + "orchestration.atlan.com/allowSchedule": "true", + "orchestration.atlan.com/categories": "kotlin,utility", + "orchestration.atlan.com/dependentPackage": "", + "orchestration.atlan.com/docsUrl": f"https://solutions.atlan.com/{self._NAME}", + "orchestration.atlan.com/emoji": "🚀", + "orchestration.atlan.com/icon": self._PACKAGE_ICON, + "orchestration.atlan.com/logo": self._PACKAGE_LOGO, # noqa + "orchestration.atlan.com/name": "Asset Export (Basic)", + "package.argoproj.io/author": "Atlan CSA", + "package.argoproj.io/description": "Export assets with all enrichment that could be made against them " + "via the Atlan UI.", + "package.argoproj.io/homepage": f"https://packages.atlan.com/-/web/detail/@csa/{self._PACKAGE_NAME}", + "package.argoproj.io/keywords": "[\"kotlin\",\"utility\"]", # fmt: skip + "package.argoproj.io/name": self._PACKAGE_NAME, + "package.argoproj.io/parent": ".", + "package.argoproj.io/registry": "https://packages.atlan.com", + "package.argoproj.io/repository": "git+https://github.com/atlanhq/marketplace-packages.git", + "package.argoproj.io/support": "support@atlan.com", + "orchestration.atlan.com/atlanName": f"csa-{self._NAME}-{self._epoch}", + }, + name=f"csa-{self._NAME}-{self._epoch}", + namespace="default", + ) diff --git a/pyatlan/model/packages/asset_import.py b/pyatlan/model/packages/asset_import.py new file mode 100644 index 00000000..db809d19 --- /dev/null +++ b/pyatlan/model/packages/asset_import.py @@ -0,0 +1,387 @@ +from __future__ import annotations + +from json import dumps +from typing import List, Optional, Union + +from pyatlan.model.enums import AssetInputHandling, CustomWorkflowPackage +from pyatlan.model.fields.atlan_fields import AtlanField +from pyatlan.model.packages.base.custom_package import AbstractCustomPackage +from pyatlan.model.workflow import WorkflowMetadata + + +class AssetImport(AbstractCustomPackage): + """ + Base configuration for a new Asset Import package. + """ + + _NAME = "asset-import" + _PACKAGE_NAME = f"@csa/{_NAME}" + _PACKAGE_PREFIX = CustomWorkflowPackage.ASSET_IMPORT.value + _PACKAGE_ICON = "http://assets.atlan.com/assets/ph-cloud-arrow-up-light.svg" + _PACKAGE_LOGO = "http://assets.atlan.com/assets/ph-cloud-arrow-up-light.svg" + + def __init__( + self, + ): + self._assets_advanced = False + self._glossaries_advanced = False + self._data_product_advanced = False + super().__init__() + + def object_store(self) -> AssetImport: + """ + Set up the package to import + metadata directly from the object store. + """ + self._parameters.append({"name": "import_type", "value": "CLOUD"}) + self._parameters.append({"name": "cloud_source", "value": "{{credentialGuid}}"}) + return self + + def s3( + self, + access_key: str, + secret_key: str, + region: str, + bucket: str, + ) -> AssetImport: + """ + Set up package to import metadata from S3. + + :param access_key: AWS access key + :param secret_key: AWS secret key + :param region: AWS region + :param bucket: bucket to retrieve object store object from + + :returns: package, set up to import metadata from S3 + """ + local_creds = { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "s3", + "username": access_key, + "password": secret_key, + "extra": { + "region": region, + "s3_bucket": bucket, + }, + "connector_config_name": "csa-connectors-objectstore", + } + self._credentials_body.update(local_creds) + return self + + def gcs( + self, project_id: str, service_account_json: str, bucket: str + ) -> AssetImport: + """ + Set up package to import metadata from GCS. + + :param project_id: ID of GCP project + :param service_account_json: service account credentials in JSON format + :param bucket: bucket to retrieve object store object from + + :returns: Package set up to import metadata from GCS + """ + local_creds = { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "gcs", + "username": project_id, + "password": service_account_json, + "extra": { + "gcs_bucket": bucket, + }, + "connector_config_name": "csa-connectors-objectstore", + } + self._credentials_body.update(local_creds) + return self + + def adls( + self, + client_id: str, + client_secret: str, + tenant_id: str, + account_name: str, + container: str, + ) -> AssetImport: + """ + Set up package to import metadata from ADLS. + + :param client_id: unique application (client) ID assigned by Azure AD when the app was registered + :param client_secret: client secret for authentication + :param tenant_id: unique ID of the Azure Active Directory instance + :param account_name: name of the storage account + :param container: container to retrieve object store objects from + + :returns: package, set up to import metadata from ADLS + """ + local_creds = { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "adls", + "username": client_id, + "password": client_secret, + "extra": { + "azure_tenant_id": tenant_id, + "storage_account_name": account_name, + "adls_container": container, + }, + "connector_config_name": "csa-connectors-objectstore", + } + self._credentials_body.update(local_creds) + return self + + def assets( + self, + prefix: str, + object_key: str, + input_handling: AssetInputHandling = AssetInputHandling.UPDATE, + ) -> AssetImport: + """ + Set up package to import assets. + + :param prefix: directory (path) within the object store from + which to retrieve the file containing asset metadata + :param object_key: object key (filename), + including its extension, within the object store and prefix + :param input_handling: specifies whether to allow the creation + of new assets from the input CSV (full or partial assets) + or only update existing assets in Atlan + + :returns: package, configured to import assets + """ + self._parameters.append({"name": "assets_prefix", "value": prefix}) + self._parameters.append({"name": "assets_key", "value": object_key}) + self._parameters.append( + {"name": "assets_upsert_semantic", "value": input_handling} + ) + return self + + def assets_advanced( + self, + remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None, + fail_on_errors: Optional[bool] = None, + case_sensitive_match: Optional[bool] = None, + is_table_view_agnostic: Optional[bool] = None, + field_separator: Optional[str] = None, + batch_size: Optional[int] = None, + ) -> AssetImport: + """ + Set up package to import assets with advanced configuration. + + :param remove_attributes: list of attributes to clear (remove) + from assets if their value is blank in the provided file. + :param fail_on_errors: specifies whether an invalid value + in a field should cause the import to fail (`True`) or + log a warning, skip that value, and proceed (`False`). + :param case_sensitive_match: indicates whether to use + case-sensitive matching when running in update-only mode (`True`) + or to try case-insensitive matching (`False`). + :param is_table_view_agnostic: specifies whether to treat + tables, views, and materialized views as interchangeable (`True`) + or to strictly adhere to specified types in the input (`False`). + :param field_separator: character used to separate + fields in the input file (e.g., ',' or ';'). + :param batch_size: maximum number of rows + to process at a time (per API request). + + :returns: package, configured to import + assets with advanced configuration. + """ + if isinstance(remove_attributes, list) and all( + isinstance(field, AtlanField) for field in remove_attributes + ): + remove_attributes = [field.atlan_field_name for field in remove_attributes] # type: ignore + params = { + "assets_attr_to_overwrite": dumps(remove_attributes), + "assets_fail_on_errors": fail_on_errors, + "assets_case_sensitive": case_sensitive_match, + "assets_table_view_agnostic": is_table_view_agnostic, + "assets_field_separator": field_separator, + "assets_batch_size": batch_size, + } + self._add_optional_params(params) + self._assets_advanced = True + return self + + def glossaries( + self, + prefix: str, + object_key: str, + input_handling: AssetInputHandling = AssetInputHandling.UPDATE, + ) -> AssetImport: + """ + Set up package to import glossaries. + + :param prefix: directory (path) within the object store from + which to retrieve the file containing glossaries, categories and terms + :param object_key: object key (filename), + including its extension, within the object store and prefix + :param input_handling: specifies whether to allow the creation of new glossaries, + categories and terms from the input CSV, or ensure these are only updated + if they already exist in Atlan. + + :returns: package, configured to import glossaries, categories and terms. + """ + self._parameters.append({"name": "glossaries_prefix", "value": prefix}) + self._parameters.append({"name": "glossaries_key", "value": object_key}) + self._parameters.append( + {"name": "glossaries_upsert_semantic", "value": input_handling} + ) + return self + + def glossaries_advanced( + self, + remove_attributes: Optional[List[str]] = None, + fail_on_errors: Optional[bool] = None, + field_separator: Optional[str] = None, + batch_size: Optional[int] = None, + ) -> AssetImport: + """ + Set up package to import glossaries with advanced configuration. + + :param remove_attributes: list of attributes to clear (remove) + from assets if their value is blank in the provided file. + :param fail_on_errors: specifies whether an invalid value + in a field should cause the import to fail (`True`) or + log a warning, skip that value, and proceed (`False`). + :param field_separator: character used to separate + fields in the input file (e.g., ',' or ';'). + :param batch_size: maximum number of rows + to process at a time (per API request). + + :returns: package, configured to import + glossaries with advanced configuration. + """ + if isinstance(remove_attributes, list) and all( + isinstance(field, AtlanField) for field in remove_attributes + ): + remove_attributes = [field.atlan_field_name for field in remove_attributes] # type: ignore + params = { + "glossaries_attr_to_overwrite": str(remove_attributes), + "glossaries_fail_on_errors": fail_on_errors, + "glossaries_field_separator": field_separator, + "glossaries_batch_size": batch_size, + } + self._add_optional_params(params) + self._glossaries_advanced = True + return self + + def data_products( + self, + prefix: str, + object_key: str, + input_handling: AssetInputHandling = AssetInputHandling.UPDATE, + ) -> AssetImport: + """ + Set up package to import data products. + + :param prefix: directory (path) within the object store from + which to retrieve the file containing data domains, and data products + :param object_key: object key (filename), + including its extension, within the object store and prefix + :param input_handling: specifies whether to allow the creation of new glossaries, + categories and terms from the input CSV, or ensure these are only updated + if they already exist in Atlan. + + :returns: package, configured to import data domain and data products + """ + self._parameters.append({"name": "data_products", "value": prefix}) + self._parameters.append({"name": "data_products_key", "value": object_key}) + self._parameters.append( + {"name": "data_products_upsert_semantic", "value": input_handling} + ) + return self + + def data_product_advanced( + self, + remove_attributes: Optional[List[str]] = None, + fail_on_errors: Optional[bool] = None, + field_separator: Optional[str] = None, + batch_size: Optional[int] = None, + ) -> AssetImport: + """ + Set up package to import data domain + and data products with advanced configuration. + + :param remove_attributes: list of attributes to clear (remove) + from assets if their value is blank in the provided file. + :param fail_on_errors: specifies whether an invalid value + in a field should cause the import to fail (`True`) or + log a warning, skip that value, and proceed (`False`). + :param field_separator: character used to separate + fields in the input file (e.g., ',' or ';'). + :param batch_size: maximum number of rows + to process at a time (per API request). + + :returns: package, configured to import + data domain and data products with advanced configuration. + """ + if isinstance(remove_attributes, list) and all( + isinstance(field, AtlanField) for field in remove_attributes + ): + remove_attributes = [field.atlan_field_name for field in remove_attributes] # type: ignore + params = { + "data_product_attr_to_overwrite": str(remove_attributes), + "data_product_fail_on_errors": fail_on_errors, + "data_product_field_separator": field_separator, + "data_product_batch_size": batch_size, + } + self._add_optional_params(params) + self._data_product_advanced = True + return self + + def _set_required_metadata_params(self): + self._parameters.append( + dict( + name="assets_config", + value="advanced" if self._assets_advanced else "default", + ) + ) + self._parameters.append( + dict( + name="glossaries_config", + value="advanced" if self._glossaries_advanced else "default", + ) + ) + self._parameters.append( + dict( + name="data_products_config", + value="advanced" if self._data_product_advanced else "default", + ) + ) + + def _get_metadata(self) -> WorkflowMetadata: + self._set_required_metadata_params() + return WorkflowMetadata( + labels={ + "orchestration.atlan.com/certified": "true", + "orchestration.atlan.com/source": self._NAME, + "orchestration.atlan.com/sourceCategory": "utility", + "orchestration.atlan.com/type": "custom", + "orchestration.atlan.com/preview": "true", + "orchestration.atlan.com/verified": "true", + "package.argoproj.io/installer": "argopm", + "package.argoproj.io/name": f"a-t-ratlans-l-a-s-h{self._NAME}", + "package.argoproj.io/registry": "httpsc-o-l-o-ns-l-a-s-hs-l-a-s-hpackages.atlan.com", + "orchestration.atlan.com/atlan-ui": "true", + }, + annotations={ + "orchestration.atlan.com/allowSchedule": "true", + "orchestration.atlan.com/categories": "kotlin,utility", + "orchestration.atlan.com/dependentPackage": "", + "orchestration.atlan.com/docsUrl": f"https://solutions.atlan.com/{self._NAME}", + "orchestration.atlan.com/emoji": "\U0001f680", + "orchestration.atlan.com/icon": self._PACKAGE_ICON, + "orchestration.atlan.com/logo": self._PACKAGE_LOGO, # noqa + "orchestration.atlan.com/name": "Asset Import", + "package.argoproj.io/author": "Atlan CSA", + "package.argoproj.io/description": "Import assets from a CSV file.", + "package.argoproj.io/homepage": f"https://packages.atlan.com/-/web/detail/{self._PACKAGE_NAME}", + "package.argoproj.io/keywords": "[\"kotlin\",\"utility\"]", # fmt: skip + "package.argoproj.io/name": self._PACKAGE_NAME, + "package.argoproj.io/parent": ".", + "package.argoproj.io/registry": "https://packages.atlan.com", + "package.argoproj.io/repository": "git+https://github.com/atlanhq/marketplace-packages.git", + "package.argoproj.io/support": "support@atlan.com", + "orchestration.atlan.com/atlanName": f"csa-{self._NAME}-{self._epoch}", + }, + name=f"csa-{self._NAME}-{self._epoch}", + namespace="default", + ) diff --git a/pyatlan/model/packages/base/custom_package.py b/pyatlan/model/packages/base/custom_package.py new file mode 100644 index 00000000..2e2a8481 --- /dev/null +++ b/pyatlan/model/packages/base/custom_package.py @@ -0,0 +1,26 @@ +from typing import Any, Dict, Optional + +from pyatlan import utils +from pyatlan.model.packages.base.package import AbstractPackage + + +class AbstractCustomPackage(AbstractPackage): + """ + Abstract class for custom packages + """ + + def __init__( + self, + ): + super().__init__() + self._epoch = int(utils.get_epoch_timestamp()) + + def _add_optional_params(self, params: Dict[str, Optional[Any]]) -> None: + """ + Helper method to add non-None params to `self._parameters`. + + :param params: dict of param names and values. + """ + for name, value in params.items(): + if value is not None: + self._parameters.append({"name": name, "value": value}) diff --git a/pyatlan/model/packages/relational_assets_builder.py b/pyatlan/model/packages/relational_assets_builder.py new file mode 100644 index 00000000..eb156780 --- /dev/null +++ b/pyatlan/model/packages/relational_assets_builder.py @@ -0,0 +1,253 @@ +from __future__ import annotations + +from typing import List, Optional + +from pyatlan.model.enums import ( + AssetDeltaHandling, + AssetInputHandling, + AssetRemovalType, + CustomWorkflowPackage, +) +from pyatlan.model.fields.atlan_fields import AtlanField +from pyatlan.model.packages.base.custom_package import AbstractCustomPackage +from pyatlan.model.workflow import WorkflowMetadata + + +class RelationalAssetsBuilder(AbstractCustomPackage): + """ + Base configuration for the Relational Assets Builder package. + """ + + _NAME = "relational-assets-builder" + _PACKAGE_NAME = f"@csa/{_NAME}" + _PACKAGE_PREFIX = CustomWorkflowPackage.RELATIONAL_ASSETS_BUILDER.value + _PACKAGE_ICON = "http://assets.atlan.com/assets/ph-database-light.svg" + _PACKAGE_LOGO = "http://assets.atlan.com/assets/ph-database-light.svg" + + def __init__( + self, + ): + super().__init__() + + def direct(self) -> RelationalAssetsBuilder: + """ + Set up package to directly upload the file. + """ + self._parameters.append({"name": "import_type", "value": "DIRECT"}) + return self + + def object_store( + self, prefix: Optional[str] = None, object_key: Optional[str] = None + ) -> RelationalAssetsBuilder: + """ + Set up the package to import + metadata directly from the object store. + + :param prefix: Enter the directory (path) within the bucket/container from which to retrieve the object(s). + :param object_key: Enter the object key (filename), including its extension, within the bucket/container and + prefix. + + :returns: package, set up to import metadata from object store + """ + self._parameters.append({"name": "assets_prefix", "value": prefix}) + self._parameters.append({"name": "assets_key", "value": object_key}) + self._parameters.append({"name": "import_type", "value": "CLOUD"}) + self._parameters.append({"name": "cloud_source", "value": "{{credentialGuid}}"}) + return self + + def s3( + self, + access_key: str, + secret_key: str, + region: str, + bucket: str, + ) -> RelationalAssetsBuilder: + """ + Set up package to import metadata from S3. + + :param access_key: AWS access key + :param secret_key: AWS secret key + :param region: AWS region + :param bucket: Enter the bucket from which to retrieve the object store object(s). + + :returns: package, set up to import metadata from S3 + """ + local_creds = { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "s3", + "username": access_key, + "password": secret_key, + "extra": { + "region": region, + "s3_bucket": bucket, + }, + "connector_config_name": "csa-connectors-objectstore", + } + self._credentials_body.update(local_creds) + return self + + def gcs( + self, project_id: str, service_account_json: str, bucket: str + ) -> RelationalAssetsBuilder: + """ + Set up package to import metadata from GCS. + + :param project_id: ID of GCP project + :param service_account_json: service account credentials in JSON format + :param bucket: the bucket from which to retrieve the object store object(s) + + :returns: Package set up to import metadata from GCS + """ + local_creds = { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "gcs", + "username": project_id, + "password": service_account_json, + "extra": { + "gcs_bucket": bucket, + }, + "connector_config_name": "csa-connectors-objectstore", + } + self._credentials_body.update(local_creds) + return self + + def adls( + self, + client_id: str, + client_secret: str, + tenant_id: str, + account_name: str, + container: str, + ) -> RelationalAssetsBuilder: + """ + Set up package to import metadata from ADLS. + + :param client_id: unique application (client) ID assigned by Azure AD when the app was registered + :param client_secret: client secret for authentication + :param tenant_id: unique ID of the Azure Active Directory instance + :param account_name: name of the storage account + :param container: container to retrieve object store objects from + + :returns: package, set up to import metadata from ADLS + """ + local_creds = { + "name": f"csa-{self._NAME}-{self._epoch}-0", + "auth_type": "adls", + "username": client_id, + "password": client_secret, + "extra": { + "azure_tenant_id": tenant_id, + "storage_account_name": account_name, + "adls_container": container, + }, + "connector_config_name": "csa-connectors-objectstore", + } + self._credentials_body.update(local_creds) + return self + + def assets_semantics( + self, + input_handling: AssetInputHandling = AssetInputHandling.UPSERT, + delta_handling: AssetDeltaHandling = AssetDeltaHandling.INCREMENTAL, + removal_type: AssetRemovalType = AssetRemovalType.ARCHIVE, + ) -> RelationalAssetsBuilder: + """ + Set up the package to import metadata with semantics. + + :param input_handling: Whether to allow the creation of new (full or partial) assets from the input CSV, + or ensure assets are only updated if they already exist in Atlan. + :param delta_handling: Whether to treat the input file as an initial load, full replacement (deleting any + existing assets not in the file) or only incremental (no deletion of existing assets). + :param removal_type: If `delta_handling` is set to `FULL_REPLACEMENT`, this parameter specifies whether to + delete any assets not found in the latest file by archive (recoverable) or purge (non-recoverable). + If `delta_handling` is set to `INCREMENTAL`, this parameter is ignored and assets are archived. + + :returns: package, set up to import metadata with semantics + """ + self._parameters.append( + {"name": "assets_upsert_semantic", "value": input_handling} + ) + self._parameters.append({"name": "delta_semantic", "value": delta_handling}) + if delta_handling == AssetDeltaHandling.FULL_REPLACEMENT: + self._parameters.append( + {"name": "delta_removal_type", "value": removal_type} + ) + else: + self._parameters.append( + {"name": "delta_removal_type", "value": AssetRemovalType.ARCHIVE} + ) + return self + + def options( + self, + remove_attributes: Optional[List[str]] = None, + fail_on_errors: Optional[bool] = None, + field_separator: Optional[str] = None, + batch_size: Optional[int] = None, + ) -> RelationalAssetsBuilder: + """ + Set up package to import assets with advanced configuration. + + :param remove_attributes: list of attributes to clear (remove) + from assets if their value is blank in the provided file. + :param fail_on_errors: specifies whether an invalid value + in a field should cause the import to fail (`True`) or + log a warning, skip that value, and proceed (`False`). + :param field_separator: character used to separate + fields in the input file (e.g., ',' or ';'). + :param batch_size: maximum number of rows + to process at a time (per API request). + + :returns: package, set up to import assets with advanced configuration + """ + + if isinstance(remove_attributes, list) and all( + isinstance(field, AtlanField) for field in remove_attributes + ): + remove_attributes = [field.atlan_field_name for field in remove_attributes] # type: ignore + params = { + "assets_attr_to_overwrite": str(remove_attributes), + "assets_fail_on_errors": fail_on_errors, + "assets_field_separator": field_separator, + "assets_batch_size": batch_size, + } + self._add_optional_params(params) + return self + + def _get_metadata(self) -> WorkflowMetadata: + return WorkflowMetadata( + labels={ + "orchestration.atlan.com/certified": "true", + "orchestration.atlan.com/source": self._NAME, + "orchestration.atlan.com/sourceCategory": "utility", + "orchestration.atlan.com/type": "custom", + "orchestration.atlan.com/preview": "true", + "orchestration.atlan.com/verified": "true", + "package.argoproj.io/installer": "argopm", + "package.argoproj.io/name": f"a-t-ratlans-l-a-s-h{self._NAME}", + "package.argoproj.io/registry": "httpsc-o-l-o-ns-l-a-s-hs-l-a-s-hpackages.atlan.com", + "orchestration.atlan.com/atlan-ui": "true", + }, + annotations={ + "orchestration.atlan.com/allowSchedule": "true", + "orchestration.atlan.com/categories": "kotlin,utility", + "orchestration.atlan.com/dependentPackage": "", + "orchestration.atlan.com/docsUrl": f"https://solutions.atlan.com/{self._NAME}", + "orchestration.atlan.com/emoji": "\U0001f680", + "orchestration.atlan.com/icon": self._PACKAGE_ICON, + "orchestration.atlan.com/logo": self._PACKAGE_LOGO, # noqa + "orchestration.atlan.com/name": "Relational Assets Builder", + "package.argoproj.io/author": "Atlan CSA", + "package.argoproj.io/description": "Build (and update) relational assets managed through a CSV file.", + "package.argoproj.io/homepage": f"https://packages.atlan.com/-/web/detail/{self._PACKAGE_NAME}", + "package.argoproj.io/keywords": "[\"kotlin\",\"utility\"]", # fmt: skip + "package.argoproj.io/name": self._PACKAGE_NAME, + "package.argoproj.io/parent": ".", + "package.argoproj.io/registry": "https://packages.atlan.com", + "package.argoproj.io/repository": "git+https://github.com/atlanhq/marketplace-packages.git", + "package.argoproj.io/support": "support@atlan.com", + "orchestration.atlan.com/atlanName": f"csa-{self._NAME}-{self._epoch}", + }, + name=f"csa-{self._NAME}-{self._epoch}", + namespace="default", + ) diff --git a/pyatlan/model/packages/tableau_crawler.py b/pyatlan/model/packages/tableau_crawler.py index 8594536b..196cd9d3 100644 --- a/pyatlan/model/packages/tableau_crawler.py +++ b/pyatlan/model/packages/tableau_crawler.py @@ -52,6 +52,26 @@ def __init__( source_logo=self._PACKAGE_LOGO, ) + def offline(self, s3_bucket: str, s3_prefix: str, s3_region: str) -> TableauCrawler: + """ + Set up the crawler to fetch metadata directly from the S3 bucket. + + :param s3_bucket: name of the S3 bucket containing the extracted metadata files + :param s3_prefix: prefix within the S3 bucket where the extracted metadata files are located + :param s3_region: region where the S3 bucket is located + :returns: crawler, configured to fetch metadata directly from the S3 bucket + """ + self._parameters.append(dict(name="extraction-method", value="s3")) + self._parameters.append(dict(name="metadata-s3-bucket", value=s3_bucket)) + self._parameters.append(dict(name="metadata-s3-prefix", value=s3_prefix)) + self._parameters.append(dict(name="metadata-s3-region", value=s3_region)) + # Advanced configuration + self.exclude(projects=[]) + self.include(projects=[]) + self.crawl_unpublished(enabled=True) + self.crawl_hidden_fields(enabled=True) + return self + def direct( self, hostname: str, @@ -80,6 +100,9 @@ def direct( } self._credentials_body.update(local_creds) self._parameters.append({"name": "extraction-method", "value": "direct"}) + self._parameters.append( + {"name": "credential-guid", "value": "{{credentialGuid}}"} + ) return self def basic_auth(self, username: str, password: str) -> TableauCrawler: @@ -187,9 +210,6 @@ def alternate_host(self, hostname: str) -> TableauCrawler: return self def _set_required_metadata_params(self): - self._parameters.append( - {"name": "credential-guid", "value": "{{credentialGuid}}"} - ) self._parameters.append( { "name": "connection", diff --git a/tests/unit/data/package_requests/tableau_access_token.json b/tests/unit/data/package_requests/tableau_access_token.json index b6ab6405..01fad95a 100644 --- a/tests/unit/data/package_requests/tableau_access_token.json +++ b/tests/unit/data/package_requests/tableau_access_token.json @@ -51,6 +51,10 @@ "name": "extraction-method", "value": "direct" }, + { + "name": "credential-guid", + "value": "{{credentialGuid}}" + }, { "name": "include-filter", "value": "{\"test-project-guid-1\": {}, \"test-project-guid-2\": {}}" @@ -67,10 +71,6 @@ "name": "crawl-hidden-datasource-fields", "value": "false" }, - { - "name": "credential-guid", - "value": "{{credentialGuid}}" - }, { "name": "connection", "value": "{\"typeName\": \"Connection\", \"attributes\": {\"qualifiedName\": \"default/tableau/123456\", \"name\": \"test-tableau-access-token-conn\", \"adminUsers\": [], \"adminGroups\": [], \"connectorName\": \"tableau\", \"isDiscoverable\": true, \"isEditable\": false, \"adminRoles\": [\"admin-guid-1234\"], \"category\": \"bi\", \"allowQuery\": false, \"allowQueryPreview\": false, \"rowLimit\": 0, \"defaultCredentialGuid\": \"{{credentialGuid}}\", \"sourceLogo\": \"https://img.icons8.com/color/480/000000/tableau-software.png\"}, \"guid\": \"-1234567890000000000000000\"}" diff --git a/tests/unit/data/package_requests/tableau_basic.json b/tests/unit/data/package_requests/tableau_basic.json index a42e0656..481a8c73 100644 --- a/tests/unit/data/package_requests/tableau_basic.json +++ b/tests/unit/data/package_requests/tableau_basic.json @@ -51,6 +51,10 @@ "name": "extraction-method", "value": "direct" }, + { + "name": "credential-guid", + "value": "{{credentialGuid}}" + }, { "name": "include-filter", "value": "{\"test-project-guid-1\": {}, \"test-project-guid-2\": {}}" @@ -67,10 +71,6 @@ "name": "crawl-hidden-datasource-fields", "value": "false" }, - { - "name": "credential-guid", - "value": "{{credentialGuid}}" - }, { "name": "connection", "value": "{\"typeName\": \"Connection\", \"attributes\": {\"qualifiedName\": \"default/tableau/123456\", \"name\": \"test-tableau-basic-conn\", \"adminUsers\": [], \"adminGroups\": [], \"connectorName\": \"tableau\", \"isDiscoverable\": true, \"isEditable\": false, \"adminRoles\": [\"admin-guid-1234\"], \"category\": \"bi\", \"allowQuery\": false, \"allowQueryPreview\": false, \"rowLimit\": 0, \"defaultCredentialGuid\": \"{{credentialGuid}}\", \"sourceLogo\": \"https://img.icons8.com/color/480/000000/tableau-software.png\"}, \"guid\": \"-1234567890000000000000000\"}" diff --git a/tests/unit/data/package_requests/tableau_offline.json b/tests/unit/data/package_requests/tableau_offline.json new file mode 100644 index 00000000..61a85310 --- /dev/null +++ b/tests/unit/data/package_requests/tableau_offline.json @@ -0,0 +1,113 @@ +{ + "metadata": { + "annotations": { + "orchestration.atlan.com/allowSchedule": "true", + "orchestration.atlan.com/categories": "tableau,crawler", + "orchestration.atlan.com/dependentPackage": "", + "orchestration.atlan.com/docsUrl": "https://ask.atlan.com/hc/en-us/articles/6332449996689", + "orchestration.atlan.com/emoji": "🚀", + "orchestration.atlan.com/icon": "https://img.icons8.com/color/480/000000/tableau-software.png", + "orchestration.atlan.com/logo": "https://img.icons8.com/color/480/000000/tableau-software.png", + "orchestration.atlan.com/marketplaceLink": "https://packages.atlan.com/-/web/detail/@atlan/tableau", + "orchestration.atlan.com/name": "tableau Assets", + "package.argoproj.io/author": "Atlan", + "package.argoproj.io/description": "Package to crawl Tableau assets and publish to Atlan for discovery.", + "package.argoproj.io/homepage": "https://packages.atlan.com/-/web/detail/@atlan/tableau", + "package.argoproj.io/keywords": "[\"tableau\",\"bi\",\"connector\",\"crawler\"]", + "package.argoproj.io/name": "@atlan/tableau", + "package.argoproj.io/parent": ".", + "package.argoproj.io/registry": "https://packages.atlan.com", + "package.argoproj.io/repository": "git+https://github.com/atlanhq/marketplace-packages.git", + "package.argoproj.io/support": "support@atlan.com", + "orchestration.atlan.com/atlanName": "atlan-tableau-default-tableau-123456" + }, + "labels": { + "orchestration.atlan.com/certified": "true", + "orchestration.atlan.com/source": "tableau", + "orchestration.atlan.com/sourceCategory": "bi", + "orchestration.atlan.com/type": "connector", + "orchestration.atlan.com/verified": "true", + "package.argoproj.io/installer": "argopm", + "package.argoproj.io/name": "a-t-ratlans-l-a-s-htableau", + "package.argoproj.io/registry": "httpsc-o-l-o-ns-l-a-s-hs-l-a-s-hpackages.atlan.com", + "orchestration.atlan.com/default-tableau-123456": "true", + "orchestration.atlan.com/atlan-ui": "true" + }, + "name": "atlan-tableau-123456", + "namespace": "default" + }, + "spec": { + "entrypoint": "main", + "templates": [ + { + "name": "main", + "dag": { + "tasks": [ + { + "name": "run", + "arguments": { + "parameters": [ + { + "name": "extraction-method", + "value": "s3" + }, + { + "name": "metadata-s3-bucket", + "value": "test-bucket" + }, + { + "name": "metadata-s3-prefix", + "value": "test-prefix" + }, + { + "name": "metadata-s3-region", + "value": "test-region" + }, + { + "name": "exclude-filter", + "value": "{}" + }, + { + "name": "include-filter", + "value": "{}" + }, + { + "name": "crawl-unpublished-worksheets-dashboard", + "value": "true" + }, + { + "name": "crawl-hidden-datasource-fields", + "value": "true" + }, + { + "name": "connection", + "value": "{\"typeName\": \"Connection\", \"attributes\": {\"qualifiedName\": \"default/tableau/123456\", \"name\": \"test-tableau-offline-conn\", \"adminUsers\": [], \"adminGroups\": [], \"connectorName\": \"tableau\", \"isDiscoverable\": true, \"isEditable\": false, \"adminRoles\": [\"admin-guid-1234\"], \"category\": \"bi\", \"allowQuery\": false, \"allowQueryPreview\": false, \"rowLimit\": 0, \"defaultCredentialGuid\": \"{{credentialGuid}}\", \"sourceLogo\": \"https://img.icons8.com/color/480/000000/tableau-software.png\"}, \"guid\": \"-1234567890000000000000000\"}" + }, + { + "name": "atlas-auth-type", + "value": "internal" + }, + { + "name": "publish-mode", + "value": "production" + } + ] + }, + "templateRef": { + "name": "atlan-tableau", + "template": "main", + "clusterScope": true + } + } + ] + } + } + ], + "workflowMetadata": { + "annotations": { + "package.argoproj.io/name": "@atlan/tableau" + } + } + }, + "payload": [] +} diff --git a/tests/unit/test_packages.py b/tests/unit/test_packages.py index c4f8a15e..1b6cfba7 100644 --- a/tests/unit/test_packages.py +++ b/tests/unit/test_packages.py @@ -30,6 +30,7 @@ GLUE_IAM_USER = "glue_iam_user.json" TABLEAU_BASIC = "tableau_basic.json" TABLEAU_ACCESS_TOKEN = "tableau_access_token.json" +TABLEAU_OFFLINE = "tableau_offline.json" POWERBI_DELEGATED_USER = "powerbi_delegated_user.json" POWEBI_SERVICE_PRINCIPAL = "powerbi_service_principal.json" CONFLUENT_KAFKA_DIRECT = "confluent_kafka_direct.json" @@ -232,6 +233,23 @@ def test_tableau_package(mock_package_env): ) assert request_json == load_json(TABLEAU_ACCESS_TOKEN) + tableau_offline = ( + TableauCrawler( + connection_name="test-tableau-offline-conn", + admin_roles=["admin-guid-1234"], + admin_groups=None, + admin_users=None, + ) + .offline( + s3_bucket="test-bucket", + s3_prefix="test-prefix", + s3_region="test-region", + ) + .to_workflow() + ) + request_json = loads(tableau_offline.json(by_alias=True, exclude_none=True)) + assert request_json == load_json(TABLEAU_OFFLINE) + def test_powerbi_package(mock_package_env): powerbi_delegated_user = (