Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Issue reading hive partitioned dataset with NativeExecutionEngine #288

Open
LaurentErreca opened this issue Jan 9, 2022 · 9 comments · Fixed by #306
Open

[BUG] Issue reading hive partitioned dataset with NativeExecutionEngine #288

LaurentErreca opened this issue Jan 9, 2022 · 9 comments · Fixed by #306
Assignees
Labels

Comments

@LaurentErreca
Copy link
Contributor

LaurentErreca commented Jan 9, 2022

I have a pandas dataframe with a column DAY representing the day number in month (ex : values from 1 to 31 for december)

from datetime import datetime
import pandas as pd

df = pd.DataFrame({'IBES': ['AAPL', 'AAPL', 'IBM', 'IBM'],
                   'EST_MEAN': [12.2, 10.0, 13.1, 13.5],
                   'EST_MEDIAN': [12.2, 12.0, 13.1, 13.2],
                   'BDATE': [datetime(2022, 1, 6), datetime(2022, 1, 7),
                             datetime(2022, 1, 6), datetime(2022, 1, 7)],
                   'DAY': [6, 7, 6, 7]
                  })

I save this dataframe with hive partition DAY

%%fsql

SELECT * FROM df
SAVE PREPARTITION BY DAY OVERWRITE PARQUET output_path

The result folder has a format similar to this:
! tree output_path
output_path
├── DAY=6
│ └── 02b4a05c12fa4791aca2931e47659ecc.parquet
└── DAY=7
└── bd17a05a5bd948cc824e4730fd03b473.parquet

When I try to read the dataset using spark execution engine, there is no problem

%%fsql spark

df_int = LOAD PARQUET output_path
SELECT * FROM df_int
PRINT df_int

But the same code fails using native execution engine.

The above exception was the direct cause of the following exception:

FugueDataFrameInitError                   Traceback (most recent call last)
/tmp/ipykernel_7378/4230927618.py in <module>
----> 1 get_ipython().run_cell_magic('fsql', '', "\ndf_int = LOAD PARQUET output_path\nSELECT * FROM df_int\nPRINT df_int\n")

~/.conda/envs/fugue/lib/python3.8/site-packages/IPython/core/interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
   2404             with self.builtin_trap:
   2405                 args = (magic_arg_s, cell)
-> 2406                 result = fn(*args, **kwargs)
   2407             return result
   2408 

~/.conda/envs/fugue/lib/python3.8/site-packages/decorator.py in fun(*args, **kw)
    230             if not kwsyntax:
    231                 args, kw = fix(args, kw, sig)
--> 232             return caller(func, *(extras + args), **kw)
    233     fun.__name__ = func.__name__
    234     fun.__doc__ = func.__doc__

~/.conda/envs/fugue/lib/python3.8/site-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
    185     # but it's overkill for just that one bit of state.
    186     def magic_deco(arg):
--> 187         call = lambda f, *a, **k: f(*a, **k)
    188 
    189         if callable(arg):

~/dev/fugue/fugue_notebook/env.py in fsql(self, line, cell, local_ns)
     88         except FugueSQLSyntaxError as ex:
     89             raise FugueSQLSyntaxError(str(ex)).with_traceback(None) from None
---> 90         dag.run(self.get_engine(line, {} if local_ns is None else local_ns))
     91         for k, v in dag.yields.items():
     92             if isinstance(v, YieldedDataFrame):

~/dev/fugue/fugue/workflow/workflow.py in run(self, *args, **kwargs)
   1516                 if ctb is None:  # pragma: no cover
   1517                     raise
