I’m migrating from the load_assets_from_dbt_manifest
to dbt_assets
. How do I ensure that I get parity with my usage of the load_assets_from_dbt_manifest
function?
#18065
-
Hi. I am currently running dagster v1.3.7 and use dbt_assets = load_assets_from_dbt_manifest(
manifest=DBT_MANIFEST,
io_manager_key="warehouse_io_manager",
dagster_dbt_translator=CustomDagsterDbtTranslator(),
# key_prefix=[DBT_KEY_PREFIX], # deprecated
# use_build_command=True, # deprecated
runtime_metadata_fn=get_dbt_model_metadata, #deprecated
node_info_to_auto_materialize_policy_fn=auto_materialization_fn #deprecated
) These As part of upgrading to 1.4.x and then 1.5.x, I noticed a lot of the arguments within As part of switching to using the @dbt_assets(
manifest=DBT_MANIFEST,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
io_manager_key="warehouse_io_manager",
)
def dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""A special asset that will load all dbt assets"""
yield from dbt.cli(["build"], context=context).stream() where I have defined class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
return super().get_asset_key(dbt_resource_props).with_prefix(DBT_KEY_PREFIX)
@classmethod
def get_auto_materialize_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.lazy()
@classmethod
def get_metadata(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
...metadata code... The question was originally asked in Dagster Slack. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
In the migration guide to Everything that you have is correct except the usage of If you want to customize the metadata at run time, which is associated with your asset materializations, you should modify the compute function of |
Beta Was this translation helpful? Give feedback.
-
Thank you for your response @rexledesma In my case, the metadata function is currently something like this: @classmethod
def get_metadata(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
project_id = dbt_resource_props['database']
dataset = dbt_resource_props['schema']
tablename = dbt_resource_props['alias']
return {'project': project, 'dataset': dataset, 'table': tablename} From your example what object would replace def dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["run"], context=context)
for dbt_event in dbt_cli_invocation.stream_raw_events():
project_id = dbt_event['database']
dataset = dbt_event['schema']
tablename = dbt_event['alias']
for dagster_event in dbt_event.to_default_asset_events(manifest=dbt_cli_invocation.manifest, context=context):
if isinstance(dagster_event, Output):
context.add_output_metadata(
metadata={
"project": project_id,
"dataset": dataset,
"table": tablename,
},
output_name=dagster_event.output_name,
)
yield dagster_event Does it matter than I'm using ["run"] vs. ["build"] here? |
Beta Was this translation helpful? Give feedback.
In the migration guide to
1.4.0
, when these newdagster-dbt
APIs were introduced, we include the mapping of arguments fromload_assets_from_dbt_manifest
to@dbt_assets
.Everything that you have is correct except the usage of
get_metadata
in yourCustomDagsterDbtTranslator
.get_metadata
is for customizing metadata at definition time.If you want to customize the metadata at run time, which is associated with your asset materializations, you should modify the compute function of
@dbt_assets
. My previous answer here gives an example: #17753 (comment).