Skip to content

Commit

Permalink
Support buffered writing with the Arrow API (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve authored Mar 7, 2024
1 parent 925d233 commit 010bdde
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 8 deletions.
21 changes: 21 additions & 0 deletions cpp/arrow/FileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,32 @@ extern "C"
)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_WriteRecordBatches(
FileWriter* writer, struct ArrowArrayStream* stream, int64_t chunk_size)
{
TRYCATCH
(
std::shared_ptr<arrow::RecordBatchReader> reader;
PARQUET_ASSIGN_OR_THROW(reader, arrow::ImportRecordBatchReader(stream));
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
PARQUET_ASSIGN_OR_THROW(batches, reader->ToRecordBatches());
for (const auto& batch : batches)
{
PARQUET_THROW_NOT_OK(writer->WriteRecordBatch(*batch));
}
)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_NewRowGroup(FileWriter* writer, int64_t chunk_size)
{
TRYCATCH(PARQUET_THROW_NOT_OK(writer->NewRowGroup(chunk_size));)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_NewBufferedRowGroup(FileWriter* writer)
{
TRYCATCH(PARQUET_THROW_NOT_OK(writer->NewBufferedRowGroup());)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_WriteColumnChunk(
FileWriter* writer, struct ArrowArray* c_array, struct ArrowSchema* c_array_type)
{
Expand Down
74 changes: 74 additions & 0 deletions csharp.test/Arrow/TestFileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,80 @@ public async Task TestWriteRecordBatch()
await VerifyData(inStream, numRows);
}

[Test]
public async Task TestWriteBufferedRecordBatches()
{
var fields = new[]
{
new Field("x", new Apache.Arrow.Types.Int32Type(), false),
new Field("y", new Apache.Arrow.Types.FloatType(), false),
};
var schema = new Apache.Arrow.Schema(fields, null);

RecordBatch GetBatch(int xVal, int numRows)
{
var arrays = new IArrowArray[]
{
new Int32Array.Builder()
.AppendRange(Enumerable.Repeat(xVal, numRows))
.Build(),
new FloatArray.Builder()
.AppendRange(Enumerable.Range(0, numRows).Select(i => i / 100.0f))
.Build(),
};
return new RecordBatch(schema, arrays, numRows);
}

using var buffer = new ResizableBuffer();
using (var outStream = new BufferOutputStream(buffer))
{
using var propertiesBuilder = new WriterPropertiesBuilder();
propertiesBuilder.MaxRowGroupLength(250);
using var writerProperties = propertiesBuilder.Build();
using var writer = new FileWriter(outStream, schema, writerProperties);

using var batch0 = GetBatch(0, 100);
writer.WriteBufferedRecordBatch(batch0);
using var batch1 = GetBatch(0, 100);
writer.WriteBufferedRecordBatch(batch1);

writer.NewBufferedRowGroup();

using var batch2 = GetBatch(1, 100);
writer.WriteBufferedRecordBatch(batch2);
using var batch3 = GetBatch(1, 100);
writer.WriteBufferedRecordBatch(batch3);

writer.NewBufferedRowGroup();

using var batch4 = GetBatch(2, 300);
writer.WriteBufferedRecordBatch(batch4);

writer.Close();
}

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);
Assert.That(fileReader.NumRowGroups, Is.EqualTo(4));

var expectedSizes = new[] {200, 200, 250, 50};
var expectedXValues = new[] {0, 1, 2, 2};

for (var rowGroupIdx = 0; rowGroupIdx < fileReader.NumRowGroups; ++rowGroupIdx)
{
using var batchReader = fileReader.GetRecordBatchReader(rowGroups: new[] {rowGroupIdx});
using var batch = await batchReader.ReadNextRecordBatchAsync();
Assert.That(batch, Is.Not.Null);
Assert.That(batch.Length, Is.EqualTo(expectedSizes[rowGroupIdx]));
var xVals = batch.Column("x") as Int32Array;
Assert.That(xVals, Is.Not.Null);
for (var i = 0; i < xVals!.Length; ++i)
{
Assert.That(xVals.GetValue(i), Is.EqualTo(expectedXValues[rowGroupIdx]));
}
}
}

[Test]
public async Task TestWriteRowGroupColumns()
{
Expand Down
57 changes: 52 additions & 5 deletions csharp/Arrow/FileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ namespace ParquetSharp.Arrow
/// <summary>
/// Writes Parquet files using Arrow format data
///
/// This may be used to write whole tables or record batches,
/// This may be used to write whole tables or record batches at once,
/// using the WriteTable or WriteRecordBatch methods.
///
/// You can also buffer writes of record batches to allow writing multiple
/// record batches within a Parquet row group, using WriteBufferedRecordBatch
/// and NewBufferedRowGroup to start a new row group.
///
/// For more control over writing, you can create a new row group with NewRowGroup,
/// then write all columns for the row group with the WriteColumn method.
/// All required columns must be written before starting the next row group
Expand Down Expand Up @@ -146,8 +150,8 @@ public unsafe Apache.Arrow.Schema Schema
/// <summary>
/// Write an Arrow table to Parquet
///
/// The table data will be chunked into row groups that respect the maximum
/// chunk size specified if required.
/// A new row group will be started, and the table data will be chunked into
/// row groups that respect the maximum chunk size specified if required.
/// This method requires that the columns in the table use equal chunking.
/// </summary>
/// <param name="table">The table to write</param>
Expand All @@ -161,8 +165,8 @@ public void WriteTable(Table table, long chunkSize = 1024 * 1024)
/// <summary>
/// Write a record batch to Parquet
///
/// The data will be chunked into row groups that respect the maximum
/// chunk size specified if required.
/// A new row group will be started, and the record batch data will be chunked
/// into row groups that respect the maximum chunk size specified if required.
/// </summary>
/// <param name="recordBatch">The record batch to write</param>
/// <param name="chunkSize">The maximum length of row groups to write</param>
Expand All @@ -172,6 +176,31 @@ public void WriteRecordBatch(RecordBatch recordBatch, long chunkSize = 1024 * 10
WriteRecordBatchStream(arrayStream, chunkSize);
}

/// <summary>
/// Write a record batch to Parquet in buffered mode, allowing
/// multiple record batches to be written to the same row group.
///
/// New row groups are started if the data reaches the MaxRowGroupLength configured
/// in the WriterProperties.
/// </summary>
/// <param name="recordBatch">The record batch to write</param>
public void WriteBufferedRecordBatch(RecordBatch recordBatch)
{
var arrayStream = new RecordBatchStream(recordBatch.Schema, new[] {recordBatch});
WriteBufferedRecordBatches(arrayStream);
}

/// <summary>
/// Flush buffered data and start a new row group.
/// This can be used to force creation of a new row group when writing data
/// with WriteBufferedRecordBatch.
/// </summary>
public void NewBufferedRowGroup()
{
ExceptionInfo.Check(FileWriter_NewBufferedRowGroup(_handle.IntPtr));
GC.KeepAlive(_handle);
}

/// <summary>
/// Start writing a new row group to the file. After calling this method,
/// each column required in the schema must be written using WriteColumn
Expand Down Expand Up @@ -265,6 +294,18 @@ private unsafe void WriteRecordBatchStream(IArrowArrayStream arrayStream, long c
GC.KeepAlive(_handle);
}

/// <summary>
/// Write record batches in buffered mode
/// </summary>
private unsafe void WriteBufferedRecordBatches(IArrowArrayStream arrayStream)
{
var cArrayStream = new CArrowArrayStream();
CArrowArrayStreamExporter.ExportArrayStream(arrayStream, &cArrayStream);
ExceptionInfo.Check(FileWriter_WriteRecordBatches(_handle.IntPtr, &cArrayStream));
GC.KeepAlive(cArrayStream);
GC.KeepAlive(_handle);
}

[DllImport(ParquetDll.Name)]
private static extern unsafe IntPtr FileWriter_OpenPath(
[MarshalAs(UnmanagedType.LPUTF8Str)] string path, CArrowSchema* schema, IntPtr properties, IntPtr arrowProperties, out IntPtr writer);
Expand All @@ -279,9 +320,15 @@ private static extern unsafe IntPtr FileWriter_OpenStream(
[DllImport(ParquetDll.Name)]
private static extern unsafe IntPtr FileWriter_WriteTable(IntPtr writer, CArrowArrayStream* stream, long chunkSize);

[DllImport(ParquetDll.Name)]
private static extern unsafe IntPtr FileWriter_WriteRecordBatches(IntPtr writer, CArrowArrayStream* stream);

[DllImport(ParquetDll.Name)]
private static extern IntPtr FileWriter_NewRowGroup(IntPtr writer, long chunkSize);

[DllImport(ParquetDll.Name)]
private static extern IntPtr FileWriter_NewBufferedRowGroup(IntPtr writer);

[DllImport(ParquetDll.Name)]
private static extern unsafe IntPtr FileWriter_WriteColumnChunk(IntPtr writer, CArrowArray* array, CArrowSchema* arrayType);

Expand Down
4 changes: 2 additions & 2 deletions csharp/Encryption/CryptoFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public unsafe CryptoFactory(KmsClientFactory kmsClientFactory)
/// </summary>
/// <param name="connectionConfig">The KMS connection configuration to use</param>
/// <param name="encryptionConfig">The encryption configuration to use</param>
/// <param name="filePath">The path to the Parquet file being written</param>
/// <param name="filePath">The path to the Parquet file being written. Can be null if internal key material is used.</param>
/// <returns>Encryption properties for the file</returns>
public FileEncryptionProperties GetFileEncryptionProperties(
KmsConnectionConfig connectionConfig,
Expand All @@ -58,7 +58,7 @@ public FileEncryptionProperties GetFileEncryptionProperties(
/// </summary>
/// <param name="connectionConfig">The KMS connection configuration to use</param>
/// <param name="decryptionConfig">The decryption configuration to use</param>
/// <param name="filePath">The path to the Parquet file being read</param>
/// <param name="filePath">The path to the Parquet file being read. Can be null if internal key material is used.</param>
/// <returns>Decryption properties for the file</returns>
public FileDecryptionProperties GetFileDecryptionProperties(
KmsConnectionConfig connectionConfig,
Expand Down
2 changes: 1 addition & 1 deletion csharp/ParquetSharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<NoWarn>1591;</NoWarn>
<Version>15.0.0-beta2</Version>
<Version>15.0.0-beta3</Version>
<Company>G-Research</Company>
<Authors>G-Research</Authors>
<Product>ParquetSharp</Product>
Expand Down
13 changes: 13 additions & 0 deletions docs/Arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ if it contains more rows than the chunk size, which can be specified when writin
writer.WriteRecordBatch(recordBatch, chunkSize: 1024);
```

Calling `WriteRecordBatch` always starts a new row group, but since ParquetSharp 15.0.0,
you can also write buffered record batches,
so that multiple batches may be written to the same row group:

```csharp
writer.WriteBufferedRecordBatch(recordBatch);
```

When using `WriteBufferedRecordBatch`, data will be flushed when the `FileWriter`
is closed or `NewBufferedRowGroup` is called to start a new row group.
A new row group will also be started if the row group size reaches the `MaxRowGroupLength`
value configured in the `WriterProperties`.

### Writing data one column at a time

Rather than writing record batches, you may also explicitly start Parquet row groups
Expand Down

0 comments on commit 010bdde

Please sign in to comment.