Skip to content

Commit 11c5080

Browse files
authored
Fix reading all row groups when none are selected (#13)
1 parent 22f31ae commit 11c5080

File tree

4 files changed

+78
-12
lines changed

4 files changed

+78
-12
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System.Collections.Generic;
2+
using Apache.Arrow;
3+
using ParquetSharp.Dataset.Filter;
4+
5+
namespace ParquetSharp.Dataset.Test;
6+
7+
internal sealed class InstrumentedFilter : IFilter
8+
{
9+
public InstrumentedFilter(IFilter filter)
10+
{
11+
_filter = filter;
12+
}
13+
14+
public int IncludePartitionCallCount { get; private set; }
15+
16+
public int IncludeRowGroupCallCount { get; private set; }
17+
18+
public int ComputeMaskRowCount { get; private set; }
19+
20+
public bool IncludePartition(PartitionInformation partitionInformation)
21+
{
22+
++IncludePartitionCallCount;
23+
return _filter.IncludePartition(partitionInformation);
24+
}
25+
26+
public bool IncludeRowGroup(IReadOnlyDictionary<string, LogicalStatistics> columnStatistics)
27+
{
28+
++IncludeRowGroupCallCount;
29+
return _filter.IncludeRowGroup(columnStatistics);
30+
}
31+
32+
public FilterMask? ComputeMask(RecordBatch dataBatch)
33+
{
34+
ComputeMaskRowCount += dataBatch.Length;
35+
return _filter.ComputeMask(dataBatch);
36+
}
37+
38+
public IEnumerable<string> Columns()
39+
{
40+
return _filter.Columns();
41+
}
42+
43+
private readonly IFilter _filter;
44+
}

ParquetSharp.Dataset.Test/TestDatasetReader.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,18 @@ await VerifyData(
9797
new Dictionary<string, int> { { "a", 20 }, { "b", 20 } });
9898

9999
// Read filtered on partition
100-
var filter = Col.Named("part").IsEqualTo("b");
100+
var filter = new InstrumentedFilter(Col.Named("part").IsEqualTo("b"));
101101
using var filteredReader = dataset.ToBatches(filter);
102102
await VerifyData(
103103
filteredReader,
104104
new Dictionary<int, int> { { 2, 10 }, { 3, 10 } },
105105
new Dictionary<string, int> { { "b", 20 } });
106+
107+
// IncludePartition is called for the root directory and 2 subdirectories
108+
Assert.That(filter.IncludePartitionCallCount, Is.EqualTo(3));
109+
// No data files are used in the filter, so row group filtering is not used
110+
Assert.That(filter.IncludeRowGroupCallCount, Is.EqualTo(0));
111+
Assert.That(filter.ComputeMaskRowCount, Is.EqualTo(20));
106112
}
107113

108114
[Test]
@@ -322,10 +328,15 @@ public async Task TestFilterOnFileColumn()
322328
new NoPartitioning(),
323329
schema: schema);
324330

325-
var filter = Col.Named("id").IsInRange(0, 1);
331+
var filter = new InstrumentedFilter(Col.Named("id").IsInRange(0, 1));
326332
using var reader = dataset.ToBatches(filter);
327333

328334
await VerifyData(reader, new Dictionary<int, int> { { 0, 10 }, { 1, 10 } });
335+
336+
Assert.That(filter.IncludePartitionCallCount, Is.EqualTo(1));
337+
Assert.That(filter.IncludeRowGroupCallCount, Is.EqualTo(4));
338+
// Should only read data from one row group (10 rows) from each file
339+
Assert.That(filter.ComputeMaskRowCount, Is.EqualTo(20));
329340
}
330341

331342
[Test]
@@ -348,10 +359,15 @@ public async Task TestAllRowGroupsInFileExcluded()
348359
new NoPartitioning(),
349360
schema: schema);
350361

351-
var filter = Col.Named("id").IsEqualTo(2);
362+
var filter = new InstrumentedFilter(Col.Named("id").IsEqualTo(2));
352363
using var reader = dataset.ToBatches(filter);
353364

354365
await VerifyData(reader, new Dictionary<int, int> { { 2, 10 } });
366+
367+
Assert.That(filter.IncludePartitionCallCount, Is.EqualTo(1));
368+
Assert.That(filter.IncludeRowGroupCallCount, Is.EqualTo(4));
369+
// Should only read data from one row group (10 rows) from the second file
370+
Assert.That(filter.ComputeMaskRowCount, Is.EqualTo(10));
355371
}
356372

357373
[Test]

ParquetSharp.Dataset/DatasetStreamReader.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,22 +111,28 @@ private void GetNextReader()
111111
_currentFragmentReader?.Dispose();
112112
_currentFileReader?.Dispose();
113113

114-
if (_fragmentEnumerator.MoveNext())
114+
while (_fragmentEnumerator.MoveNext())
115115
{
116116
_currentFileReader = new FileReader(
117117
_fragmentEnumerator.Current.FilePath, _readerProperties, _arrowReaderProperties);
118118

119119
var rowGroups = _rowGroupSelector?.GetRequiredRowGroups(_currentFileReader);
120+
if (rowGroups != null && rowGroups.Length == 0)
121+
{
122+
_currentFileReader.Dispose();
123+
continue;
124+
}
125+
120126
var columnIndices = GetFileColumnIndices(_currentFileReader);
121127

122128
_currentFragmentReader = _currentFileReader.GetRecordBatchReader(
123129
rowGroups: rowGroups, columns: columnIndices);
130+
131+
return;
124132
}
125-
else
126-
{
127-
_currentFileReader = null;
128-
_currentFragmentReader = null;
129-
}
133+
134+
_currentFileReader = null;
135+
_currentFragmentReader = null;
130136
}
131137

132138
private int[] GetFileColumnIndices(FileReader fileReader)

ParquetSharp.Dataset/Filter/DateRangeEvaluator.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public void Visit(Date32Array array)
2626
var endNumber = _end.DayNumber - ArrowEpoch.DayNumber;
2727
if (inputArray.NullCount == 0)
2828
{
29-
var values = array.Values;
29+
var values = inputArray.Values;
3030
for (var i = 0; i < inputArray.Length; ++i)
3131
{
3232
var value = values[i];
@@ -38,7 +38,7 @@ public void Visit(Date32Array array)
3838
{
3939
for (var i = 0; i < inputArray.Length; ++i)
4040
{
41-
var value = array.GetValue(i);
41+
var value = inputArray.GetValue(i);
4242
var isInRange = value.HasValue && value.Value >= startNumber && value.Value <= endNumber;
4343
BitUtility.SetBit(mask, i, isInRange);
4444
}
@@ -54,7 +54,7 @@ public void Visit(Date64Array array)
5454
{
5555
for (var i = 0; i < inputArray.Length; ++i)
5656
{
57-
var value = array.GetDateOnly(i);
57+
var value = inputArray.GetDateOnly(i);
5858
var isInRange = value.HasValue && value.Value >= _start && value.Value <= _end;
5959
BitUtility.SetBit(mask, i, isInRange);
6060
}

0 commit comments

Comments
 (0)