Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -11196,28 +11196,55 @@ def any(
Series([], dtype: bool)
"""
axis = validate_axis(axis)
if axis != 0:
raise NotImplementedError('axis should be either 0 or "index" currently.')

column_labels = self._internal.column_labels
if bool_only:
column_labels = self._bool_column_labels(column_labels)
if len(column_labels) == 0:
return ps.Series([], dtype=bool)
if axis == 0:
applied: List[PySparkColumn] = []
for label in column_labels:
scol = self._internal.spark_column_for(label)
if skipna:
# When skipna=True, nulls count as False
any_col = F.max(scol.cast("boolean"))
applied.append(F.when(any_col.isNull(), False).otherwise(any_col))
else:
# When skipna=False, nulls count as True
any_col = F.max(scol.cast("boolean"))
applied.append(F.when(any_col.isNull(), True).otherwise(any_col))
return self._result_aggregated(column_labels, applied)
elif axis == 1:
from pyspark.pandas.series import first_series

applied: List[PySparkColumn] = []
for label in column_labels:
scol = self._internal.spark_column_for(label)
if skipna:
# When skipna=True, nulls count as False
any_col = F.max(scol.cast("boolean"))
applied.append(F.when(any_col.isNull(), False).otherwise(any_col))
else:
# When skipna=False, nulls count as True
any_col = F.max(scol.cast("boolean"))
applied.append(F.when(any_col.isNull(), True).otherwise(any_col))

return self._result_aggregated(column_labels, applied)
sdf = self._internal.spark_frame.select(
*self._internal_frame.index_spark_columns,
F.greatest(
*[
F.coalesce(
self._internal.spark_column_for(label).cast("boolean"),
# When skipna=True, nulls count as False and vice versa
F.lit(not skipna),
)
for label in column_labels
],
F.lit(False), # Handle one-column DataFrame case
).alias(SPARK_DEFAULT_SERIES_NAME),
)
return first_series(
DataFrame(
InternalFrame(
spark_frame=sdf,
index_spark_columns=self._internal.index_spark_columns,
index_names=self._internal.index_names,
index_fields=self._internal.index_fields,
column_labels=[None],
)
)
)
else:
# axis=None case - return single boolean value
raise NotImplementedError('axis should be 0, 1, "index", or "columns" currently.')

def _bool_column_labels(self, column_labels: List[Label]) -> List[Label]:
"""
Expand Down
45 changes: 41 additions & 4 deletions python/pyspark/pandas/tests/computation/test_any_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,29 @@ def test_any(self):
self.assert_eq(psdf.any(bool_only=True), pdf.any(bool_only=True))
self.assert_eq(psdf.any(bool_only=False), pdf.any(bool_only=False))

# Test axis=1
self.assert_eq(psdf.any(axis=1), pdf.any(axis=1))
self.assert_eq(psdf.any(axis=1, bool_only=True), pdf.any(axis=1, bool_only=True))
self.assert_eq(psdf.any(axis=1, bool_only=False), pdf.any(axis=1, bool_only=False))

# Test axis='index'
self.assert_eq(psdf.any(axis="index"), pdf.any(axis="index"))
self.assert_eq(
psdf.any(axis="index", bool_only=True), pdf.any(axis="index", bool_only=True)
)
self.assert_eq(
psdf.any(axis="index", bool_only=False), pdf.any(axis="index", bool_only=False)
)

# Test axis='columns'
self.assert_eq(psdf.any(axis="columns"), pdf.any(axis="columns"))
self.assert_eq(
psdf.any(axis="columns", bool_only=True), pdf.any(axis="columns", bool_only=True)
)
self.assert_eq(
psdf.any(axis="columns", bool_only=False), pdf.any(axis="columns", bool_only=False)
)

columns.names = ["X", "Y"]
pdf.columns = columns
psdf.columns = columns
Expand All @@ -143,10 +166,10 @@ def test_any(self):
self.assert_eq(psdf.any(bool_only=True), pdf.any(bool_only=True))
self.assert_eq(psdf.any(bool_only=False), pdf.any(bool_only=False))

with self.assertRaisesRegex(
NotImplementedError, 'axis should be either 0 or "index" currently.'
):
psdf.any(axis=1)
# Test axis=1
self.assert_eq(psdf.any(axis=1), pdf.any(axis=1))
self.assert_eq(psdf.any(axis=1, bool_only=True), pdf.any(axis=1, bool_only=True))
self.assert_eq(psdf.any(axis=1, bool_only=False), pdf.any(axis=1, bool_only=False))

# Test skipna parameter
pdf = pd.DataFrame(
Expand All @@ -156,26 +179,40 @@ def test_any(self):

# bools and np.nan
self.assert_eq(psdf[["A", "B"]].any(skipna=False), pdf[["A", "B"]].any(skipna=False))
self.assert_eq(
psdf[["A", "B"]].any(axis=1, skipna=False), pdf[["A", "B"]].any(axis=1, skipna=False)
)
# bools and None
self.assert_eq(psdf[["A", "C"]].any(skipna=False), pdf[["A", "C"]].any(skipna=False))
# bools, np.nan, and None
self.assert_eq(psdf[["B", "C"]].any(skipna=False), pdf[["B", "C"]].any(skipna=False))
# np.nan, and None
self.assert_eq(psdf[["D"]].any(skipna=False), pdf[["D"]].any(skipna=False))
self.assert_eq(psdf[["D"]].any(axis=1, skipna=False), pdf[["D"]].any(axis=1, skipna=False))

# np.nan only
self.assert_eq(
ps.DataFrame([np.nan]).any(skipna=False),
pd.DataFrame([np.nan]).any(skipna=False),
almost=True,
)
self.assert_eq(
ps.DataFrame([np.nan]).any(axis=1, skipna=False),
pd.DataFrame([np.nan]).any(axis=1, skipna=False),
almost=True,
)

# None only
self.assert_eq(
ps.DataFrame([None]).any(skipna=True),
pd.DataFrame([None]).any(skipna=True),
almost=True,
)
self.assert_eq(
ps.DataFrame([None]).any(axis=1, skipna=True),
pd.DataFrame([None]).any(axis=1, skipna=True),
almost=True,
)


class FrameAnyAllTests(
Expand Down