Skip to content

Commit

Permalink
Added managed streams support in IO classes. (#75)
Browse files Browse the repository at this point in the history
* ManagedRandomAccessFile/OutputStream to read/write to .Net IO.Streams (#74)

Adds ManagedRandomAccessFile and ManagedOutputStream, implementations of
the RandomAccessFile and OutputStream C++ interfaces that call back into
C# to read/write to a .Net IO.Stream.

This seems to be on par with performance from letting the C++ side
manage the file itself, that is pinvoke overhead appears minimal
relative to IO costs. The TestFloatTimeSeries benchmark has been updated
to write out by passing a filepath to C++, and by using
ManagedOutputStream.

Example output of TestFloatTimeSeries (with a slightly larger data size
than default):

 Generating data...
 Generated 31,200,000 rows in 0.78 sec

 Saving to CSV
 Saved to CSV (1,262,808,034 bytes) in 43.87 sec

 Saving to CSV.GZ
 Saved to CSV (317,658,548 bytes) in 92.55 sec

 Saving to Parquet
 Saved to Parquet (168,610,125 bytes) in 1.06 sec

 Saving to Parquet.Chunked (by date)
 Saved to Parquet.Chunked (361,316,340 bytes) in 6.74 sec

 Saving to Parquet.RowOriented
 Saved to Parquet.RowOriented (168,510,739 bytes) in 1.98 sec

 Saving to Parquet.FileStream
 Saved to Parquet.FileStream (168,609,640 bytes) in 1.05 sec

 Saving to Parquet.Chunked.FileStream (by date)
 Saved to Parquet.Chunked.FileStream (361,316,332 bytes) in 5.14 sec

 Saving to Parquet.RowOriented.FileStream
 Saved to Parquet.RowOriented.FileStream (168,510,254 bytes) in 2.02 sec

 Saving to Parquet.NET
 Saved to Parquet.NET (261,420,477 bytes) in 11.60 sec

As can be seen the FileStream tests are on par with using C++ directly,
and are all much faster than Parquet.NET.

* Adopt consistent coding convention.
Refactored the code in some place for clarity.
Added more unit tests and removed redundant one (exceptions are currently causing a segfault, WiP).
Moved to vcpkg-2019.09

* Fixed undefined behaviour.

* Updated dependencies.

* Bump version to 2.0.2-beta1
  • Loading branch information
GPSnoopy authored Oct 11, 2019
1 parent 50d790e commit f65b64e
Show file tree
Hide file tree
Showing 16 changed files with 809 additions and 16 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ add_library(ParquetSharpNative SHARED
GroupNode.cpp
KeyValueMetadata.cpp
LogicalType.cpp
ManagedRandomAccessFile.cpp
ManagedOutputStream.cpp
Node.cpp
OutputStream.cpp
ParquetFileReader.cpp
Expand Down
2 changes: 1 addition & 1 deletion cpp/ExceptionInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <exception>
#include <string>

struct ExceptionInfo
struct ExceptionInfo final
{
ExceptionInfo(const char* type, const char* message);
ExceptionInfo(const std::exception& exception);
Expand Down
100 changes: 100 additions & 0 deletions cpp/ManagedOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@

#include "cpp/ParquetSharpExport.h"
#include "ExceptionInfo.h"

#include <arrow/status.h>
#include <arrow/io/interfaces.h>

using arrow::Status;
using arrow::StatusCode;

typedef StatusCode(*WriteFunc)(const void*, int64_t, const char**);
typedef StatusCode(*TellFunc)(int64_t*, const char**);
typedef StatusCode(*FlushFunc)(const char**);
typedef StatusCode(*CloseFunc)(const char**);
typedef bool (*ClosedFunc)();

class ManagedOutputStream final : public arrow::io::OutputStream
{
public:

ManagedOutputStream(
const WriteFunc write,
const TellFunc tell,
const FlushFunc flush,
const CloseFunc close,
const ClosedFunc closed) :
write_(write),
tell_(tell),
flush_(flush),
close_(close),
closed_(closed)
{
}

~ManagedOutputStream()
{
}

Status Write(const void* data, int64_t nbytes) override
{
const char* exception = nullptr;
const auto statusCode = write_(data, nbytes, &exception);
return GetStatus(statusCode, exception);
}

Status Flush() override
{
const char* exception = nullptr;
const auto statusCode = flush_(&exception);
return GetStatus(statusCode, exception);
}

Status Close() override
{
const char* exception = nullptr;
const auto statusCode = close_(&exception);
return GetStatus(statusCode, exception);
}

Status Tell(int64_t* position) const override
{
const char* exception = nullptr;
const auto statusCode = tell_(position, &exception);
return GetStatus(statusCode, exception);
}

bool closed() const override
{
return this->closed_();
}

private:

static Status GetStatus(const StatusCode statusCode, const char* const exception)
{
return statusCode == StatusCode::OK
? Status::OK()
: Status(statusCode, exception);
}

const WriteFunc write_;
const TellFunc tell_;
const FlushFunc flush_;
const CloseFunc close_;
const ClosedFunc closed_;
};

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* ManagedOutputStream_Create(
const WriteFunc write,
const TellFunc tell,
const FlushFunc flush,
const CloseFunc close,
const ClosedFunc closed,
std::shared_ptr<ManagedOutputStream>** stream)
{
TRYCATCH(*stream = new std::shared_ptr<ManagedOutputStream>(new ManagedOutputStream(write, tell, flush, close, closed));)
}
}
130 changes: 130 additions & 0 deletions cpp/ManagedRandomAccessFile.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@

#include "cpp/ParquetSharpExport.h"
#include "ExceptionInfo.h"

#include <arrow/status.h>
#include <arrow/buffer.h>
#include <arrow/io/interfaces.h>

using arrow::Status;
using arrow::StatusCode;

typedef StatusCode(*ReadFunc)(int64_t, int64_t*, void*, const char**);
typedef StatusCode(*CloseFunc)(const char**);
typedef StatusCode(*GetSizeFunc)(int64_t*, const char**);
typedef StatusCode(*TellFunc)(int64_t*, const char**);
typedef StatusCode(*SeekFunc)(int64_t, const char**);
typedef bool (*ClosedFunc)();

class ManagedRandomAccessFile final : public arrow::io::RandomAccessFile
{
public:

ManagedRandomAccessFile(
const ReadFunc read,
const CloseFunc close,
const GetSizeFunc getSize,
const TellFunc tell,
const SeekFunc seek,
const ClosedFunc closed) :
read_(read),
close_(close),
getSize_(getSize),
tell_(tell),
seek_(seek),
closed_(closed)
{
}

~ManagedRandomAccessFile()
{
}

Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override
{
const char* exception = nullptr;
const auto statusCode = read_(nbytes, bytes_read, out, &exception);
return GetStatus(statusCode, exception);
}

Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override
{
std::shared_ptr<arrow::ResizableBuffer> buffer;
RETURN_NOT_OK(arrow::AllocateResizableBuffer(nbytes, &buffer));

int64_t bytes_read = 0;
RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
if (bytes_read < nbytes)
{
RETURN_NOT_OK(buffer->Resize(bytes_read));
buffer->ZeroPadding();
}

*out = buffer;
return Status::OK();
}

Status Close() override
{
const char* exception = nullptr;
const auto statusCode = close_(&exception);
return GetStatus(statusCode, exception);
}

Status Tell(int64_t* position) const override
{
const char* exception = nullptr;
const auto statusCode = tell_(position, &exception);
return GetStatus(statusCode, exception);
}

Status Seek(int64_t position) override
{
const char* exception = nullptr;
const auto statusCode = seek_(position, &exception);
return GetStatus(statusCode, exception);
}

Status GetSize(int64_t* size) override
{
const char* exception = nullptr;
const auto statusCode = getSize_(size, &exception);
return GetStatus(statusCode, exception);
}

bool closed() const override
{
return closed_();
}

private:

static Status GetStatus(const StatusCode statusCode, const char* const exception)
{
return statusCode == StatusCode::OK
? Status::OK()
: Status(statusCode, exception);
}

const ReadFunc read_;
const CloseFunc close_;
const GetSizeFunc getSize_;
const TellFunc tell_;
const SeekFunc seek_;
const ClosedFunc closed_;
};

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* ManagedRandomAccessFile_Create(
const ReadFunc read,
const CloseFunc close,
const GetSizeFunc getSize,
const TellFunc tell,
const SeekFunc seek,
const ClosedFunc closed,
std::shared_ptr<ManagedRandomAccessFile>** stream)
{
TRYCATCH(*stream = new std::shared_ptr<ManagedRandomAccessFile>(new ManagedRandomAccessFile(read, close, getSize, tell, seek, closed));)
}
}
6 changes: 3 additions & 3 deletions csharp.test/ParquetSharp.Test.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks Condition="'$(OS)'=='Windows_NT'">netcoreapp2.0;net461</TargetFrameworks>
Expand All @@ -15,8 +15,8 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.13.0" />
<PackageReference Include="Parquet.Net" Version="3.3.8" />
<PackageReference Include="NUnit3TestAdapter" Version="3.15.1" />
<PackageReference Include="Parquet.Net" Version="3.3.9" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 2 additions & 1 deletion csharp.test/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ public static int Main()
//TestColumn.TestPrimitives();
//TestParquetFileWriter.TestReadWriteParquetMultipleTasks();
//TestColumnReader.TestHasNext();
TestLogicalTypeRoundtrip.TestReaderWriteTypes(128, 2401, 1331);
//TestLogicalTypeRoundtrip.TestReaderWriteTypes(128, 2401, 1331);
//TestPhysicalTypeRoundtrip.TestReaderWriteTypes();
//TestParquetFileReader.TestReadFileCreateByPython();
//TestParquetFileReader.TestFileHandleHasBeenReleased();
//TestParquetFileWriter.TestWriteLongString();
TestManagedRandomAccessFile.TestWriteException();

// Ensure the finalizers are executed, so we can check whether they throw.
GC.Collect();
Expand Down
108 changes: 108 additions & 0 deletions csharp.test/TestManagedRandomAccessFile.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using System.IO;
using System.Linq;
using System.Security.Cryptography.X509Certificates;
using NUnit.Framework;
using ParquetSharp.IO;

namespace ParquetSharp.Test
{
[TestFixture]
internal static class TestManagedRandomAccessFile
{
[Test]
public static void TestInMemoryRoundTrip()
{
var expected = Enumerable.Range(0, 1024 * 1024).ToArray();

using (var buffer = new MemoryStream())
{
// Write test data.
using (var output = new ManagedOutputStream(buffer))
using (var writer = new ParquetFileWriter(output, new Column[] { new Column<int>("ids") }))
using (var group = writer.AppendRowGroup())
using (var column = group.NextColumn().LogicalWriter<int>())
{
column.WriteBatch(expected);
}

// Seek back to start.
buffer.Seek(0, SeekOrigin.Begin);

// Read test data.
using (var input = new ManagedRandomAccessFile(buffer))
using (var reader = new ParquetFileReader(input))
using (var group = reader.RowGroup(0))
using (var column = group.Column(0).LogicalReader<int>())
{
Assert.AreEqual(expected, column.ReadAll(expected.Length));
}
}
}

[Test]
public static void TestWriteException()
{
var exception = Assert.Throws<ParquetException>(() =>
{
using (var buffer = new ErroneousWriterStream())
using (var output = new ManagedOutputStream(buffer))
using (new ParquetFileWriter(output, new Column[] {new Column<int>("ids")}))
{
}
});

Assert.That(
exception.Message,
Contains.Substring("this is an erroneous writer"));
}

[Test]
public static void TestReadExeption()
{
var expected = Enumerable.Range(0, 1024 * 1024).ToArray();

var exception = Assert.Throws<ParquetException>(() =>
{
using (var buffer = new ErroneousReaderStream())
{
using (var output = new ManagedOutputStream(buffer))
using (var writer = new ParquetFileWriter(output, new Column[] {new Column<int>("ids")}))
using (var group = writer.AppendRowGroup())
using (var column = group.NextColumn().LogicalWriter<int>())
{
column.WriteBatch(expected);
}
buffer.Seek(0, SeekOrigin.Begin);
using (var input = new ManagedRandomAccessFile(buffer))
using (new ParquetFileReader(input))
{
}
}
});

Assert.That(
exception.Message,
Contains.Substring("this is an erroneous reader"));
}

private sealed class ErroneousReaderStream : MemoryStream
{
public override int Read(byte[] buffer, int offset, int count)
{
throw new IOException("this is an erroneous reader");
}
}

private sealed class ErroneousWriterStream : MemoryStream
{
public override void Write(byte[] buffer, int offset, int count)
{
throw new IOException("this is an erroneous writer");
}
}
}
}
Loading

0 comments on commit f65b64e

Please sign in to comment.