-> 1518                 raise ex.with_traceback(ctb)
   1519             self._computed = True
   1520         return DataFrames(

~/dev/fugue/fugue/dataframe/pandas_dataframe.py in __init__(self, df, schema, metadata, pandas_df_wrapper)
     77             self._native = pdf
     78         except Exception as e:
---> 79             raise FugueDataFrameInitError from e
     80 
     81     @property

FugueDataFrameInitError:

I also observed that when you specifiy the list of columns you want to read, and this does not include the partition column, else it works fine:

%%fsql

df_int = LOAD PARQUET output_path COLUMNS IBES,EST_MEDIAN,EST_MEAN,BDATE
SELECT * FROM df_int
PRINT df_int

Environment:

  • Backend: pandas
  • Backend version: 1.3.5
  • Python version: 3.8.12
  • OS: linux
@LaurentErreca LaurentErreca changed the title [BUG] Issue reading hive partitioned dataset with NativeExecutionEngine when partition column contains integers [BUG] Issue reading hive partitioned dataset with NativeExecutionEngine Jan 10, 2022
@LaurentErreca
Copy link
Contributor Author

LaurentErreca commented Jan 10, 2022

The problem is due to the dtype of the partition column which is set to pd.CategoricalDtype. By converting the dtype to str, the partitioned dataset is correctly read. The user will have to convert the type of the partition column if necessary.

Dirty hack to test this, in /triad/collections/schema.py, in function append (line232) where obj is a List:

    def append(self, obj: Any) -> "Schema":  # noqa: C901
        """Append schema like object to the current schema. Only new columns
        are allowed.

        :raises SchemaError: if a column exists or is invalid or obj is not convertible
        :return: the Schema object itself
        """
        try:
            if obj is None:
                return self
            elif isinstance(obj, pa.Field):
                self[obj.name] = obj.type
            elif isinstance(obj, str):
                self._append_pa_schema(expression_to_schema(obj))
            elif isinstance(obj, Dict):
                for k, v in obj.items():
                    self[k] = v
            elif isinstance(obj, pa.Schema):
                self._append_pa_schema(obj)
            elif isinstance(obj, pd.DataFrame):
                self._append_pa_schema(PD_UTILS.to_schema(obj))
            elif isinstance(obj, Tuple):  # type: ignore
                self[obj[0]] = obj[1]
            elif isinstance(obj, List):
                import pandas
                for x in obj:
                    if isinstance(x, pandas.core.frame.DataFrame):
                        for col in x.columns:
                            if isinstance(x[col].dtype, pd.CategoricalDtype):
                                x[col] = x[col].astype(str)
                    self.append(x)
            else:
                raise SchemaError(f"Invalid schema to add {obj}")
            return self
        except SchemaError:
            raise
        except Exception as e:
            raise SchemaError(str(e))

@goodwanghan
Copy link
Collaborator

Ah, good catch. So in pyarrow, the dictionary is the categorical type. But the implementation can be very hard. Converting categorical to string may be a more practical way? I am not sure yet, maybe we should spend the effort to support pyarrow dictionary. I will need to think about it.

@LaurentErreca
Copy link
Contributor Author

Hi, I'm agree with you, it is better to try to support dictionary first, also, the value type in dictionary seems to be well inferred.

@LaurentErreca
Copy link
Contributor Author

LaurentErreca commented Jan 10, 2022

What makes me think that pyarrow correctly infer types :
When I tried with a column of type object (string values) as partition column, the error contains:
SchemaError: pyarrow.Field<IBES: dictionary<values=string, indices=int8, ordered=0>> is not supported

Then I also tried with column containing integers:
SchemaError: pyarrow.Field<DAY: dictionary<values=int64, indices=int8, ordered=0>> is not supported

@goodwanghan
Copy link
Collaborator

We need to add this from triad And then on Fugue

@goodwanghan
Copy link
Collaborator

And then we also need to add tons of unit tests and need to make it work for all backends

@goodwanghan
Copy link
Collaborator

We will try to solve it in #296

@LaurentErreca
Copy link
Contributor Author

LaurentErreca commented Apr 4, 2022

I think that this issue cannot be closed with #306 because we still can't read hive partitioned dataset with native or Dask execution engine.

@goodwanghan
Copy link
Collaborator

Sorry, let me reopen

@goodwanghan goodwanghan reopened this Apr 12, 2022
@goodwanghan goodwanghan modified the milestones: 0.6.6, 0.7.0 May 7, 2022
@goodwanghan goodwanghan removed this from the 0.7.0 milestone Jul 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants