Skip to content

[C++][Parquet] ReaderProperties::GetStream() support to use the origin InputStream #48229

@Smith-Cruise

Description

@Smith-Cruise

Describe the enhancement requested

We've already implemented page-level IO coalesce(based on PageIndex) and prebuffer, so we want to use the origin InputStream when creating ColumnPageReader.

std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
    // .....
    std::shared_ptr<ArrowInputStream> stream;
    if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr &&
        ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) {
      // PARQUET-1698: if read coalescing is enabled, read from pre-buffered
      // segments.
      PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
      stream = std::make_shared<::arrow::io::BufferReader>(buffer);
    } else {
      stream = properties_.GetStream(source_, col_range.offset, col_range.length);
    }
    // .....
}

In the current implementation, the arrow's built-in prebuffer(io coalesce) is column-level; ReadRange is too large.

std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
    std::shared_ptr<ArrowInputFile> source, int64_t start, int64_t num_bytes) {
  if (buffered_stream_enabled_) {
    // ARROW-6180 / PARQUET-1636 Create isolated reader that references segment
    // of source
    PARQUET_ASSIGN_OR_THROW(
        std::shared_ptr<::arrow::io::InputStream> safe_stream,
        ::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes));
    PARQUET_ASSIGN_OR_THROW(
        auto stream, ::arrow::io::BufferedInputStream::Create(buffer_size_, pool_,
                                                              safe_stream, num_bytes));
    return stream;
  } else {
    PARQUET_ASSIGN_OR_THROW(auto data, source->ReadAt(start, num_bytes));

    if (data->size() != num_bytes) {
      std::stringstream ss;
      ss << "Tried reading " << num_bytes << " bytes starting at position " << start
         << " from file but only got " << data->size();
      throw ParquetException(ss.str());
    }
    return std::make_shared<::arrow::io::BufferReader>(data);
  }
}

But in properties_.GetStream(), if we enable buffered_stream_enabled_, it will wrap BufferedInputStream, which causes our memory to be copied one more time(we've already implemented zero-copy in our InputStream). If we disable buffered_stream_enabled_, it has to buffer the whole column in memory.

I think maybe we can add a new flag bool enable_original_input_stream_=false. If the flag is set to true, we just use the user-provided InputStream to read pages.

I'm very glad to help it.

Component(s)

C++, Parquet

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions