From 2b613d9ae6d500a4dc7820317769d9871e589a2a Mon Sep 17 00:00:00 2001 From: Kajwan Date: Tue, 15 Jul 2025 17:34:45 +0200 Subject: [PATCH 01/18] Initial HEM implementation --- pymrio/tools/iohem.py | 381 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 381 insertions(+) create mode 100644 pymrio/tools/iohem.py diff --git a/pymrio/tools/iohem.py b/pymrio/tools/iohem.py new file mode 100644 index 0000000..cd7b70c --- /dev/null +++ b/pymrio/tools/iohem.py @@ -0,0 +1,381 @@ +""" + +""" +# %% +import json +from pathlib import Path + +import numpy as np +import pandas as pd +import pymrio.tools.iomath as iomath + +# %% +class HEM(): + """Class for Hypothetical Extraction Method (HEM) results. + """ + def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, save_path=None) -> None: + """ + Initialize the HEM class with the IOSystem or core IO data. + + Parameters + ---------- + IOSystem : pymrio.IOSystem, optional + An instance of the pymrio.IOSystem class containing the core IO data. + Y : pd.DataFrame, optional + Final demand matrix. If not provided, the one from the IOSystem will be used. + A : pd.DataFrame, optional + Input-output coefficients matrix. If not provided, the one from the IOSystem will be used. + x : pd.DataFrame, optional + Total output vector as a single column matrix, named IndOut. If not provided, the one from the IOSystem will be used. + L : pd.DataFrame, optional + Leontief inverse matrix. If not provided, it will be calculated from A. + meta : dict, optional + Metadata dictionary containing information about the IOSystem or extraction. + save_path : str or Path, optional + Path to save the extraction results. If not provided, it will be set to None. + """ + if IOSystem is None: + self.Y = Y + self.A = A + self.x = x + self.L = L + self.meta = { + "IO System meta": meta, + } + else: + self.Y = IOSystem.Y + self.A = IOSystem.A + self.x = IOSystem.x + self.L = IOSystem.L + self.meta = { + "IO System meta": IOSystem.meta, + } + + self.save_path = Path(save_path or "./") + + + def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=True, Y=None, A=None, x=None, downstream_allocation_matrix="A12"): + """ + Create a hypothetical extraction of the IOSystem based on the specified regions and sectors. + + Parameters + ---------- + regions : list + List of regions to be extracted. + sectors : list + List of sectors to be extracted. + extraction_type : str, optional + Type of extraction to be performed. Defaults to "1.2". See "https://doi.org/10.1111/jiec.13522" for more information. + multipliers : bool, optional + Whether to calculate multipliers for the extracted sectors. Defaults to True. + Y : pd.DataFrame, optional + Final demand matrix. If not provided, the one from the IOSystem will be used. + A : pd.DataFrame, optional + Input-output coefficients matrix. If not provided, the one from the IOSystem will be used. + x : pd.DataFrame, optional + Total output vector as a single column matrix, named IndOut. If not provided, the one from the IOSystem will be used. + downstream_allocation_matrix : str, optional + The matrix used to allocate downstream production. Defaults to "A12". Can be either "A12" or "L12". + Returns + ------- + None + + Notes + ----- + This method sets the attributes of the HEM class based on the specified parameters. + It calculates the hypothetical extraction of the IOSystem based on the specified regions and sectors. + The extraction type must be one of the following: "1.2", "2a.2", "3a.2". + The method also calculates the downstream allocation matrix based on the specified type. + The method raises a ValueError if either regions or sectors are not specified, or if the extraction type is not implemented. + The method raises a NotImplementedError if the extraction type is not one of the implemented ones. + The method raises a TypeError if the intensities are not a pandas Series or DataFrame. + The method raises a ValueError if the save path is not provided. + + Notes + ----- + The definition of downstream and upstream production changes, if other extraction types are implemented. + Current three implemented extraction types are identical. + See https://doi.org/10.1111/jiec.13522 for more information. + + + """ + self.meta.update({ + "extraction_type": extraction_type, + "downstream_allocation_matrix": downstream_allocation_matrix, + "multipliers": multipliers, + }) + self.extraction_regions = regions + self.extraction_sectors = sectors + + # In case the user does not pass Y and A, use the ones from the IOSystem + if Y is None: + Y = self.Y + if A is None: + A = self.A + if x is None: + x = self.x + + if (regions != [None]) & (sectors != [None]): + index_extraction = pd.MultiIndex.from_product(iterables=[ + regions, sectors + ]) + + elif (regions == [None]) & (sectors != [None]): + index_extraction = pd.MultiIndex.from_product(iterables=[ + self.A.index.get_level_values(0).unique(), sectors + ]) + elif (regions != [None]) & (sectors == [None]): + index_extraction = pd.MultiIndex.from_product(iterables=[ + regions, self.A.index.get_level_values(1).unique() + ]) + else: + raise ValueError( + "Either regions or sectors must be specified, or both." + ) + + index_other = self.A.index.drop(index_extraction) + self.index_extraction = index_extraction + self.index_other = index_other + + + if extraction_type in ["1.2", "2a.2", "3a.2"]: + # TODO: Turn different extraction types into functions that this method can call. + # Extracting blocks + Y1 = Y.loc[Y.index.isin(index_extraction), :] + Y2 = Y.loc[Y.index.isin(index_other), :] + A11 = A.loc[A.index.isin(index_extraction), A.columns.isin(index_extraction)] + A12 = A.loc[A.index.isin(index_extraction), A.columns.isin(index_other)] + A22 = A.loc[A.index.isin(index_other), A.columns.isin(index_other)] + A21 = A.loc[A.index.isin(index_other), A.columns.isin(index_extraction)] + + # Calculating HEM matrices + I11 = pd.DataFrame( + data=np.eye(len(A11)), + index=A11.index, + columns=A11.columns, + ) + + self.L22 = iomath.calc_L(A22) + + self.H = pd.DataFrame( + data=np.linalg.inv( + I11 - A11 - A12.dot(self.L22.dot(A21)) + ), + index=A11.index, + columns=A11.columns + ) + + # Calculating different accounts + self.production_downstream_all = pd.DataFrame( + data=np.diag(v=self.L22.dot(Y2.sum(axis=1))), + index=self.L22.index, + columns=self.L22.index + ) + + # Allocating downstream production + if downstream_allocation_matrix == "A12": + self.downstream_allocation_matrix = A12 + + elif downstream_allocation_matrix == "L12": + if self.L is None: + self.L = iomath.calc_L(A) + + L12 = self.L.loc[index_extraction, index_other] + L12_normalised = L12.div(L12.sum(axis=0), axis=1) + self.downstream_allocation_matrix = L12_normalised + else: + raise ValueError("Downstream allocation matrix must be either 'A12' or 'L12'.") + + self.production_downstream = self.downstream_allocation_matrix.dot(self.production_downstream_all) + + self.demand_final_diagonal = pd.DataFrame( + data=np.diag(v=Y1.sum(axis=1)), + index=Y1.index, + columns=Y1.index + ) + self.demand_intermediate_diagonal = pd.DataFrame( + data=np.diag(v=self.production_downstream.sum(axis=1)), + index=self.production_downstream.index, + columns=self.production_downstream.index + ) + + self.production = self.H.dot(other=(self.demand_final_diagonal+self.demand_intermediate_diagonal)) + self.production_upstream_first_tier = A21.dot(self.production) + self.production_upstream = self.L22.dot(self.production_upstream_first_tier) + + if multipliers: + self.M_production = self.production.div(x.loc[index_extraction, "indout"], axis=0).replace(np.nan, 0) + self.M_production_upstream_first_tier = self.production_upstream_first_tier.div(x.loc[index_extraction, "indout"], axis=1).replace(np.nan, 0) + self.M_upstream = self.production_upstream.div(x.loc[index_extraction, "indout"], axis=1).replace(np.nan, 0) + self.M_downstream = self.production_downstream.div(x.loc[index_extraction, "indout"], axis=0).replace(np.nan, 0) + + else: + raise NotImplementedError( + "Only extraction types '1.2', '2a.2', '3a.2' are implemented at the moment.\n" + + "Please implement the extraction type you need or use one of the implemented ones.\n" + + "For more information see Table 4 in https://doi.org/10.1111/jiec.13522." + ) + + def calculate_impacts(self, intensities=None): + """ + Calculate the impacts of the hypothetical extraction based on the provided intensities. + + Parameters + ---------- + intensities : pd.Series or pd.DataFrame + Environmental intensities for the extraction sectors and other sectors. + If a Series, it should have the extraction sectors as index. + If a DataFrame, it should have the extraction sectors as columns and other sectors as index. + + Raises + ------ + TypeError + If the intensities are not a pandas Series or DataFrame. + + """ + # Keep details, if intensities are a Series + if type(intensities) is pd.Series: + self.impact_production = self.production.mul(intensities.loc[self.index_extraction], axis=0) + self.impact_upstream_first_tier = self.production_upstream_first_tier.mul(intensities.loc[self.index_other], axis=0) + self.impact_upstream = self.production_upstream.mul(intensities.loc[self.index_other], axis=0) + self.impact_downstream = self.production_downstream.mul(intensities.loc[self.index_other], axis=1) + + if self.meta["multipliers"]: + self.M_impact_production = self.M_production.mul(intensities.loc[self.index_extraction], axis=0) + self.M_impact_upstream_first_tier = self.M_production_upstream_first_tier.mul(intensities.loc[self.index_other], axis=0) + self.M_impact_upstream = self.M_upstream.mul(intensities.loc[self.index_other], axis=0) + self.M_impact_downstream = self.M_downstream.mul(intensities.loc[self.index_other], axis=1) + self.intensities = [intensities.name] + + # Drop details, if intensities are a DataFrame + elif type(intensities) is pd.DataFrame: + self.impact_production = intensities.loc[:, self.index_extraction].dot(self.production) + self.impact_upstream_first_tier = intensities.loc[:, self.index_other].dot(self.production_upstream_first_tier) + self.impact_upstream = intensities.loc[:, self.index_other].dot(self.production_upstream) + self.impact_downstream = self.production_downstream.dot(intensities.loc[:, self.index_other].T).T + if self.meta["multipliers"]: + self.M_impact_production = intensities.loc[:, self.index_extraction].dot(self.M_production) + self.M_impact_upstream_first_tier = intensities.loc[:, self.index_other].dot(self.M_production_upstream_first_tier) + self.M_impact_upstream = intensities.loc[:, self.index_other].dot(self.M_upstream) + self.M_impact_downstream = self.M_downstream.dot(intensities.loc[:, self.index_other].T).T + self.intensities = intensities.index.to_list() + else: + raise TypeError( + "Intensities must be either a pandas Series or a pandas DataFrame." + ) + + def save_extraction(self, save_path=None, save_core_IO=False, save_details=False): + """ + Save the extraction results to the specified path. + + Parameters + ---------- + save_path : str or Path, optional + Path to save the extraction results. If not provided, the save path from the IOSystem will be used. + save_core_IO : bool, optional + Whether to save the core IO data (A and Y). Defaults to False. + save_details : bool, optional + Whether to save additional details like all downstream production, final demand diagonal, and intermediate demand diagonal. Defaults to False. + + Raises + ------ + ValueError + If no save path is provided. + + """ + if save_path is None: + save_path = self.save_path + + if save_path is None: + raise ValueError("No save path provided. Please provide a save path.") + + save_path = Path(save_path) + self.save_path = save_path + + # Makes subfolders for individual regions and/or sectors, if it is clearly that a single region and/or sector has been extracted. + # Will make sure that things are not overwritten, if multiple regions and/or sectors are extracted in a loop. + if (len(t1.extraction_regions) == 1) and (len(t1.extraction_sectors) == 1): + extraction_save_path = save_path / f"{self.extraction_regions[0]}_{self.extraction_sectors[0]}" + + elif (len(t1.extraction_regions) == 1): + extraction_save_path = save_path / f"{self.extraction_regions[0]}" + + elif (len(t1.extraction_sectors) == 1): + extraction_save_path = save_path / f"{self.extraction_sectors[0]}" + + else: + extraction_save_path = save_path + + self.extraction_save_path = extraction_save_path + extraction_save_path.mkdir(parents=True, exist_ok=True) + + self.index_extraction.to_frame().to_csv(extraction_save_path / "index_extraction.txt", sep="\t", index=False, header=False) + self.index_other.to_frame().to_csv(extraction_save_path / "index_other.txt", sep="\t", index=False, header=False) + self.L22.to_csv(extraction_save_path / "L22.txt", sep="\t") + self.H.to_csv(extraction_save_path / "H.txt", sep="\t") + self.downstream_allocation_matrix.to_csv(extraction_save_path / f"{self.meta["downstream_allocation_matrix"]}.txt", sep="\t") + + self.production.to_csv(extraction_save_path / "production.txt", sep="\t") + self.production_upstream_first_tier.to_csv(extraction_save_path / "production_upstream_first_tier.txt", sep="\t") + self.production_upstream.to_csv(extraction_save_path / "production_upstream.txt", sep="\t") + self.production_downstream.to_csv(extraction_save_path / "production_downstream.txt", sep="\t") + + if self.meta["multipliers"]: + self.M_production.to_csv(extraction_save_path / "M_production.txt", sep="\t") + self.M_production_upstream_first_tier.to_csv(extraction_save_path / "M_production_upstream_first_tier.txt", sep="\t") + self.M_upstream.to_csv(extraction_save_path / "M_upstream.txt", sep="\t") + self.M_downstream.to_csv(extraction_save_path / "M_downstream.txt", sep="\t") + + if save_core_IO: + self.A.to_csv(extraction_save_path / "A.txt", sep="\t") + self.Y.to_csv(extraction_save_path / "Y.txt", sep="\t") + + if save_details: + self.production_downstream_all.to_csv(extraction_save_path / "production_downstream_all.txt", sep="\t") + self.demand_final_diagonal.to_csv(extraction_save_path / "demand_final_diagonal.txt", sep="\t") + self.demand_intermediate_diagonal.to_csv(extraction_save_path / "demand_intermediate_diagonal.txt", sep="\t") + + with open(extraction_save_path / "meta.json", 'w') as json_file: + json.dump(self.meta, json_file, indent=4) + + + def save_impacts(self, impact_account=None): + """ + Save the impacts of the hypothetical extraction to the specified path. + + Parameters + ---------- + impact_account : str, optional + Account name for the impacts. If not provided, the impacts will be saved in a general "impacts" folder. + save_path : str or Path, optional + Path to save the impacts. If not provided, the save path from the IOSystem will be used. + + Raises + ------ + ValueError + If no save path is provided. + + """ + + + if impact_account is None: + save_path = Path(self.extraction_save_path) / "impacts" + else: + save_path = Path(self.extraction_save_path) / "impacts" / impact_account + + save_path.mkdir(parents=True, exist_ok=True) + pd.DataFrame(self.intensities).to_csv(save_path / "extensions.txt", sep="\t", index=False, header=False) + self.impact_production.to_csv(save_path / "impact_production.txt", sep="\t") + self.impact_upstream_first_tier.to_csv(save_path / "impact_upstream_first_tier.txt", sep="\t") + self.impact_upstream.to_csv(save_path / "impact_upstream.txt", sep="\t") + self.impact_downstream.to_csv(save_path / "impact_downstream.txt", sep="\t") + + if self.meta["multipliers"]: + self.M_impact_production.to_csv(save_path / "M_impact_production.txt", sep="\t") + self.M_impact_upstream_first_tier.to_csv(save_path / "M_impact_upstream_first_tier.txt", sep="\t") + self.M_impact_upstream.to_csv(save_path / "M_impact_upstream.txt", sep="\t") + self.M_impact_downstream.to_csv(save_path / "M_impact_downstream.txt", sep="\t") + + +# %% + From e43f49050026622ad8e9973b3c6892d4c3898845 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Tue, 15 Jul 2025 17:47:28 +0200 Subject: [PATCH 02/18] tidy --- pymrio/tools/iohem.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pymrio/tools/iohem.py b/pymrio/tools/iohem.py index cd7b70c..ca53642 100644 --- a/pymrio/tools/iohem.py +++ b/pymrio/tools/iohem.py @@ -26,7 +26,7 @@ def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, sav A : pd.DataFrame, optional Input-output coefficients matrix. If not provided, the one from the IOSystem will be used. x : pd.DataFrame, optional - Total output vector as a single column matrix, named IndOut. If not provided, the one from the IOSystem will be used. + Total output vector as a single column matrix, named indout. If not provided, the one from the IOSystem will be used. L : pd.DataFrame, optional Leontief inverse matrix. If not provided, it will be calculated from A. meta : dict, optional @@ -73,7 +73,7 @@ def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=T A : pd.DataFrame, optional Input-output coefficients matrix. If not provided, the one from the IOSystem will be used. x : pd.DataFrame, optional - Total output vector as a single column matrix, named IndOut. If not provided, the one from the IOSystem will be used. + Total output vector as a single column matrix, named indout. If not provided, the one from the IOSystem will be used. downstream_allocation_matrix : str, optional The matrix used to allocate downstream production. Defaults to "A12". Can be either "A12" or "L12". Returns @@ -357,7 +357,6 @@ def save_impacts(self, impact_account=None): """ - if impact_account is None: save_path = Path(self.extraction_save_path) / "impacts" else: From d9b535367632e6077b37967a6a063800b16667cf Mon Sep 17 00:00:00 2001 From: Kajwan Date: Tue, 15 Jul 2025 17:47:34 +0200 Subject: [PATCH 03/18] basic testing --- tests/test_hem.py | 249 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 tests/test_hem.py diff --git a/tests/test_hem.py b/tests/test_hem.py new file mode 100644 index 0000000..750d146 --- /dev/null +++ b/tests/test_hem.py @@ -0,0 +1,249 @@ +"""Test cases for HEM calculations.""" + + +import os +import sys + +import numpy as np +import numpy.testing as npt +import pandas as pd +import pandas.testing as pdt +import pytest + +TESTPATH = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(TESTPATH, "..")) + + +# the function which should be tested here +from pymrio.tools.iohem import HEM + +def td_small_MRIO(): + """Small MRIO with three sectors and two regions. + + The testdata here just consists of pandas DataFrames, the functionality + with numpy arrays gets tested with td_IO_Data_Miller. + """ + + class IO_Data: + _sectors = ["sector1", "sector2", "sector3"] + _regions = ["reg1", "reg2"] + _Z_multiindex = pd.MultiIndex.from_product([_regions, _sectors], names=["region", "sector"]) + + Z = pd.DataFrame( + data=[ + [10, 5, 1, 6, 5, 7], + [0, 2, 0, 0, 5, 3], + [10, 3, 20, 4, 2, 0], + [5, 0, 0, 1, 10, 9], + [0, 10, 1, 0, 20, 1], + [5, 0, 0, 1, 10, 10], + ], + index=_Z_multiindex, + columns=_Z_multiindex, + dtype=("float64"), + ) + + _categories = ["final demand"] + _Y_multiindex = pd.MultiIndex.from_product([_regions, _categories], names=["region", "category"]) + Y = pd.DataFrame( + data=[[14, 3], [2.5, 2.5], [13, 6], [5, 20], [10, 10], [3, 10]], + index=_Z_multiindex, + columns=_Y_multiindex, + dtype=("float64"), + ) + + F = pd.DataFrame( + data=[[20, 1, 42, 4, 20, 5], [5, 4, 11, 8, 2, 10]], + index=["ext_type_1", "ext_type_2"], + columns=_Z_multiindex, + dtype=("float64"), + ) + + F_Y = pd.DataFrame( + data=[[50, 10], [100, 20]], + index=["ext_type_1", "ext_type_2"], + columns=_Y_multiindex, + dtype=("float64"), + ) + + S_Y = pd.DataFrame( + data=[ + [1.0526315789473684, 0.1941747572815534], + [2.1052631578947367, 0.3883495145631068], + ], + index=["ext_type_1", "ext_type_2"], + columns=_Y_multiindex, + dtype=("float64"), + ) + + A = pd.DataFrame( + data=[ + [ + 0.19607843137254902, + 0.3333333333333333, + 0.017241379310344827, + 0.12, + 0.09615384615384616, + 0.1794871794871795, + ], # noqa + [ + 0.0, + 0.13333333333333333, + 0.0, + 0.0, + 0.09615384615384616, + 0.07692307692307693, + ], # noqa + [ + 0.19607843137254902, + 0.2, + 0.3448275862068966, + 0.08, + 0.038461538461538464, + 0.0, + ], # noqa + [ + 0.09803921568627451, + 0.0, + 0.0, + 0.02, + 0.19230769230769232, + 0.23076923076923075, + ], # noqa + [ + 0.0, + 0.6666666666666666, + 0.017241379310344827, + 0.0, + 0.38461538461538464, + 0.02564102564102564, + ], # noqa + [ + 0.09803921568627451, + 0.0, + 0.0, + 0.02, + 0.19230769230769232, + 0.2564102564102564, + ], # noqa + ], + index=_Z_multiindex, + columns=_Z_multiindex, + ) + + L = pd.DataFrame( + data=[ + [ + 1.3387146304736708, + 0.9689762471208287, + 0.05036622549592462, + 0.17820960407435948, + 0.5752019383714646, + 0.4985179148178926, + ], # noqa + [ + 0.02200779585580331, + 1.3716472861392823, + 0.0076800357678581885, + 0.006557415453762468, + 0.2698335633228079, + 0.15854643902810828, + ], # noqa + [ + 0.43290422861412026, + 0.8627066565439678, + 1.5492942759220427, + 0.18491657196329184, + 0.44027825642348534, + 0.26630955082840885, + ], # noqa + [ + 0.18799498787612925, + 0.5244084722329316, + 0.020254008037620782, + 1.0542007368783255, + 0.5816573175534603, + 0.44685014763069275, + ], # noqa + [ + 0.04400982046095892, + 1.5325472495862535, + 0.05259311578831879, + 0.014602513642445088, + 1.9545285794951548, + 0.2410917825607805, + ], # noqa + [ + 0.19294222439918532, + 0.5382086951864299, + 0.020787008249137116, + 0.05562707205933412, + 0.596964089068025, + 1.4849251515157111, + ], # noqa + ], + index=_Z_multiindex, + columns=_Z_multiindex, + ) + + + x = pd.DataFrame( + data=[ + [51], + [15], + [58], + [50], + [52], + [39], + ], + columns=["indout"], + index=_Z_multiindex, + dtype=("float64"), + ) + S = pd.DataFrame( + data=[ + [ + 0.39215686274509803, + 0.06666666666666667, + 0.7241379310344828, + 0.08, + 0.38461538461538464, + 0.1282051282051282, + ], # noqa + [ + 0.09803921568627451, + 0.26666666666666666, + 0.1896551724137931, + 0.16, + 0.038461538461538464, + 0.2564102564102564, + ], # noqa + ], + index=["ext_type_1", "ext_type_2"], + columns=_Z_multiindex, + ) + + return IO_Data + + +def test_hem_extraction(td_small_mrio, regions=["reg1"], sectors=["sector1", "sector2"]): + """Test the extraction of HEM data from a small MRIO.""" + IO_Data = td_small_MRIO.A + HEM_object = HEM(IOSystem=None, Y=td_small_MRIO.Y, A=td_small_MRIO.A, x=td_small_MRIO.x, L=td_small_MRIO.L, meta=None, save_path=None) + HEM_object.make_extraction(regions=["reg1"], sectors=["sector1", "sector2"], extraction_type="1.2", multipliers=True) + pdt.assert_frame_equal( + left=IO_Data.x.loc[HEM_object.index_extraction, "indout"], + right=HEM_object.production.sum(axis=1) + ) + +def test_hem_extraction_impacts(td_small_mrio, regions=["reg1"], sectors=["sector1", "sector2"]): + """Test the extraction of HEM data from a small MRIO.""" + IO_Data = td_small_MRIO.A + HEM_object = HEM(IOSystem=None, Y=td_small_MRIO.Y, A=td_small_MRIO.A, x=td_small_MRIO.x, L=td_small_MRIO.L, meta=None, save_path=None) + HEM_object.make_extraction(regions=["reg1"], sectors=["sector1", "sector2"], extraction_type="1.2", multipliers=True) + HEM_object.calculate_impacts(IO_Data.S) + + pdt.assert_frame_equal( + left=IO_Data.F.loc[:,HEM_object.index_extraction].sum(axis=1), + right=HEM_object.impact_production.sum(axis=1) + ) From 2de813a2de8be11f72e5c0273a205b1a51ff26ea Mon Sep 17 00:00:00 2001 From: Kajwan Date: Tue, 15 Jul 2025 17:48:25 +0200 Subject: [PATCH 04/18] bug fix --- tests/test_hem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_hem.py b/tests/test_hem.py index 750d146..3fd5c82 100644 --- a/tests/test_hem.py +++ b/tests/test_hem.py @@ -17,6 +17,7 @@ # the function which should be tested here from pymrio.tools.iohem import HEM +@pytest.fixture() def td_small_MRIO(): """Small MRIO with three sectors and two regions. From 3c31ba78ac7f093891c1636034c0b30922a8276b Mon Sep 17 00:00:00 2001 From: Kajwan Date: Tue, 15 Jul 2025 18:51:39 +0200 Subject: [PATCH 05/18] Implemented HEM in mriosystem - not tested yet --- pymrio/core/mriosystem.py | 67 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/pymrio/core/mriosystem.py b/pymrio/core/mriosystem.py index 4929dc2..378b68a 100644 --- a/pymrio/core/mriosystem.py +++ b/pymrio/core/mriosystem.py @@ -48,6 +48,7 @@ calc_Z, recalc_M, ) +from pymrio.tools.iohem import HEM from pymrio.tools.iometadata import MRIOMetaData # internal functions @@ -3356,6 +3357,72 @@ def extension_concate(self, new_extension_name): """ return extension_concate(*list(self.get_extensions(data=True)), new_extension_name=new_extension_name) + + def apply_HEM( + self, + regions, + sectors, + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=True, + save_path="./test_extraction", + calculate_impacts=True, + impact_account = "all", + save_impacts=True, + save_core_IO=True, + save_details=True, + return_results=False + ): + # First, make sure that all necessary matrices are available in IOSystem. + if (self.x is None) or (self.Y is None) or (self.A is None): + self.calc_system() + elif (downstream_allocation_matrix == "L12") and (self.L is None): + self.L = self.calc_L(self.A) + + HEM_object = HEM(IOSystem=None, save_path=save_path) + + HEM_object.make_extraction( + regions=regions, + sectors=sectors, + extraction_type=extraction_type, + multipliers=multipliers, + downstream_allocation_matrix=downstream_allocation_matrix + ) + + if save_extraction: + HEM_object.save_extraction(save_core_IO=save_core_IO, save_details=save_details) + + HEM_results = [] + + if calculate_impacts: + if impact_account == "all": + for impact in self.get_extensions(): + impact_extension = getattr(self, impact) + + if impact_extension.S is None: + impact_extension.S = calc_S(impact_extension.F, self.x) + + HEM_object.calculate_impacts(intensities=impact_extension.S) + if save_impacts: + HEM_object.save_impacts(impact_name=impact) + else: + HEM_results.append(HEM_object) + else: + impact_extension = getattr(self, impact_account) + if impact_extension.S is None: + impact_extension.S = calc_S(impact_extension.F, self.x) + + HEM_object.calculate_impacts(intensities=impact_extension.S) + if save_impacts: + HEM_object.save_impacts(impact_name=impact_account) + if return_results: + HEM_results.append(HEM_object) + return HEM_results + elif return_results: + HEM_results.append(HEM_object) + return HEM_results + def extension_characterize( From 37792387bc25e6c4eed1fa0c5ac7059cf1b47e7c Mon Sep 17 00:00:00 2001 From: Kajwan Date: Wed, 16 Jul 2025 10:58:04 +0200 Subject: [PATCH 06/18] minor changes - to be tested. --- pymrio/core/mriosystem.py | 20 +++++++++++++++----- pymrio/tools/iohem.py | 15 ++++++++------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/pymrio/core/mriosystem.py b/pymrio/core/mriosystem.py index 378b68a..b5adbd8 100644 --- a/pymrio/core/mriosystem.py +++ b/pymrio/core/mriosystem.py @@ -3360,20 +3360,26 @@ def extension_concate(self, new_extension_name): def apply_HEM( self, - regions, - sectors, + regions=None, + sectors=None, extraction_type="1.2", multipliers=True, downstream_allocation_matrix="A12", save_extraction=True, save_path="./test_extraction", calculate_impacts=True, - impact_account = "all", + impact_account="all", + specific_impact=None, save_impacts=True, save_core_IO=True, save_details=True, - return_results=False + return_results=False, ): + # TODO: Option to whether or not add results as an attribute in PyMRIO object. + + if (regions is None) & (sectors is None): + raise ValueError("At least one of regions or sectors must be specified.") + # First, make sure that all necessary matrices are available in IOSystem. if (self.x is None) or (self.Y is None) or (self.A is None): self.calc_system() @@ -3403,7 +3409,11 @@ def apply_HEM( if impact_extension.S is None: impact_extension.S = calc_S(impact_extension.F, self.x) - HEM_object.calculate_impacts(intensities=impact_extension.S) + if specific_impact is None: + HEM_object.calculate_impacts(intensities=impact_extension.S) + else: + HEM_object.calculate_impacts(intensities=impact_extension.S[impact_extension.S.index.isin(specific_impact)]) + if save_impacts: HEM_object.save_impacts(impact_name=impact) else: diff --git a/pymrio/tools/iohem.py b/pymrio/tools/iohem.py index ca53642..3453097 100644 --- a/pymrio/tools/iohem.py +++ b/pymrio/tools/iohem.py @@ -294,13 +294,13 @@ def save_extraction(self, save_path=None, save_core_IO=False, save_details=False # Makes subfolders for individual regions and/or sectors, if it is clearly that a single region and/or sector has been extracted. # Will make sure that things are not overwritten, if multiple regions and/or sectors are extracted in a loop. - if (len(t1.extraction_regions) == 1) and (len(t1.extraction_sectors) == 1): + if (len(self.extraction_regions) == 1) and (len(self.extraction_sectors) == 1): extraction_save_path = save_path / f"{self.extraction_regions[0]}_{self.extraction_sectors[0]}" - elif (len(t1.extraction_regions) == 1): + elif (len(self.extraction_regions) == 1): extraction_save_path = save_path / f"{self.extraction_regions[0]}" - elif (len(t1.extraction_sectors) == 1): + elif (len(self.extraction_sectors) == 1): extraction_save_path = save_path / f"{self.extraction_sectors[0]}" else: @@ -339,7 +339,7 @@ def save_extraction(self, save_path=None, save_core_IO=False, save_details=False json.dump(self.meta, json_file, indent=4) - def save_impacts(self, impact_account=None): + def save_impacts(self, impact_account=None, specific_impact=None): """ Save the impacts of the hypothetical extraction to the specified path. @@ -356,9 +356,10 @@ def save_impacts(self, impact_account=None): If no save path is provided. """ - - if impact_account is None: - save_path = Path(self.extraction_save_path) / "impacts" + if (impact_account is None) & (specific_impact is None): + save_path = Path(self.extraction_save_path) / "impacts" + elif (impact_account is None) & (specific_impact is not None): + save_path = Path(self.extraction_save_path) / "impacts" / specific_impact else: save_path = Path(self.extraction_save_path) / "impacts" / impact_account From 5b3ccbf42aef9bd7e947f66683e265c3bdc5aa19 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Wed, 16 Jul 2025 15:10:26 +0200 Subject: [PATCH 07/18] bug fix and formatting --- pymrio/core/mriosystem.py | 74 +++++------ pymrio/tools/iohem.py | 253 ++++++++++++++++++++++---------------- 2 files changed, 185 insertions(+), 142 deletions(-) diff --git a/pymrio/core/mriosystem.py b/pymrio/core/mriosystem.py index b5adbd8..74a23d1 100644 --- a/pymrio/core/mriosystem.py +++ b/pymrio/core/mriosystem.py @@ -30,6 +30,7 @@ MISSING_AGG_ENTRY, STORAGE_FORMAT, ) +from pymrio.tools.iohem import HEM from pymrio.tools.iomath import ( calc_A, calc_accounts, @@ -48,7 +49,6 @@ calc_Z, recalc_M, ) -from pymrio.tools.iohem import HEM from pymrio.tools.iometadata import MRIOMetaData # internal functions @@ -3357,24 +3357,25 @@ def extension_concate(self, new_extension_name): """ return extension_concate(*list(self.get_extensions(data=True)), new_extension_name=new_extension_name) - + def apply_HEM( - self, - regions=None, - sectors=None, - extraction_type="1.2", - multipliers=True, - downstream_allocation_matrix="A12", - save_extraction=True, - save_path="./test_extraction", - calculate_impacts=True, - impact_account="all", - specific_impact=None, - save_impacts=True, - save_core_IO=True, - save_details=True, - return_results=False, - ): + self, + regions=None, + sectors=None, + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=True, + save_path="./test_extraction", + calculate_impacts=True, + impact_account="all", + specific_impact=None, # If specific impact is provided, details on other sectors are kept. + save_impacts=True, + save_core_IO=True, + save_details=True, + return_results=False, + ): + """Apply the HEM method to the IOSystem.""" # TODO: Option to whether or not add results as an attribute in PyMRIO object. if (regions is None) & (sectors is None): @@ -3385,37 +3386,35 @@ def apply_HEM( self.calc_system() elif (downstream_allocation_matrix == "L12") and (self.L is None): self.L = self.calc_L(self.A) - - HEM_object = HEM(IOSystem=None, save_path=save_path) + + HEM_object = HEM(IOSystem=self, save_path=save_path) HEM_object.make_extraction( - regions=regions, - sectors=sectors, - extraction_type=extraction_type, - multipliers=multipliers, - downstream_allocation_matrix=downstream_allocation_matrix + regions=regions, + sectors=sectors, + extraction_type=extraction_type, + multipliers=multipliers, + downstream_allocation_matrix=downstream_allocation_matrix, ) if save_extraction: HEM_object.save_extraction(save_core_IO=save_core_IO, save_details=save_details) - + HEM_results = [] if calculate_impacts: if impact_account == "all": + if specific_impact is not None: + raise ValueError("If specific_impact is given, impact_account must not be 'all'.") for impact in self.get_extensions(): impact_extension = getattr(self, impact) if impact_extension.S is None: impact_extension.S = calc_S(impact_extension.F, self.x) - - if specific_impact is None: - HEM_object.calculate_impacts(intensities=impact_extension.S) - else: - HEM_object.calculate_impacts(intensities=impact_extension.S[impact_extension.S.index.isin(specific_impact)]) - + + HEM_object.calculate_impacts(intensities=impact_extension.S) if save_impacts: - HEM_object.save_impacts(impact_name=impact) + HEM_object.save_impacts(impact_account=impact) else: HEM_results.append(HEM_object) else: @@ -3423,9 +3422,13 @@ def apply_HEM( if impact_extension.S is None: impact_extension.S = calc_S(impact_extension.F, self.x) - HEM_object.calculate_impacts(intensities=impact_extension.S) + if specific_impact is None: + HEM_object.calculate_impacts(intensities=impact_extension.S) + else: + HEM_object.calculate_impacts(intensities=impact_extension.S.loc[specific_impact, :]) + if save_impacts: - HEM_object.save_impacts(impact_name=impact_account) + HEM_object.save_impacts(impact_account=impact_account, specific_impact=specific_impact) if return_results: HEM_results.append(HEM_object) return HEM_results @@ -3434,7 +3437,6 @@ def apply_HEM( return HEM_results - def extension_characterize( *extensions, factors, diff --git a/pymrio/tools/iohem.py b/pymrio/tools/iohem.py index 3453097..8ff752b 100644 --- a/pymrio/tools/iohem.py +++ b/pymrio/tools/iohem.py @@ -1,20 +1,21 @@ -""" +"""Object for applying the Hypothetical Extraction Method (HEM).""" -""" -# %% +# %% import json from pathlib import Path import numpy as np import pandas as pd + import pymrio.tools.iomath as iomath + # %% -class HEM(): - """Class for Hypothetical Extraction Method (HEM) results. - """ - def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, save_path=None) -> None: - """ +class HEM: + """Class for Hypothetical Extraction Method (HEM) results.""" + + def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, save_path=None) -> None: + """ Initialize the HEM class with the IOSystem or core IO data. Parameters @@ -26,7 +27,8 @@ def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, sav A : pd.DataFrame, optional Input-output coefficients matrix. If not provided, the one from the IOSystem will be used. x : pd.DataFrame, optional - Total output vector as a single column matrix, named indout. If not provided, the one from the IOSystem will be used. + Total output vector as a single column matrix, named indout. + If not provided, the one from the IOSystem will be used. L : pd.DataFrame, optional Leontief inverse matrix. If not provided, it will be calculated from A. meta : dict, optional @@ -48,24 +50,34 @@ def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, sav self.x = IOSystem.x self.L = IOSystem.L self.meta = { - "IO System meta": IOSystem.meta, + "IO System meta": repr(IOSystem.meta), } - - self.save_path = Path(save_path or "./") + self.save_path = Path(save_path or "./") - def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=True, Y=None, A=None, x=None, downstream_allocation_matrix="A12"): + def make_extraction( + self, + regions: list = None, + sectors: list = None, + extraction_type="1.2", + multipliers=True, + Y=None, + A=None, + x=None, + downstream_allocation_matrix="A12", + ): """ Create a hypothetical extraction of the IOSystem based on the specified regions and sectors. - + Parameters ---------- regions : list List of regions to be extracted. sectors : list - List of sectors to be extracted. + List of sectors to be extracted. extraction_type : str, optional - Type of extraction to be performed. Defaults to "1.2". See "https://doi.org/10.1111/jiec.13522" for more information. + Type of extraction to be performed. Defaults to "1.2". + See "https://doi.org/10.1111/jiec.13522" for more information. multipliers : bool, optional Whether to calculate multipliers for the extracted sectors. Defaults to True. Y : pd.DataFrame, optional @@ -73,10 +85,12 @@ def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=T A : pd.DataFrame, optional Input-output coefficients matrix. If not provided, the one from the IOSystem will be used. x : pd.DataFrame, optional - Total output vector as a single column matrix, named indout. If not provided, the one from the IOSystem will be used. + Total output vector as a single column matrix, named indout. + If not provided, the one from the IOSystem will be used. downstream_allocation_matrix : str, optional The matrix used to allocate downstream production. Defaults to "A12". Can be either "A12" or "L12". - Returns + + Returns ------- None @@ -86,11 +100,12 @@ def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=T It calculates the hypothetical extraction of the IOSystem based on the specified regions and sectors. The extraction type must be one of the following: "1.2", "2a.2", "3a.2". The method also calculates the downstream allocation matrix based on the specified type. - The method raises a ValueError if either regions or sectors are not specified, or if the extraction type is not implemented. + The method raises a ValueError if either regions or sectors are not specified, + or if the extraction type is not implemented. The method raises a NotImplementedError if the extraction type is not one of the implemented ones. The method raises a TypeError if the intensities are not a pandas Series or DataFrame. The method raises a ValueError if the save path is not provided. - + Notes ----- The definition of downstream and upstream production changes, if other extraction types are implemented. @@ -99,11 +114,13 @@ def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=T """ - self.meta.update({ - "extraction_type": extraction_type, - "downstream_allocation_matrix": downstream_allocation_matrix, - "multipliers": multipliers, - }) + self.meta.update( + { + "extraction_type": extraction_type, + "downstream_allocation_matrix": downstream_allocation_matrix, + "multipliers": multipliers, + } + ) self.extraction_regions = regions self.extraction_sectors = sectors @@ -115,29 +132,24 @@ def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=T if x is None: x = self.x - if (regions != [None]) & (sectors != [None]): - index_extraction = pd.MultiIndex.from_product(iterables=[ - regions, sectors - ]) - - elif (regions == [None]) & (sectors != [None]): - index_extraction = pd.MultiIndex.from_product(iterables=[ - self.A.index.get_level_values(0).unique(), sectors - ]) - elif (regions != [None]) & (sectors == [None]): - index_extraction = pd.MultiIndex.from_product(iterables=[ - regions, self.A.index.get_level_values(1).unique() - ]) - else: - raise ValueError( - "Either regions or sectors must be specified, or both." + if (regions is not None) & (sectors is not None): + index_extraction = pd.MultiIndex.from_product(iterables=[regions, sectors]) + + elif (regions is None) & (sectors is not None): + index_extraction = pd.MultiIndex.from_product( + iterables=[self.A.index.get_level_values(0).unique(), sectors] + ) + elif (regions is not None) & (sectors is None): + index_extraction = pd.MultiIndex.from_product( + iterables=[regions, self.A.index.get_level_values(1).unique()] ) - + else: + raise ValueError("Either regions or sectors must be specified, or both.") + index_other = self.A.index.drop(index_extraction) self.index_extraction = index_extraction self.index_other = index_other - if extraction_type in ["1.2", "2a.2", "3a.2"]: # TODO: Turn different extraction types into functions that this method can call. # Extracting blocks @@ -158,18 +170,12 @@ def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=T self.L22 = iomath.calc_L(A22) self.H = pd.DataFrame( - data=np.linalg.inv( - I11 - A11 - A12.dot(self.L22.dot(A21)) - ), - index=A11.index, - columns=A11.columns + data=np.linalg.inv(I11 - A11 - A12.dot(self.L22.dot(A21))), index=A11.index, columns=A11.columns ) # Calculating different accounts self.production_downstream_all = pd.DataFrame( - data=np.diag(v=self.L22.dot(Y2.sum(axis=1))), - index=self.L22.index, - columns=self.L22.index + data=np.diag(v=self.L22.dot(Y2.sum(axis=1))), index=self.L22.index, columns=self.L22.index ) # Allocating downstream production @@ -181,45 +187,48 @@ def make_extraction(self, regions, sectors, extraction_type="1.2", multipliers=T self.L = iomath.calc_L(A) L12 = self.L.loc[index_extraction, index_other] - L12_normalised = L12.div(L12.sum(axis=0), axis=1) + L12_normalised = L12.div(L12.sum(axis=0), axis=1) self.downstream_allocation_matrix = L12_normalised else: raise ValueError("Downstream allocation matrix must be either 'A12' or 'L12'.") - + self.production_downstream = self.downstream_allocation_matrix.dot(self.production_downstream_all) - - self.demand_final_diagonal = pd.DataFrame( - data=np.diag(v=Y1.sum(axis=1)), - index=Y1.index, - columns=Y1.index - ) + + self.demand_final_diagonal = pd.DataFrame(data=np.diag(v=Y1.sum(axis=1)), index=Y1.index, columns=Y1.index) self.demand_intermediate_diagonal = pd.DataFrame( data=np.diag(v=self.production_downstream.sum(axis=1)), index=self.production_downstream.index, - columns=self.production_downstream.index + columns=self.production_downstream.index, ) - self.production = self.H.dot(other=(self.demand_final_diagonal+self.demand_intermediate_diagonal)) + self.production = self.H.dot(other=(self.demand_final_diagonal + self.demand_intermediate_diagonal)) self.production_upstream_first_tier = A21.dot(self.production) self.production_upstream = self.L22.dot(self.production_upstream_first_tier) if multipliers: self.M_production = self.production.div(x.loc[index_extraction, "indout"], axis=0).replace(np.nan, 0) - self.M_production_upstream_first_tier = self.production_upstream_first_tier.div(x.loc[index_extraction, "indout"], axis=1).replace(np.nan, 0) - self.M_upstream = self.production_upstream.div(x.loc[index_extraction, "indout"], axis=1).replace(np.nan, 0) - self.M_downstream = self.production_downstream.div(x.loc[index_extraction, "indout"], axis=0).replace(np.nan, 0) + self.M_production_upstream_first_tier = self.production_upstream_first_tier.div( + x.loc[index_extraction, "indout"], axis=1 + ).replace(np.nan, 0) + self.M_upstream = self.production_upstream.div(x.loc[index_extraction, "indout"], axis=1).replace( + np.nan, 0 + ) + self.M_downstream = self.production_downstream.div(x.loc[index_extraction, "indout"], axis=0).replace( + np.nan, 0 + ) else: raise NotImplementedError( "Only extraction types '1.2', '2a.2', '3a.2' are implemented at the moment.\n" - + "Please implement the extraction type you need or use one of the implemented ones.\n" - + "For more information see Table 4 in https://doi.org/10.1111/jiec.13522." - ) + "Please implement the extraction type you need or use one of the implemented ones.\n" + "For more information see Table 4 in https://doi.org/10.1111/jiec.13522." + ) + return self def calculate_impacts(self, intensities=None): """ Calculate the impacts of the hypothetical extraction based on the provided intensities. - + Parameters ---------- intensities : pd.Series or pd.DataFrame @@ -235,39 +244,48 @@ def calculate_impacts(self, intensities=None): """ # Keep details, if intensities are a Series if type(intensities) is pd.Series: + self.intensities = [intensities.name] self.impact_production = self.production.mul(intensities.loc[self.index_extraction], axis=0) - self.impact_upstream_first_tier = self.production_upstream_first_tier.mul(intensities.loc[self.index_other], axis=0) + self.impact_upstream_first_tier = self.production_upstream_first_tier.mul( + intensities.loc[self.index_other], axis=0 + ) self.impact_upstream = self.production_upstream.mul(intensities.loc[self.index_other], axis=0) self.impact_downstream = self.production_downstream.mul(intensities.loc[self.index_other], axis=1) if self.meta["multipliers"]: self.M_impact_production = self.M_production.mul(intensities.loc[self.index_extraction], axis=0) - self.M_impact_upstream_first_tier = self.M_production_upstream_first_tier.mul(intensities.loc[self.index_other], axis=0) + self.M_impact_upstream_first_tier = self.M_production_upstream_first_tier.mul( + intensities.loc[self.index_other], axis=0 + ) self.M_impact_upstream = self.M_upstream.mul(intensities.loc[self.index_other], axis=0) self.M_impact_downstream = self.M_downstream.mul(intensities.loc[self.index_other], axis=1) - self.intensities = [intensities.name] # Drop details, if intensities are a DataFrame elif type(intensities) is pd.DataFrame: + self.intensities = intensities.index.to_list() self.impact_production = intensities.loc[:, self.index_extraction].dot(self.production) - self.impact_upstream_first_tier = intensities.loc[:, self.index_other].dot(self.production_upstream_first_tier) + self.impact_upstream_first_tier = intensities.loc[:, self.index_other].dot( + self.production_upstream_first_tier + ) self.impact_upstream = intensities.loc[:, self.index_other].dot(self.production_upstream) self.impact_downstream = self.production_downstream.dot(intensities.loc[:, self.index_other].T).T + if self.meta["multipliers"]: self.M_impact_production = intensities.loc[:, self.index_extraction].dot(self.M_production) - self.M_impact_upstream_first_tier = intensities.loc[:, self.index_other].dot(self.M_production_upstream_first_tier) + self.M_impact_upstream_first_tier = intensities.loc[:, self.index_other].dot( + self.M_production_upstream_first_tier + ) self.M_impact_upstream = intensities.loc[:, self.index_other].dot(self.M_upstream) self.M_impact_downstream = self.M_downstream.dot(intensities.loc[:, self.index_other].T).T - self.intensities = intensities.index.to_list() + else: - raise TypeError( - "Intensities must be either a pandas Series or a pandas DataFrame." - ) - + raise TypeError("Intensities must be either a pandas Series or a pandas DataFrame.") + return self + def save_extraction(self, save_path=None, save_core_IO=False, save_details=False): - """ + """ Save the extraction results to the specified path. - + Parameters ---------- save_path : str or Path, optional @@ -275,13 +293,13 @@ def save_extraction(self, save_path=None, save_core_IO=False, save_details=False save_core_IO : bool, optional Whether to save the core IO data (A and Y). Defaults to False. save_details : bool, optional - Whether to save additional details like all downstream production, final demand diagonal, and intermediate demand diagonal. Defaults to False. - + Whether to save additional details like all downstream production, + final demand diagonal, and intermediate demand diagonal. Defaults to False. + Raises ------ ValueError If no save path is provided. - """ if save_path is None: save_path = self.save_path @@ -292,37 +310,58 @@ def save_extraction(self, save_path=None, save_core_IO=False, save_details=False save_path = Path(save_path) self.save_path = save_path - # Makes subfolders for individual regions and/or sectors, if it is clearly that a single region and/or sector has been extracted. + # Makes subfolders for individual regions and/or sectors, + # if it is clearly that a single region and/or sector has been extracted. # Will make sure that things are not overwritten, if multiple regions and/or sectors are extracted in a loop. if (len(self.extraction_regions) == 1) and (len(self.extraction_sectors) == 1): - extraction_save_path = save_path / f"{self.extraction_regions[0]}_{self.extraction_sectors[0]}" - - elif (len(self.extraction_regions) == 1): + extraction_save_path = save_path / f"{self.extraction_regions[0]}_{self.extraction_sectors[0]}" + + elif len(self.extraction_regions) == 1: extraction_save_path = save_path / f"{self.extraction_regions[0]}" - - elif (len(self.extraction_sectors) == 1): + + elif len(self.extraction_sectors) == 1: extraction_save_path = save_path / f"{self.extraction_sectors[0]}" - + else: - extraction_save_path = save_path + extraction_save_path = save_path self.extraction_save_path = extraction_save_path extraction_save_path.mkdir(parents=True, exist_ok=True) - - self.index_extraction.to_frame().to_csv(extraction_save_path / "index_extraction.txt", sep="\t", index=False, header=False) - self.index_other.to_frame().to_csv(extraction_save_path / "index_other.txt", sep="\t", index=False, header=False) + + ( + self.index_extraction.to_frame().to_csv( + extraction_save_path / "index_extraction.txt", sep="\t", index=False, header=False + ) + ) + ( + self.index_other.to_frame().to_csv( + extraction_save_path / "index_other.txt", sep="\t", index=False, header=False + ) + ) self.L22.to_csv(extraction_save_path / "L22.txt", sep="\t") self.H.to_csv(extraction_save_path / "H.txt", sep="\t") - self.downstream_allocation_matrix.to_csv(extraction_save_path / f"{self.meta["downstream_allocation_matrix"]}.txt", sep="\t") - + ( + self.downstream_allocation_matrix.to_csv( + extraction_save_path / f"{self.meta['downstream_allocation_matrix']}.txt", sep="\t" + ) + ) + self.production.to_csv(extraction_save_path / "production.txt", sep="\t") - self.production_upstream_first_tier.to_csv(extraction_save_path / "production_upstream_first_tier.txt", sep="\t") + ( + self.production_upstream_first_tier.to_csv( + extraction_save_path / "production_upstream_first_tier.txt", sep="\t" + ) + ) self.production_upstream.to_csv(extraction_save_path / "production_upstream.txt", sep="\t") self.production_downstream.to_csv(extraction_save_path / "production_downstream.txt", sep="\t") if self.meta["multipliers"]: self.M_production.to_csv(extraction_save_path / "M_production.txt", sep="\t") - self.M_production_upstream_first_tier.to_csv(extraction_save_path / "M_production_upstream_first_tier.txt", sep="\t") + ( + self.M_production_upstream_first_tier.to_csv( + extraction_save_path / "M_production_upstream_first_tier.txt", sep="\t" + ) + ) self.M_upstream.to_csv(extraction_save_path / "M_upstream.txt", sep="\t") self.M_downstream.to_csv(extraction_save_path / "M_downstream.txt", sep="\t") @@ -333,23 +372,26 @@ def save_extraction(self, save_path=None, save_core_IO=False, save_details=False if save_details: self.production_downstream_all.to_csv(extraction_save_path / "production_downstream_all.txt", sep="\t") self.demand_final_diagonal.to_csv(extraction_save_path / "demand_final_diagonal.txt", sep="\t") - self.demand_intermediate_diagonal.to_csv(extraction_save_path / "demand_intermediate_diagonal.txt", sep="\t") + ( + self.demand_intermediate_diagonal.to_csv( + extraction_save_path / "demand_intermediate_diagonal.txt", sep="\t" + ) + ) - with open(extraction_save_path / "meta.json", 'w') as json_file: + with open(extraction_save_path / "meta.json", "w") as json_file: json.dump(self.meta, json_file, indent=4) - def save_impacts(self, impact_account=None, specific_impact=None): - """ + """ Save the impacts of the hypothetical extraction to the specified path. - + Parameters ---------- impact_account : str, optional Account name for the impacts. If not provided, the impacts will be saved in a general "impacts" folder. save_path : str or Path, optional Path to save the impacts. If not provided, the save path from the IOSystem will be used. - + Raises ------ ValueError @@ -362,7 +404,7 @@ def save_impacts(self, impact_account=None, specific_impact=None): save_path = Path(self.extraction_save_path) / "impacts" / specific_impact else: save_path = Path(self.extraction_save_path) / "impacts" / impact_account - + save_path.mkdir(parents=True, exist_ok=True) pd.DataFrame(self.intensities).to_csv(save_path / "extensions.txt", sep="\t", index=False, header=False) self.impact_production.to_csv(save_path / "impact_production.txt", sep="\t") @@ -378,4 +420,3 @@ def save_impacts(self, impact_account=None, specific_impact=None): # %% - From 06785fd1cfe1a877bd60ee6e7f5022d1d9a1323a Mon Sep 17 00:00:00 2001 From: Kajwan Date: Wed, 16 Jul 2025 15:11:12 +0200 Subject: [PATCH 08/18] extensive testing works --- tests/test_hem.py | 705 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 493 insertions(+), 212 deletions(-) diff --git a/tests/test_hem.py b/tests/test_hem.py index 3fd5c82..332b2d8 100644 --- a/tests/test_hem.py +++ b/tests/test_hem.py @@ -1,22 +1,22 @@ """Test cases for HEM calculations.""" - import os +import shutil import sys -import numpy as np -import numpy.testing as npt import pandas as pd import pandas.testing as pdt import pytest +# the function which should be tested here +from pymrio.core.mriosystem import Extension, IOSystem +from pymrio.tools.iohem import HEM +from pymrio.tools.iomath import calc_S + TESTPATH = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(TESTPATH, "..")) -# the function which should be tested here -from pymrio.tools.iohem import HEM - @pytest.fixture() def td_small_MRIO(): """Small MRIO with three sectors and two regions. @@ -24,227 +24,508 @@ def td_small_MRIO(): The testdata here just consists of pandas DataFrames, the functionality with numpy arrays gets tested with td_IO_Data_Miller. """ + _sectors = ["sector1", "sector2", "sector3"] + _regions = ["reg1", "reg2"] + _Z_multiindex = pd.MultiIndex.from_product([_regions, _sectors], names=["region", "sector"]) - class IO_Data: - _sectors = ["sector1", "sector2", "sector3"] - _regions = ["reg1", "reg2"] - _Z_multiindex = pd.MultiIndex.from_product([_regions, _sectors], names=["region", "sector"]) - - Z = pd.DataFrame( - data=[ - [10, 5, 1, 6, 5, 7], - [0, 2, 0, 0, 5, 3], - [10, 3, 20, 4, 2, 0], - [5, 0, 0, 1, 10, 9], - [0, 10, 1, 0, 20, 1], - [5, 0, 0, 1, 10, 10], - ], - index=_Z_multiindex, - columns=_Z_multiindex, - dtype=("float64"), - ) + Z = pd.DataFrame( + data=[ + [10, 5, 1, 6, 5, 7], + [0, 2, 0, 0, 5, 3], + [10, 3, 20, 4, 2, 0], + [5, 0, 0, 1, 10, 9], + [0, 10, 1, 0, 20, 1], + [5, 0, 0, 1, 10, 10], + ], + index=_Z_multiindex, + columns=_Z_multiindex, + dtype=("float64"), + ) - _categories = ["final demand"] - _Y_multiindex = pd.MultiIndex.from_product([_regions, _categories], names=["region", "category"]) - Y = pd.DataFrame( - data=[[14, 3], [2.5, 2.5], [13, 6], [5, 20], [10, 10], [3, 10]], - index=_Z_multiindex, - columns=_Y_multiindex, - dtype=("float64"), - ) + _categories = ["final demand"] + _Y_multiindex = pd.MultiIndex.from_product([_regions, _categories], names=["region", "category"]) + Y = pd.DataFrame( + data=[[14, 3], [2.5, 2.5], [13, 6], [5, 20], [10, 10], [3, 10]], + index=_Z_multiindex, + columns=_Y_multiindex, + dtype=("float64"), + ) - F = pd.DataFrame( - data=[[20, 1, 42, 4, 20, 5], [5, 4, 11, 8, 2, 10]], - index=["ext_type_1", "ext_type_2"], - columns=_Z_multiindex, - dtype=("float64"), - ) + A = pd.DataFrame( + data=[ + [ + 0.19607843137254902, + 0.3333333333333333, + 0.017241379310344827, + 0.12, + 0.09615384615384616, + 0.1794871794871795, + ], # noqa + [ + 0.0, + 0.13333333333333333, + 0.0, + 0.0, + 0.09615384615384616, + 0.07692307692307693, + ], # noqa + [ + 0.19607843137254902, + 0.2, + 0.3448275862068966, + 0.08, + 0.038461538461538464, + 0.0, + ], # noqa + [ + 0.09803921568627451, + 0.0, + 0.0, + 0.02, + 0.19230769230769232, + 0.23076923076923075, + ], # noqa + [ + 0.0, + 0.6666666666666666, + 0.017241379310344827, + 0.0, + 0.38461538461538464, + 0.02564102564102564, + ], # noqa + [ + 0.09803921568627451, + 0.0, + 0.0, + 0.02, + 0.19230769230769232, + 0.2564102564102564, + ], # noqa + ], + index=_Z_multiindex, + columns=_Z_multiindex, + ) - F_Y = pd.DataFrame( - data=[[50, 10], [100, 20]], - index=["ext_type_1", "ext_type_2"], - columns=_Y_multiindex, - dtype=("float64"), - ) + L = pd.DataFrame( + data=[ + [ + 1.3387146304736708, + 0.9689762471208287, + 0.05036622549592462, + 0.17820960407435948, + 0.5752019383714646, + 0.4985179148178926, + ], # noqa + [ + 0.02200779585580331, + 1.3716472861392823, + 0.0076800357678581885, + 0.006557415453762468, + 0.2698335633228079, + 0.15854643902810828, + ], # noqa + [ + 0.43290422861412026, + 0.8627066565439678, + 1.5492942759220427, + 0.18491657196329184, + 0.44027825642348534, + 0.26630955082840885, + ], # noqa + [ + 0.18799498787612925, + 0.5244084722329316, + 0.020254008037620782, + 1.0542007368783255, + 0.5816573175534603, + 0.44685014763069275, + ], # noqa + [ + 0.04400982046095892, + 1.5325472495862535, + 0.05259311578831879, + 0.014602513642445088, + 1.9545285794951548, + 0.2410917825607805, + ], # noqa + [ + 0.19294222439918532, + 0.5382086951864299, + 0.020787008249137116, + 0.05562707205933412, + 0.596964089068025, + 1.4849251515157111, + ], # noqa + ], + index=_Z_multiindex, + columns=_Z_multiindex, + ) - S_Y = pd.DataFrame( - data=[ - [1.0526315789473684, 0.1941747572815534], - [2.1052631578947367, 0.3883495145631068], - ], - index=["ext_type_1", "ext_type_2"], - columns=_Y_multiindex, - dtype=("float64"), - ) + x = pd.DataFrame( + data=[ + [51], + [15], + [58], + [50], + [52], + [39], + ], + columns=["indout"], + index=_Z_multiindex, + dtype=("float64"), + ) - A = pd.DataFrame( - data=[ - [ - 0.19607843137254902, - 0.3333333333333333, - 0.017241379310344827, - 0.12, - 0.09615384615384616, - 0.1794871794871795, - ], # noqa - [ - 0.0, - 0.13333333333333333, - 0.0, - 0.0, - 0.09615384615384616, - 0.07692307692307693, - ], # noqa - [ - 0.19607843137254902, - 0.2, - 0.3448275862068966, - 0.08, - 0.038461538461538464, - 0.0, - ], # noqa - [ - 0.09803921568627451, - 0.0, - 0.0, - 0.02, - 0.19230769230769232, - 0.23076923076923075, - ], # noqa - [ - 0.0, - 0.6666666666666666, - 0.017241379310344827, - 0.0, - 0.38461538461538464, - 0.02564102564102564, - ], # noqa - [ - 0.09803921568627451, - 0.0, - 0.0, - 0.02, - 0.19230769230769232, - 0.2564102564102564, - ], # noqa - ], - index=_Z_multiindex, - columns=_Z_multiindex, - ) + IO_object = IOSystem(Z=Z, A=A, Y=Y, x=x, L=L) - L = pd.DataFrame( - data=[ - [ - 1.3387146304736708, - 0.9689762471208287, - 0.05036622549592462, - 0.17820960407435948, - 0.5752019383714646, - 0.4985179148178926, - ], # noqa - [ - 0.02200779585580331, - 1.3716472861392823, - 0.0076800357678581885, - 0.006557415453762468, - 0.2698335633228079, - 0.15854643902810828, - ], # noqa - [ - 0.43290422861412026, - 0.8627066565439678, - 1.5492942759220427, - 0.18491657196329184, - 0.44027825642348534, - 0.26630955082840885, - ], # noqa - [ - 0.18799498787612925, - 0.5244084722329316, - 0.020254008037620782, - 1.0542007368783255, - 0.5816573175534603, - 0.44685014763069275, - ], # noqa - [ - 0.04400982046095892, - 1.5325472495862535, - 0.05259311578831879, - 0.014602513642445088, - 1.9545285794951548, - 0.2410917825607805, - ], # noqa - [ - 0.19294222439918532, - 0.5382086951864299, - 0.020787008249137116, - 0.05562707205933412, - 0.596964089068025, - 1.4849251515157111, - ], # noqa - ], - index=_Z_multiindex, - columns=_Z_multiindex, - ) + F_one = pd.DataFrame( + data=[[20, 1, 42, 4, 20, 5], [5, 4, 11, 8, 2, 10]], + index=["ext_type_11", "ext_type_12"], + columns=_Z_multiindex, + dtype=("float64"), + ) + F_Y_one = pd.DataFrame( + data=[[50, 10], [100, 20]], + index=["ext_type_11", "ext_type_12"], + columns=_Y_multiindex, + dtype=("float64"), + ) - x = pd.DataFrame( - data=[ - [51], - [15], - [58], - [50], - [52], - [39], - ], - columns=["indout"], - index=_Z_multiindex, - dtype=("float64"), - ) - S = pd.DataFrame( - data=[ - [ - 0.39215686274509803, - 0.06666666666666667, - 0.7241379310344828, - 0.08, - 0.38461538461538464, - 0.1282051282051282, - ], # noqa - [ - 0.09803921568627451, - 0.26666666666666666, - 0.1896551724137931, - 0.16, - 0.038461538461538464, - 0.2564102564102564, - ], # noqa - ], - index=["ext_type_1", "ext_type_2"], - columns=_Z_multiindex, - ) + F_two = pd.DataFrame( + data=[[20, 1, 42, 4, 20, 5], [5, 4, 11, 8, 2, 10]], + index=["ext_type_21", "ext_type_22"], + columns=_Z_multiindex, + dtype=("float64"), + ) - return IO_Data + F_Y_two = pd.DataFrame( + data=[[50, 10], [100, 20]], + index=["ext_type_21", "ext_type_22"], + columns=_Y_multiindex, + dtype=("float64"), + ) + IO_object.extensions_one = Extension( + name="extensions_one", + F=F_one, + F_Y=F_Y_one, + ) + IO_object.extensions_two = Extension( + name="extensions_two", + F=F_two, + F_Y=F_Y_two, + ) -def test_hem_extraction(td_small_mrio, regions=["reg1"], sectors=["sector1", "sector2"]): + return IO_object + + +def test_hem_extraction(td_small_MRIO): """Test the extraction of HEM data from a small MRIO.""" - IO_Data = td_small_MRIO.A - HEM_object = HEM(IOSystem=None, Y=td_small_MRIO.Y, A=td_small_MRIO.A, x=td_small_MRIO.x, L=td_small_MRIO.L, meta=None, save_path=None) - HEM_object.make_extraction(regions=["reg1"], sectors=["sector1", "sector2"], extraction_type="1.2", multipliers=True) - pdt.assert_frame_equal( - left=IO_Data.x.loc[HEM_object.index_extraction, "indout"], - right=HEM_object.production.sum(axis=1) + HEM_object = HEM( + IOSystem=None, + Y=td_small_MRIO.Y, + A=td_small_MRIO.A, + x=td_small_MRIO.x, + L=td_small_MRIO.L, + meta=None, + save_path=None, + ) + HEM_object.make_extraction( + regions=["reg1"], sectors=["sector1", "sector2"], extraction_type="1.2", multipliers=True + ) + pdt.assert_series_equal( + left=td_small_MRIO.x.loc[HEM_object.index_extraction, "indout"], + right=HEM_object.production.sum(axis=1).rename("indout"), ) -def test_hem_extraction_impacts(td_small_mrio, regions=["reg1"], sectors=["sector1", "sector2"]): + +def test_hem_extraction_impacts(td_small_MRIO): """Test the extraction of HEM data from a small MRIO.""" - IO_Data = td_small_MRIO.A - HEM_object = HEM(IOSystem=None, Y=td_small_MRIO.Y, A=td_small_MRIO.A, x=td_small_MRIO.x, L=td_small_MRIO.L, meta=None, save_path=None) - HEM_object.make_extraction(regions=["reg1"], sectors=["sector1", "sector2"], extraction_type="1.2", multipliers=True) - HEM_object.calculate_impacts(IO_Data.S) + HEM_object = HEM( + IOSystem=None, + Y=td_small_MRIO.Y, + A=td_small_MRIO.A, + x=td_small_MRIO.x, + L=td_small_MRIO.L, + meta=None, + save_path=None, + ) + HEM_object.make_extraction( + regions=["reg1"], sectors=["sector1", "sector2"], extraction_type="1.2", multipliers=True + ) + intensities = calc_S(td_small_MRIO.extensions_one.F, td_small_MRIO.x) + HEM_object.calculate_impacts(intensities) + + pdt.assert_series_equal( + left=td_small_MRIO.extensions_one.F.loc[:, HEM_object.index_extraction].sum(axis=1), + right=HEM_object.impact_production.sum(axis=1), + ) + + +def test_hem_io_system_minimum(td_small_MRIO): + """Test the HEM calculation with minimum parameters.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=False, + impact_account="all", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_multipliers(td_small_MRIO): + """Test the HEM calculation with multipliers.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=True, + impact_account="all", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_impact(td_small_MRIO): + """Test the HEM calculation with impact calculation.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=True, + impact_account="all", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_impact_multipliers(td_small_MRIO): + """Test the HEM calculation with impact calculation and multipliers.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=True, + impact_account="all", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_impact_all_specific(td_small_MRIO): + """Test the HEM calculation with impact calculation and specific impact.""" + IO_object = td_small_MRIO + IO_object.calc_system() + with pytest.raises(ValueError, match="If specific_impact is given, impact_account must not be 'all'."): + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=True, + impact_account="all", + specific_impact="ext_type_11", + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_impact_account_specific(td_small_MRIO): + """Test the HEM calculation with impact account and specific impact.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=True, + impact_account="extensions_one", + specific_impact="ext_type_11", + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_impact_account(td_small_MRIO): + """Test the HEM calculation with impact account.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=True, + impact_account="extensions_one", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_L12_allocation(td_small_MRIO): + """Test the HEM calculation with L12 allocation.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="L12", + save_extraction=False, + save_path=None, + calculate_impacts=False, + impact_account="all", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_one_region(td_small_MRIO): + """Test the HEM calculation with one region.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=None, + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=False, + impact_account="all", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_one_sector(td_small_MRIO): + """Test the HEM calculation with one sector.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=None, + sectors=["sector1"], + extraction_type="1.2", + multipliers=False, + downstream_allocation_matrix="A12", + save_extraction=False, + save_path=None, + calculate_impacts=False, + impact_account="all", + specific_impact=None, + save_impacts=False, + save_core_IO=False, + save_details=False, + return_results=False, + ) + + +def test_hem_io_system_save_all(td_small_MRIO, save_path="./tests/hem_save_test_all", cleanup=True): + """Test the HEM calculation with all save options.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=True, + save_path=save_path, + calculate_impacts=True, + impact_account="all", + specific_impact=None, + save_impacts=True, + save_core_IO=True, + save_details=True, + return_results=False, + ) + if cleanup: + shutil.rmtree(save_path) + - pdt.assert_frame_equal( - left=IO_Data.F.loc[:,HEM_object.index_extraction].sum(axis=1), - right=HEM_object.impact_production.sum(axis=1) +def test_hem_io_system_save_specific(td_small_MRIO, save_path="./tests/hem_save_test_specific", cleanup=True): + """Test the HEM calculation with specific save options.""" + IO_object = td_small_MRIO + IO_object.calc_system() + IO_object.apply_HEM( + regions=["reg1"], + sectors=["sector1", "sector2"], + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=True, + save_path=save_path, + calculate_impacts=True, + impact_account="extensions_one", + specific_impact="ext_type_11", + save_impacts=True, + save_core_IO=True, + save_details=True, + return_results=False, ) + if cleanup: + shutil.rmtree(save_path) From 0f12d749acfeb2c4bc74db9f0801afa88f410989 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Wed, 16 Jul 2025 15:27:39 +0200 Subject: [PATCH 09/18] added more documentation --- pymrio/core/mriosystem.py | 66 ++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/pymrio/core/mriosystem.py b/pymrio/core/mriosystem.py index 74a23d1..9b8465f 100644 --- a/pymrio/core/mriosystem.py +++ b/pymrio/core/mriosystem.py @@ -3366,16 +3366,68 @@ def apply_HEM( multipliers=True, downstream_allocation_matrix="A12", save_extraction=True, - save_path="./test_extraction", + save_path="./HEM_results", calculate_impacts=True, - impact_account="all", + extension="all", specific_impact=None, # If specific impact is provided, details on other sectors are kept. save_impacts=True, save_core_IO=True, save_details=True, return_results=False, ): - """Apply the HEM method to the IOSystem.""" + """Apply a specific extraction using the HEM to the IOSystem. + + Parameters + ---------- + regions : list, optional + List of regions to consider in the HEM. If None, all regions are considered. + Default: None + sectors : list, optional + List of sectors to consider in the HEM. If None, all sectors are considered. + Default: None + extraction_type : str, optional + Type of extraction to apply. See "https://doi.org/10.1111/jiec.13522" for more information. + Default: "1.2" + multipliers : bool, optional + Whether to calculate HEM multipliers . Default: True + downstream_allocation_matrix : str, optional + The allocation matrix to use for downstream allocation. Can be "A12" or "L12". + Default: "A12" + save_extraction : bool, optional + Whether to save the extraction results to disk. Default: True + save_path : str, optional + Path to save the extraction results. Default: "./HEM_results" + calculate_impacts : bool, optional + Whether to calculate environmental impacts. Default: True + extension : str, optional + The extension to use for impact calculations. Can be "all" or a specific extension name. + Default: "all" + specific_impact : str, optional + If provided, only this specific impact will be calculated. + If None, all impacts in extensions are calculated. + Default: None + save_impacts : bool, optional + Whether to save the impact results to disk. Default: True + save_core_IO : bool, optional + Whether to save the core IO results along with the extraction. Default: True + save_details : bool, optional + Whether to save detailed results of the extraction. Default: True + return_results : bool, optional + Whether to return the HEM results as a list. Default: False + + Returns + ------- + list + If return_results is True, returns a list of HEM results. Otherwise, returns None. + + Raises + ------ + ValueError + If neither regions nor sectors are specified. + ValueError + If specific_impact is provided but extension is "all". + + """ # TODO: Option to whether or not add results as an attribute in PyMRIO object. if (regions is None) & (sectors is None): @@ -3403,9 +3455,9 @@ def apply_HEM( HEM_results = [] if calculate_impacts: - if impact_account == "all": + if extension == "all": if specific_impact is not None: - raise ValueError("If specific_impact is given, impact_account must not be 'all'.") + raise ValueError("If specific_impact is given, extension must not be 'all'.") for impact in self.get_extensions(): impact_extension = getattr(self, impact) @@ -3418,7 +3470,7 @@ def apply_HEM( else: HEM_results.append(HEM_object) else: - impact_extension = getattr(self, impact_account) + impact_extension = getattr(self, extension) if impact_extension.S is None: impact_extension.S = calc_S(impact_extension.F, self.x) @@ -3428,7 +3480,7 @@ def apply_HEM( HEM_object.calculate_impacts(intensities=impact_extension.S.loc[specific_impact, :]) if save_impacts: - HEM_object.save_impacts(impact_account=impact_account, specific_impact=specific_impact) + HEM_object.save_impacts(impact_account=extension, specific_impact=specific_impact) if return_results: HEM_results.append(HEM_object) return HEM_results From c305805b3abe3c505eea51c93e6ffc7675d85d27 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Wed, 16 Jul 2025 15:27:49 +0200 Subject: [PATCH 10/18] changed variable name --- tests/test_hem.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/test_hem.py b/tests/test_hem.py index 332b2d8..56f4c35 100644 --- a/tests/test_hem.py +++ b/tests/test_hem.py @@ -275,7 +275,7 @@ def test_hem_io_system_minimum(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=False, - impact_account="all", + extension="all", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -297,7 +297,7 @@ def test_hem_io_system_multipliers(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=True, - impact_account="all", + extension="all", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -319,7 +319,7 @@ def test_hem_io_system_impact(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=True, - impact_account="all", + extension="all", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -341,7 +341,7 @@ def test_hem_io_system_impact_multipliers(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=True, - impact_account="all", + extension="all", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -354,7 +354,7 @@ def test_hem_io_system_impact_all_specific(td_small_MRIO): """Test the HEM calculation with impact calculation and specific impact.""" IO_object = td_small_MRIO IO_object.calc_system() - with pytest.raises(ValueError, match="If specific_impact is given, impact_account must not be 'all'."): + with pytest.raises(ValueError, match="If specific_impact is given, extension must not be 'all'."): IO_object.apply_HEM( regions=["reg1"], sectors=["sector1", "sector2"], @@ -364,7 +364,7 @@ def test_hem_io_system_impact_all_specific(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=True, - impact_account="all", + extension="all", specific_impact="ext_type_11", save_impacts=False, save_core_IO=False, @@ -386,7 +386,7 @@ def test_hem_io_system_impact_account_specific(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=True, - impact_account="extensions_one", + extension="extensions_one", specific_impact="ext_type_11", save_impacts=False, save_core_IO=False, @@ -408,7 +408,7 @@ def test_hem_io_system_impact_account(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=True, - impact_account="extensions_one", + extension="extensions_one", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -430,7 +430,7 @@ def test_hem_io_system_L12_allocation(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=False, - impact_account="all", + extension="all", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -452,7 +452,7 @@ def test_hem_io_system_one_region(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=False, - impact_account="all", + extension="all", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -474,7 +474,7 @@ def test_hem_io_system_one_sector(td_small_MRIO): save_extraction=False, save_path=None, calculate_impacts=False, - impact_account="all", + extension="all", specific_impact=None, save_impacts=False, save_core_IO=False, @@ -496,7 +496,7 @@ def test_hem_io_system_save_all(td_small_MRIO, save_path="./tests/hem_save_test_ save_extraction=True, save_path=save_path, calculate_impacts=True, - impact_account="all", + extension="all", specific_impact=None, save_impacts=True, save_core_IO=True, @@ -520,7 +520,7 @@ def test_hem_io_system_save_specific(td_small_MRIO, save_path="./tests/hem_save_ save_extraction=True, save_path=save_path, calculate_impacts=True, - impact_account="extensions_one", + extension="extensions_one", specific_impact="ext_type_11", save_impacts=True, save_core_IO=True, From befb5d7306f96de8ef0c1502ab87954dedfce454 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Wed, 16 Jul 2025 18:25:01 +0200 Subject: [PATCH 11/18] Added to-do list --- TODO.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 TODO.md diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..3d7a4eb --- /dev/null +++ b/TODO.md @@ -0,0 +1,5 @@ + +- [] Add HEM relevant tests to tests/test_integrations. +- [] Udpdate math and terminology documentation. +- [] Ensure HEM is properly included in api_doc. +- [] Add functionality to load HEM results into a HEM object. \ No newline at end of file From 165d77e629320ad6dc669fade89d55ab8c894cbc Mon Sep 17 00:00:00 2001 From: Kajwan Date: Thu, 17 Jul 2025 15:35:08 +0200 Subject: [PATCH 12/18] testing with pymrio.load_test() as well now --- tests/test_hem.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/tests/test_hem.py b/tests/test_hem.py index 56f4c35..660ce33 100644 --- a/tests/test_hem.py +++ b/tests/test_hem.py @@ -8,6 +8,8 @@ import pandas.testing as pdt import pytest +import pymrio + # the function which should be tested here from pymrio.core.mriosystem import Extension, IOSystem from pymrio.tools.iohem import HEM @@ -227,7 +229,7 @@ def test_hem_extraction(td_small_MRIO): A=td_small_MRIO.A, x=td_small_MRIO.x, L=td_small_MRIO.L, - meta=None, + IOSystem_meta=None, save_path=None, ) HEM_object.make_extraction( @@ -247,7 +249,7 @@ def test_hem_extraction_impacts(td_small_MRIO): A=td_small_MRIO.A, x=td_small_MRIO.x, L=td_small_MRIO.L, - meta=None, + IOSystem_meta=None, save_path=None, ) HEM_object.make_extraction( @@ -483,13 +485,15 @@ def test_hem_io_system_one_sector(td_small_MRIO): ) -def test_hem_io_system_save_all(td_small_MRIO, save_path="./tests/hem_save_test_all", cleanup=True): +def test_hem_io_system_save_all(save_path="./tests/hem_save_test_all", cleanup=True): """Test the HEM calculation with all save options.""" - IO_object = td_small_MRIO + IO_object = pymrio.load_test() + extract_regions = [IO_object.regions[0]] + extract_sectors = list(IO_object.sectors[:2]) IO_object.calc_system() IO_object.apply_HEM( - regions=["reg1"], - sectors=["sector1", "sector2"], + regions=extract_regions, + sectors=extract_sectors, extraction_type="1.2", multipliers=True, downstream_allocation_matrix="A12", @@ -507,21 +511,25 @@ def test_hem_io_system_save_all(td_small_MRIO, save_path="./tests/hem_save_test_ shutil.rmtree(save_path) -def test_hem_io_system_save_specific(td_small_MRIO, save_path="./tests/hem_save_test_specific", cleanup=True): +def test_hem_io_system_save_specific(save_path="./tests/hem_save_test_specific", cleanup=True): """Test the HEM calculation with specific save options.""" - IO_object = td_small_MRIO + IO_object = pymrio.load_test() + extract_regions = [IO_object.regions[0]] + extract_sectors = list(IO_object.sectors[:2]) + extension = list(IO_object.get_extensions())[0] + specific_impact = [list(IO_object.get_extensions(data=True))[0].get_index()[0]] IO_object.calc_system() IO_object.apply_HEM( - regions=["reg1"], - sectors=["sector1", "sector2"], + regions=extract_regions, + sectors=extract_sectors, extraction_type="1.2", multipliers=True, downstream_allocation_matrix="A12", save_extraction=True, save_path=save_path, calculate_impacts=True, - extension="extensions_one", - specific_impact="ext_type_11", + extension=extension, + specific_impact=specific_impact, save_impacts=True, save_core_IO=True, save_details=True, @@ -529,3 +537,4 @@ def test_hem_io_system_save_specific(td_small_MRIO, save_path="./tests/hem_save_ ) if cleanup: shutil.rmtree(save_path) + From 71c0abe998cb6ae6976aa4323132ff34ab943db5 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Thu, 17 Jul 2025 15:35:52 +0200 Subject: [PATCH 13/18] added HEM + ruff reorganise --- pymrio/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pymrio/__init__.py b/pymrio/__init__.py index 32a28ba..5ad506f 100644 --- a/pymrio/__init__.py +++ b/pymrio/__init__.py @@ -46,6 +46,7 @@ download_oecd, download_wiod2013, ) +from pymrio.tools.iohem import HEM from pymrio.tools.iomath import ( calc_A, calc_accounts, @@ -93,8 +94,8 @@ to_long, ) from pymrio.tools.tshelper import ( - apply_method, apply_function, + apply_method, extract_from_mrioseries, ) from pymrio.version import __version__ @@ -123,6 +124,8 @@ "archive", "load_test", "ReadError", + # tools.iohem + "HEM", # tools.iomath "calc_A", "calc_accounts", From 2b620f45533dd332bed7797551ba13a925c0fa0e Mon Sep 17 00:00:00 2001 From: Kajwan Date: Thu, 17 Jul 2025 15:36:24 +0200 Subject: [PATCH 14/18] ignores some of the test save folders might make, if user forgets to set cleanup=True in test --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 41d11db..d285650 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,8 @@ Session.vim /**/build/ /__pycache__ /tests/__pycache__/ +/tests/hem_save_test_all/ +/tests/hem_save_test_specific/ /.cache /.idea /**/.ipynb_checkpoints/ From f24785d63dc7d02893cfa2a33ff45f8a78e80baf Mon Sep 17 00:00:00 2001 From: Kajwan Date: Thu, 17 Jul 2025 19:23:15 +0200 Subject: [PATCH 15/18] added tests for loading functionality --- tests/test_hem.py | 125 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 4 deletions(-) diff --git a/tests/test_hem.py b/tests/test_hem.py index 660ce33..4f707f1 100644 --- a/tests/test_hem.py +++ b/tests/test_hem.py @@ -12,7 +12,7 @@ # the function which should be tested here from pymrio.core.mriosystem import Extension, IOSystem -from pymrio.tools.iohem import HEM +from pymrio.tools.iohem import HEM, load_extraction from pymrio.tools.iomath import calc_S TESTPATH = os.path.dirname(os.path.abspath(__file__)) @@ -375,7 +375,7 @@ def test_hem_io_system_impact_all_specific(td_small_MRIO): ) -def test_hem_io_system_impact_account_specific(td_small_MRIO): +def test_hem_io_system_extension_specific(td_small_MRIO): """Test the HEM calculation with impact account and specific impact.""" IO_object = td_small_MRIO IO_object.calc_system() @@ -397,7 +397,7 @@ def test_hem_io_system_impact_account_specific(td_small_MRIO): ) -def test_hem_io_system_impact_account(td_small_MRIO): +def test_hem_io_system_extension(td_small_MRIO): """Test the HEM calculation with impact account.""" IO_object = td_small_MRIO IO_object.calc_system() @@ -485,7 +485,7 @@ def test_hem_io_system_one_sector(td_small_MRIO): ) -def test_hem_io_system_save_all(save_path="./tests/hem_save_test_all", cleanup=True): +def test_hem_io_system_save_all(save_path="./tests/hem_save_test_all", cleanup=False): """Test the HEM calculation with all save options.""" IO_object = pymrio.load_test() extract_regions = [IO_object.regions[0]] @@ -538,3 +538,120 @@ def test_hem_io_system_save_specific(save_path="./tests/hem_save_test_specific", if cleanup: shutil.rmtree(save_path) +def test_hem_load(save_path="./tests/hem_load_test", cleanup=True): + """Test the HEM calculation with all save options.""" + IO_object = pymrio.load_test() + extract_regions = [IO_object.regions[0]] + extract_sectors = list(IO_object.sectors[:2]) + IO_object.calc_system() + IO_object.apply_HEM( + regions=extract_regions, + sectors=extract_sectors, + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=True, + save_path=save_path, + calculate_impacts=True, + extension="all", + specific_impact=None, + save_impacts=True, + save_core_IO=True, + save_details=True, + return_results=False, + ) + + HEM_extraction = load_extraction( + extraction_path=f"{save_path}/{IO_object.regions[0]}", + load_multipliers=True, + load_core=True, + load_details=True, + ) + + if cleanup: + shutil.rmtree(save_path) + +def test_hem_load_calculate(save_path="./tests/hem_load_test", cleanup=True): + """Test the HEM calculation with all save options.""" + IO_object = pymrio.load_test() + extract_regions = [IO_object.regions[0]] + extract_sectors = list(IO_object.sectors[:2]) + IO_object.calc_system() + HEM_initial = IO_object.apply_HEM( + regions=extract_regions, + sectors=extract_sectors, + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=True, + save_path=save_path, + calculate_impacts=True, + extension="all", + specific_impact=None, + save_impacts=True, + save_core_IO=True, + save_details=True, + return_results=True, + ) + + HEM_object = load_extraction( + extraction_path=HEM_initial[0].extraction_save_path, + load_multipliers=True, + load_core=True, + load_details=True, + ) + + HEM_object.calculate_impacts( + intensities=next(IO_object.get_extensions(data=True), None).S + ) + + if cleanup: + shutil.rmtree(save_path) + +def test_hem_load_calculate_save(save_path="./tests/hem_load_calculate_save", cleanup=True): + """Test the HEM calculation with all save options.""" + IO_object = pymrio.load_test() + extract_regions = [IO_object.regions[1]] + extract_sectors = list(IO_object.sectors[:2]) + IO_object.calc_system() + HEM_initial = IO_object.apply_HEM( + regions=extract_regions, + sectors=extract_sectors, + extraction_type="1.2", + multipliers=True, + downstream_allocation_matrix="A12", + save_extraction=True, + save_path=save_path, + calculate_impacts=False, + extension="all", + specific_impact=None, + save_impacts=False, + save_core_IO=True, + save_details=True, + return_results=True, + )[0] + + HEM_object = load_extraction( + extraction_path=HEM_initial.extraction_save_path, + load_multipliers=True, + load_core=True, + load_details=True, + ) + + extension = next(IO_object.get_extensions(data=True), None) + extension_name = next(IO_object.get_extensions(data=False), None) + intensities = calc_S( + extension.F, IO_object.x + ) + + HEM_object.calculate_impacts( + intensities=intensities + ) + + HEM_object.save_impacts( + extension=extension_name, + specific_impact=None, + ) + + if cleanup: + shutil.rmtree(save_path) From 1ce746f9823db99de609d67141d181d1b072c70b Mon Sep 17 00:00:00 2001 From: Kajwan Date: Thu, 17 Jul 2025 19:23:35 +0200 Subject: [PATCH 16/18] added code for loading extractions --- pymrio/tools/iohem.py | 295 ++++++++++++++++++++++++++++++++---------- 1 file changed, 224 insertions(+), 71 deletions(-) diff --git a/pymrio/tools/iohem.py b/pymrio/tools/iohem.py index 8ff752b..e75d2ac 100644 --- a/pymrio/tools/iohem.py +++ b/pymrio/tools/iohem.py @@ -14,7 +14,7 @@ class HEM: """Class for Hypothetical Extraction Method (HEM) results.""" - def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, save_path=None) -> None: + def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, IOSystem_meta=None, save_path=None) -> None: """ Initialize the HEM class with the IOSystem or core IO data. @@ -31,7 +31,7 @@ def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, sav If not provided, the one from the IOSystem will be used. L : pd.DataFrame, optional Leontief inverse matrix. If not provided, it will be calculated from A. - meta : dict, optional + IOSystem_meta : dict, optional Metadata dictionary containing information about the IOSystem or extraction. save_path : str or Path, optional Path to save the extraction results. If not provided, it will be set to None. @@ -42,7 +42,7 @@ def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, sav self.x = x self.L = L self.meta = { - "IO System meta": meta, + "IO System meta": IOSystem_meta, } else: self.Y = IOSystem.Y @@ -54,6 +54,8 @@ def __init__(self, IOSystem=None, Y=None, A=None, x=None, L=None, meta=None, sav } self.save_path = Path(save_path or "./") + self.L22 = None + self.H = None def make_extraction( self, @@ -150,79 +152,88 @@ def make_extraction( self.index_extraction = index_extraction self.index_other = index_other - if extraction_type in ["1.2", "2a.2", "3a.2"]: - # TODO: Turn different extraction types into functions that this method can call. - # Extracting blocks - Y1 = Y.loc[Y.index.isin(index_extraction), :] - Y2 = Y.loc[Y.index.isin(index_other), :] - A11 = A.loc[A.index.isin(index_extraction), A.columns.isin(index_extraction)] - A12 = A.loc[A.index.isin(index_extraction), A.columns.isin(index_other)] - A22 = A.loc[A.index.isin(index_other), A.columns.isin(index_other)] - A21 = A.loc[A.index.isin(index_other), A.columns.isin(index_extraction)] - - # Calculating HEM matrices - I11 = pd.DataFrame( - data=np.eye(len(A11)), - index=A11.index, - columns=A11.columns, + if extraction_type not in ["1.2", "2a.2", "3a.2"]: + raise NotImplementedError( + "Only extraction types '1.2', '2a.2', '3a.2' are implemented at the moment.\n" + "Please implement the extraction type you need or use one of the implemented ones.\n" + "For more information see Table 4 in https://doi.org/10.1111/jiec.13522." ) + # TODO: Turn different extraction types into functions that this method can call. + # ? How should we deal with the interpretation of the account in those cases? + # Extracting blocks + Y1 = Y.loc[Y.index.isin(index_extraction), :] + Y2 = Y.loc[Y.index.isin(index_other), :] + A11 = A.loc[A.index.isin(index_extraction), A.columns.isin(index_extraction)] + A12 = A.loc[A.index.isin(index_extraction), A.columns.isin(index_other)] + A22 = A.loc[A.index.isin(index_other), A.columns.isin(index_other)] + A21 = A.loc[A.index.isin(index_other), A.columns.isin(index_extraction)] + + # Calculating HEM matrices + I11 = pd.DataFrame( + data=np.eye(len(A11)), + index=A11.index, + columns=A11.columns, + ) + + if (self.L22 is not None) and (self.L22.index.equals(A22.index)) and (self.L22.columns.equals(A22.columns)): + # If L22 is already provided, use it directly + self.L22 = self.L22 + else: self.L22 = iomath.calc_L(A22) + if (self.H is not None) and (self.H.index.equals(A11.index)) and (self.H.columns.equals(A11.columns)): + # If H is already provided, use it directly + self.H = self.H + else: self.H = pd.DataFrame( data=np.linalg.inv(I11 - A11 - A12.dot(self.L22.dot(A21))), index=A11.index, columns=A11.columns ) - # Calculating different accounts - self.production_downstream_all = pd.DataFrame( - data=np.diag(v=self.L22.dot(Y2.sum(axis=1))), index=self.L22.index, columns=self.L22.index - ) - - # Allocating downstream production - if downstream_allocation_matrix == "A12": - self.downstream_allocation_matrix = A12 + # Calculating different accounts + self.production_downstream_all = pd.DataFrame( + data=np.diag(v=self.L22.dot(Y2.sum(axis=1))), index=self.L22.index, columns=self.L22.index + ) - elif downstream_allocation_matrix == "L12": - if self.L is None: - self.L = iomath.calc_L(A) + # Allocating downstream production + if downstream_allocation_matrix == "A12": + self.downstream_allocation_matrix = A12 - L12 = self.L.loc[index_extraction, index_other] - L12_normalised = L12.div(L12.sum(axis=0), axis=1) - self.downstream_allocation_matrix = L12_normalised - else: - raise ValueError("Downstream allocation matrix must be either 'A12' or 'L12'.") + elif downstream_allocation_matrix == "L12": + if self.L is None: + self.L = iomath.calc_L(A) - self.production_downstream = self.downstream_allocation_matrix.dot(self.production_downstream_all) + L12 = self.L.loc[index_extraction, index_other] + L12_normalised = L12.div(L12.sum(axis=0), axis=1) + self.downstream_allocation_matrix = L12_normalised + else: + raise ValueError("Downstream allocation matrix must be either 'A12' or 'L12'.") - self.demand_final_diagonal = pd.DataFrame(data=np.diag(v=Y1.sum(axis=1)), index=Y1.index, columns=Y1.index) - self.demand_intermediate_diagonal = pd.DataFrame( - data=np.diag(v=self.production_downstream.sum(axis=1)), - index=self.production_downstream.index, - columns=self.production_downstream.index, - ) + self.production_downstream = self.downstream_allocation_matrix.dot(self.production_downstream_all) - self.production = self.H.dot(other=(self.demand_final_diagonal + self.demand_intermediate_diagonal)) - self.production_upstream_first_tier = A21.dot(self.production) - self.production_upstream = self.L22.dot(self.production_upstream_first_tier) - - if multipliers: - self.M_production = self.production.div(x.loc[index_extraction, "indout"], axis=0).replace(np.nan, 0) - self.M_production_upstream_first_tier = self.production_upstream_first_tier.div( - x.loc[index_extraction, "indout"], axis=1 - ).replace(np.nan, 0) - self.M_upstream = self.production_upstream.div(x.loc[index_extraction, "indout"], axis=1).replace( - np.nan, 0 - ) - self.M_downstream = self.production_downstream.div(x.loc[index_extraction, "indout"], axis=0).replace( - np.nan, 0 - ) + self.demand_final_diagonal = pd.DataFrame(data=np.diag(v=Y1.sum(axis=1)), index=Y1.index, columns=Y1.index) + self.demand_intermediate_diagonal = pd.DataFrame( + data=np.diag(v=self.production_downstream.sum(axis=1)), + index=self.production_downstream.index, + columns=self.production_downstream.index, + ) - else: - raise NotImplementedError( - "Only extraction types '1.2', '2a.2', '3a.2' are implemented at the moment.\n" - "Please implement the extraction type you need or use one of the implemented ones.\n" - "For more information see Table 4 in https://doi.org/10.1111/jiec.13522." + self.production = self.H.dot(other=(self.demand_final_diagonal + self.demand_intermediate_diagonal)) + self.production_upstream_first_tier = A21.dot(self.production) + self.production_upstream = self.L22.dot(self.production_upstream_first_tier) + + if multipliers: + self.M_production = self.production.div(x.loc[index_extraction, "indout"], axis=0).replace(np.nan, 0) + self.M_production_upstream_first_tier = self.production_upstream_first_tier.div( + x.loc[index_extraction, "indout"], axis=1 + ).replace(np.nan, 0) + self.M_upstream = self.production_upstream.div(x.loc[index_extraction, "indout"], axis=1).replace( + np.nan, 0 + ) + self.M_downstream = self.production_downstream.div(x.loc[index_extraction, "indout"], axis=0).replace( + np.nan, 0 ) + return self def calculate_impacts(self, intensities=None): @@ -313,13 +324,22 @@ def save_extraction(self, save_path=None, save_core_IO=False, save_details=False # Makes subfolders for individual regions and/or sectors, # if it is clearly that a single region and/or sector has been extracted. # Will make sure that things are not overwritten, if multiple regions and/or sectors are extracted in a loop. - if (len(self.extraction_regions) == 1) and (len(self.extraction_sectors) == 1): + if self.extraction_regions is None: + n_regions = None + else: + n_regions = len(self.extraction_regions) + if self.extraction_sectors is None: + n_sectors = None + else: + n_sectors = len(self.extraction_sectors) + + if (n_regions == 1) and (n_sectors == 1): extraction_save_path = save_path / f"{self.extraction_regions[0]}_{self.extraction_sectors[0]}" - elif len(self.extraction_regions) == 1: + elif n_regions == 1: extraction_save_path = save_path / f"{self.extraction_regions[0]}" - elif len(self.extraction_sectors) == 1: + elif n_sectors == 1: extraction_save_path = save_path / f"{self.extraction_sectors[0]}" else: @@ -381,16 +401,16 @@ def save_extraction(self, save_path=None, save_core_IO=False, save_details=False with open(extraction_save_path / "meta.json", "w") as json_file: json.dump(self.meta, json_file, indent=4) - def save_impacts(self, impact_account=None, specific_impact=None): + def save_impacts(self, extension=None, specific_impact=None): """ Save the impacts of the hypothetical extraction to the specified path. Parameters ---------- - impact_account : str, optional + extension : str, optional Account name for the impacts. If not provided, the impacts will be saved in a general "impacts" folder. - save_path : str or Path, optional - Path to save the impacts. If not provided, the save path from the IOSystem will be used. + specific_impact : str, optional + Specific impact name for the impacts. If provided, the impacts will be saved in a subfolder with this name. Raises ------ @@ -398,12 +418,12 @@ def save_impacts(self, impact_account=None, specific_impact=None): If no save path is provided. """ - if (impact_account is None) & (specific_impact is None): + if (extension is None) & (specific_impact is None): save_path = Path(self.extraction_save_path) / "impacts" - elif (impact_account is None) & (specific_impact is not None): + elif (extension is None) & (specific_impact is not None): save_path = Path(self.extraction_save_path) / "impacts" / specific_impact else: - save_path = Path(self.extraction_save_path) / "impacts" / impact_account + save_path = Path(self.extraction_save_path) / "impacts" / extension save_path.mkdir(parents=True, exist_ok=True) pd.DataFrame(self.intensities).to_csv(save_path / "extensions.txt", sep="\t", index=False, header=False) @@ -418,5 +438,138 @@ def save_impacts(self, impact_account=None, specific_impact=None): self.M_impact_upstream.to_csv(save_path / "M_impact_upstream.txt", sep="\t") self.M_impact_downstream.to_csv(save_path / "M_impact_downstream.txt", sep="\t") +# %% + +def load_extraction(extraction_path, load_multipliers=True, load_core=True, load_details=True): + """Load extraction results from the specified path. + + Parameters + ---------- + extraction_path : str or Path + Path to the extraction results. + load_multipliers : bool, optional + Whether to load the multipliers (M_production, M_production_upstream_first_tier, M_upstream, M_downstream). + load_core : bool, optional + Whether to load the core IO data (A, Y). + load_details : bool, optional + Whether to load additional details + (production_downstream_all, demand_final_diagonal, demand_intermediate_diagonal). + + Returns + ------- + HEM + An instance of the HEM class with the loaded extraction results. + """ + extraction_path = Path(extraction_path) + + if list(extraction_path.glob("meta.json")): + with open(extraction_path / "meta.json") as json_file: + meta = json.load(json_file) + + if list(extraction_path.glob("index_extraction.txt")): + index_extraction = pd.read_csv(extraction_path / "index_extraction.txt", sep="\t", header=None) + if index_extraction.shape[1] == 1: + index_extraction = pd.Index(index_extraction.iloc[:, 0]).unique() + dimensions = [0] + elif index_extraction.shape[1] == 2: + index_extraction = pd.MultiIndex.from_frame(index_extraction) + dimensions = [0, 1] + + if list(extraction_path.glob("index_other.txt")): + index_other = pd.read_csv(extraction_path / "index_other.txt", sep="\t", header=None) + if index_other.shape[1] == 1: + index_other = pd.Index(index_other.iloc[:, 0]).unique() + elif index_other.shape[1] == 2: + index_other = pd.MultiIndex.from_frame(index_other) + + + if list(extraction_path.glob("L22.txt")): + L22 = pd.read_csv(extraction_path / "L22.txt", sep="\t", index_col=dimensions, header=dimensions) + if list(extraction_path.glob("H.txt")): + H = pd.read_csv(extraction_path / "H.txt", sep="\t", index_col=dimensions, header=dimensions) + + if list(extraction_path.glob(f"{meta['downstream_allocation_matrix']}.txt")): + downstream_allocation_matrix = pd.read_csv( + extraction_path / f"{meta['downstream_allocation_matrix']}.txt", sep="\t", index_col=dimensions, header=dimensions + ) + + if list(extraction_path.glob("production.txt")): + production = pd.read_csv(extraction_path / "production.txt", sep="\t", index_col=dimensions, header=dimensions) + if list(extraction_path.glob("production_upstream_first_tier.txt")): + production_upstream_first_tier = pd.read_csv( + extraction_path / "production_upstream_first_tier.txt", sep="\t", index_col=dimensions, header=dimensions + ) + if list(extraction_path.glob("production_upstream.txt")): + production_upstream = pd.read_csv( + extraction_path / "production_upstream.txt", sep="\t", index_col=dimensions, header=dimensions + ) + if list(extraction_path.glob("production_downstream.txt")): + production_downstream = pd.read_csv( + extraction_path / "production_downstream.txt", sep="\t", index_col=dimensions, header=dimensions + ) + + if load_multipliers: + if list(extraction_path.glob("M_production.txt")): + M_production = pd.read_csv(extraction_path / "M_production.txt", sep="\t", index_col=dimensions, header=dimensions) + if list(extraction_path.glob("M_production_upstream_first_tier.txt")): + M_production_upstream_first_tier = pd.read_csv( + extraction_path / "M_production_upstream_first_tier.txt", sep="\t", index_col=dimensions, header=dimensions + ) + if list(extraction_path.glob("M_upstream.txt")): + M_upstream = pd.read_csv(extraction_path / "M_upstream.txt", sep="\t", index_col=dimensions, header=dimensions) + if list(extraction_path.glob("M_downstream.txt")): + M_downstream = pd.read_csv(extraction_path / "M_downstream.txt", sep="\t", index_col=dimensions, header=dimensions) + + if load_core: + if list(extraction_path.glob("A.txt")): + A = pd.read_csv(extraction_path / "A.txt", sep="\t", index_col=dimensions, header=dimensions) + if list(extraction_path.glob("Y.txt")): + Y = pd.read_csv(extraction_path / "Y.txt", sep="\t", index_col=dimensions, header=dimensions) + + if load_details: + if list(extraction_path.glob("production_downstream_all.txt")): + production_downstream_all = pd.read_csv( + extraction_path / "production_downstream_all.txt", sep="\t", index_col=dimensions, header=dimensions + ) + if list(extraction_path.glob("demand_final_diagonal.txt")): + demand_final_diagonal = pd.read_csv( + extraction_path / "demand_final_diagonal.txt", sep="\t", index_col=dimensions, header=dimensions + ) + if list(extraction_path.glob("demand_intermediate_diagonal.txt")): + demand_intermediate_diagonal = pd.read_csv( + extraction_path / "demand_intermediate_diagonal.txt", sep="\t", index_col=dimensions, header=dimensions + ) + HEM_extraction = HEM() + HEM_extraction.extraction_save_path = extraction_path + HEM_extraction.meta = meta + HEM_extraction.index_extraction = index_extraction + HEM_extraction.index_other = index_other + + HEM_extraction.L22 = L22 + HEM_extraction.H = H + + HEM_extraction.downstream_allocation_matrix = downstream_allocation_matrix + HEM_extraction.production = production + HEM_extraction.production_upstream_first_tier = production_upstream_first_tier + HEM_extraction.production_upstream = production_upstream + HEM_extraction.production_downstream = production_downstream + + if load_multipliers: + HEM_extraction.M_production = M_production + HEM_extraction.M_production_upstream_first_tier = M_production_upstream_first_tier + HEM_extraction.M_upstream = M_upstream + HEM_extraction.M_downstream = M_downstream + + if load_core: + HEM_extraction.A = A + HEM_extraction.Y = Y + + if load_details: + HEM_extraction.production_downstream_all = production_downstream_all + HEM_extraction.demand_final_diagonal = demand_final_diagonal + HEM_extraction.demand_intermediate_diagonal = demand_intermediate_diagonal + + return HEM_extraction # %% + From b4d955c350cd9bb6e6357010b89e7700a7fab959 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Thu, 17 Jul 2025 19:23:45 +0200 Subject: [PATCH 17/18] minor bug fix --- pymrio/core/mriosystem.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pymrio/core/mriosystem.py b/pymrio/core/mriosystem.py index 9b8465f..272854a 100644 --- a/pymrio/core/mriosystem.py +++ b/pymrio/core/mriosystem.py @@ -3466,9 +3466,10 @@ def apply_HEM( HEM_object.calculate_impacts(intensities=impact_extension.S) if save_impacts: - HEM_object.save_impacts(impact_account=impact) - else: + HEM_object.save_impacts(extension=impact) + if return_results: HEM_results.append(HEM_object) + else: impact_extension = getattr(self, extension) if impact_extension.S is None: @@ -3480,10 +3481,12 @@ def apply_HEM( HEM_object.calculate_impacts(intensities=impact_extension.S.loc[specific_impact, :]) if save_impacts: - HEM_object.save_impacts(impact_account=extension, specific_impact=specific_impact) + HEM_object.save_impacts(extension=extension, specific_impact=specific_impact) if return_results: HEM_results.append(HEM_object) - return HEM_results + if return_results: + return HEM_results + elif return_results: HEM_results.append(HEM_object) return HEM_results From 5ebed1cc8e443caf2ca3da4d844147546e98f188 Mon Sep 17 00:00:00 2001 From: Kajwan Date: Thu, 17 Jul 2025 19:23:49 +0200 Subject: [PATCH 18/18] update --- TODO.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/TODO.md b/TODO.md index 3d7a4eb..51a06f7 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,5 @@ -- [] Add HEM relevant tests to tests/test_integrations. +- [x] Add HEM relevant tests to tests/test_integrations. - [] Udpdate math and terminology documentation. - [] Ensure HEM is properly included in api_doc. -- [] Add functionality to load HEM results into a HEM object. \ No newline at end of file +- [x] Add functionality to load HEM results into a HEM object. \ No newline at end of file