diff --git a/kedro-datasets/kedro_datasets/polars/__init__.py b/kedro-datasets/kedro_datasets/polars/__init__.py index 3ea77eec4..ce9fce0df 100644 --- a/kedro-datasets/kedro_datasets/polars/__init__.py +++ b/kedro-datasets/kedro_datasets/polars/__init__.py @@ -4,6 +4,9 @@ import lazy_loader as lazy +# Import the PolarsExcelDataset +from .polars_excel_dataset import PolarsExcelDataset + # https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901 CSVDataset: Any EagerPolarsDataset: Any diff --git a/kedro-datasets/kedro_datasets/polars/data.xlsx b/kedro-datasets/kedro_datasets/polars/data.xlsx new file mode 100644 index 000000000..07e5ef9be Binary files /dev/null and b/kedro-datasets/kedro_datasets/polars/data.xlsx differ diff --git a/kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py b/kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py index 5914ce4d7..26f922cf1 100644 --- a/kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py +++ b/kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py @@ -1,206 +1,335 @@ -"""``EagerPolarsDataset`` loads/saves data from/to a data file using an underlying -filesystem (e.g.: local, S3, GCS). It uses polars to handle the -type of read/write target. -""" -from __future__ import annotations - -from copy import deepcopy -from io import BytesIO -from pathlib import PurePosixPath -from typing import Any - -import fsspec -import polars as pl -from kedro.io.core import ( - AbstractVersionedDataset, - DatasetError, - Version, - get_filepath_str, - get_protocol_and_path, -) - - -class EagerPolarsDataset(AbstractVersionedDataset[pl.DataFrame, pl.DataFrame]): - """``polars.EagerPolarsDataset`` loads/saves data from/to a data file using an underlying - filesystem (e.g.: local, S3, GCS). It uses polars to handle the dynamically select the - appropriate type of read/write on a best effort basis. - - Example usage for the `YAML API `_: - - .. code-block:: yaml - - cars: - type: polars.EagerPolarsDataset - file_format: parquet - filepath: s3://data/01_raw/company/cars.parquet - load_args: - low_memory: True - save_args: - compression: "snappy" - - Example using Python API: - - .. code-block:: pycon - - >>> from kedro_datasets.polars import EagerPolarsDataset - >>> import polars as pl - >>> - >>> data = pl.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) - >>> - >>> dataset = EagerPolarsDataset(filepath=tmp_path / "test.parquet", file_format="parquet") - >>> dataset.save(data) - >>> reloaded = dataset.load() - >>> assert data.equals(reloaded) - - """ - - DEFAULT_LOAD_ARGS = {} # type: dict[str, Any] - DEFAULT_SAVE_ARGS = {} # type: dict[str, Any] - - def __init__( # noqa: PLR0913 - self, - *, - filepath: str, - file_format: str, - load_args: dict[str, Any] | None = None, - save_args: dict[str, Any] | None = None, - version: Version | None = None, - credentials: dict[str, Any] | None = None, - fs_args: dict[str, Any] | None = None, - metadata: dict[str, Any] | None = None, - ): - """Creates a new instance of ``EagerPolarsDataset`` pointing to a concrete data file - on a specific filesystem. The appropriate polars load/save methods are dynamically - identified by string matching on a best effort basis. - - Args: - filepath: Filepath in POSIX format to a file prefixed with a protocol like - `s3://`. - If prefix is not provided, `file` protocol (local filesystem) - will be used. - The prefix should be any protocol supported by ``fsspec``. - Key assumption: The first argument of either load/save method points to - a filepath/buffer/io type location. There are some read/write targets such - as 'clipboard' or 'records' that will fail since they do not take a filepath - like argument. - file_format: String which is used to match the appropriate load/save method on a - best effort basis. For example if 'csv' is passed, the `polars.read_csv` and - `polars.DataFrame.write_csv` methods will be identified. An error will - be raised unless there is at least one matching `read_` - or `write_`. - load_args: Polars options for loading CSV files. - Here you can find all available arguments: - https://pola-rs.github.io/polars/py-polars/html/reference/io.html - All defaults are preserved. - save_args: Polars options for saving files. - Here you can find all available arguments: - https://pola-rs.github.io/polars/py-polars/html/reference/io.html - All defaults are preserved. - version: If specified, should be an instance of - ``kedro.io.core.Version``. If its ``load`` attribute is - None, the latest version will be loaded. If its ``save`` - attribute is None, save version will be autogenerated. - credentials: Credentials required to get access to the underlying filesystem. - E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. - fs_args: Extra arguments to pass into underlying filesystem class constructor - (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). - metadata: Any arbitrary metadata. - This is ignored by Kedro, but may be consumed by users or external plugins. - Raises: - DatasetError: Will be raised if at least less than one appropriate read or write - methods are identified. - """ - - self._file_format = file_format.lower() - - _fs_args = deepcopy(fs_args) or {} - _fs_open_args_load = _fs_args.pop("open_args_load", {}) - _fs_open_args_save = _fs_args.pop("open_args_save", {}) - _credentials = deepcopy(credentials) or {} - - protocol, path = get_protocol_and_path(filepath) - if protocol == "file": - _fs_args.setdefault("auto_mkdir", True) - - self._protocol = protocol - self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args) - self.metadata = metadata - - super().__init__( - filepath=PurePosixPath(path), - version=version, - exists_function=self._fs.exists, - glob_function=self._fs.glob, - ) - - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save - - def load(self) -> pl.DataFrame: - load_path = get_filepath_str(self._get_load_path(), self._protocol) - load_method = getattr(pl, f"read_{self._file_format}", None) - - if not load_method: - raise DatasetError( - f"Unable to retrieve 'polars.read_{self._file_format}' method, please" - " ensure that your " - "'file_format' parameter has been defined correctly as per the Polars" - " API" - " https://pola-rs.github.io/polars/py-polars/html/reference/io.html" - ) - with self._fs.open(load_path, **self._fs_open_args_load) as fs_file: - return load_method(fs_file, **self._load_args) - - def save(self, data: pl.DataFrame) -> None: - save_path = get_filepath_str(self._get_save_path(), self._protocol) - save_method = getattr(data, f"write_{self._file_format}", None) - - if not save_method: - raise DatasetError( - f"Unable to retrieve 'polars.DataFrame.write_{self._file_format}' " - "method, please " - "ensure that your 'file_format' parameter has been defined correctly as" - " per the Polars API " - "https://pola-rs.github.io/polars/py-polars/html/reference/io.html" - ) - buf = BytesIO() - save_method(buf, **self._save_args) - with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: - fs_file.write(buf.getvalue()) - self._invalidate_cache() - - def _exists(self) -> bool: - try: - load_path = get_filepath_str(self._get_load_path(), self._protocol) - except DatasetError: - return False - - return self._fs.exists(load_path) - - def _describe(self) -> dict[str, Any]: - return { - "file_format": self._file_format, - "filepath": self._filepath, - "protocol": self._protocol, - "load_args": self._load_args, - "save_args": self._save_args, - "version": self._version, - } - - def _release(self) -> None: - super()._release() - self._invalidate_cache() - - def _invalidate_cache(self) -> None: - """Invalidate underlying filesystem caches.""" - filepath = get_filepath_str(self._filepath, self._protocol) - self._fs.invalidate_cache(filepath) +"""``EagerPolarsDataset`` loads/saves data from/to a data file using an underlying +filesystem (e.g.: local, S3, GCS). It uses polars to handle the +type of read/write target. +""" + +from __future__ import annotations + +from copy import deepcopy +from io import BytesIO +from pathlib import PurePosixPath +from typing import Any + +import fsspec +import polars as pl +from kedro.io.core import ( + AbstractVersionedDataset, + DatasetError, + Version, + get_filepath_str, + get_protocol_and_path, +) + + +class EagerPolarsDataset(AbstractVersionedDataset[pl.DataFrame, pl.DataFrame]): + """``polars.EagerPolarsDataset`` loads/saves data from/to a data file using an underlying + filesystem (e.g.: local, S3, GCS). It uses polars to handle the dynamically select the + appropriate type of read/write on a best effort basis. + + Example usage for the `YAML API `_: + + .. code-block:: yaml + + cars: + type: polars.EagerPolarsDataset + file_format: parquet + filepath: s3://data/01_raw/company/cars.parquet + load_args: + low_memory: True + save_args: + compression: "snappy" + + Example using Python API: + + .. code-block:: pycon + + >>> from kedro_datasets.polars import EagerPolarsDataset + >>> import polars as pl + >>> + >>> data = pl.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) + >>> + >>> dataset = EagerPolarsDataset(filepath=tmp_path / "test.parquet", file_format="parquet") + >>> dataset.save(data) + >>> reloaded = dataset.load() + >>> assert data.equals(reloaded) + + """ + + DEFAULT_LOAD_ARGS = {} # type: dict[str, Any] + DEFAULT_SAVE_ARGS = {} # type: dict[str, Any] + + def __init__( # noqa: PLR0913 + self, + *, + filepath: str, + file_format: str, + load_args: dict[str, Any] | None = None, + save_args: dict[str, Any] | None = None, + version: Version | None = None, + credentials: dict[str, Any] | None = None, + fs_args: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, + ): + """Creates a new instance of ``EagerPolarsDataset`` pointing to a concrete data file + on a specific filesystem. The appropriate polars load/save methods are dynamically + identified by string matching on a best effort basis. + + Args: + filepath: Filepath in POSIX format to a file prefixed with a protocol like + `s3://`. + If prefix is not provided, `file` protocol (local filesystem) + will be used. + The prefix should be any protocol supported by ``fsspec``. + Key assumption: The first argument of either load/save method points to + a filepath/buffer/io type location. There are some read/write targets such + as 'clipboard' or 'records' that will fail since they do not take a filepath + like argument. + file_format: String which is used to match the appropriate load/save method on a + best effort basis. For example if 'csv' is passed, the `polars.read_csv` and + `polars.DataFrame.write_csv` methods will be identified. An error will + be raised unless there is at least one matching `read_` + or `write_`. + load_args: Polars options for loading CSV files. + Here you can find all available arguments: + https://pola-rs.github.io/polars/py-polars/html/reference/io.html + All defaults are preserved. + save_args: Polars options for saving files. + Here you can find all available arguments: + https://pola-rs.github.io/polars/py-polars/html/reference/io.html + All defaults are preserved. + version: If specified, should be an instance of + ``kedro.io.core.Version``. If its ``load`` attribute is + None, the latest version will be loaded. If its ``save`` + attribute is None, save version will be autogenerated. + credentials: Credentials required to get access to the underlying filesystem. + E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. + fs_args: Extra arguments to pass into underlying filesystem class constructor + (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + metadata: Any arbitrary metadata. + This is ignored by Kedro, but may be consumed by users or external plugins. + Raises: + DatasetError: Will be raised if at least less than one appropriate read or write + methods are identified. + """ + + self._file_format = file_format.lower() + + _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) + _credentials = deepcopy(credentials) or {} + + protocol, path = get_protocol_and_path(filepath) + if protocol == "file": + _fs_args.setdefault("auto_mkdir", True) + + self._protocol = protocol + self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args) + self.metadata = metadata + + super().__init__( + filepath=PurePosixPath(path), + version=version, + exists_function=self._fs.exists, + glob_function=self._fs.glob, + ) + + self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args is not None: + self._load_args.update(load_args) + self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) + if save_args is not None: + self._save_args.update(save_args) + + _fs_open_args_save.setdefault("mode", "wb") + self._fs_open_args_load = _fs_open_args_load + self._fs_open_args_save = _fs_open_args_save + + def load(self) -> pl.DataFrame: + load_path = get_filepath_str(self._get_load_path(), self._protocol) + load_method = getattr(pl, f"read_{self._file_format}", None) + + if not load_method: + raise DatasetError( + f"Unable to retrieve 'polars.read_{self._file_format}' method, please" + " ensure that your " + "'file_format' parameter has been defined correctly as per the Polars" + " API" + " https://pola-rs.github.io/polars/py-polars/html/reference/io.html" + ) + with self._fs.open(load_path, **self._fs_open_args_load) as fs_file: + return load_method(fs_file, **self._load_args) + + def save(self, data: pl.DataFrame) -> None: + save_path = get_filepath_str(self._get_save_path(), self._protocol) + save_method = getattr(data, f"write_{self._file_format}", None) + + if not save_method: + raise DatasetError( + f"Unable to retrieve 'polars.DataFrame.write_{self._file_format}' " + "method, please " + "ensure that your 'file_format' parameter has been defined correctly as" + " per the Polars API " + "https://pola-rs.github.io/polars/py-polars/html/reference/io.html" + ) + buf = BytesIO() + save_method(buf, **self._save_args) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + fs_file.write(buf.getvalue()) + self._invalidate_cache() + + def _exists(self) -> bool: + try: + load_path = get_filepath_str(self._get_load_path(), self._protocol) + except DatasetError: + return False + + return self._fs.exists(load_path) + + def _describe(self) -> dict[str, Any]: + return { + "file_format": self._file_format, + "filepath": self._filepath, + "protocol": self._protocol, + "load_args": self._load_args, + "save_args": self._save_args, + "version": self._version, + } + + def _release(self) -> None: + super()._release() + self._invalidate_cache() + + def _invalidate_cache(self) -> None: + """Invalidate underlying filesystem caches.""" + filepath = get_filepath_str(self._filepath, self._protocol) + self._fs.invalidate_cache(filepath) + + +from kedro_datasets.polars import PolarsEagerDataset +import polars as pl +from pathlib import Path + + +def main(): + # Specify the path to your Excel file in the "polars" directory + filepath = Path( + r"C:\Users\Stephanie Ewelu\kedro-plugins\kedro-datasets\kedro_datasets\polars\your_excel_file.xlsx" + ) + sheet_name = "Sheet1" # Adjust the sheet name as necessary + + # Initialize the dataset to load data from the specified Excel file + dataset = PolarsEagerDataset(filepath=str(filepath), sheet_name=sheet_name) + + # Load the dataset into a Polars DataFrame + try: + df = dataset.load() + print("Data loaded successfully:") + print(df) + except Exception as e: + print(f"Error loading data: {e}") + + # Process the DataFrame (example: filtering data where a column 'A' is greater than 10) + processed_df = df.filter(df["A"] > 10) # Example filtering operation + + # Print the processed data + print("Processed DataFrame:") + print(processed_df) + + # Save the processed data back to the Excel file + try: + dataset.save(processed_df) + print("Data saved successfully.") + except Exception as e: + print(f"Error saving data: {e}") + + +if __name__ == "__main__": + main() +from typing import Dict, Any +from pathlib import Path +import polars as pl +import pandas as pd +import openpyxl +from kedro.io import AbstractDataset +from kedro.io.core import DatasetError + + +class PolarsExcelDataset( + AbstractDataset[Dict[str, pl.DataFrame], Dict[str, pl.DataFrame]] +): + """ + Kedro Dataset for reading and writing multiple Polars DataFrames to/from an Excel file. + Example: + >>> dataset = PolarsExcelDataset("data.xlsx") + >>> data = dataset.load() # Returns a dictionary of Polars DataFrames + >>> dataset.save({"sheet1": df1, "sheet2": df2}) # Saves multiple DataFrames to Excel + """ + + def __init__(self, filepath: str): + """ + Initialize PolarsExcelDataset. + + Args: + filepath (str): Path where the dataset will be stored. + """ + self._filepath = Path(filepath) + + def _load(self) -> Dict[str, pl.DataFrame]: + """Load multiple sheets into a dictionary of Polars DataFrames.""" + if not self._filepath.exists(): + raise DatasetError(f"File not found: {self._filepath}") + + try: + # Use pandas to read all sheets + pandas_data = pd.read_excel(self._filepath, sheet_name=None) + # Convert each sheet to a Polars DataFrame + return { + sheet_name: pl.DataFrame(df) for sheet_name, df in pandas_data.items() + } + except Exception as e: + raise DatasetError(f"Failed to load dataset: {e}") + + def _save(self, data: Dict[str, pl.DataFrame]) -> None: + """Save multiple Polars DataFrames as different sheets.""" + if not isinstance(data, dict) or not all( + isinstance(df, pl.DataFrame) for df in data.values() + ): + raise DatasetError("Data must be a dictionary of Polars DataFrames.") + + try: + # Convert Polars DataFrame to Pandas DataFrame and then save with openpyxl + with pd.ExcelWriter(self._filepath, engine="openpyxl") as writer: + for sheet_name, df in data.items(): + if not isinstance(sheet_name, str) or len(sheet_name) > 31: + raise DatasetError( + f"Invalid sheet name: {sheet_name}. Sheet names must be strings and <= 31 characters." + ) + df.to_pandas().to_excel(writer, sheet_name=sheet_name, index=False) + except Exception as e: + raise DatasetError(f"Failed to save dataset: {e}") + + def _exists(self) -> bool: + """Check if the dataset exists.""" + return self._filepath.exists() + + def _describe(self) -> Dict[str, Any]: + """Return dataset metadata.""" + return { + "filepath": str(self._filepath), + "exists": self._exists(), + } + + +# Create sample data +df1 = pl.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) +df2 = pl.DataFrame({"colA": [4, 5], "colB": ["x", "y"]}) +data = {"sheet1": df1, "sheet2": df2} + +# Save and load data +dataset = PolarsExcelDataset("data.xlsx") +dataset.save(data) +loaded_data = dataset.load() + +print(loaded_data) diff --git a/kedro-datasets/kedro_datasets/polars/polars_excel_dataset.py b/kedro-datasets/kedro_datasets/polars/polars_excel_dataset.py new file mode 100644 index 000000000..b872551f8 --- /dev/null +++ b/kedro-datasets/kedro_datasets/polars/polars_excel_dataset.py @@ -0,0 +1,76 @@ +from typing import Dict, Any +from pathlib import Path +import polars as pl +import pandas as pd +import openpyxl +from kedro.io import AbstractDataset +from kedro.io.core import DatasetError + +class PolarsExcelDataset(AbstractDataset[Dict[str, pl.DataFrame], Dict[str, pl.DataFrame]]): + """ + Kedro Dataset for reading and writing multiple Polars DataFrames to/from an Excel file. + + Example: + >>> dataset = PolarsExcelDataset("data.xlsx") + >>> data = dataset.load() # Returns a dictionary of Polars DataFrames + >>> dataset.save({"sheet1": df1, "sheet2": df2}) # Saves multiple DataFrames to Excel + """ + + def __init__(self, filepath: str): + """ + Initialize PolarsExcelDataset. + + Args: + filepath (str): Path where the dataset will be stored. + """ + self._filepath = Path(filepath) + + def _load(self) -> Dict[str, pl.DataFrame]: + """Load multiple sheets into a dictionary of Polars DataFrames.""" + if not self._filepath.exists(): + raise DatasetError(f"File not found: {self._filepath}") + + try: + # Use pandas to read all sheets + pandas_data = pd.read_excel(self._filepath, sheet_name=None) + # Convert each sheet to a Polars DataFrame + return {sheet_name: pl.DataFrame(df) for sheet_name, df in pandas_data.items()} + except Exception as e: + raise DatasetError(f"Failed to load dataset: {e}") + + def _save(self, data: Dict[str, pl.DataFrame]) -> None: + """Save multiple Polars DataFrames as different sheets.""" + if not isinstance(data, dict) or not all(isinstance(df, pl.DataFrame) for df in data.values()): + raise DatasetError("Data must be a dictionary of Polars DataFrames.") + + try: + # Convert Polars DataFrame to Pandas DataFrame and then save with openpyxl + with pd.ExcelWriter(self._filepath, engine='openpyxl') as writer: + for sheet_name, df in data.items(): + if not isinstance(sheet_name, str) or len(sheet_name) > 31: + raise DatasetError(f"Invalid sheet name: {sheet_name}. Sheet names must be strings and <= 31 characters.") + df.to_pandas().to_excel(writer, sheet_name=sheet_name, index=False) + except Exception as e: + raise DatasetError(f"Failed to save dataset: {e}") + + def _exists(self) -> bool: + """Check if the dataset exists.""" + return self._filepath.exists() + + def _describe(self) -> Dict[str, Any]: + """Return dataset metadata.""" + return { + "filepath": str(self._filepath), + "exists": self._exists(), + } +# Create sample data +df1 = pl.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) +df2 = pl.DataFrame({"colA": [4, 5], "colB": ["x", "y"]}) +data = {"sheet1": df1, "sheet2": df2} + +# Save and load data +dataset = PolarsExcelDataset("data.xlsx") +dataset.save(data) +loaded_data = dataset.load() + +print(loaded_data) \ No newline at end of file diff --git a/kedro-datasets/kedro_datasets/polars/polars_multi_sheet.xlsx b/kedro-datasets/kedro_datasets/polars/polars_multi_sheet.xlsx new file mode 100644 index 000000000..f613f0e32 Binary files /dev/null and b/kedro-datasets/kedro_datasets/polars/polars_multi_sheet.xlsx differ diff --git a/kedro-datasets/kedro_datasets/polars/setup.py b/kedro-datasets/kedro_datasets/polars/setup.py new file mode 100644 index 000000000..ab9b3c75f --- /dev/null +++ b/kedro-datasets/kedro_datasets/polars/setup.py @@ -0,0 +1,11 @@ +from setuptools import setup, find_packages + +setup( + name="kedro_datasets", # The name of your package + version="0.1", # Version of your package + packages=find_packages(), # Automatically find all subpackages + install_requires=[ # List any external dependencies here + "kedro>=0.18.0", # Include Kedro dependency (replace with your version) + "polars", # Include Polars package (if you're using it) + ], +)