From 70cc7e72a5296262069551d6dc05469c244967b5 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 20 Mar 2024 12:47:13 +1300 Subject: [PATCH 1/8] Upgrade ParquetSharp to access row group statistics --- ParquetSharp.Dataset/ParquetSharp.Dataset.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ParquetSharp.Dataset/ParquetSharp.Dataset.csproj b/ParquetSharp.Dataset/ParquetSharp.Dataset.csproj index 2ebb5c5..b194588 100644 --- a/ParquetSharp.Dataset/ParquetSharp.Dataset.csproj +++ b/ParquetSharp.Dataset/ParquetSharp.Dataset.csproj @@ -25,7 +25,7 @@ - + From 1a5e9f49ccac2d3fdb27c90c63082b2191734824 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 18 Mar 2024 15:59:54 +1300 Subject: [PATCH 2/8] Support filtering Parquet row groups using statistics --- .../Filter/TestIntFilter.cs | 127 ++++++++++++++++++ .../Filter/TestRowGroupSelector.cs | 121 +++++++++++++++++ .../TestDatasetReader.cs | 26 ++++ ParquetSharp.Dataset/ColExtensions.cs | 6 +- ParquetSharp.Dataset/DatasetStreamReader.cs | 11 +- ParquetSharp.Dataset/Filter/AndFilter.cs | 5 + .../Filter/BaseStatisticsEvaluator.cs | 54 ++++++++ .../Filter/ColumnValueFilter.cs | 18 ++- .../Filter/IntEqualityStatisticsEvaluator.cs | 51 +++++++ .../Filter/IntRangeStatisticsEvaluator.cs | 53 ++++++++ .../Filter/LogicalStatistics.cs | 57 ++++++++ ParquetSharp.Dataset/Filter/OrFilter.cs | 5 + .../Filter/RowGroupSelector.cs | 101 ++++++++++++++ ParquetSharp.Dataset/IFilter.cs | 7 + 14 files changed, 637 insertions(+), 5 deletions(-) create mode 100644 ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs create mode 100644 ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs create mode 100644 ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs create mode 100644 ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs create mode 100644 ParquetSharp.Dataset/Filter/LogicalStatistics.cs create mode 100644 ParquetSharp.Dataset/Filter/RowGroupSelector.cs diff --git a/ParquetSharp.Dataset.Test/Filter/TestIntFilter.cs b/ParquetSharp.Dataset.Test/Filter/TestIntFilter.cs index 5376770..8a7b6a6 100644 --- a/ParquetSharp.Dataset.Test/Filter/TestIntFilter.cs +++ b/ParquetSharp.Dataset.Test/Filter/TestIntFilter.cs @@ -1,7 +1,9 @@ using System; +using System.Collections.Generic; using System.Linq; using Apache.Arrow; using NUnit.Framework; +using ParquetSharp.Dataset.Filter; namespace ParquetSharp.Dataset.Test.Filter; @@ -128,6 +130,33 @@ public void TestComputeIntRangeMask((long, long) filterRange) TestComputeIntRangeMask(rangeStart, rangeEnd, ULongValues, val => checked((long)val)); } + [Theory] + public void TestIntEqualityIncludeRowGroup(long filterValue) + { + TestIntEqualityIncludeRowGroup(filterValue, SByteValues, val => val); + TestIntEqualityIncludeRowGroup(filterValue, ShortValues, val => val); + TestIntEqualityIncludeRowGroup(filterValue, IntValues, val => val); + TestIntEqualityIncludeRowGroup(filterValue, LongValues, val => val); + TestIntEqualityIncludeRowGroup(filterValue, ByteValues, val => val); + TestIntEqualityIncludeRowGroup(filterValue, UShortValues, val => val); + TestIntEqualityIncludeRowGroup(filterValue, UIntValues, val => val); + TestIntEqualityIncludeRowGroup(filterValue, ULongValues, val => checked((long)val)); + } + + [Theory] + public void TestIntRangeIncludeRowGroup((long, long) filterRange) + { + var (rangeStart, rangeEnd) = filterRange; + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, SByteValues, val => val); + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, ShortValues, val => val); + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, IntValues, val => val); + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, LongValues, val => val); + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, ByteValues, val => val); + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, UShortValues, val => val); + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, UIntValues, val => val); + TestIntRangeIncludeRowGroup(rangeStart, rangeEnd, ULongValues, val => checked((long)val)); + } + private static void TestComputeIntEqualityMask(long filterValue, T[] values, Func checkedCast) where T : struct where TArray : PrimitiveArray @@ -203,6 +232,104 @@ private static void TestComputeIntRangeMask(long rangeStart } } + private static void TestIntEqualityIncludeRowGroup(long filterValue, T[] values, Func checkedCast) + where T : IComparable + { + var filter = Col.Named("x").IsEqualTo(filterValue); + + var statsRanges = values + .SelectMany(min => values.Select(max => (min, max))) + .Where(range => range.max.CompareTo(range.min) >= 0) + .ToArray(); + foreach (var statsRange in statsRanges) + { + var rowGroupStats = new Dictionary + { + { "x", new LogicalStatistics(statsRange.min, statsRange.max) } + }; + + var filterValueInRange = true; + try + { + var longMin = checkedCast(statsRange.min); + if (filterValue < longMin) + { + filterValueInRange = false; + } + } + catch (OverflowException) + { + filterValueInRange = false; + } + + try + { + var longMax = checkedCast(statsRange.max); + if (filterValue > longMax) + { + filterValueInRange = false; + } + } + catch (OverflowException) + { + } + + var includeRowGroup = filter.IncludeRowGroup(rowGroupStats); + Assert.That( + includeRowGroup, Is.EqualTo(filterValueInRange), + $"Expected {typeof(T)} stats range [{statsRange.min}, {statsRange.max}] inclusion to be {filterValueInRange}"); + } + } + + private static void TestIntRangeIncludeRowGroup(long rangeStart, long rangeEnd, T[] values, Func checkedCast) + where T : IComparable + { + var filter = Col.Named("x").IsInRange(rangeStart, rangeEnd); + + var statsRanges = values + .SelectMany(min => values.Select(max => (min, max))) + .Where(range => range.max.CompareTo(range.min) >= 0) + .ToArray(); + foreach (var statsRange in statsRanges) + { + var rowGroupStats = new Dictionary + { + { "x", new LogicalStatistics(statsRange.min, statsRange.max) } + }; + + var rangesOverlap = true; + try + { + var longMin = checkedCast(statsRange.min); + if (longMin > rangeEnd) + { + rangesOverlap = false; + } + } + catch (OverflowException) + { + rangesOverlap = false; + } + + try + { + var longMax = checkedCast(statsRange.max); + if (longMax < rangeStart) + { + rangesOverlap = false; + } + } + catch (OverflowException) + { + } + + var includeRowGroup = filter.IncludeRowGroup(rowGroupStats); + Assert.That( + includeRowGroup, Is.EqualTo(rangesOverlap), + $"Expected {typeof(T)} stats range [{statsRange.min}, {statsRange.max}] inclusion to be {rangesOverlap}"); + } + } + private static TArray BuildArray(T[] values) where T : struct where TArray : IArrowArray diff --git a/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs b/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs new file mode 100644 index 0000000..d3e1f65 --- /dev/null +++ b/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs @@ -0,0 +1,121 @@ +using System.Collections.Generic; +using System.Linq; +using Apache.Arrow; +using NUnit.Framework; +using ParquetSharp.Arrow; +using ParquetSharp.Dataset.Filter; + +namespace ParquetSharp.Dataset.Test.Filter; + +[TestFixture] +public class TestRowGroupSelector +{ + [Test] + public void TestFilterPartitionColumn() + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + var batch0 = GenerateBatch(0, 10); + var batch1 = GenerateBatch(10, 20); + WriteParquetFile(filePath, new[] { batch0, batch1 }, includeStats: true); + + var filter = Col.Named("part").IsEqualTo(5); + var rowGroupSelector = new RowGroupSelector(filter); + + using var reader = new FileReader(filePath); + var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); + Assert.That(rowGroups, Is.Null); + } + + [Test] + public void TestNoStatistics() + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + var batch0 = GenerateBatch(0, 10); + var batch1 = GenerateBatch(10, 20); + WriteParquetFile(filePath, new[] { batch0, batch1 }, includeStats: false); + + var filter = Col.Named("id").IsEqualTo(5); + var rowGroupSelector = new RowGroupSelector(filter); + + using var reader = new FileReader(filePath); + var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); + Assert.That(rowGroups, Is.EqualTo(new[] { 0, 1 })); + } + + [Test] + public void TestFilterIntColumnValue() + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + var batch0 = GenerateBatch(0, 10); + var batch1 = GenerateBatch(10, 20); + var batch2 = GenerateBatch(20, 30); + WriteParquetFile(filePath, new[] { batch0, batch1, batch2 }, includeStats: true); + + var filter = Col.Named("id").IsEqualTo(15); + var rowGroupSelector = new RowGroupSelector(filter); + + using var reader = new FileReader(filePath); + var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); + Assert.That(rowGroups, Is.EqualTo(new[] { 1 })); + } + + [Test] + public void TestFilterIntColumnRange() + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + var batch0 = GenerateBatch(0, 10); + var batch1 = GenerateBatch(10, 20); + var batch2 = GenerateBatch(20, 30); + WriteParquetFile(filePath, new[] { batch0, batch1, batch2 }, includeStats: true); + + var filter = Col.Named("id").IsInRange(15, 25); + var rowGroupSelector = new RowGroupSelector(filter); + + using var reader = new FileReader(filePath); + var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); + Assert.That(rowGroups, Is.EqualTo(new[] { 1, 2 })); + } + + private static RecordBatch GenerateBatch(int idStart, int idEnd) + { + const int rowsPerId = 10; + var builder = new RecordBatch.Builder(); + var idValues = Enumerable.Range(idStart, idEnd - idStart) + .SelectMany(idVal => Enumerable.Repeat(idVal, rowsPerId)) + .ToArray(); + var xValues = Enumerable.Range(0, idValues.Length).Select(i => i * 0.1f).ToArray(); + builder.Append("id", false, new Int32Array.Builder().Append(idValues)); + builder.Append("x", false, new FloatArray.Builder().Append(xValues)); + return builder.Build(); + } + + private static void WriteParquetFile(string path, IReadOnlyList batches, bool includeStats) + { + using var writerPropertiesBuilder = new WriterPropertiesBuilder(); + if (includeStats) + { + writerPropertiesBuilder.EnableStatistics(); + } + else + { + writerPropertiesBuilder.DisableStatistics(); + } + + using var writerProperties = writerPropertiesBuilder.Build(); + using var writer = new FileWriter(path, batches[0].Schema, writerProperties); + foreach (var batch in batches) + { + writer.WriteRecordBatch(batch); + } + + writer.Close(); + } +} diff --git a/ParquetSharp.Dataset.Test/TestDatasetReader.cs b/ParquetSharp.Dataset.Test/TestDatasetReader.cs index 88fa162..ec80906 100644 --- a/ParquetSharp.Dataset.Test/TestDatasetReader.cs +++ b/ParquetSharp.Dataset.Test/TestDatasetReader.cs @@ -182,6 +182,32 @@ public async Task TestFilterOnFileColumn() await VerifyData(reader, new Dictionary { { 0, 10 }, { 1, 10 } }); } + [Test] + public async Task TestAllRowGroupsInFileExcluded() + { + using var tmpDir = new DisposableDirectory(); + using var batch0 = GenerateBatch(0); + using var batch1 = GenerateBatch(1); + using var batch2 = GenerateBatch(2); + using var batch3 = GenerateBatch(3); + WriteParquetFile(tmpDir.AbsPath("data0.parquet"), new[] { batch0, batch1 }); + WriteParquetFile(tmpDir.AbsPath("data1.parquet"), new[] { batch2, batch3 }); + + var schema = new Apache.Arrow.Schema.Builder() + .Field(new Field("id", new Int32Type(), false)) + .Field(new Field("x", new FloatType(), false)) + .Build(); + var dataset = new DatasetReader( + tmpDir.DirectoryPath, + new NoPartitioning(), + schema: schema); + + var filter = Col.Named("id").IsEqualTo(2); + using var reader = dataset.ToBatches(filter); + + await VerifyData(reader, new Dictionary { { 2, 10 } }); + } + [Test] public async Task TestFilterOnExcludedFileColumn() { diff --git a/ParquetSharp.Dataset/ColExtensions.cs b/ParquetSharp.Dataset/ColExtensions.cs index 0784195..13383b6 100644 --- a/ParquetSharp.Dataset/ColExtensions.cs +++ b/ParquetSharp.Dataset/ColExtensions.cs @@ -14,7 +14,8 @@ public static class ColExtensions /// Created filter public static IFilter IsEqualTo(this Col column, long value) { - return new ColumnValueFilter(column.Name, new IntEqualityEvaluator(value, column.Name)); + return new ColumnValueFilter( + column.Name, new IntEqualityEvaluator(value, column.Name), new IntEqualityStatisticsEvaluator(value)); } /// @@ -26,7 +27,8 @@ public static IFilter IsEqualTo(this Col column, long value) /// Created filter public static IFilter IsInRange(this Col column, long start, long end) { - return new ColumnValueFilter(column.Name, new IntRangeEvaluator(start, end, column.Name)); + return new ColumnValueFilter( + column.Name, new IntRangeEvaluator(start, end, column.Name), new IntRangeStatisticsEvaluator(start, end)); } /// diff --git a/ParquetSharp.Dataset/DatasetStreamReader.cs b/ParquetSharp.Dataset/DatasetStreamReader.cs index f46766b..383f751 100644 --- a/ParquetSharp.Dataset/DatasetStreamReader.cs +++ b/ParquetSharp.Dataset/DatasetStreamReader.cs @@ -26,9 +26,11 @@ public DatasetStreamReader( _readerProperties = readerProperties; _arrowReaderProperties = arrowReaderProperties; _requiredFields = new HashSet(schema.FieldsList.Select(f => f.Name)); + _rowGroupSelector = null; if (_filter != null) { _requiredFields.UnionWith(_filter.Columns()); + _rowGroupSelector = new RowGroupSelector(_filter); } } @@ -73,7 +75,7 @@ public DatasetStreamReader( } var filterMask = _filter.ComputeMask(recordBatch); - if (filterMask == null) + if (filterMask == null || filterMask.IncludedCount == recordBatch.Length) { return recordBatch; } @@ -109,8 +111,12 @@ private void GetNextReader() { _currentFileReader = new FileReader( _fragmentEnumerator.Current.FilePath, _readerProperties, _arrowReaderProperties); + + var rowGroups = _rowGroupSelector?.GetRequiredRowGroups(_currentFileReader); var columnIndices = GetFileColumnIndices(_currentFileReader); - _currentFragmentReader = _currentFileReader.GetRecordBatchReader(columns: columnIndices); + + _currentFragmentReader = _currentFileReader.GetRecordBatchReader( + rowGroups: rowGroups, columns: columnIndices); } else { @@ -141,6 +147,7 @@ private int[] GetFileColumnIndices(FileReader fileReader) private readonly ReaderProperties? _readerProperties; private readonly ArrowReaderProperties? _arrowReaderProperties; private readonly FragmentExpander _fragmentExpander; + private readonly RowGroupSelector? _rowGroupSelector; private readonly HashSet _requiredFields; private IArrowArrayStream? _currentFragmentReader = null; private FileReader? _currentFileReader = null; diff --git a/ParquetSharp.Dataset/Filter/AndFilter.cs b/ParquetSharp.Dataset/Filter/AndFilter.cs index 08ac9e3..b6fae7a 100644 --- a/ParquetSharp.Dataset/Filter/AndFilter.cs +++ b/ParquetSharp.Dataset/Filter/AndFilter.cs @@ -17,6 +17,11 @@ public bool IncludePartition(PartitionInformation partitionInformation) return _first.IncludePartition(partitionInformation) && _second.IncludePartition(partitionInformation); } + public bool IncludeRowGroup(IReadOnlyDictionary columnStatistics) + { + return _first.IncludeRowGroup(columnStatistics) && _second.IncludeRowGroup(columnStatistics); + } + public FilterMask? ComputeMask(RecordBatch dataBatch) { var firstMask = _first.ComputeMask(dataBatch); diff --git a/ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs b/ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs new file mode 100644 index 0000000..be488e3 --- /dev/null +++ b/ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs @@ -0,0 +1,54 @@ +using System; + +namespace ParquetSharp.Dataset.Filter; + +/// +/// Base class for evaluating whether a row group should be read based on column statistics. +/// Defaults to including row groups if handling a type isn't overridden. +/// +internal class BaseStatisticsEvaluator +{ + public bool IncludeRowGroup(LogicalStatistics untypedStats) + { + return untypedStats switch + { + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + LogicalStatistics stats => IncludeRowGroup(stats), + _ => true + }; + } + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; + + protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; +} diff --git a/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs b/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs index be348f8..3efa165 100644 --- a/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs +++ b/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs @@ -7,10 +7,14 @@ namespace ParquetSharp.Dataset.Filter; internal sealed class ColumnValueFilter : IFilter { - internal ColumnValueFilter(string columnName, BaseFilterEvaluator evaluator) + internal ColumnValueFilter( + string columnName, + BaseFilterEvaluator evaluator, + BaseStatisticsEvaluator? statsEvaluator = null) { _columnName = columnName; _evaluator = evaluator; + _statsEvaluator = statsEvaluator; } public bool IncludePartition(PartitionInformation partitionInfo) @@ -28,6 +32,17 @@ public bool IncludePartition(PartitionInformation partitionInfo) return true; } + public bool IncludeRowGroup(IReadOnlyDictionary columnStatistics) + { + if (columnStatistics.TryGetValue(_columnName, out var statistics)) + { + return _statsEvaluator?.IncludeRowGroup(statistics) ?? true; + } + + // Filter column is not a Parquet column or does not have statistics + return true; + } + public FilterMask? ComputeMask(RecordBatch dataBatch) { if (dataBatch.Schema.FieldsLookup.Contains(_columnName)) @@ -47,4 +62,5 @@ public IEnumerable Columns() private readonly string _columnName; private readonly BaseFilterEvaluator _evaluator; + private readonly BaseStatisticsEvaluator? _statsEvaluator; } diff --git a/ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs b/ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs new file mode 100644 index 0000000..af108b3 --- /dev/null +++ b/ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs @@ -0,0 +1,51 @@ +namespace ParquetSharp.Dataset.Filter; + +internal sealed class IntEqualityStatisticsEvaluator : BaseStatisticsEvaluator +{ + public IntEqualityStatisticsEvaluator(long value) + { + _value = value; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= stats.Min && _value <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= stats.Min && _value <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= stats.Min && _value <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= 0 && (ulong)_value >= stats.Min && (ulong)_value <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= stats.Min && _value <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= stats.Min && _value <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= stats.Min && _value <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _value >= stats.Min && _value <= stats.Max; + } + + private readonly long _value; +} diff --git a/ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs b/ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs new file mode 100644 index 0000000..cb52b59 --- /dev/null +++ b/ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs @@ -0,0 +1,53 @@ +namespace ParquetSharp.Dataset.Filter; + +internal sealed class IntRangeStatisticsEvaluator : BaseStatisticsEvaluator +{ + public IntRangeStatisticsEvaluator(long start, long end) + { + _start = start; + _end = end; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _end >= stats.Min && _start <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _end >= stats.Min && _start <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _end >= stats.Min && _start <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return (_end >= 0 && (ulong)_end >= stats.Min) && (_start < 0 || (ulong)_start <= stats.Max); + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _end >= stats.Min && _start <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _end >= stats.Min && _start <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _end >= stats.Min && _start <= stats.Max; + } + + protected override bool IncludeRowGroup(LogicalStatistics stats) + { + return _end >= stats.Min && _start <= stats.Max; + } + + private readonly long _start; + private readonly long _end; +} diff --git a/ParquetSharp.Dataset/Filter/LogicalStatistics.cs b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs new file mode 100644 index 0000000..efb6f6a --- /dev/null +++ b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs @@ -0,0 +1,57 @@ +using System; + +namespace ParquetSharp.Dataset.Filter; + +public abstract class LogicalStatistics +{ + public static LogicalStatistics? FromStatistics(Statistics statistics, ColumnDescriptor descriptor) + { + using var logicalType = descriptor.LogicalType; + checked + { + return (statistics, logicalType) switch + { + (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), + (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), + (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), + (Statistics stats, IntLogicalType { BitWidth: 8, IsSigned: true }) => new LogicalStatistics( + (sbyte)stats.Min, (sbyte)stats.Max), + (Statistics stats, IntLogicalType { BitWidth: 8, IsSigned: false }) => new LogicalStatistics( + (byte)unchecked((uint)stats.Min), (byte)unchecked((uint)stats.Max)), + (Statistics stats, IntLogicalType { BitWidth: 16, IsSigned: true }) => new LogicalStatistics( + (short)stats.Min, (short)stats.Max), + (Statistics stats, IntLogicalType { BitWidth: 16, IsSigned: false }) => new LogicalStatistics( + (ushort)unchecked((uint)stats.Min), (ushort)unchecked((uint)stats.Max)), + (Statistics stats, IntLogicalType { BitWidth: 32, IsSigned: true }) => new LogicalStatistics( + stats.Min, stats.Max), + (Statistics stats, IntLogicalType { BitWidth: 32, IsSigned: false }) => new LogicalStatistics( + unchecked((uint)stats.Min), unchecked((uint)stats.Max)), + (Statistics stats, IntLogicalType { BitWidth: 64, IsSigned: true }) => new LogicalStatistics( + stats.Min, stats.Max), + (Statistics stats, IntLogicalType { BitWidth: 64, IsSigned: false }) => new LogicalStatistics( + unchecked((ulong)stats.Min), unchecked((ulong)stats.Max)), + (Statistics stats, Float16LogicalType) => new LogicalStatistics( + LogicalRead.ToHalf(stats.Min), LogicalRead.ToHalf(stats.Max)), + (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), + (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), + _ => null, + }; + } + } +} + +/// +/// Parquet column statistics converted to logical typed values +/// +public sealed class LogicalStatistics : LogicalStatistics +{ + internal LogicalStatistics(T min, T max) + { + Min = min; + Max = max; + } + + public T Min { get; } + + public T Max { get; } +} diff --git a/ParquetSharp.Dataset/Filter/OrFilter.cs b/ParquetSharp.Dataset/Filter/OrFilter.cs index 4404cdd..486f2ed 100644 --- a/ParquetSharp.Dataset/Filter/OrFilter.cs +++ b/ParquetSharp.Dataset/Filter/OrFilter.cs @@ -17,6 +17,11 @@ public bool IncludePartition(PartitionInformation partitionInformation) return _first.IncludePartition(partitionInformation) || _second.IncludePartition(partitionInformation); } + public bool IncludeRowGroup(IReadOnlyDictionary columnStatistics) + { + return _first.IncludeRowGroup(columnStatistics) || _second.IncludeRowGroup(columnStatistics); + } + public IEnumerable Columns() { return _first.Columns().Concat(_second.Columns()); diff --git a/ParquetSharp.Dataset/Filter/RowGroupSelector.cs b/ParquetSharp.Dataset/Filter/RowGroupSelector.cs new file mode 100644 index 0000000..36c3d64 --- /dev/null +++ b/ParquetSharp.Dataset/Filter/RowGroupSelector.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using ParquetSharp.Arrow; + +namespace ParquetSharp.Dataset.Filter; + +/// +/// Selects row groups to read from a ParquetFile based on the applied filter and +/// row group statistics +/// +internal sealed class RowGroupSelector +{ + public RowGroupSelector(IFilter filter) + { + _filter = filter; + } + + /// + /// Get the indices of row groups to read, or return null to read all row groups + /// + /// The Arrow file reader to get row groups for + public int[]? GetRequiredRowGroups(FileReader reader) + { + var numRowGroups = reader.NumRowGroups; + var columnIndices = GetParquetColumnIndices(reader); + if (columnIndices.Count == 0) + { + // Filter fields are not fields in the Parquet file + return null; + } + + using var parquetReader = reader.ParquetReader; + using var fileMetadata = parquetReader.FileMetaData; + var schemaDescriptor = fileMetadata.Schema; + + var rowGroups = new List(); + var columnStatistics = new Dictionary(columnIndices.Count); + for (var rowGroupIdx = 0; rowGroupIdx < numRowGroups; ++rowGroupIdx) + { + using var rowGroup = parquetReader.RowGroup(rowGroupIdx); + foreach (var (columnName, columnIndex) in columnIndices) + { + var metadata = rowGroup.MetaData.GetColumnChunkMetaData(columnIndex); + using var statistics = metadata.Statistics; + if (statistics?.HasMinMax ?? false) + { + var logicalStatistics = LogicalStatistics.FromStatistics( + statistics, schemaDescriptor.Column(columnIndex)); + if (logicalStatistics != null) + { + columnStatistics[columnName] = logicalStatistics; + } + } + // Else if statistics are not available, we may still be able to exclude this + // row group based on another column if an "and" condition is used. + } + + if (_filter.IncludeRowGroup(columnStatistics)) + { + rowGroups.Add(rowGroupIdx); + } + + columnStatistics.Clear(); + } + + return rowGroups.ToArray(); + } + + /// + /// Get the column indices in the Parquet file corresponding to filter fields + /// + /// The Arrow file reader + private Dictionary GetParquetColumnIndices(FileReader reader) + { + var columnIndices = new Dictionary(); + var schema = reader.Schema; + var manifest = reader.SchemaManifest; + foreach (var column in _filter.Columns()) + { + if (schema.FieldsLookup.Contains(column)) + { + var fieldIndex = schema.GetFieldIndex(column); + var columnIndex = manifest.SchemaField(fieldIndex).ColumnIndex; + if (columnIndex >= 0) + { + columnIndices[column] = columnIndex; + } + else + { + // This shouldn't happen as FieldLookup only searches top level columns + throw new Exception( + $"Cannot filter on field {fieldIndex} ('{column}'), which is not a leaf-level Parquet column"); + } + } + } + + return columnIndices; + } + + private readonly IFilter _filter; +} diff --git a/ParquetSharp.Dataset/IFilter.cs b/ParquetSharp.Dataset/IFilter.cs index e3d93df..f5984e3 100644 --- a/ParquetSharp.Dataset/IFilter.cs +++ b/ParquetSharp.Dataset/IFilter.cs @@ -16,6 +16,13 @@ public interface IFilter /// True if the partition data should be read bool IncludePartition(PartitionInformation partitionInformation); + /// + /// Whether to read data for a Parquet row group + /// + /// Dictionary of statistics for the filter columns, keyed by name + /// True if the row group should be read + bool IncludeRowGroup(IReadOnlyDictionary columnStatistics); + /// /// Compute a boolean mask indicating which rows in a batch should /// be included. Can return null to indicate that all rows are included. From 382dd131d224b17f6546a5cea9776e9bb051f59f Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 4 Apr 2024 09:52:57 +1300 Subject: [PATCH 3/8] Set initial list capacities where possible --- ParquetSharp.Dataset/DatasetStreamReader.cs | 4 ++-- ParquetSharp.Dataset/Filter/RowGroupSelector.cs | 2 +- ParquetSharp.Dataset/FragmentExpander.cs | 2 +- ParquetSharp.Dataset/Partitioning/HivePartitioning.cs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ParquetSharp.Dataset/DatasetStreamReader.cs b/ParquetSharp.Dataset/DatasetStreamReader.cs index 383f751..292c97c 100644 --- a/ParquetSharp.Dataset/DatasetStreamReader.cs +++ b/ParquetSharp.Dataset/DatasetStreamReader.cs @@ -85,7 +85,7 @@ public DatasetStreamReader( return null; } - var arrays = new List(); + var arrays = new List(recordBatch.ColumnCount); for (var colIdx = 0; colIdx < recordBatch.ColumnCount; ++colIdx) { var filterApplier = new ArrayMaskApplier(filterMask); @@ -128,7 +128,7 @@ private void GetNextReader() private int[] GetFileColumnIndices(FileReader fileReader) { var fileFields = fileReader.Schema.FieldsList; - var columnIndices = new List(); + var columnIndices = new List(_requiredFields.Count); for (var fieldIdx = 0; fieldIdx < fileFields.Count; ++fieldIdx) { if (_requiredFields.Contains(fileFields[fieldIdx].Name)) diff --git a/ParquetSharp.Dataset/Filter/RowGroupSelector.cs b/ParquetSharp.Dataset/Filter/RowGroupSelector.cs index 36c3d64..dce17b1 100644 --- a/ParquetSharp.Dataset/Filter/RowGroupSelector.cs +++ b/ParquetSharp.Dataset/Filter/RowGroupSelector.cs @@ -33,7 +33,7 @@ public RowGroupSelector(IFilter filter) using var fileMetadata = parquetReader.FileMetaData; var schemaDescriptor = fileMetadata.Schema; - var rowGroups = new List(); + var rowGroups = new List(numRowGroups); var columnStatistics = new Dictionary(columnIndices.Count); for (var rowGroupIdx = 0; rowGroupIdx < numRowGroups; ++rowGroupIdx) { diff --git a/ParquetSharp.Dataset/FragmentExpander.cs b/ParquetSharp.Dataset/FragmentExpander.cs index d07cb12..25bd5e4 100644 --- a/ParquetSharp.Dataset/FragmentExpander.cs +++ b/ParquetSharp.Dataset/FragmentExpander.cs @@ -19,7 +19,7 @@ public FragmentExpander(Apache.Arrow.Schema resultSchema) public RecordBatch ExpandBatch(RecordBatch fragmentBatch, PartitionInformation partitionInfo) { var fieldCount = _resultSchema.FieldsList.Count; - var arrays = new List(); + var arrays = new List(fieldCount); var fragmentFields = new HashSet(fragmentBatch.Schema.FieldsList.Select(f => f.Name)); var partitionFields = new HashSet(partitionInfo.Batch.Schema.FieldsList.Select(f => f.Name)); for (var i = 0; i < fieldCount; ++i) diff --git a/ParquetSharp.Dataset/Partitioning/HivePartitioning.cs b/ParquetSharp.Dataset/Partitioning/HivePartitioning.cs index d1c5adf..6ec761a 100644 --- a/ParquetSharp.Dataset/Partitioning/HivePartitioning.cs +++ b/ParquetSharp.Dataset/Partitioning/HivePartitioning.cs @@ -68,8 +68,8 @@ public HivePartitioning(Apache.Arrow.Schema schema) public PartitionInformation Parse(string[] pathComponents) { - var arrays = new List(); - var fields = new List(); + var arrays = new List(pathComponents.Length); + var fields = new List(pathComponents.Length); foreach (var dirName in pathComponents) { From f86cab8f0d0e1effd1a38114afc2a8132e7934c4 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 4 Apr 2024 10:49:43 +1300 Subject: [PATCH 4/8] Add tests for logical statistics creation --- .../Filter/TestLogicalStatistics.cs | 294 ++++++++++++++++++ .../Filter/LogicalStatistics.cs | 7 +- .../Filter/RowGroupSelector.cs | 11 +- 3 files changed, 304 insertions(+), 8 deletions(-) create mode 100644 ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs diff --git a/ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs b/ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs new file mode 100644 index 0000000..383b51a --- /dev/null +++ b/ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs @@ -0,0 +1,294 @@ +using System; +using System.Linq; +using NUnit.Framework; +using ParquetSharp.Dataset.Filter; + +namespace ParquetSharp.Dataset.Test.Filter; + +[TestFixture] +public class TestLogicalStatistics +{ + [Test] + public void TestCreateSByteStatistics() + { + var rowGroupValues = new[] + { + new sbyte?[] { -2, -4, 5, null, 1, 0 }, + new sbyte?[] { -6, 7, 4 }, + new sbyte?[] { 2, null, 3, 2 }, + new sbyte?[] { sbyte.MaxValue, sbyte.MinValue }, + }; + var expectedMin = new sbyte[] { -4, -6, 2, sbyte.MinValue }; + var expectedMax = new sbyte[] { 5, 7, 3, sbyte.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateShortStatistics() + { + var rowGroupValues = new[] + { + new short?[] { -2, -4, 5, null, 1, 0 }, + new short?[] { -6, 7, 4 }, + new short?[] { 2, null, 3, 2 }, + new short?[] { short.MaxValue, short.MinValue }, + }; + var expectedMin = new short[] { -4, -6, 2, short.MinValue }; + var expectedMax = new short[] { 5, 7, 3, short.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateIntStatistics() + { + var rowGroupValues = new[] + { + new int?[] { -2, -4, 5, null, 1, 0 }, + new int?[] { -6, 7, 4 }, + new int?[] { 2, null, 3, 2 }, + new int?[] { int.MaxValue, int.MinValue }, + }; + var expectedMin = new[] { -4, -6, 2, int.MinValue }; + var expectedMax = new[] { 5, 7, 3, int.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateLongStatistics() + { + var rowGroupValues = new[] + { + new long?[] { -2, -4, 5, null, 1, 0 }, + new long?[] { -6, 7, 4 }, + new long?[] { 2, null, 3, 2 }, + new long?[] { long.MaxValue, long.MinValue }, + }; + var expectedMin = new[] { -4, -6, 2, long.MinValue }; + var expectedMax = new[] { 5, 7, 3, long.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateByteStatistics() + { + var rowGroupValues = new[] + { + new byte?[] { 5, null, 1, 0 }, + new byte?[] { 2, null, 2 }, + new byte?[] { byte.MaxValue, byte.MaxValue - 1 }, + }; + var expectedMin = new byte[] { 0, 2, byte.MaxValue - 1 }; + var expectedMax = new byte[] { 5, 2, byte.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateUShortStatistics() + { + var rowGroupValues = new[] + { + new ushort?[] { 5, null, 1, 0 }, + new ushort?[] { 2, null, 2 }, + new ushort?[] { ushort.MaxValue, ushort.MaxValue - 1 }, + }; + var expectedMin = new ushort[] { 0, 2, ushort.MaxValue - 1 }; + var expectedMax = new ushort[] { 5, 2, ushort.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateUIntStatistics() + { + var rowGroupValues = new[] + { + new uint?[] { 5, null, 1, 0 }, + new uint?[] { 2, null, 2 }, + new uint?[] { uint.MaxValue, uint.MaxValue - 1 }, + }; + var expectedMin = new uint[] { 0, 2, uint.MaxValue - 1 }; + var expectedMax = new uint[] { 5, 2, uint.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateULongStatistics() + { + var rowGroupValues = new[] + { + new ulong?[] { 5, null, 1, 0 }, + new ulong?[] { 2, null, 2 }, + new ulong?[] { ulong.MaxValue, ulong.MaxValue - 1 }, + }; + var expectedMin = new ulong[] { 0, 2, ulong.MaxValue - 1 }; + var expectedMax = new ulong[] { 5, 2, ulong.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateHalfStatistics() + { + var rowGroupValues = new[] + { + new float?[] { -2.0f, -4.25f, 5.75f, null, float.NaN, 1, 0 } + .Select(f => f.HasValue ? (Half)f.Value : (Half?)null).ToArray(), + new Half?[] { Half.MinValue, null, Half.MaxValue }, + }; + var expectedMin = new[] { (Half)(-4.25f), Half.MinValue }; + var expectedMax = new[] { (Half)5.75f, Half.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateFloatStatistics() + { + var rowGroupValues = new[] + { + new float?[] { -2.0f, -4.25f, 5.75f, null, float.NaN, 1, 0 }, + new float?[] { float.MinValue, null, float.MaxValue }, + }; + var expectedMin = new[] { -4.25f, float.MinValue }; + var expectedMax = new[] { 5.75f, float.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestCreateDoubleStatistics() + { + var rowGroupValues = new[] + { + new double?[] { -2.0, -4.25, 5.75, null, double.NaN, 1, 0 }, + new double?[] { double.MinValue, null, double.MaxValue }, + }; + var expectedMin = new[] { -4.25, double.MinValue }; + var expectedMax = new[] { 5.75, double.MaxValue }; + + TestCreateLogicalStatistics(rowGroupValues, expectedMin, expectedMax); + } + + [Test] + public void TestAllNullValues() + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + var rowGroupValues = new[] + { + new int?[] { null, null, null }, + }; + + WriteParquet(filePath, rowGroupValues); + var statistics = GetStatistics(filePath); + + // HasMinMax should be false, so statistics will be null + Assert.That(statistics.Length, Is.EqualTo(1)); + Assert.That(statistics[0], Is.Null); + } + + [Test] + public void TestCreateStringStatistics() + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + var rowGroupValues = new[] + { + new[] { "abc", "def", "ghi" } + }; + + WriteParquet(filePath, rowGroupValues); + var statistics = GetStatistics(filePath); + + // String statistics are not currently supported + Assert.That(statistics.Length, Is.EqualTo(1)); + Assert.That(statistics[0], Is.Null); + } + + [Test] + public void TestCreateDateStatistics() + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + var rowGroupValues = new[] + { + new Date[] { new(2024, 4, 1), new(2024, 4, 2) } + }; + + WriteParquet(filePath, rowGroupValues); + var statistics = GetStatistics(filePath); + + // Stats should be null, as although the physical type is int32, + // we don't currently support creation of logical statistics for this type. + Assert.That(statistics.Length, Is.EqualTo(1)); + Assert.That(statistics[0], Is.Null); + } + + private static void TestCreateLogicalStatistics(T?[][] rowGroupValues, T[] expectedMin, T[] expectedMax) + where T : struct + { + using var tmpDir = new DisposableDirectory(); + var filePath = tmpDir.AbsPath("test.parquet"); + + WriteParquet(filePath, rowGroupValues); + var statistics = GetStatistics(filePath); + + for (var rowGroup = 0; rowGroup < rowGroupValues.Length; ++rowGroup) + { + var rowGroupStats = statistics[rowGroup]; + Assert.That(rowGroupStats, Is.InstanceOf>()); + var typedStatistics = rowGroupStats as LogicalStatistics; + + Assert.That(typedStatistics!.Min, Is.EqualTo(expectedMin[rowGroup])); + Assert.That(typedStatistics.Max, Is.EqualTo(expectedMax[rowGroup])); + } + } + + private static LogicalStatistics?[] GetStatistics(string filePath) + { + using var fileReader = new ParquetFileReader(filePath); + using var fileMetadata = fileReader.FileMetaData; + var columnDescriptor = fileMetadata.Schema.Column(0); + + var stats = new LogicalStatistics?[fileMetadata.NumRowGroups]; + + for (var rowGroup = 0; rowGroup < fileMetadata.NumRowGroups; ++rowGroup) + { + using var rowGroupReader = fileReader.RowGroup(rowGroup); + + using var columnMetadata = rowGroupReader.MetaData.GetColumnChunkMetaData(0); + using var statistics = columnMetadata.Statistics; + Assert.That(statistics, Is.Not.Null); + + stats[rowGroup] = LogicalStatistics.FromStatistics(statistics!, columnDescriptor); + } + + return stats; + } + + private static void WriteParquet(string path, T[][] rowGroupValues) + { + var columns = new Column[] + { + new Column("x") + }; + using var writer = new ParquetFileWriter(path, columns); + foreach (var values in rowGroupValues) + { + using var rowGroupWriter = writer.AppendRowGroup(); + using var colWriter = rowGroupWriter.NextColumn().LogicalWriter(); + colWriter.WriteBatch(values); + } + + writer.Close(); + } +} diff --git a/ParquetSharp.Dataset/Filter/LogicalStatistics.cs b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs index efb6f6a..cf815a6 100644 --- a/ParquetSharp.Dataset/Filter/LogicalStatistics.cs +++ b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs @@ -4,8 +4,13 @@ namespace ParquetSharp.Dataset.Filter; public abstract class LogicalStatistics { - public static LogicalStatistics? FromStatistics(Statistics statistics, ColumnDescriptor descriptor) + public static LogicalStatistics? FromStatistics(Statistics? statistics, ColumnDescriptor descriptor) { + if (!(statistics?.HasMinMax ?? false)) + { + return null; + } + using var logicalType = descriptor.LogicalType; checked { diff --git a/ParquetSharp.Dataset/Filter/RowGroupSelector.cs b/ParquetSharp.Dataset/Filter/RowGroupSelector.cs index dce17b1..b67c17f 100644 --- a/ParquetSharp.Dataset/Filter/RowGroupSelector.cs +++ b/ParquetSharp.Dataset/Filter/RowGroupSelector.cs @@ -42,14 +42,11 @@ public RowGroupSelector(IFilter filter) { var metadata = rowGroup.MetaData.GetColumnChunkMetaData(columnIndex); using var statistics = metadata.Statistics; - if (statistics?.HasMinMax ?? false) + var logicalStatistics = LogicalStatistics.FromStatistics( + statistics, schemaDescriptor.Column(columnIndex)); + if (logicalStatistics != null) { - var logicalStatistics = LogicalStatistics.FromStatistics( - statistics, schemaDescriptor.Column(columnIndex)); - if (logicalStatistics != null) - { - columnStatistics[columnName] = logicalStatistics; - } + columnStatistics[columnName] = logicalStatistics; } // Else if statistics are not available, we may still be able to exclude this // row group based on another column if an "and" condition is used. From bba3cb5d24a1fa628b990aa38252458eb3a2f243 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 4 Apr 2024 13:37:45 +1300 Subject: [PATCH 5/8] Refactor logical statistics visitor pattern --- .../Filter/TestLogicalStatistics.cs | 4 +- .../Filter/BaseStatisticsEvaluator.cs | 54 ------------------- .../Filter/ColumnValueFilter.cs | 10 ++-- .../Filter/ILogicalStatisticsVisitor.cs | 11 ++++ .../Filter/IntEqualityStatisticsEvaluator.cs | 31 +++++++---- .../Filter/IntRangeStatisticsEvaluator.cs | 31 +++++++---- .../Filter/LogicalStatistics.cs | 14 +++++ 7 files changed, 76 insertions(+), 79 deletions(-) delete mode 100644 ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs create mode 100644 ParquetSharp.Dataset/Filter/ILogicalStatisticsVisitor.cs diff --git a/ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs b/ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs index 383b51a..3efdafa 100644 --- a/ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs +++ b/ParquetSharp.Dataset.Test/Filter/TestLogicalStatistics.cs @@ -189,7 +189,9 @@ public void TestAllNullValues() WriteParquet(filePath, rowGroupValues); var statistics = GetStatistics(filePath); - // HasMinMax should be false, so statistics will be null + // HasMinMax should be false, so statistics will be null. + // In future we might want to allow statistics without Min/Max values though, + // if we want to expose other stats like the null count. Assert.That(statistics.Length, Is.EqualTo(1)); Assert.That(statistics[0], Is.Null); } diff --git a/ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs b/ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs deleted file mode 100644 index be488e3..0000000 --- a/ParquetSharp.Dataset/Filter/BaseStatisticsEvaluator.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System; - -namespace ParquetSharp.Dataset.Filter; - -/// -/// Base class for evaluating whether a row group should be read based on column statistics. -/// Defaults to including row groups if handling a type isn't overridden. -/// -internal class BaseStatisticsEvaluator -{ - public bool IncludeRowGroup(LogicalStatistics untypedStats) - { - return untypedStats switch - { - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - LogicalStatistics stats => IncludeRowGroup(stats), - _ => true - }; - } - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; - - protected virtual bool IncludeRowGroup(LogicalStatistics stats) => true; -} diff --git a/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs b/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs index 3efa165..af218e0 100644 --- a/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs +++ b/ParquetSharp.Dataset/Filter/ColumnValueFilter.cs @@ -1,6 +1,4 @@ -using System; using System.Collections.Generic; -using System.Linq; using Apache.Arrow; namespace ParquetSharp.Dataset.Filter; @@ -10,7 +8,7 @@ internal sealed class ColumnValueFilter : IFilter internal ColumnValueFilter( string columnName, BaseFilterEvaluator evaluator, - BaseStatisticsEvaluator? statsEvaluator = null) + ILogicalStatisticsVisitor? statsEvaluator = null) { _columnName = columnName; _evaluator = evaluator; @@ -34,9 +32,9 @@ public bool IncludePartition(PartitionInformation partitionInfo) public bool IncludeRowGroup(IReadOnlyDictionary columnStatistics) { - if (columnStatistics.TryGetValue(_columnName, out var statistics)) + if (_statsEvaluator != null && columnStatistics.TryGetValue(_columnName, out var statistics)) { - return _statsEvaluator?.IncludeRowGroup(statistics) ?? true; + return statistics.Accept(_statsEvaluator); } // Filter column is not a Parquet column or does not have statistics @@ -62,5 +60,5 @@ public IEnumerable Columns() private readonly string _columnName; private readonly BaseFilterEvaluator _evaluator; - private readonly BaseStatisticsEvaluator? _statsEvaluator; + private readonly ILogicalStatisticsVisitor? _statsEvaluator; } diff --git a/ParquetSharp.Dataset/Filter/ILogicalStatisticsVisitor.cs b/ParquetSharp.Dataset/Filter/ILogicalStatisticsVisitor.cs new file mode 100644 index 0000000..7c3b5bf --- /dev/null +++ b/ParquetSharp.Dataset/Filter/ILogicalStatisticsVisitor.cs @@ -0,0 +1,11 @@ +namespace ParquetSharp.Dataset.Filter; + +public interface ILogicalStatisticsVisitor +{ + TOut Visit(LogicalStatistics stats); +} + +public interface ILogicalStatisticsVisitor : ILogicalStatisticsVisitor +{ + TOut Visit(LogicalStatistics stats); +} diff --git a/ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs b/ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs index af108b3..98cc380 100644 --- a/ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs +++ b/ParquetSharp.Dataset/Filter/IntEqualityStatisticsEvaluator.cs @@ -1,48 +1,61 @@ namespace ParquetSharp.Dataset.Filter; -internal sealed class IntEqualityStatisticsEvaluator : BaseStatisticsEvaluator +internal sealed class IntEqualityStatisticsEvaluator + : ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor { public IntEqualityStatisticsEvaluator(long value) { _value = value; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) + { + return true; + } + + public bool Visit(LogicalStatistics stats) { return _value >= stats.Min && _value <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _value >= stats.Min && _value <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _value >= stats.Min && _value <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _value >= 0 && (ulong)_value >= stats.Min && (ulong)_value <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _value >= stats.Min && _value <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _value >= stats.Min && _value <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _value >= stats.Min && _value <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _value >= stats.Min && _value <= stats.Max; } diff --git a/ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs b/ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs index cb52b59..5864b1a 100644 --- a/ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs +++ b/ParquetSharp.Dataset/Filter/IntRangeStatisticsEvaluator.cs @@ -1,6 +1,14 @@ namespace ParquetSharp.Dataset.Filter; -internal sealed class IntRangeStatisticsEvaluator : BaseStatisticsEvaluator +internal sealed class IntRangeStatisticsEvaluator + : ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor, + ILogicalStatisticsVisitor { public IntRangeStatisticsEvaluator(long start, long end) { @@ -8,42 +16,47 @@ public IntRangeStatisticsEvaluator(long start, long end) _end = end; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) + { + return true; + } + + public bool Visit(LogicalStatistics stats) { return _end >= stats.Min && _start <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _end >= stats.Min && _start <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _end >= stats.Min && _start <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return (_end >= 0 && (ulong)_end >= stats.Min) && (_start < 0 || (ulong)_start <= stats.Max); } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _end >= stats.Min && _start <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _end >= stats.Min && _start <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _end >= stats.Min && _start <= stats.Max; } - protected override bool IncludeRowGroup(LogicalStatistics stats) + public bool Visit(LogicalStatistics stats) { return _end >= stats.Min && _start <= stats.Max; } diff --git a/ParquetSharp.Dataset/Filter/LogicalStatistics.cs b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs index cf815a6..98d8750 100644 --- a/ParquetSharp.Dataset/Filter/LogicalStatistics.cs +++ b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs @@ -43,6 +43,8 @@ public abstract class LogicalStatistics }; } } + + public abstract TOut Accept(ILogicalStatisticsVisitor visitor); } /// @@ -59,4 +61,16 @@ internal LogicalStatistics(T min, T max) public T Min { get; } public T Max { get; } + + public override TOut Accept(ILogicalStatisticsVisitor visitor) + { + if (visitor is ILogicalStatisticsVisitor typedVisitor) + { + return typedVisitor.Visit(this); + } + else + { + return visitor.Visit(this); + } + } } From 3787f91e2e6959ffa659bb618cc97ec624f0ecd1 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 4 Apr 2024 14:32:29 +1300 Subject: [PATCH 6/8] Tidy up statistics construction --- .../Filter/LogicalStatistics.cs | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/ParquetSharp.Dataset/Filter/LogicalStatistics.cs b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs index 98d8750..b075df4 100644 --- a/ParquetSharp.Dataset/Filter/LogicalStatistics.cs +++ b/ParquetSharp.Dataset/Filter/LogicalStatistics.cs @@ -16,34 +16,32 @@ public abstract class LogicalStatistics { return (statistics, logicalType) switch { - (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), - (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), - (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), - (Statistics stats, IntLogicalType { BitWidth: 8, IsSigned: true }) => new LogicalStatistics( - (sbyte)stats.Min, (sbyte)stats.Max), - (Statistics stats, IntLogicalType { BitWidth: 8, IsSigned: false }) => new LogicalStatistics( - (byte)unchecked((uint)stats.Min), (byte)unchecked((uint)stats.Max)), - (Statistics stats, IntLogicalType { BitWidth: 16, IsSigned: true }) => new LogicalStatistics( - (short)stats.Min, (short)stats.Max), - (Statistics stats, IntLogicalType { BitWidth: 16, IsSigned: false }) => new LogicalStatistics( - (ushort)unchecked((uint)stats.Min), (ushort)unchecked((uint)stats.Max)), - (Statistics stats, IntLogicalType { BitWidth: 32, IsSigned: true }) => new LogicalStatistics( - stats.Min, stats.Max), - (Statistics stats, IntLogicalType { BitWidth: 32, IsSigned: false }) => new LogicalStatistics( - unchecked((uint)stats.Min), unchecked((uint)stats.Max)), - (Statistics stats, IntLogicalType { BitWidth: 64, IsSigned: true }) => new LogicalStatistics( - stats.Min, stats.Max), - (Statistics stats, IntLogicalType { BitWidth: 64, IsSigned: false }) => new LogicalStatistics( - unchecked((ulong)stats.Min), unchecked((ulong)stats.Max)), - (Statistics stats, Float16LogicalType) => new LogicalStatistics( - LogicalRead.ToHalf(stats.Min), LogicalRead.ToHalf(stats.Max)), - (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), - (Statistics stats, NoneLogicalType) => new LogicalStatistics(stats.Min, stats.Max), + (Statistics stats, NoneLogicalType) => CreateStatistics(stats, val => val), + (Statistics stats, NoneLogicalType) => CreateStatistics(stats, val => val), + (Statistics stats, NoneLogicalType) => CreateStatistics(stats, val => val), + (Statistics stats, IntLogicalType { BitWidth: 8, IsSigned: true }) => CreateStatistics(stats, val => (sbyte)val), + (Statistics stats, IntLogicalType { BitWidth: 8, IsSigned: false }) => CreateStatistics(stats, val => (byte)unchecked((uint)val)), + (Statistics stats, IntLogicalType { BitWidth: 16, IsSigned: true }) => CreateStatistics(stats, val => (short)val), + (Statistics stats, IntLogicalType { BitWidth: 16, IsSigned: false }) => CreateStatistics(stats, val => (ushort)unchecked((uint)val)), + (Statistics stats, IntLogicalType { BitWidth: 32, IsSigned: true }) => CreateStatistics(stats, val => val), + (Statistics stats, IntLogicalType { BitWidth: 32, IsSigned: false }) => CreateStatistics(stats, val => unchecked((uint)val)), + (Statistics stats, IntLogicalType { BitWidth: 64, IsSigned: true }) => CreateStatistics(stats, val => val), + (Statistics stats, IntLogicalType { BitWidth: 64, IsSigned: false }) => CreateStatistics(stats, val => unchecked((ulong)val)), + (Statistics stats, Float16LogicalType) => CreateStatistics(stats, LogicalRead.ToHalf), + (Statistics stats, NoneLogicalType) => CreateStatistics(stats, val => val), + (Statistics stats, NoneLogicalType) => CreateStatistics(stats, val => val), _ => null, }; } } + private static LogicalStatistics CreateStatistics( + Statistics statistics, Func converter) + where TPhysical : unmanaged + { + return new LogicalStatistics(converter(statistics.Min), converter(statistics.Max)); + } + public abstract TOut Accept(ILogicalStatisticsVisitor visitor); } From 9ddc8d88467ea132ad8ffc111b8afb326472ae86 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 4 Apr 2024 22:38:17 +1300 Subject: [PATCH 7/8] Use parameterized tests for RowGroupSelector --- .../Filter/TestRowGroupSelector.cs | 38 ++++++------------- 1 file changed, 12 insertions(+), 26 deletions(-) diff --git a/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs b/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs index d3e1f65..29c366e 100644 --- a/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs +++ b/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs @@ -11,15 +11,17 @@ namespace ParquetSharp.Dataset.Test.Filter; public class TestRowGroupSelector { [Test] - public void TestFilterPartitionColumn() + public void TestFilterPartitionColumn([Values] bool enableStatistics) { using var tmpDir = new DisposableDirectory(); var filePath = tmpDir.AbsPath("test.parquet"); var batch0 = GenerateBatch(0, 10); var batch1 = GenerateBatch(10, 20); - WriteParquetFile(filePath, new[] { batch0, batch1 }, includeStats: true); + WriteParquetFile(filePath, new[] { batch0, batch1 }, includeStats: enableStatistics); + // Filter on an arbitrary field name that isn't found in the data file. + // This will happen when filtering on a field from the partitioning schema. var filter = Col.Named("part").IsEqualTo(5); var rowGroupSelector = new RowGroupSelector(filter); @@ -29,25 +31,7 @@ public void TestFilterPartitionColumn() } [Test] - public void TestNoStatistics() - { - using var tmpDir = new DisposableDirectory(); - var filePath = tmpDir.AbsPath("test.parquet"); - - var batch0 = GenerateBatch(0, 10); - var batch1 = GenerateBatch(10, 20); - WriteParquetFile(filePath, new[] { batch0, batch1 }, includeStats: false); - - var filter = Col.Named("id").IsEqualTo(5); - var rowGroupSelector = new RowGroupSelector(filter); - - using var reader = new FileReader(filePath); - var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); - Assert.That(rowGroups, Is.EqualTo(new[] { 0, 1 })); - } - - [Test] - public void TestFilterIntColumnValue() + public void TestFilterIntColumnValue([Values] bool enableStatistics) { using var tmpDir = new DisposableDirectory(); var filePath = tmpDir.AbsPath("test.parquet"); @@ -55,18 +39,19 @@ public void TestFilterIntColumnValue() var batch0 = GenerateBatch(0, 10); var batch1 = GenerateBatch(10, 20); var batch2 = GenerateBatch(20, 30); - WriteParquetFile(filePath, new[] { batch0, batch1, batch2 }, includeStats: true); + WriteParquetFile(filePath, new[] { batch0, batch1, batch2 }, includeStats: enableStatistics); var filter = Col.Named("id").IsEqualTo(15); var rowGroupSelector = new RowGroupSelector(filter); using var reader = new FileReader(filePath); var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); - Assert.That(rowGroups, Is.EqualTo(new[] { 1 })); + var expectedRowGroups = enableStatistics ? new[] { 1 } : new[] { 0, 1, 2 }; + Assert.That(rowGroups, Is.EqualTo(expectedRowGroups)); } [Test] - public void TestFilterIntColumnRange() + public void TestFilterIntColumnRange([Values] bool enableStatistics) { using var tmpDir = new DisposableDirectory(); var filePath = tmpDir.AbsPath("test.parquet"); @@ -74,14 +59,15 @@ public void TestFilterIntColumnRange() var batch0 = GenerateBatch(0, 10); var batch1 = GenerateBatch(10, 20); var batch2 = GenerateBatch(20, 30); - WriteParquetFile(filePath, new[] { batch0, batch1, batch2 }, includeStats: true); + WriteParquetFile(filePath, new[] { batch0, batch1, batch2 }, includeStats: enableStatistics); var filter = Col.Named("id").IsInRange(15, 25); var rowGroupSelector = new RowGroupSelector(filter); using var reader = new FileReader(filePath); var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); - Assert.That(rowGroups, Is.EqualTo(new[] { 1, 2 })); + var expectedRowGroups = enableStatistics ? new[] { 1, 2 } : new[] { 0, 1, 2 }; + Assert.That(rowGroups, Is.EqualTo(expectedRowGroups)); } private static RecordBatch GenerateBatch(int idStart, int idEnd) From 178663e914b40fddd078f9ecc12ed23578aeb8a7 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 4 Apr 2024 22:40:50 +1300 Subject: [PATCH 8/8] Extra comment --- ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs b/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs index 29c366e..474ce4b 100644 --- a/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs +++ b/ParquetSharp.Dataset.Test/Filter/TestRowGroupSelector.cs @@ -27,6 +27,7 @@ public void TestFilterPartitionColumn([Values] bool enableStatistics) using var reader = new FileReader(filePath); var rowGroups = rowGroupSelector.GetRequiredRowGroups(reader); + // Null row groups indicate that all row groups should be read Assert.That(rowGroups, Is.Null); }