Skip to content

Commit 5e1fe14

Browse files
committed
Use zero-copy C data interface in Parquet adapter
Signed-off-by: Arham Chopra <[email protected]>
1 parent 518d3b5 commit 5e1fe14

File tree

2 files changed

+20
-34
lines changed

2 files changed

+20
-34
lines changed

cpp/csp/python/adapters/parquetadapterimpl.cpp

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
#include <csp/python/PyCppNode.h>
1818
#include <csp/python/PyNodeWrapper.h>
1919
#include <csp/python/NumpyConversions.h>
20+
#include <arrow/c/abi.h>
21+
#include <arrow/c/bridge.h>
2022
#include <arrow/io/memory.h>
21-
#include <arrow/ipc/reader.h>
23+
#include <arrow/table.h>
2224
#include <locale>
2325
#include <codecvt>
2426

@@ -156,34 +158,30 @@ class ArrowTableGenerator : public csp::Generator<std::shared_ptr<arrow::Table>,
156158
{
157159
CSP_THROW( csp::python::PythonPassthrough, "" );
158160
}
159-
if( nextVal == nullptr )
161+
if( nextValPtr.get() == nullptr )
160162
{
161163
return false;
162164
}
163165

164-
if(!PyBytes_Check( nextVal ))
166+
if( !PyCapsule_IsValid( nextValPtr.get(), "arrow_array_stream" ) )
165167
{
166-
CSP_THROW( csp::TypeError, "Invalid arrow buffer type, expected bytes got " << Py_TYPE( nextVal ) -> tp_name );
168+
CSP_THROW( csp::TypeError, "Invalid arrow data, expected PyCapsule got " << Py_TYPE( nextValPtr.get() ) -> tp_name );
167169
}
168-
const char * data = PyBytes_AsString( nextVal );
169-
if( !data )
170-
CSP_THROW( csp::python::PythonPassthrough, "" );
171-
auto size = PyBytes_Size(nextVal);
172-
m_data = csp::python::PyObjectPtr::incref(nextVal);
173-
std::shared_ptr<arrow::io::BufferReader> bufferReader = std::make_shared<arrow::io::BufferReader>(
174-
reinterpret_cast<const uint8_t *>(data), size );
175-
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader = arrow::ipc::RecordBatchStreamReader::Open(bufferReader.get()).ValueOrDie();
176-
auto result = reader->ToTable();
177-
if (!(result.ok()))
178-
CSP_THROW(csp::RuntimeException, "Failed read arrow table from buffer");
179-
value = std::move(result.ValueUnsafe());
170+
// Extract the record batch
171+
struct ArrowArrayStream * c_stream = reinterpret_cast<struct ArrowArrayStream*>( PyCapsule_GetPointer( nextValPtr.get(), "arrow_array_stream" ) );
172+
auto reader_result = arrow::ImportRecordBatchReader( c_stream );
173+
if( !reader_result.ok() )
174+
CSP_THROW( csp::ValueError, "Failed to load record batches through PyCapsule C Data interface: " << reader_result.status().ToString() );
175+
auto reader = std::move( reader_result.ValueUnsafe() );
176+
auto table_result = arrow::Table::FromRecordBatchReader( reader.get() );
177+
if( !table_result.ok() )
178+
CSP_THROW( csp::ValueError, "Failed to load table from record batches " << table_result.status().ToString() );
179+
value = std::move( table_result.ValueUnsafe() );
180180
return true;
181181
}
182182
private:
183183
csp::python::PyObjectPtr m_wrappedGenerator;
184184
csp::python::PyObjectPtr m_iter;
185-
// We need to keep the last buffer in memory since arrow doesn't copy it but can refer to strings in it
186-
csp::python::PyObjectPtr m_data;
187185
};
188186

189187
template< typename CspCType>

csp/adapters/parquet.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ def __init__(
8888
if not _CAN_READ_ARROW_BINARY:
8989
raise TypeError("CSP Cannot load binary arrows derived from pyarrow versions less than 4.0.1")
9090
wrapped = self._filenames_gen
91-
self._filenames_gen = lambda starttime, endtime: self._arrow_in_memory_table_to_buffers(
92-
wrapped, starttime, endtime
93-
)
91+
self._filenames_gen = lambda starttime, endtime: self._arrow_c_data_interface(wrapped, starttime, endtime)
9492
binary_arrow = True
9593
self._properties = {"split_columns_to_files": split_columns_to_files}
9694
if symbol_column:
@@ -116,22 +114,12 @@ def __init__(
116114
self._properties["allow_missing_files"] = allow_missing_files
117115

118116
@classmethod
119-
def _arrow_in_memory_table_to_buffers(cls, gen, startime, endtime):
120-
# This is a temporary solution until we implement PyArrow in-memory suport:
121-
# Currently when we try to read in c++ pyarrow structures that were created in python it crashes. We believe that the reason is difference in compilers. c++ interface is generally not portable across compilers and compiler versions. As current workaround we create a temporary files in python from in-memory tables and consume those in c++ code, this is bad and ugly apporoach.
122-
# Possible solutions:
123-
# 1. Try using pyarrow c interface (introduced in pyarrow 5.0.0), it doesn't seem like it's sufficient for our needs but we could try using it.
124-
# 2. Compile the arrow adapter in the user environment - generally it's hard to do and is a huge opening to a bunch of issues, we should probably avoid this approach as much as possible.
125-
# 3. The most "sane" solution is probably to use numpy array as intermediate layer, i.e transform pyarrow tables to arrow arrays (and pass a bunch of metadata about those arrays - to properly resolve typing). We probably won't be able to support ALL arrow types, we will limit ourselves to a reasonable set of types for which we will have to implement conversions.
117+
def _arrow_c_data_interface(cls, gen, startime, endtime):
126118
for v in gen(startime, endtime):
127119
if not isinstance(v, pyarrow.Table):
128120
raise TypeError(f"Expected PyTable from generator, got {type(v).__name__}")
129-
130-
sink = io.BytesIO()
131-
with pyarrow.ipc.new_stream(sink, v.schema) as writer:
132-
writer.write_table(v)
133-
134-
yield sink.getvalue()
121+
# Use the PyCapsule C data interface to pass data zero copy
122+
yield v.__arrow_c_stream__()
135123

136124
@node
137125
def _reconstruct_struct_array_fields(self, s: ts["T"], fields: {str: ts[object]}, struct_typ: "T") -> ts["T"]:

0 commit comments

Comments
 (0)