Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ add_library(ParquetSharpNative SHARED
arrow/ArrowWriterPropertiesBuilder.cpp
arrow/FileReader.cpp
arrow/FileWriter.cpp
arrow/SchemaField.cpp
arrow/SchemaManifest.cpp
encryption/CryptoFactory.cpp
encryption/DecryptionConfiguration.cpp
encryption/EncryptionConfiguration.cpp
Expand Down
15 changes: 15 additions & 0 deletions cpp/arrow/FileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"
Expand Down Expand Up @@ -104,6 +105,20 @@ extern "C"
)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_ParquetReader(
FileReader* reader,
parquet::ParquetFileReader** parquet_reader)
{
TRYCATCH(*parquet_reader = reader->parquet_reader();)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_Manifest(
FileReader* reader,
const SchemaManifest** manifest)
{
TRYCATCH(*manifest = &(reader->manifest());)
}

PARQUETSHARP_EXPORT void FileReader_Free(FileReader* reader)
{
delete reader;
Expand Down
42 changes: 42 additions & 0 deletions cpp/arrow/SchemaField.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <arrow/c/abi.h>
#include <arrow/c/bridge.h>
#include <parquet/arrow/schema.h>
#include <parquet/exception.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

using namespace parquet::arrow;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_ChildrenLength(const SchemaField* field, int32_t* length)
{
TRYCATCH(*length = static_cast<int32_t>(field->children.size());)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_Child(const SchemaField* field, int32_t index, const SchemaField** child)
{
TRYCATCH(
if (index >= static_cast<int32_t>(field->children.size()))
{
throw std::out_of_range("Child field index out of range");
}
*child = &(field->children[index]);
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_ColumnIndex(const SchemaField* field, int32_t* column_index)
{
TRYCATCH(
*column_index = field->column_index;
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaField_Field(const SchemaField* field, struct ArrowSchema* arrow_field)
{
TRYCATCH(
PARQUET_THROW_NOT_OK(arrow::ExportField(*(field->field), arrow_field));
)
}
}
38 changes: 38 additions & 0 deletions cpp/arrow/SchemaManifest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include <parquet/arrow/schema.h>
#include <parquet/exception.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

using namespace parquet::arrow;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_SchemaFieldsLength(const SchemaManifest* manifest, int32_t* length)
{
TRYCATCH(*length = static_cast<int32_t>(manifest->schema_fields.size());)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_SchemaField(const SchemaManifest* manifest, int32_t index, const SchemaField** field)
{
TRYCATCH(
if (index >= static_cast<int32_t>(manifest->schema_fields.size()))
{
throw std::out_of_range("Field index out of range");
}
*field = &(manifest->schema_fields[index]);
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_GetColumnField(const SchemaManifest* manifest, int32_t column_index, const SchemaField** field)
{
TRYCATCH(
PARQUET_THROW_NOT_OK(manifest->GetColumnField(column_index, field));
)
}

PARQUETSHARP_EXPORT ExceptionInfo* SchemaManifest_GetParent(const SchemaManifest* manifest, const SchemaField* field, const SchemaField** parent)
{
TRYCATCH(*parent = manifest->GetParent(field);)
}
}
201 changes: 201 additions & 0 deletions csharp.test/Arrow/TestFileReader.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow;
Expand Down Expand Up @@ -195,6 +196,168 @@ public async Task TestReadSelectedColumns()
Assert.That(rowsRead, Is.EqualTo(RowsPerRowGroup * NumRowGroups));
}

[Test]
public void TestAccessUnderlyingReader()
{
using var buffer = new ResizableBuffer();
WriteTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);
using var parquetReader = fileReader.ParquetReader;

// Verify we can access column statistics
for (var rowGroupIdx = 0; rowGroupIdx < NumRowGroups; ++rowGroupIdx)
{
using var rowGroup = parquetReader.RowGroup(rowGroupIdx);
using var colMetadata = rowGroup.MetaData.GetColumnChunkMetaData(1);
using var stats = colMetadata.Statistics as Statistics<int>;
Assert.That(stats, Is.Not.Null);
Assert.That(stats!.HasMinMax);
Assert.That(stats.Min, Is.EqualTo(rowGroupIdx * RowsPerRowGroup));
Assert.That(stats.Max, Is.EqualTo((rowGroupIdx + 1) * RowsPerRowGroup - 1));
}
}

[Test]
public void TestAccessUnderlyingReaderAfterDisposed()
{
using var buffer = new ResizableBuffer();
WriteTestFile(buffer);

using var inStream = new BufferReader(buffer);
ParquetFileReader parquetReader;
using (var fileReader = new FileReader(inStream))
{
parquetReader = fileReader.ParquetReader;
}

using (parquetReader)
{
var exception = Assert.Throws<NullReferenceException>(() => { _ = parquetReader.FileMetaData; });
Assert.That(exception!.Message, Does.Contain("owning parent has been disposed"));
}
}

[Test]
public void TestSchemaManifest()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var fields = manifest.SchemaFields;

Assert.That(fields.Count, Is.EqualTo(2));

var structField = fields[0];
var structArrowField = structField.Field;

Assert.That(structArrowField.Name, Is.EqualTo("test_struct"));
Assert.That(structArrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Struct));

Assert.That(structField.ColumnIndex, Is.EqualTo(-1));
var structFields = structField.Children;
Assert.That(structFields.Count, Is.EqualTo(2));
Assert.That(structFields[0].ColumnIndex, Is.EqualTo(0));
Assert.That(structFields[1].ColumnIndex, Is.EqualTo(1));
var structArrowFieldA = structFields[0].Field;
var structArrowFieldB = structFields[1].Field;
Assert.That(structArrowFieldA.Name, Is.EqualTo("a"));
Assert.That(structArrowFieldA.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));
Assert.That(structArrowFieldB.Name, Is.EqualTo("b"));
Assert.That(structArrowFieldB.DataType.TypeId, Is.EqualTo(ArrowTypeId.Float));

Assert.That(fields[1].Children.Count, Is.EqualTo(0));
Assert.That(fields[1].ColumnIndex, Is.EqualTo(2));
var xArrowField = fields[1].Field;
Assert.That(xArrowField.Name, Is.EqualTo("x"));
Assert.That(xArrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));
}

[Test]
public void TestSchemaManifestGetSingleField()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var field = manifest.SchemaField(1);
Assert.That(field, Is.Not.Null);
var arrowField = field.Field;
Assert.That(arrowField.Name, Is.EqualTo("x"));
Assert.That(arrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));

var exception = Assert.Throws<ParquetException>(() => manifest.SchemaField(2));
Assert.That(exception!.Message, Does.Contain("out of range"));
}

