Skip to content

Commit 24a8104

Browse files
authored
Dispose of Parquet RecordBatches when creating a new filtered batch (#25)
1 parent bfae663 commit 24a8104

File tree

5 files changed

+34
-12
lines changed

5 files changed

+34
-12
lines changed

ParquetSharp.Dataset.Benchmark/DatasetRead.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ public async Task<long> ReadAllFilesDirectly()
7676
{
7777
using var reader = new FileReader(filePath);
7878
using var batchReader = reader.GetRecordBatchReader();
79-
while (await batchReader.ReadNextRecordBatchAsync() is { } batch)
79+
while (await batchReader.ReadNextRecordBatchAsync() is { } batch_)
8080
{
81+
using var batch = batch_;
8182
rowsRead += batch.Length;
8283
}
8384
}
@@ -91,8 +92,9 @@ public async Task<long> ReadAllData()
9192
var dataset = new DatasetReader(_datasetDirectory, new HivePartitioning.Factory());
9293
using var reader = dataset.ToBatches();
9394
long rowsRead = 0;
94-
while (await reader.ReadNextRecordBatchAsync() is { } batch)
95+
while (await reader.ReadNextRecordBatchAsync() is { } batch_)
9596
{
97+
using var batch = batch_;
9698
rowsRead += batch.Length;
9799
}
98100

@@ -106,8 +108,9 @@ public async Task<long> FilterPartitions()
106108
var filter = Col.Named("group").IsEqualTo("group-2").And(Col.Named("day").IsEqualTo(2));
107109
using var reader = dataset.ToBatches(filter);
108110
long rowsRead = 0;
109-
while (await reader.ReadNextRecordBatchAsync() is { } batch)
111+
while (await reader.ReadNextRecordBatchAsync() is { } batch_)
110112
{
113+
using var batch = batch_;
111114
rowsRead += batch.Length;
112115
}
113116

@@ -121,8 +124,9 @@ public async Task<long> FilterFileData()
121124
var filter = Col.Named("id").IsEqualTo(5);
122125
using var reader = dataset.ToBatches(filter);
123126
long rowsRead = 0;
124-
while (await reader.ReadNextRecordBatchAsync() is { } batch)
127+
while (await reader.ReadNextRecordBatchAsync() is { } batch_)
125128
{
129+
using var batch = batch_;
126130
rowsRead += batch.Length;
127131
}
128132

@@ -135,8 +139,9 @@ public async Task<long> FilterToFileColumns()
135139
var dataset = new DatasetReader(_datasetDirectory, new HivePartitioning.Factory());
136140
using var reader = dataset.ToBatches(columns: new[] { "id", "ints", "doubles" });
137141
long rowsRead = 0;
138-
while (await reader.ReadNextRecordBatchAsync() is { } batch)
142+
while (await reader.ReadNextRecordBatchAsync() is { } batch_)
139143
{
144+
using var batch = batch_;
140145
rowsRead += batch.Length;
141146
}
142147

@@ -149,8 +154,9 @@ public async Task<long> FilterToSingleColumn()
149154
var dataset = new DatasetReader(_datasetDirectory, new HivePartitioning.Factory());
150155
using var reader = dataset.ToBatches(columns: new[] { "ints" });
151156
long rowsRead = 0;
152-
while (await reader.ReadNextRecordBatchAsync() is { } batch)
157+
while (await reader.ReadNextRecordBatchAsync() is { } batch_)
153158
{
159+
using var batch = batch_;
154160
rowsRead += batch.Length;
155161
}
156162

ParquetSharp.Dataset.Test/TestEncryption.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ public static async Task TestReadEncryptedData()
4747
using var reader = dataset.ToBatches();
4848

4949
var rowsRead = 0;
50-
while (await reader.ReadNextRecordBatchAsync() is { } batch)
50+
while (await reader.ReadNextRecordBatchAsync() is { } batch_)
5151
{
52+
using var batch = batch_;
5253
rowsRead += batch.Length;
5354
}
5455

ParquetSharp.Dataset.Test/TestFilterParquet.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,9 @@ private static async Task TestIntegerFilter(IFilter filter, Func<int, bool> incl
9898
var dataset = new DatasetReader(datasetDir.DirectoryPath);
9999
using var reader = dataset.ToBatches(filter);
100100
var valuesRead = new List<int?>();
101-
while (await reader.ReadNextRecordBatchAsync() is { } batch)
101+
while (await reader.ReadNextRecordBatchAsync() is { } batch_)
102102
{
103+
using var batch = batch_;
103104
var filteredBatchValues = batch.Column(0) as Int32Array;
104105
foreach (var value in filteredBatchValues!)
105106
{

ParquetSharp.Dataset/DatasetStreamReader.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading;
@@ -48,7 +49,7 @@ public DatasetStreamReader(
4849
var nextBatch = await _currentFragmentReader.ReadNextRecordBatchAsync(cancellationToken);
4950
if (nextBatch != null)
5051
{
51-
var filtered = FilterBatch(nextBatch);
52+
var filtered = FilterBatch(ref nextBatch);
5253
if (filtered == null)
5354
{
5455
// All rows excluded
@@ -70,9 +71,16 @@ public DatasetStreamReader(
7071
/// <summary>
7172
/// Return a record batch with rows filtered out using the current filter.
7273
/// Returns null if all rows are excluded.
74+
/// Takes ownership of the input record batch and will dispose it if it
75+
/// is filtered.
7376
/// </summary>
74-
private RecordBatch? FilterBatch(RecordBatch recordBatch)
77+
private RecordBatch? FilterBatch(ref RecordBatch? recordBatch)
7578
{
79+
if (recordBatch == null)
80+
{
81+
throw new ArgumentNullException(nameof(recordBatch));
82+
}
83+
7684
if (_filter == null)
7785
{
7886
return recordBatch;
@@ -97,7 +105,13 @@ public DatasetStreamReader(
97105
arrays.Add(filterApplier.MaskedArray);
98106
}
99107

100-
return new RecordBatch(recordBatch.Schema, arrays, filterMask.IncludedCount);
108+
var filteredBatch = new RecordBatch(recordBatch.Schema, arrays, filterMask.IncludedCount);
109+
110+
// Dispose input record batch so its memory can be immediately freed
111+
recordBatch.Dispose();
112+
recordBatch = null;
113+
114+
return filteredBatch;
101115
}
102116

103117
public void Dispose()

ParquetSharp.Dataset/ParquetSharp.Dataset.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<LangVersion>10.0</LangVersion>
66
<Nullable>enable</Nullable>
77
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
8-
<VersionPrefix>0.3.0</VersionPrefix>
8+
<VersionPrefix>0.3.1</VersionPrefix>
99
<Company>G-Research</Company>
1010
<Authors>G-Research</Authors>
1111
<Product>ParquetSharp.Dataset</Product>

0 commit comments

Comments
 (0)