Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions csharp/LogicalBatchReader/NestedReader.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;

namespace ParquetSharp.LogicalBatchReader
{
Expand All @@ -11,28 +12,36 @@ internal sealed class NestedReader<TItem> : ILogicalBatchReader<Nested<TItem>>
public NestedReader(ILogicalBatchReader<TItem> innerReader, int bufferLength)
{
_innerReader = innerReader;
_buffer = new TItem[bufferLength];
_bufferLength = bufferLength;
}

public int ReadBatch(Span<Nested<TItem>> destination)
{
// Read batches of values from the underlying reader and convert them to nested values
var totalRead = 0;
while (totalRead < destination.Length)
var buffer = ArrayPool<TItem>.Shared.Rent(_bufferLength);
try
{
var readSize = Math.Min(destination.Length - totalRead, _buffer.Length);
var valuesRead = _innerReader.ReadBatch(_buffer.AsSpan(0, readSize));
for (var i = 0; i < valuesRead; ++i)
while (totalRead < destination.Length)
{
destination[totalRead + i] = new Nested<TItem>(_buffer[i]);
}
var readSize = Math.Min(destination.Length - totalRead, buffer.Length);
var valuesRead = _innerReader.ReadBatch(buffer.AsSpan(0, readSize));
for (var i = 0; i < valuesRead; ++i)
{
destination[totalRead + i] = new Nested<TItem>(buffer[i]);
}

totalRead += valuesRead;
if (valuesRead < readSize)
{
break;
totalRead += valuesRead;
if (valuesRead < readSize)
{
break;
}
}
}
finally
{
ArrayPool<TItem>.Shared.Return(buffer);
}

return totalRead;
}
Expand All @@ -48,6 +57,6 @@ public long Skip(long numRowsToSkip)
}

private readonly ILogicalBatchReader<TItem> _innerReader;
private readonly TItem[] _buffer;
private readonly int _bufferLength;
}
}
27 changes: 18 additions & 9 deletions csharp/LogicalBatchWriter/NestedWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;

namespace ParquetSharp.LogicalBatchWriter
{
Expand All @@ -14,28 +15,36 @@ public NestedWriter(
{
_firstInnerWriter = firstInnerWriter;
_innerWriter = innerWriter;
_buffer = new TItem[bufferLength];
_bufferLength = bufferLength;
}

public void WriteBatch(ReadOnlySpan<Nested<TItem>> values)
{
var offset = 0;
var writer = _firstInnerWriter;
while (offset < values.Length)
var buffer = ArrayPool<TItem>.Shared.Rent(_bufferLength);
try
{
var batchSize = Math.Min(values.Length - offset, _buffer.Length);
for (var i = 0; i < batchSize; ++i)
while (offset < values.Length)
{
_buffer[i] = values[offset + i].Value;
var batchSize = Math.Min(values.Length - offset, buffer.Length);
for (var i = 0; i < batchSize; ++i)
{
buffer[i] = values[offset + i].Value;
}
writer.WriteBatch(buffer.AsSpan(0, batchSize));
offset += batchSize;
writer = _innerWriter;
}
writer.WriteBatch(_buffer.AsSpan(0, batchSize));
offset += batchSize;
writer = _innerWriter;
}
finally
{
ArrayPool<TItem>.Shared.Return(buffer);
}
}

private readonly ILogicalBatchWriter<TItem> _firstInnerWriter;
private readonly ILogicalBatchWriter<TItem> _innerWriter;
private readonly TItem[] _buffer;
private readonly int _bufferLength;
}
}
23 changes: 14 additions & 9 deletions csharp/LogicalColumnReader.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System;
using System.Buffers;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using ParquetSharp.Schema;
using ParquetSharp.LogicalBatchReader;

namespace ParquetSharp
Expand Down Expand Up @@ -159,17 +158,23 @@ public override TReturn Apply<TReturn>(ILogicalColumnReaderVisitor<TReturn> visi

public IEnumerator<TElement> GetEnumerator()
{
var buffer = new TElement[BufferLength];

while (HasNext)
var buffer = ArrayPool<TElement>.Shared.Rent(BufferLength);
try
{
var read = ReadBatch(buffer);

for (int i = 0; i != read; ++i)
while (HasNext)
{
yield return buffer[i];
var read = ReadBatch(buffer);

for (int i = 0; i != read; ++i)
{
yield return buffer[i];
}
}
}
finally
{
ArrayPool<TElement>.Shared.Return(buffer);
}
}

IEnumerator IEnumerable.GetEnumerator()
Expand Down