[Test]
public void TestSchemaManifestGetColumnField()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var field = manifest.GetColumnField(2);
Assert.That(field, Is.Not.Null);
var arrowField = field.Field;
Assert.That(arrowField.Name, Is.EqualTo("x"));
Assert.That(arrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Int32));

var exception = Assert.Throws<ParquetException>(() => manifest.GetColumnField(3));
Assert.That(exception!.Message, Does.Contain("Column index 3"));
}

[Test]
public void TestSchemaManifestGetFieldParent()
{
using var buffer = new ResizableBuffer();
WriteNestedTestFile(buffer);

using var inStream = new BufferReader(buffer);
using var fileReader = new FileReader(inStream);

var manifest = fileReader.SchemaManifest;
var field = manifest.GetColumnField(1);
var parent = manifest.GetParent(field);

Assert.That(parent, Is.Not.Null);
var arrowField = parent!.Field;
Assert.That(arrowField.Name, Is.EqualTo("test_struct"));
Assert.That(arrowField.DataType.TypeId, Is.EqualTo(ArrowTypeId.Struct));

var grandparent = manifest.GetParent(parent);
Assert.That(grandparent, Is.Null);
}

[Test]
public void TestAccessSchemaManifestFieldAfterDisposed()
{
using var buffer = new ResizableBuffer();
WriteTestFile(buffer);

using var inStream = new BufferReader(buffer);
SchemaField field;
using (var fileReader = new FileReader(inStream))
{
var manifest = fileReader.SchemaManifest;
field = manifest.SchemaFields[0];
}

var exception = Assert.Throws<NullReferenceException>(() => { _ = field.Field; });
Assert.That(exception!.Message, Does.Contain("owning parent has been disposed"));
}

private static void WriteTestFile(ResizableBuffer buffer)
{
var columns = new Column[]
Expand Down Expand Up @@ -226,6 +389,44 @@ private static void WriteTestFile(ResizableBuffer buffer)
fileWriter.Close();
}

private static void WriteNestedTestFile(ResizableBuffer buffer)
{
var fields = new[]
{
new Field("test_struct", new StructType(
new[]
{
new Field("a", new Int32Type(), false),
new Field("b", new FloatType(), false),
}), true),
new Field("x", new Int32Type(), false),
};
var schema = new Apache.Arrow.Schema(fields, null);

using var outStream = new BufferOutputStream(buffer);
using var writer = new FileWriter(outStream, schema);
for (var rowGroup = 0; rowGroup < NumRowGroups; ++rowGroup)
{
var start = rowGroup * RowsPerRowGroup;
var arrays = new List<IArrowArray>
{
new StructArray(fields[0].DataType, RowsPerRowGroup, new IArrowArray[]
{
new Int32Array.Builder().AppendRange(Enumerable.Range(start, RowsPerRowGroup).ToArray()).Build(),
new FloatArray.Builder().AppendRange(Enumerable.Range(start, RowsPerRowGroup).Select(i => i * 0.1f).ToArray())
.Build(),
}, new ArrowBuffer.BitmapBuilder().AppendRange(true, RowsPerRowGroup).Build()),
new Int32Array.Builder().AppendRange(Enumerable.Range(start, RowsPerRowGroup).ToArray()).Build()
};

var batch = new RecordBatch(schema, arrays, RowsPerRowGroup);

writer.WriteRecordBatch(batch);
}

writer.Close();
}

private const int NumRowGroups = 4;
private const int RowsPerRowGroup = 100;
}
Expand Down
30 changes: 30 additions & 0 deletions csharp/Arrow/FileReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@ public unsafe IArrowArrayStream GetRecordBatchReader(
return CArrowArrayStreamImporter.ImportArrayStream(&cStream);
}

/// <summary>
/// Get the underlying ParquetFileReader used by this Arrow FileReader
/// </summary>
public ParquetFileReader ParquetReader
{
get
{
var readerPtr = ExceptionInfo.Return<IntPtr>(_handle, FileReader_ParquetReader);
return new ParquetFileReader(new ChildParquetHandle(readerPtr, _handle));
}
}

/// <summary>
/// Get the schema manifest, which describes the relationship between the Arrow schema and Parquet schema
/// </summary>
public SchemaManifest SchemaManifest
{
get
{
var manifestPtr = ExceptionInfo.Return<IntPtr>(_handle, FileReader_Manifest);
return new SchemaManifest(new ChildParquetHandle(manifestPtr, _handle));
}
}

public void Dispose()
{
_handle.Dispose();
Expand Down Expand Up @@ -165,6 +189,12 @@ private static extern IntPtr FileReader_OpenFile(
private static extern unsafe IntPtr FileReader_GetRecordBatchReader(
IntPtr reader, int* rowGroups, int rowGroupsCount, int* columns, int columnsCount, CArrowArrayStream* stream);

[DllImport(ParquetDll.Name)]
private static extern IntPtr FileReader_ParquetReader(IntPtr reader, out IntPtr parquetReader);

[DllImport(ParquetDll.Name)]
private static extern IntPtr FileReader_Manifest(IntPtr reader, out IntPtr manifest);

[DllImport(ParquetDll.Name)]
private static extern void FileReader_Free(IntPtr reader);

Expand Down
Loading