diff --git a/.gitignore b/.gitignore index 259148f..b8a80f4 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ *.lo *.o *.obj +*.gcno # Precompiled Headers *.gch @@ -30,3 +31,4 @@ *.exe *.out *.app + diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..de5caec --- /dev/null +++ b/.gitmodules @@ -0,0 +1,15 @@ +[submodule "pixels-cpp"] + path = pixels-cpp + url = https://github.com/neo-SunBoyan/pixels-cpp.git +[submodule "third-party"] + path = third-party + url = https://github.com/protocolbuffers/protobuf.git +[submodule "pixels_fdw/pixels-cpp"] + path = pixels_fdw/pixels-cpp + url = https://github.com/neo-SunBoyan/pixels-cpp.git +[submodule "third-party/protobuf"] + path = third-party/protobuf + url = https://github.com/protocolbuffers/protobuf.git +[submodule "pixels_fdw/third-party/protobuf"] + path = pixels_fdw/third-party/protobuf + url = https://github.com/protocolbuffers/protobuf.git diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 419cbc8..47e2f71 --- a/README.md +++ b/README.md @@ -1,2 +1,4 @@ # pixels-postgres The PostgreSQL FDW for Pixels. + +export PIXELS_FDW_SRC=/path/to/yours diff --git a/COPYRIGHT b/pixels_fdw/COPYRIGHT old mode 100644 new mode 100755 similarity index 100% rename from COPYRIGHT rename to pixels_fdw/COPYRIGHT diff --git a/pixels_fdw/Makefile b/pixels_fdw/Makefile new file mode 100755 index 0000000..9fe097b --- /dev/null +++ b/pixels_fdw/Makefile @@ -0,0 +1,46 @@ +MODULE_big = pixels_fdw +OBJS = pixels_fdw.o pixels-cpp/pixels-common/lib/physical/StorageFactory.o pixels-cpp/pixels-common/lib/physical/io/PhysicalLocalReader.o pixels-cpp/pixels-common/lib/physical/allocator/BufferPoolAllocator.o pixels-cpp/pixels-common/lib/physical/Request.o pixels-cpp/pixels-common/lib/physical/RequestBatch.o pixels-cpp/pixels-common/lib/physical/Storage.o pixels-cpp/pixels-common/lib/physical/BufferPool.o pixels-cpp/pixels-common/lib/physical/SchedulerFactory.o pixels-cpp/pixels-common/lib/physical/natives/ByteBuffer.o pixels-cpp/pixels-common/lib/physical/natives/PixelsRandomAccessFile.o pixels-cpp/pixels-common/lib/physical/natives/DirectIoLib.o pixels-cpp/pixels-common/lib/physical/natives/DirectRandomAccessFile.o pixels-cpp/pixels-common/lib/physical/storage/LocalFS.o pixels-cpp/pixels-common/lib/physical/scheduler/NoopScheduler.o pixels-cpp/pixels-common/lib/physical/scheduler/SortMergeScheduler.o pixels-cpp/pixels-common/lib/physical/StorageArrayScheduler.o pixels-cpp/pixels-common/lib/utils/ColumnSizeCSVReader.o pixels-cpp/pixels-common/lib/utils/ConfigFactory.o pixels-cpp/pixels-common/lib/utils/Constants.o pixels-cpp/pixels-common/lib/utils/String.o pixels-cpp/pixels-common/lib/profiler/CountProfiler.o pixels-cpp/pixels-common/lib/profiler/TimeProfiler.o pixels-cpp/pixels-common/lib/MergedRequest.o pixels-cpp/pixels-common/lib/exception/InvalidArgumentException.o PixelsFilter.o PixelsFdwPlanState.o PixelsFdwExecutionState.o pixels-cpp/pixels-proto/pixels.pb.o pixels_impl.o pixels-cpp/pixels-core/lib/TypeDescription.o pixels-cpp/pixels-core/lib/PixelsFooterCache.o pixels-cpp/pixels-core/lib/reader/DateColumnReader.o pixels-cpp/pixels-core/lib/reader/StringColumnReader.o pixels-cpp/pixels-core/lib/reader/ColumnReaderBuilder.o pixels-cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.o pixels-cpp/pixels-core/lib/reader/DecimalColumnReader.o pixels-cpp/pixels-core/lib/reader/IntegerColumnReader.o pixels-cpp/pixels-core/lib/reader/ColumnReader.o pixels-cpp/pixels-core/lib/reader/VarcharColumnReader.o pixels-cpp/pixels-core/lib/reader/PixelsReaderOption.o pixels-cpp/pixels-core/lib/reader/CharColumnReader.o pixels-cpp/pixels-core/lib/reader/TimestampColumnReader.o pixels-cpp/pixels-core/lib/encoding/Decoder.o pixels-cpp/pixels-core/lib/encoding/RunLenIntDecoder.o pixels-cpp/pixels-core/lib/encoding/RunLenIntEncoder.o pixels-cpp/pixels-core/lib/encoding/Encoder.o pixels-cpp/pixels-core/lib/vector/LongColumnVector.o pixels-cpp/pixels-core/lib/vector/TimestampColumnVector.o pixels-cpp/pixels-core/lib/vector/DecimalColumnVector.o pixels-cpp/pixels-core/lib/vector/BinaryColumnVector.o pixels-cpp/pixels-core/lib/vector/VectorizedRowBatch.o pixels-cpp/pixels-core/lib/vector/ByteColumnVector.o pixels-cpp/pixels-core/lib/vector/DateColumnVector.o pixels-cpp/pixels-core/lib/vector/ColumnVector.o pixels-cpp/pixels-core/lib/Category.o pixels-cpp/pixels-core/lib/PixelsBitMask.o pixels-cpp/pixels-core/lib/PixelsVersion.o pixels-cpp/pixels-core/lib/PixelsReaderImpl.o pixels-cpp/pixels-core/lib/PixelsReaderBuilder.o pixels-cpp/pixels-core/lib/utils/EncodingUtils.o pixels-cpp/pixels-core/lib/exception/PixelsFileVersionInvalidException.o pixels-cpp/pixels-core/lib/exception/PixelsFileMagicInvalidException.o pixels-cpp/pixels-core/lib/exception/PixelsReaderException.o +PGFILEDESC = "pixels_fdw - foreign data wrapper for pixels reader" + +SHLIB_LINK = -lm -lstdc++ -L$(PIXELS_FDW_SRC)/third-party/protobuf/cmake/build -lprotobuf + +EXTENSION = pixels_fdw +DATA = pixels_fdw--1.0.sql + +override PG_CXXFLAGS += -std=c++17 -I./include -I./pixels-cpp/pixels-common/include -I./pixels-cpp/pixels-core/include -I./pixels-cpp/pixels-proto -I./third-party/protobuf/src -I./third-party/protobuf/cmake/build/include + +ifdef CCFLAGS + override PG_CXXFLAGS += $(CCFLAGS) + override PG_CFLAGS += $(CCFLAGS) +endif + +COMPILE.cxx.bc = $(CLANG) -xc++ -Wno-ignored-attributes -Wno-register $(BITCODE_CXXFLAGS) $(CPPFLAGS) -emit-llvm -c + +REGRESS = pixels_fdw + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +SHLIB_PREREQS = submake-libpq +subdir = contrib/pixels_fdw +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + +PROTOBUF_DIR=third-party/protobuf + +pull: + git submodule init + git submodule update --recursive --init + +deps: + mkdir -p "${PROTOBUF_DIR}/cmake/build" && cd "third-party/protobuf/cmake/build" && \ + cmake -Dprotobuf_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=Release ../.. -DCMAKE_POSITION_INDEPENDENT_CODE=ON \ + -Dprotobuf_BUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=./ && \ + make -j16 install + +%.bc : %.cpp + $(COMPILE.cxx.bc) $(CXXFLAGS) $(CPPFLAGS) -o $@ $< diff --git a/pixels_fdw/PixelsFdwExecutionState.cpp b/pixels_fdw/PixelsFdwExecutionState.cpp new file mode 100755 index 0000000..db61943 --- /dev/null +++ b/pixels_fdw/PixelsFdwExecutionState.cpp @@ -0,0 +1,437 @@ +// +// Created by liyu on 3/26/23. +// + +#include "PixelsFdwExecutionState.hpp" + +char * +tolowercase(const char *input, char *output) +{ + int i = 0; + Assert(strlen(input) < NAMEDATALEN - 1); + do + { + output[i] = tolower(input[i]); + } + while (input[i++]); + + return output; +} + + + +PixelsFdwExecutionState::PixelsFdwExecutionState(List* files, + List* filters, + set attrs_used, + TupleDesc tupleDesc) { + ListCell *file_lc; + foreach (file_lc, files) { + files_list.emplace_back(std::string(strVal(lfirst(file_lc)))); + } + ListCell *filter_lc; + foreach (filter_lc, filters) { + filters_list.emplace_back((PixelsFilter*)lfirst(filter_lc)); + } + attrs_used = attrs_used; + tuple_desc = tupleDesc; + shared_ptr file_schema; + bind_data = PixelsFdwExecutionState::PixelsScanBind(files_list, filters_list, file_schema); + column_map = PixelsFdwExecutionState::PixelsGetColumnMap(file_schema, attrs_used, tuple_desc); + parallel_state = PixelsFdwExecutionState::PixelsScanInitGlobal(*bind_data); + scan_data = PixelsFdwExecutionState::PixelsScanInitLocal(*bind_data, *parallel_state, column_map); +} + +PixelsFdwExecutionState::~PixelsFdwExecutionState() { + if (bind_data->initialPixelsReader) { + bind_data->initialPixelsReader->close(); + } + bind_data.reset(); + if (parallel_state->initialPixelsReader) { + parallel_state->initialPixelsReader->close(); + } + parallel_state.reset(); + if (scan_data->currReader) { + scan_data->currReader->close(); + } + if (scan_data->currPixelsRecordReader) { + scan_data->currPixelsRecordReader->close(); + } + if (scan_data->nextPixelsRecordReader) { + scan_data->nextPixelsRecordReader->close(); + } + scan_data.reset(); +} + + +static uint32_t PixelsScanGetBatchIndex(const PixelsReadBindData *bind_data_p, + PixelsReadLocalState *local_state, + PixelsReadGlobalState *global_state) { + auto &data = (PixelsReadLocalState &)*local_state; + return data.curr_batch_index; +} + +static double PixelsProgress(const PixelsReadBindData *bind_data_p, + const PixelsReadGlobalState *global_state) { + auto &bind_data = (PixelsReadBindData &)*bind_data_p; + if (bind_data.files.empty()) { + return 100.0; + } + auto percentage = bind_data.curFileId * 100.0 / bind_data.files.size(); + return percentage; +} + +static uint32_t PixelsCardinality(const PixelsReadBindData *bind_data) { + auto &data = (PixelsReadBindData &)*bind_data; + + return data.initialPixelsReader->getNumberOfRows() * data.files.size(); +} + +unique_ptr +PixelsFdwExecutionState::PixelsScanBind(vector filenames, + vector filters_list, + shared_ptr &file_schema) { + if (filenames.empty()) { + throw PixelsReaderException("Pixels reader cannot take empty filename as parameter"); + } + + auto footerCache = std::make_shared(); + auto builder = std::make_shared(); + + std::shared_ptr<::Storage> storage = StorageFactory::getInstance()->getStorage(::Storage::file); + std::shared_ptr pixelsReader = builder->setPath(filenames.at(0)) + ->setStorage(storage) + ->setPixelsFooterCache(footerCache) + ->build(); + file_schema = pixelsReader->getFileSchema(); + + auto result = make_unique(); + result->initialPixelsReader = pixelsReader; + result->fileSchema = file_schema; + result->files = filenames; + result->filters = filters_list; + + return std::move(result); +} + +vector +PixelsFdwExecutionState::PixelsGetColumnMap(const shared_ptr file_schema, + set attrs_used, + TupleDesc tupleDesc) { + vector column_map; + column_map.resize(tupleDesc->natts); + for (int i = 0; i < tupleDesc->natts; i++) + { + AttrNumber attnum = i + 1 - FirstLowInvalidHeapAttributeNumber; + char pg_colname[255]; + const char *attname = NameStr(TupleDescAttr(tupleDesc, i)->attname); + + column_map[i] = -1; + + /* Skip columns we don't intend to use in query */ + if (attrs_used.find(attnum) == attrs_used.end()) + continue; + + tolowercase(NameStr(TupleDescAttr(tupleDesc, i)->attname), pg_colname); + assert(file_schema->getCategory() == TypeDescription::STRUCT); + vector column_names; + for (int i = 0; i < file_schema->getChildren().size(); i++) + { + auto field_name = file_schema->getFieldNames().at(i); + char pixels_colname[255]; + if (field_name.length() > NAMEDATALEN) + throw PixelsReaderException("pixels column name is too long"); + tolowercase(file_schema->getFieldNames().at(i).c_str(), pixels_colname); + if (strcmp(pg_colname, pixels_colname) == 0) + { + column_names.push_back(pixels_colname); + column_map[i] = column_names.size() - 1; + break; + } + } + } + return column_map; +} + +unique_ptr +PixelsFdwExecutionState::PixelsScanInitGlobal(PixelsReadBindData &bind_data) { + + auto result = make_unique(); + result->initialPixelsReader = bind_data.initialPixelsReader; + int max_threads = std::stoi(ConfigFactory::Instance().getProperty("pixel.threads")); + if (max_threads <= 0) { + max_threads = (int) bind_data.files.size(); + } + result->storageArrayScheduler = std::make_shared(bind_data.files, max_threads); + result->file_index.resize(result->storageArrayScheduler->getDeviceSum()); + result->max_threads = max_threads; + result->batch_index = 0; + return std::move(result); +} + +unique_ptr +PixelsFdwExecutionState::PixelsScanInitLocal(PixelsReadBindData &bind_data, + PixelsReadGlobalState ¶llel_state, + vector column_map) { + auto result = make_unique(); + result->deviceID = parallel_state.storageArrayScheduler->acquireDeviceId(); + auto file_schema = bind_data.fileSchema; + vector field_names; + vector field_ids; + for (int i = 0; i < column_map.size(); i++) { + if (column_map[i] >= 0) { + field_names.emplace_back(file_schema->getFieldNames().at(i)); + field_ids.emplace_back(i); + } + } + result->filters = bind_data.filters; + result->column_names = field_names; + result->column_ids = field_ids; + if(!PixelsParallelStateNext(bind_data, *result, parallel_state, true)) { + return nullptr; + } + return std::move(result); +} + +bool +PixelsFdwExecutionState::PixelsParallelStateNext(const PixelsReadBindData &bind_data, + PixelsReadLocalState &scan_data, + PixelsReadGlobalState ¶llel_state, + bool is_init_state) { + unique_lock parallel_lock(parallel_state.lock); + if (parallel_state.error_opening_file) { + throw InvalidArgumentException("PixelsScanInitLocal: file open error."); + } + + auto& StorageInstance = parallel_state.storageArrayScheduler; + if ((is_init_state && + parallel_state.file_index.at(scan_data.deviceID) >= StorageInstance->getFileSum(scan_data.deviceID)) || + scan_data.next_file_index >= StorageInstance->getFileSum(scan_data.deviceID)) { + ::BufferPool::Reset(); + parallel_lock.unlock(); + return false; + } + + scan_data.curr_file_index = scan_data.next_file_index; + scan_data.curr_batch_index = scan_data.next_batch_index; + scan_data.next_file_index = parallel_state.file_index.at(scan_data.deviceID); + scan_data.next_batch_index = StorageInstance->getBatchID(scan_data.deviceID, scan_data.next_file_index); + scan_data.curr_file_name = scan_data.next_file_name; + parallel_state.file_index.at(scan_data.deviceID)++; + parallel_lock.unlock(); + + if(scan_data.currReader != nullptr) { + scan_data.currReader->close(); + } + + ::BufferPool::Switch(); + scan_data.currReader = scan_data.nextReader; + scan_data.currPixelsRecordReader = scan_data.nextPixelsRecordReader; + if (scan_data.currPixelsRecordReader != nullptr) { + auto currPixelsRecordReader = std::static_pointer_cast(scan_data.currPixelsRecordReader); + currPixelsRecordReader->read(); + } + if(scan_data.next_file_index < StorageInstance->getFileSum(scan_data.deviceID)) { + auto footerCache = std::make_shared(); + auto builder = std::make_shared(); + std::shared_ptr<::Storage> storage = StorageFactory::getInstance()->getStorage(::Storage::file); + scan_data.next_file_name = StorageInstance->getFileName(scan_data.deviceID, scan_data.next_file_index); + scan_data.nextReader = builder->setPath(scan_data.next_file_name) + ->setStorage(storage) + ->setPixelsFooterCache(footerCache) + ->build(); + + PixelsReaderOption option = GetPixelsReaderOption(scan_data, parallel_state); + scan_data.nextPixelsRecordReader = scan_data.nextReader->read(option); + auto nextPixelsRecordReader = std::static_pointer_cast(scan_data.nextPixelsRecordReader); + nextPixelsRecordReader->read(); + } else { + scan_data.nextReader = nullptr; + scan_data.nextPixelsRecordReader = nullptr; + } + return true; +} + +PixelsReaderOption +PixelsFdwExecutionState::GetPixelsReaderOption(PixelsReadLocalState &local_state, + PixelsReadGlobalState &global_state) { + PixelsReaderOption option; + option.setSkipCorruptRecords(true); + option.setTolerantSchemaEvolution(true); + option.setEnableEncodedColumnVector(true); + option.setIncludeCols(local_state.column_names); + option.setRGRange(0, local_state.nextReader->getRowGroupNum()); + option.setQueryId(1); + option.setEnabledFilterPushDown(true); + option.setFilters(local_state.filters); + int stride = std::stoi(ConfigFactory::Instance().getProperty("pixel.stride")); + option.setBatchSize(stride); + return option; +} + +void PixelsFdwExecutionState::GetNextOffsets() { + auto currPixelsRecordReader = std::static_pointer_cast(scan_data->currPixelsRecordReader); + if (cur_row_index == -1) { + scan_data->vectorizedRowBatch->increment(-1); + masked_next_offsets.clear(); + int masked_next_offset = 1; + for (int j = 0; j < scan_data->vectorizedRowBatch->count(); j++) { + if (enable_filter_pushdown && !currPixelsRecordReader->getFilterMask()->get(j)) { + masked_next_offset++; + } + else { + masked_next_offsets.insert(std::make_pair(j - masked_next_offset, masked_next_offset)); + masked_next_offset = 1; + } + } + for (std::pair offset : masked_next_offsets) { + std::cout << "offset: " << offset.first << " " << offset.second << std::endl; + } + } + if (masked_next_offsets.find(cur_row_index) == masked_next_offsets.end()) { + scan_data->vectorizedRowBatch->increment(scan_data->vectorizedRowBatch->count()); + cur_row_index = PIXELS_FDW_MAX_COLUMN_LENGTH; + } + else { + scan_data->vectorizedRowBatch->increment(masked_next_offsets[cur_row_index]); + cur_row_index += masked_next_offsets[cur_row_index]; + } +} + +bool PixelsFdwExecutionState::GetNextBatch() { + if (!scan_data) { + return false; + } + if (scan_data->currPixelsRecordReader == nullptr) { + if(!PixelsParallelStateNext(*bind_data, *scan_data, *parallel_state)) { + return false; + } + } + auto currPixelsRecordReader = std::static_pointer_cast(scan_data->currPixelsRecordReader); + if (scan_data->vectorizedRowBatch == nullptr) { + scan_data->vectorizedRowBatch = currPixelsRecordReader->readBatch(false); + cur_row_index = -1; + GetNextOffsets(); + } + while (scan_data->vectorizedRowBatch->isEndOfFile()) { + if (currPixelsRecordReader->isEndOfFile()) { + currPixelsRecordReader.reset(); + if(!PixelsParallelStateNext(*bind_data, *scan_data, *parallel_state)) { + return false; + } + currPixelsRecordReader = std::static_pointer_cast(scan_data->currPixelsRecordReader); + if (scan_data->vectorizedRowBatch == nullptr) { + scan_data->vectorizedRowBatch = currPixelsRecordReader->readBatch(false); + cur_row_index = -1; + GetNextOffsets(); + } + } + else { + scan_data->vectorizedRowBatch = currPixelsRecordReader->readBatch(false); + cur_row_index = -1; + GetNextOffsets(); + } + } + return true; +} + +bool PixelsFdwExecutionState::next(TupleTableSlot* slot) { + if (!GetNextBatch()) { + return false; + } + for (int i = 0; i < scan_data->column_ids.size(); i++) { + int column_id = scan_data->column_ids.at(i); + auto col = scan_data->vectorizedRowBatch->cols.at(i); + auto colSchema = bind_data->fileSchema->getChildren().at(column_id); + switch (colSchema->getCategory()) { + case TypeDescription::SHORT: { + auto intCol = std::static_pointer_cast(col); + slot->tts_isnull[column_id] = false; + slot->tts_values[column_id] = Int16GetDatum(*((short*)(intCol->current()))); + break; + } + case TypeDescription::INT: { + auto intCol = std::static_pointer_cast(col); + slot->tts_isnull[column_id] = false; + slot->tts_values[column_id] = Int32GetDatum(*((int*)(intCol->current()))); + break; + } + case TypeDescription::LONG: { + auto intCol = std::static_pointer_cast(col); + slot->tts_isnull[column_id] = false; + slot->tts_values[column_id] = Int64GetDatum(*((long*)(intCol->current()))); + break; + } + case TypeDescription::DATE: { + auto intCol = std::static_pointer_cast(col); + slot->tts_isnull[column_id] = false; + slot->tts_values[column_id] = DateADTGetDatum(*((int*)(intCol->current())) + (UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE)); + break; + } + case TypeDescription::DECIMAL: { + auto decimalCol = std::static_pointer_cast(col); + if (decimalCol->getPrecision() > 18) { + throw PixelsReaderException("Pixels reader do not support longer decimal"); + } + // lose precision + Datum numeric_data = DirectFunctionCall1(float8_numeric, + Float8GetDatum(float8(*((long*)(decimalCol->current())) / std::pow(10, decimalCol->getScale())))); + slot->tts_isnull[column_id] = false; + slot->tts_values[column_id] = numeric_data; + break; + } + case TypeDescription::VARCHAR: + case TypeDescription::CHAR: + { + auto binaryCol = std::static_pointer_cast(col); + string_t *string_value = (string_t*)(binaryCol->current()); + int64 bytea_len = string_value->GetSize() + VARHDRSZ; + bytea *b = (bytea*)palloc0(bytea_len); + SET_VARSIZE(b, bytea_len); + memcpy(VARDATA(b), string_value->GetData(), string_value->GetSize()); + slot->tts_isnull[column_id] = false; + slot->tts_values[column_id] = PointerGetDatum(b); + break; + } + default: { + throw PixelsReaderException("Pixels reader do not support other types now"); + break; + } + } + } + if (masked_next_offsets.find(cur_row_index) == masked_next_offsets.end()) { + scan_data->vectorizedRowBatch->increment(scan_data->vectorizedRowBatch->count()); + cur_row_index = PIXELS_FDW_MAX_COLUMN_LENGTH; + } + else { + scan_data->vectorizedRowBatch->increment(masked_next_offsets[cur_row_index]); + cur_row_index += masked_next_offsets[cur_row_index]; + } + ExecStoreVirtualTuple(slot); + return true; +} + +void +PixelsFdwExecutionState::PixelsFdwExecutionState::rescan() { + bind_data->initialPixelsReader->close(); + bind_data.reset(); + parallel_state->initialPixelsReader->close(); + parallel_state.reset(); + scan_data->currReader->close(); + scan_data->currPixelsRecordReader->close(); + scan_data->nextPixelsRecordReader->close(); + scan_data.reset(); + shared_ptr file_schema; + bind_data = PixelsFdwExecutionState::PixelsScanBind(files_list, filters_list, file_schema); + column_map = PixelsFdwExecutionState::PixelsGetColumnMap(file_schema, attrs_used, tuple_desc); + parallel_state = PixelsFdwExecutionState::PixelsScanInitGlobal(*bind_data); + scan_data = PixelsFdwExecutionState::PixelsScanInitLocal(*bind_data, *parallel_state, column_map); + +} + +PixelsFdwExecutionState* +createPixelsFdwExecutionState(List* filenames, + List* filters, + set attrs_used, + TupleDesc tupleDesc) { + return new PixelsFdwExecutionState(filenames, filters, attrs_used, tupleDesc); +} diff --git a/pixels_fdw/PixelsFdwPlanState.cpp b/pixels_fdw/PixelsFdwPlanState.cpp new file mode 100755 index 0000000..99e2cbb --- /dev/null +++ b/pixels_fdw/PixelsFdwPlanState.cpp @@ -0,0 +1,67 @@ +// +// Created by liyu on 3/26/23. +// + +#include "PixelsFdwPlanState.hpp" +#include "physical/StorageArrayScheduler.h" +#include "profiler/CountProfiler.h" + +PixelsFdwPlanState::PixelsFdwPlanState(List* files, + List* col_filters, + List* options) { + ListCell *file_lc; + foreach (file_lc, files) { + files_list = lappend(files_list, lfirst(file_lc)); + } + if (list_length(files_list) == 0) { + throw PixelsReaderException("Pixels reader cannot take empty filename as parameter"); + } + + ListCell *filter_lc; + foreach (filter_lc, col_filters) { + filters_list = lappend(filters_list, lfirst(filter_lc)); + } + + auto footerCache = std::make_shared(); + auto builder = std::make_shared(); + + std::shared_ptr<::Storage> storage = StorageFactory::getInstance()->getStorage(::Storage::file); + std::shared_ptr pixelsReader = builder->setPath(string(strVal(lfirst(list_head(files_list))))) + ->setStorage(storage) + ->setPixelsFooterCache(footerCache) + ->build(); + initialPixelsReader = pixelsReader; + row_count = initialPixelsReader->getNumberOfRows(); + plan_options = options; + attrs_used = bms_make_singleton(1 - FirstLowInvalidHeapAttributeNumber); +} + +PixelsFdwPlanState::~PixelsFdwPlanState() { + if (initialPixelsReader) { + initialPixelsReader->close(); + } + initialPixelsReader.reset(); +} + +List*& +PixelsFdwPlanState::getFilesList() { + return files_list; +} + +List*& +PixelsFdwPlanState::getFiltersList() { + return filters_list; +} + +uint64_t +PixelsFdwPlanState::getRowCount() { + return row_count; +} + + +PixelsFdwPlanState* +createPixelsFdwPlanState(List* files, + List* col_filters, + List* options) { + return new PixelsFdwPlanState(files, col_filters, options); +} diff --git a/pixels_fdw/PixelsFilter.cpp b/pixels_fdw/PixelsFilter.cpp new file mode 100644 index 0000000..4624932 --- /dev/null +++ b/pixels_fdw/PixelsFilter.cpp @@ -0,0 +1,329 @@ + +#include "PixelsFilter.hpp" + + +PixelsFilter::PixelsFilter(PixelsFilterType type, + std::string cname, + long ivalue, + double dvalue, + string_t svalue) { + pixelsFilterType = type; + column_name = cname; + integer_value = ivalue; + decimal_value = dvalue; + string_value = svalue; +} + +PixelsFilter::~PixelsFilter() { + if (lchild) { + delete lchild; + } + if (rchild) { + delete rchild; + } +} + +std::string PixelsFilter::getColumnName() { + return column_name; +} + +PixelsFilterType PixelsFilter::getFilterType() { + return pixelsFilterType; +} + +void PixelsFilter::setColumnName(std::string cname) { + column_name = cname; +} + +long PixelsFilter::getIntegerValue() +{ + return integer_value; +} + +double PixelsFilter::getDecimalValue() { + return decimal_value; +} + +string_t PixelsFilter::getStringValue() { + return string_value; +} + +PixelsFilter *PixelsFilter::getLChild() { + return lchild; +} + +void PixelsFilter::setLChild(PixelsFilter *lc) { + lchild = lc; +} + +PixelsFilter *PixelsFilter::getRChild() { + return rchild; +} + +void PixelsFilter::setRChild(PixelsFilter *rc) { + rchild = rc; +} + +PixelsFilter *PixelsFilter::copy() { + return new PixelsFilter(pixelsFilterType, column_name, integer_value, decimal_value, string_value); +} + +template +int PixelsFilter::CompareAvx2(void * data, + T constant) { + __m256i vector; + __m256i vector_next; + __m256i constants; + __m256i mask; + if constexpr(sizeof(T) == 4) { + vector = _mm256_load_si256((__m256i *)data); + constants = _mm256_set1_epi32(constant); + if constexpr(std::is_same()) { + mask = _mm256_cmpeq_epi32(vector, constants); + return _mm256_movemask_ps((__m256)mask); + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi32(constants, vector); + return _mm256_movemask_ps((__m256)mask); + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi32(vector, constants); + return ~_mm256_movemask_ps((__m256)mask); + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi32(vector, constants); + return _mm256_movemask_ps((__m256)mask); + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi32(constants, vector); + return ~_mm256_movemask_ps((__m256)mask); + } + } else if constexpr(sizeof(T) == 8) { + constants = _mm256_set1_epi64x(constant); + vector = _mm256_load_si256((__m256i *)data); + vector_next = _mm256_load_si256((__m256i *)((uint8_t *)data + 32)); + int result = 0; + if constexpr(std::is_same()) { + mask = _mm256_cmpeq_epi64(vector, constants); + result = _mm256_movemask_pd((__m256d)mask); + mask = _mm256_cmpeq_epi64(vector_next, constants); + result += _mm256_movemask_pd((__m256d)mask) << 4; + return result; + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi64(constants, vector); + result = _mm256_movemask_pd((__m256d)mask); + mask = _mm256_cmpgt_epi64(constants, vector_next); + result += _mm256_movemask_pd((__m256d)mask) << 4; + return result; + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi64(vector, constants); + result = _mm256_movemask_pd((__m256d)mask); + mask = _mm256_cmpgt_epi64(vector_next, constants); + result += _mm256_movemask_pd((__m256d)mask) << 4; + return ~result; + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi64(vector, constants); + result = _mm256_movemask_pd((__m256d)mask); + mask = _mm256_cmpgt_epi64(vector_next, constants); + result += _mm256_movemask_pd((__m256d)mask) << 4; + return result; + } else if constexpr(std::is_same()) { + mask = _mm256_cmpgt_epi64(constants, vector); + result = _mm256_movemask_pd((__m256d)mask); + mask = _mm256_cmpgt_epi64(constants, vector_next); + result += _mm256_movemask_pd((__m256d)mask) << 4; + return ~result; + } + } else { + throw InvalidArgumentException("We didn't support other sizes yet to do filter SIMD"); + } +} + + +template +void PixelsFilter::TemplatedFilterOperation(std::shared_ptr vector, + const long ivalue, + const double dvalue, + const string_t svalue, + PixelsBitMask &filter_mask, + std::shared_ptr type) { + switch (type->getCategory()) { + case TypeDescription::SHORT: + case TypeDescription::INT: { + int constant_value = (int)ivalue; + auto longColumnVector = std::static_pointer_cast(vector); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(longColumnVector->intVector + i, constant_value); + filter_mask.setByteAligned(i, mask); + } +#endif + for (; i < vector->length; i++) { + filter_mask.set(i, OP::Operation(longColumnVector->intVector[i], + constant_value)); + } + break; + } + case TypeDescription::LONG: { + long constant_value = (long)ivalue; + auto longColumnVector = std::static_pointer_cast(vector); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(longColumnVector->longVector + i, constant_value); + filter_mask.setByteAligned(i, mask); + } +#endif + for(; i < vector->length; i++) { + filter_mask.set(i, OP::Operation(longColumnVector->longVector[i], + constant_value)); + } + break; + } + case TypeDescription::DATE: { + int constant_value = (int)ivalue; + auto dateColumnVector = std::static_pointer_cast(vector); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(dateColumnVector->dates + i, constant_value); + filter_mask.setByteAligned(i, mask); + } +#endif + for (; i < vector->length; i++) { + filter_mask.set(i, OP::Operation(dateColumnVector->dates[i], + constant_value)); + } + break; + } + case TypeDescription::DECIMAL: { + auto decimalColumnVector = std::static_pointer_cast(vector); + int scale = decimalColumnVector->getScale(); + double decimal_value = dvalue * std::pow(10, scale); + long long_value = std::lround(decimal_value); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(decimalColumnVector->vector + i, constant_value); + filter_mask.setByteAligned(i, mask); + } +#endif + for (; i < vector->length; i++) { + filter_mask.set(i, OP::Operation(decimalColumnVector->vector[i], + long_value)); + } + break; + } + case TypeDescription::STRING: + case TypeDescription::BINARY: + case TypeDescription::VARBINARY: + case TypeDescription::CHAR: + case TypeDescription::VARCHAR: { + string_t constant_value = svalue; + auto binaryColumnVector = std::static_pointer_cast(vector); + for (int i = 0; i < vector->length; i++) { + filter_mask.set(i, OP::Operation(binaryColumnVector->vector[i], + constant_value)); + } + break; + } + } +} + +template +void PixelsFilter::FilterOperationSwitch(std::shared_ptr vector, + const long ivalue, + const double dvalue, + const string_t svalue, + PixelsBitMask &filter_mask, + std::shared_ptr type) { + if (filter_mask.isNone()) { + return; + } + switch (type->getCategory()) { + case TypeDescription::SHORT: + case TypeDescription::INT: + case TypeDescription::DATE: + TemplatedFilterOperation(vector, ivalue, dvalue, svalue, filter_mask, type); + break; + case TypeDescription::LONG: + TemplatedFilterOperation(vector, ivalue, dvalue, svalue, filter_mask, type); + break; + case TypeDescription::DECIMAL: + TemplatedFilterOperation(vector, ivalue, dvalue, svalue, filter_mask, type); + break; + case TypeDescription::STRING: + case TypeDescription::BINARY: + case TypeDescription::VARBINARY: + case TypeDescription::CHAR: + case TypeDescription::VARCHAR: + TemplatedFilterOperation(vector, ivalue, dvalue, svalue, filter_mask, type); + break; + default: + throw InvalidArgumentException("Unsupported type for filter. "); + } +} + +void +PixelsFilter::ApplyFilter(std::shared_ptr vector, + PixelsBitMask& filterMask, + std::shared_ptr type) { + switch (pixelsFilterType) { + case PixelsFilterType::CONJUNCTION_AND: { + if (lchild) { + PixelsBitMask lchildMask(filterMask.maskLength); + lchild->ApplyFilter(vector, lchildMask, type); + filterMask.And(lchildMask); + } + if (rchild) { + PixelsBitMask lchildMask(filterMask.maskLength); + lchild->ApplyFilter(vector, lchildMask, type); + filterMask.And(lchildMask); + } + break; + } + case PixelsFilterType::CONJUNCTION_OR: { + PixelsBitMask orMask(filterMask.maskLength); + if (lchild) { + PixelsBitMask lchildMask(filterMask.maskLength); + lchild->ApplyFilter(vector, lchildMask, type); + orMask.Or(lchildMask); + } + if (rchild) { + PixelsBitMask lchildMask(filterMask.maskLength); + lchild->ApplyFilter(vector, lchildMask, type); + orMask.Or(lchildMask); + } + filterMask.And(orMask); + break; + } + case PixelsFilterType::COMPARE_EQ: { + FilterOperationSwitch(vector, integer_value, decimal_value, string_value, filterMask, type); + break; + } + case PixelsFilterType::COMPARE_GTEQ: { + FilterOperationSwitch(vector, integer_value, decimal_value, string_value, filterMask, type); + break; + } + case PixelsFilterType::COMPARE_LTEQ: { + FilterOperationSwitch(vector, integer_value, decimal_value, string_value, filterMask, type); + break; + } + case PixelsFilterType::COMPARE_GT: { + FilterOperationSwitch(vector, integer_value, decimal_value, string_value, filterMask, type); + break; + } + case PixelsFilterType::COMPARE_LT: { + FilterOperationSwitch(vector, integer_value, decimal_value, string_value, filterMask, type); + break; + } + default: + assert(0); + break; + } +} + +PixelsFilter *createPixelsFilter(PixelsFilterType type, + std::string cname, + long ivalue, + double dvalue, + string_t svalue) { + return new PixelsFilter(type, cname, ivalue, dvalue, svalue); +} diff --git a/pixels_fdw/README.md b/pixels_fdw/README.md new file mode 100755 index 0000000..a303549 --- /dev/null +++ b/pixels_fdw/README.md @@ -0,0 +1,29 @@ +# pixels-postgres +The PostgreSQL FDW for Pixels. + +export PIXELS_FDW_SRC=/path/to/yours + +You'd better put this folder to your postgresql source code directory. (postgresql-xx.xx/contrib) + +Please modify this line "shared_preload_libraries = 'pixels_fdw' # (change requires restart)" in "postgresql.conf" + +make pull +make deps +make +sudo make install + +CREATE EXTENSION pixels_fdw; + +CREATE SERVER pixels_server FOREIGN DATA WRAPPER pixels_fdw; + +create foreign table example ( + id int, + name varchar, + birthday date, + score decimal(15, 2) +) +server pixels_server +options ( + filename '|/path1|/path2|/path3|', + filters 'id > 1 & score < 90' +); \ No newline at end of file diff --git a/pixels_fdw/include/PixelsFdwExecutionState.hpp b/pixels_fdw/include/PixelsFdwExecutionState.hpp new file mode 100755 index 0000000..652a6f0 --- /dev/null +++ b/pixels_fdw/include/PixelsFdwExecutionState.hpp @@ -0,0 +1,108 @@ +// +// Created by liyu on 3/26/23. +// +#pragma once + +#define STANDARD_VECTOR_SIZE 2048U +#define PIXELS_FDW_MAX_DEC_WIDTH 18 +#define PIXELS_FDW_MAX_COLUMN_LENGTH INT64_MAX + +#include +#include +#include +#include +#include +#include +#include "PixelsReadGlobalState.hpp" +#include "PixelsReadLocalState.hpp" +#include "PixelsReadBindData.hpp" +#include "PixelsFilter.hpp" +#include "physical/storage/LocalFS.h" +#include "physical/natives/ByteBuffer.h" +#include "physical/natives/DirectRandomAccessFile.h" +#include "physical/io/PhysicalLocalReader.h" +#include "physical/StorageFactory.h" +#include "physical/StorageArrayScheduler.h" +#include "profiler/CountProfiler.h" +#include "PixelsReaderImpl.h" +#include "PixelsReaderBuilder.h" +#include +#include +#include +#include "physical/scheduler/NoopScheduler.h" +#include "physical/SchedulerFactory.h" +#include "PixelsVersion.h" +#include "PixelsFooterCache.h" +#include "exception/PixelsReaderException.h" +#include "reader/PixelsReaderOption.h" +#include "TypeDescription.h" +#include "vector/ColumnVector.h" +#include "vector/LongColumnVector.h" +#include "physical/BufferPool.h" +#include "profiler/TimeProfiler.h" +#include "TypeDescription.h" +#include + +extern "C" { +#include "postgres.h" +#include "fmgr.h" +#include "utils/date.h" +#include "utils/numeric.h" +#include "utils/fmgrprotos.h" +#include "catalog/pg_type.h" +#include "executor/tuptable.h" +#include "executor/spi.h" +} + +using namespace std; + + +class PixelsFdwExecutionState { +public: + PixelsFdwExecutionState(List* files, + List* filters, + set attrs_used, + TupleDesc tupleDesc); + ~PixelsFdwExecutionState(); + static unique_ptr PixelsScanInitGlobal(PixelsReadBindData &bind_data); + static unique_ptr PixelsScanInitLocal(PixelsReadBindData &bind_data, + PixelsReadGlobalState ¶llel_state, + vector column_map); + static unique_ptr PixelsScanBind(vector files, + vector filters, + shared_ptr &file_schema); + static vector PixelsGetColumnMap(const shared_ptr file_schema, + set attrs_used, + TupleDesc tupleDesc); + static bool PixelsParallelStateNext(const PixelsReadBindData &bind_data, + PixelsReadLocalState &scan_data, + PixelsReadGlobalState ¶llel_state, + bool is_init_state = false); + static PixelsReaderOption GetPixelsReaderOption(PixelsReadLocalState &local_state, + PixelsReadGlobalState &global_state); + void GetNextOffsets(); + bool GetNextBatch(); + bool next(TupleTableSlot* slot); + void rescan(); +private: + vector files_list; + vector filters_list; + set attrs_used; + vector column_map; + vector types; + TupleDesc tuple_desc; + int64_t cur_row_index = -1; + map masked_next_offsets; + unique_ptr bind_data; + unique_ptr scan_data; + unique_ptr parallel_state; + PixelsReaderOption reader_option; + vector selected_column_name; + vector selected_column_idx; + bool enable_filter_pushdown = true; +}; + +PixelsFdwExecutionState* createPixelsFdwExecutionState(List* files, + List* filters, + set attrs_used, + TupleDesc tupleDesc); \ No newline at end of file diff --git a/pixels_fdw/include/PixelsFdwPlanState.hpp b/pixels_fdw/include/PixelsFdwPlanState.hpp new file mode 100755 index 0000000..04c70f1 --- /dev/null +++ b/pixels_fdw/include/PixelsFdwPlanState.hpp @@ -0,0 +1,72 @@ +// +// Created by liyu on 3/26/23. +// +#pragma once + +#define STANDARD_VECTOR_SIZE 2048U + +#include +#include +#include +#include +#include +#include + +#include "physical/storage/LocalFS.h" +#include "physical/natives/ByteBuffer.h" +#include "physical/natives/DirectRandomAccessFile.h" +#include "physical/io/PhysicalLocalReader.h" +#include "physical/StorageFactory.h" +#include "PixelsReaderImpl.h" +#include "PixelsReaderBuilder.h" +#include "PixelsFilter.hpp" +#include +#include +#include +#include "physical/scheduler/NoopScheduler.h" +#include "physical/SchedulerFactory.h" +#include "PixelsVersion.h" +#include "PixelsFooterCache.h" +#include "exception/PixelsReaderException.h" +#include "reader/PixelsReaderOption.h" +#include "TypeDescription.h" +#include "vector/ColumnVector.h" +#include "vector/LongColumnVector.h" +#include "physical/BufferPool.h" +#include "profiler/TimeProfiler.h" +#include "TypeDescription.h" + +extern "C" { +#include "postgres.h" +#include "fmgr.h" +#include "catalog/pg_type.h" +#include "executor/tuptable.h" +#include "executor/spi.h" +} + +using namespace std; + + +class PixelsFdwPlanState { +public: + PixelsFdwPlanState(List* files, + List* filters, + List* options); + ~PixelsFdwPlanState(); + List*& getFilesList(); + List*& getFiltersList(); + uint64_t getRowCount(); + Bitmapset* attrs_used; + +private: + std::shared_ptr initialPixelsReader; + List* files_list = NIL; + List* filters_list = NIL; + uint64_t row_count; + List* plan_options; +}; + + +PixelsFdwPlanState* createPixelsFdwPlanState(List* files, + List* filters, + List* options); \ No newline at end of file diff --git a/pixels_fdw/include/PixelsFilter.hpp b/pixels_fdw/include/PixelsFilter.hpp new file mode 100644 index 0000000..041f72b --- /dev/null +++ b/pixels_fdw/include/PixelsFilter.hpp @@ -0,0 +1,116 @@ +// +// Created by liyu on 6/23/23. +// +#pragma once + +#include +#include "PixelsBitMask.h" +#include "vector/ColumnVector.h" +#include "TypeDescription.h" +#include "string_t.hpp" +#include +#include +#include + +enum class PixelsFilterType : uint8_t { + CONJUNCTION_AND = 0, + CONJUNCTION_OR, + COMPARE_EQ, + COMPARE_GTEQ, + COMPARE_LTEQ, + COMPARE_GT, + COMPARE_LT +}; + +class PixelsFilterOp { +public: + struct Equals { + template + static inline bool Operation(const T &left, const T &right) { + return left == right; + } + }; + + struct GreaterThan { + template + static inline bool Operation(const T &left, const T &right) { + return left > right; + } + }; + + struct GreaterThanEquals { + template + static inline bool Operation(const T &left, const T &right) { + return !GreaterThan::Operation(right, left); + } + }; + + struct LessThan { + template + static inline bool Operation(const T &left, const T &right) { + return GreaterThan::Operation(right, left); + } + }; + + struct LessThanEquals { + template + static inline bool Operation(const T &left, const T &right) { + return !GreaterThan::Operation(left, right); + } + }; +}; + +class PixelsFilter { +public: + PixelsFilter(PixelsFilterType type, + std::string cname, + long ivalue, + double dvalue, + string_t svalue); + ~PixelsFilter(); + std::string getColumnName(); + PixelsFilterType getFilterType(); + void setColumnName(std::string cname); + long getIntegerValue(); + double getDecimalValue(); + string_t getStringValue(); + PixelsFilter *getLChild(); + void setLChild(PixelsFilter *lc); + PixelsFilter *getRChild(); + void setRChild(PixelsFilter *rc); + PixelsFilter *copy(); + void ApplyFilter(std::shared_ptr vector, + PixelsBitMask& filterMask, + std::shared_ptr type); + template + static int CompareAvx2(void * data, T constant); + template + static void TemplatedFilterOperation(std::shared_ptr vector, + const long ivalue, + const double dvalue, + const string_t svalue, + PixelsBitMask &filter_mask, + std::shared_ptr type); + + template + static void FilterOperationSwitch(std::shared_ptr vector, + const long ivalue, + const double dvalue, + const string_t svalue, + PixelsBitMask &filter_mask, + std::shared_ptr type); +private: + PixelsFilterType pixelsFilterType; + std::string column_name; + long integer_value; + double decimal_value; + string_t string_value; + PixelsFilter *lchild = nullptr; + PixelsFilter *rchild = nullptr; +}; + +PixelsFilter* createPixelsFilter(PixelsFilterType type, + std::string cname, + const long ivalue, + const double dvalue, + string_t svalue); diff --git a/pixels_fdw/include/PixelsReadBindData.hpp b/pixels_fdw/include/PixelsReadBindData.hpp new file mode 100755 index 0000000..f35c686 --- /dev/null +++ b/pixels_fdw/include/PixelsReadBindData.hpp @@ -0,0 +1,21 @@ +// +// Created by liyu on 3/27/23. +// + +#ifndef EXAMPLE_C_PIXELSREADBINDDATA_HPP +#define EXAMPLE_C_PIXELSREADBINDDATA_HPP + + +#include "PixelsReader.h" +#include "PixelsFilter.hpp" + + +struct PixelsReadBindData { + std::shared_ptr initialPixelsReader; + std::shared_ptr fileSchema; + std::vector files; + std::vector filters; + std::atomic curFileId; +}; + +#endif // EXAMPLE_C_PIXELSREADBINDDATA_HPP diff --git a/pixels_fdw/include/PixelsReadGlobalState.hpp b/pixels_fdw/include/PixelsReadGlobalState.hpp new file mode 100755 index 0000000..50ce3c3 --- /dev/null +++ b/pixels_fdw/include/PixelsReadGlobalState.hpp @@ -0,0 +1,33 @@ +// +// Created by liyu on 3/26/23. +// + +#include "PixelsReader.h" +#include "physical/StorageArrayScheduler.h" + +struct PixelsReadGlobalState { + std::mutex lock; + + //! The initial reader from the bind phase + std::shared_ptr initialPixelsReader; + + //! Mutexes to wait for a file that is currently being opened + std::unique_ptr file_mutexes; + + //! Signal to other threads that a file failed to open, letting every thread abort. + bool error_opening_file = false; + + std::shared_ptr storageArrayScheduler; + + //! Index of file currently up for scanning + std::vector file_index; + + //! Batch index of the next row group to be scanned + uint64_t batch_index; + + uint64_t max_threads; + + uint64_t MaxThreads() const { + return max_threads; + } +}; diff --git a/pixels_fdw/include/PixelsReadLocalState.hpp b/pixels_fdw/include/PixelsReadLocalState.hpp new file mode 100755 index 0000000..db76408 --- /dev/null +++ b/pixels_fdw/include/PixelsReadLocalState.hpp @@ -0,0 +1,40 @@ +// +// Created by liyu on 3/26/23. +// + + +#include "PixelsReader.h" +#include "PixelsFilter.hpp" +#include "reader/PixelsRecordReader.h" + +struct PixelsReadLocalState { + PixelsReadLocalState() { + curr_file_index = 0; + next_file_index = 0; + curr_batch_index = 0; + next_batch_index = 0; + rowOffset = 0; + currPixelsRecordReader = nullptr; + nextPixelsRecordReader = nullptr; + vectorizedRowBatch = nullptr; + currReader = nullptr; + nextReader = nullptr; + } + std::shared_ptr currPixelsRecordReader; + std::shared_ptr nextPixelsRecordReader; + std::shared_ptr vectorizedRowBatch; + int deviceID; + int rowOffset; + std::vector column_ids; + std::vector column_names; + std::vector filters; + std::shared_ptr currReader; + std::shared_ptr nextReader; + uint64_t curr_file_index; + uint64_t next_file_index; + uint64_t curr_batch_index; + uint64_t next_batch_index; + std::string next_file_name; + std::string curr_file_name; + +}; diff --git a/pixels_fdw/include/PixelsRelMetaData.hpp b/pixels_fdw/include/PixelsRelMetaData.hpp new file mode 100755 index 0000000..2c1636e --- /dev/null +++ b/pixels_fdw/include/PixelsRelMetaData.hpp @@ -0,0 +1,7 @@ +#include "PixelsReader.h" + + +struct PixelsRelMetaData { + std::shared_ptr initialPixelsReader; + std::vector files; +}; \ No newline at end of file diff --git a/pixels_fdw/include/PixelsTableFilter.hpp b/pixels_fdw/include/PixelsTableFilter.hpp new file mode 100644 index 0000000..845288d --- /dev/null +++ b/pixels_fdw/include/PixelsTableFilter.hpp @@ -0,0 +1,101 @@ +/* #include +#include +#include + +enum class TableFilterType : uint8_t { + CONSTANT_COMPARISON = 0, // constant comparison (e.g. =C, >C, >=C, Copy() const = 0; + virtual bool Equals(const TableFilter &other) const { + return filter_type != other.filter_type; + } + virtual std::unique_ptr ToExpression(const Expression &column) const = 0; + + virtual void Serialize(Serializer &serializer) const; + static std::unique_ptr Deserialize(Deserializer &deserializer); + +public: + template + TARGET &Cast() { + if (filter_type != TARGET::TYPE) { + throw InternalException("Failed to cast table to type - table filter type mismatch"); + } + return reinterpret_cast(*this); + } + + template + const TARGET &Cast() const { + if (filter_type != TARGET::TYPE) { + throw InternalException("Failed to cast table to type - table filter type mismatch"); + } + return reinterpret_cast(*this); + } +}; + +class TableFilterSet { +public: + std::unordered_map> filters; + +public: + void PushFilter(uint64_t column_index, std::unique_ptr filter); + + bool Equals(TableFilterSet &other) { + if (filters.size() != other.filters.size()) { + return false; + } + for (auto &entry : filters) { + auto other_entry = other.filters.find(entry.first); + if (other_entry == other.filters.end()) { + return false; + } + if (!entry.second->Equals(*other_entry->second)) { + return false; + } + } + return true; + } + static bool Equals(TableFilterSet *left, TableFilterSet *right) { + if (left == right) { + return true; + } + if (!left || !right) { + return false; + } + return left->Equals(*right); + } + + void Serialize(Serializer &serializer) const; + static TableFilterSet Deserialize(Deserializer &deserializer); +}; + +class DynamicTableFilterSet { +public: + void ClearFilters(const PhysicalOperator &op); + void PushFilter(const PhysicalOperator &op, idx_t column_index, unique_ptr filter); + + bool HasFilters() const; + unique_ptr GetFinalTableFilters(const PhysicalTableScan &scan, + optional_ptr existing_filters) const; + +private: + mutable mutex lock; + reference_map_t> filters; +}; + */ \ No newline at end of file diff --git a/pixels_fdw/include/string_t.hpp b/pixels_fdw/include/string_t.hpp new file mode 100755 index 0000000..8d5f728 --- /dev/null +++ b/pixels_fdw/include/string_t.hpp @@ -0,0 +1,201 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/common/types/string_type.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include +#include +#include +#include +#include + + +template +const T Load(void *ptr) { + T ret; + memcpy(&ret, ptr, sizeof(ret)); // NOLINT + return ret; +} + +template +void Store(const T &val, void *ptr) { + memcpy(ptr, (void *)&val, sizeof(val)); // NOLINT +} + +struct string_t { + +public: + static constexpr uint32_t PREFIX_BYTES = 4 * sizeof(char); + static constexpr uint32_t INLINE_BYTES = 12 * sizeof(char); + static constexpr uint32_t HEADER_SIZE = sizeof(uint32_t) + PREFIX_BYTES; + static constexpr uint32_t MAX_STRING_SIZE = UINT32_MAX; + static constexpr uint32_t PREFIX_LENGTH = PREFIX_BYTES; + static constexpr uint32_t INLINE_LENGTH = INLINE_BYTES; + + string_t() = default; + explicit string_t(uint32_t len) { + value.inlined.length = len; + } + string_t(const char *data, uint32_t len) { + value.inlined.length = len; + assert(data || GetSize() == 0); + if (IsInlined()) { + // zero initialize the prefix first + // this makes sure that strings with length smaller than 4 still have an equal prefix + memset(value.inlined.inlined, 0, INLINE_BYTES); + if (GetSize() == 0) { + return; + } + // small string: inlined + memcpy(value.inlined.inlined, data, GetSize()); + } else { + // large string: store pointer + memcpy(value.pointer.prefix, data, PREFIX_LENGTH); + value.pointer.ptr = (char *)data; // NOLINT + } + } + + string_t(const char *data) // NOLINT: Allow implicit conversion from `const char*` + : string_t(data, static_cast(strlen(data))) { + } + string_t(const std::string &value) // NOLINT: Allow implicit conversion from `const char*` + : string_t(value.c_str(), static_cast(value.size())) { + } + + bool IsInlined() const { + return GetSize() <= INLINE_LENGTH; + } + + const char *GetData() const { + return IsInlined() ? const_cast(value.inlined.inlined) : value.pointer.ptr; + } + const char *GetDataUnsafe() const { + return GetData(); + } + + char *GetDataWriteable() const { + return IsInlined() ? (char *)value.inlined.inlined : value.pointer.ptr; // NOLINT + } + + const char *GetPrefix() const { + return value.inlined.inlined; + } + + char *GetPrefixWriteable() { + return value.inlined.inlined; + } + + uint32_t GetSize() const { + return value.inlined.length; + } + + bool Empty() const { + return value.inlined.length == 0; + } + + std::string GetString() const { + return std::string(GetData(), GetSize()); + } + + void Finalize() { + // set trailing NULL byte + if (GetSize() <= INLINE_LENGTH) { + // fill prefix with zeros if the length is smaller than the prefix length + memset(value.inlined.inlined + GetSize(), 0, INLINE_BYTES - GetSize()); + } else { + // copy the data into the prefix + memset(value.pointer.prefix, 0, PREFIX_BYTES); + } + } + + struct StringComparisonOperators { + static inline bool Equals(const string_t &a, const string_t &b) { + uint64_t a_bulk_comp = Load((void*)&a); + uint64_t b_bulk_comp = Load((void*)&b); + if (a_bulk_comp != b_bulk_comp) { + // Either length or prefix are different -> not equal + return false; + } + // they have the same length and same prefix! + a_bulk_comp = Load((void*)(&a) + 8u); + b_bulk_comp = Load((void*)(&b) + 8u); + if (a_bulk_comp == b_bulk_comp) { + // either they are both inlined (so compare equal) or point to the same string (so compare equal) + return true; + } + if (!a.IsInlined()) { + // 'long' strings of the same length -> compare pointed value + if (memcmp(a.value.pointer.ptr, b.value.pointer.ptr, a.GetSize()) == 0) { + return true; + } + } + // either they are short string of same length but different content + // or they point to string with different content + // either way, they can't represent the same underlying string + return false; + } + // compare up to shared length. if still the same, compare lengths + static bool GreaterThan(const string_t &left, const string_t &right) { + const uint32_t left_length = static_cast(left.GetSize()); + const uint32_t right_length = static_cast(right.GetSize()); + const uint32_t min_length = std::min(left_length, right_length); + +#ifndef DUCKDB_DEBUG_NO_INLINE + uint32_t a_prefix = Load((void*)(left.GetPrefix())); + uint32_t b_prefix = Load((void*)(right.GetPrefix())); + + // Utility to move 0xa1b2c3d4 into 0xd4c3b2a1, basically inverting the order byte-a-byte + auto byte_swap = [](uint32_t v) -> uint32_t { + uint32_t t1 = (v >> 16u) | (v << 16u); + uint32_t t2 = t1 & 0x00ff00ff; + uint32_t t3 = t1 & 0xff00ff00; + return (t2 << 8u) | (t3 >> 8u); + }; + + // Check on prefix ----- + // We dont' need to mask since: + // if the prefix is greater(after bswap), it will stay greater regardless of the extra bytes + // if the prefix is smaller(after bswap), it will stay smaller regardless of the extra bytes + // if the prefix is equal, the extra bytes are guaranteed to be /0 for the shorter one + + if (a_prefix != b_prefix) { + return byte_swap(a_prefix) > byte_swap(b_prefix); + } +#endif + auto memcmp_res = memcmp(left.GetData(), right.GetData(), min_length); + return memcmp_res > 0 || (memcmp_res == 0 && left_length > right_length); + } + }; + + bool operator==(const string_t &r) const { + return StringComparisonOperators::Equals(*this, r); + } + + bool operator!=(const string_t &r) const { + return !(*this == r); + } + + bool operator>(const string_t &r) const { + return StringComparisonOperators::GreaterThan(*this, r); + } + bool operator<(const string_t &r) const { + return r > *this; + } +private: + union { + struct { + uint32_t length; + char prefix[4]; + char *ptr; + } pointer; + struct { + uint32_t length; + char inlined[12]; + } inlined; + } value; +}; \ No newline at end of file diff --git a/pixels_fdw/pixels-cpp b/pixels_fdw/pixels-cpp new file mode 160000 index 0000000..da395d1 --- /dev/null +++ b/pixels_fdw/pixels-cpp @@ -0,0 +1 @@ +Subproject commit da395d1faccdbb814b54a8f7d31d7673b009a85c diff --git a/pixels_fdw/pixels-cxx.properties b/pixels_fdw/pixels-cxx.properties new file mode 100755 index 0000000..1f0917d --- /dev/null +++ b/pixels_fdw/pixels-cxx.properties @@ -0,0 +1,31 @@ +# pixels c++ reader configurations + + +# valid values: noop, sortmerge, ratelimited +read.request.scheduler=noop +read.request.merge.gap=2097152 + +# localfs properties +localfs.block.size=4096 +localfs.enable.direct.io=true +localfs.enable.async.io=true +# the lib of async is iouring or aio +localfs.async.lib=iouring +# pixel.stride must be the same as the stride size in pxl data +pixel.stride=10000 +# the work thread to run pixels. -1 means using all CPU cores +pixel.threads=-1 +# column size path. It is optional. If no column size path is designated, the +# size of first pixels data is used. For example: +# pixel.column.size.path=/scratch/liyu/opt/pixels/cpp/pixels-duckdb/benchmark/clickbench/clickbench-size.csv +pixel.column.size.path= + +# the work thread to run parquet. -1 means using all CPU cores +parquet.threads=-1 + +# storage device identifier directory depth +# this parameter defines the directory depth that determines the storage device. +# for example, we have three SSDs, the path is /data/ssd1, /data/ssd2 and /data/ssd3, so the depth is 2 +# another example: we have three SSDs, the path is /ssd1, /ssd2 and /ssd3, so the depth is 1 +# this parameter helps us allocate SSD to specific threads +storage.directory.depth=1 diff --git a/pixels_fdw/pixels_fdw--1.0.sql b/pixels_fdw/pixels_fdw--1.0.sql new file mode 100755 index 0000000..ea8c2eb --- /dev/null +++ b/pixels_fdw/pixels_fdw--1.0.sql @@ -0,0 +1,18 @@ +/* contrib/pixels_fdw/pixels_fdw--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pixels_fdw" to load this pixels. \quit + +CREATE FUNCTION pixels_fdw_handler() +RETURNS fdw_handler +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION pixels_fdw_validator(text[], oid) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FOREIGN DATA WRAPPER pixels_fdw + HANDLER pixels_fdw_handler + VALIDATOR pixels_fdw_validator; diff --git a/pixels_fdw/pixels_fdw.c b/pixels_fdw/pixels_fdw.c new file mode 100755 index 0000000..2c61ab7 --- /dev/null +++ b/pixels_fdw/pixels_fdw.c @@ -0,0 +1,64 @@ +#include "postgres.h" +#include "fmgr.h" + +#include "commands/explain.h" +#include "foreign/fdwapi.h" + + +PG_MODULE_MAGIC; + +void _PG_init(void); + +/* FDW routines */ +extern void pixelsGetForeignRelSize(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid); +extern void pixelsGetForeignPaths(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid); +extern ForeignScan *pixelsGetForeignPlan(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid, + ForeignPath *best_path, + List *tlist, + List *scan_clauses, + Plan *outer_plan); +extern TupleTableSlot *pixelsIterateForeignScan(ForeignScanState *node); +extern void pixelsBeginForeignScan(ForeignScanState *node, int eflags); +extern void pixelsEndForeignScan(ForeignScanState *node); +extern void pixelsReScanForeignScan(ForeignScanState *node); +extern bool pixelsAnalyzeForeignTable (Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages); +extern void pixelsExplainForeignScan(ForeignScanState *node, ExplainState *es); +extern bool pixelsIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, + RangeTblEntry *rte); +extern Datum pixels_fdw_validator_impl(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(pixels_fdw_validator); +Datum +pixels_fdw_validator(PG_FUNCTION_ARGS) +{ + return pixels_fdw_validator_impl(fcinfo); +} + +PG_FUNCTION_INFO_V1(pixels_fdw_handler); +Datum +pixels_fdw_handler(PG_FUNCTION_ARGS) +{ + FdwRoutine *fdwroutine = makeNode(FdwRoutine); + + fdwroutine->GetForeignRelSize = pixelsGetForeignRelSize; + fdwroutine->GetForeignPaths = pixelsGetForeignPaths; + fdwroutine->GetForeignPlan = pixelsGetForeignPlan; + fdwroutine->BeginForeignScan = pixelsBeginForeignScan; + fdwroutine->IterateForeignScan = pixelsIterateForeignScan; + fdwroutine->ReScanForeignScan = pixelsReScanForeignScan; + fdwroutine->EndForeignScan = pixelsEndForeignScan; + fdwroutine->AnalyzeForeignTable = pixelsAnalyzeForeignTable; + fdwroutine->ExplainForeignScan = pixelsExplainForeignScan; + fdwroutine->IsForeignScanParallelSafe = pixelsIsForeignScanParallelSafe; + + PG_RETURN_POINTER(fdwroutine); +} + diff --git a/pixels_fdw/pixels_fdw.control b/pixels_fdw/pixels_fdw.control new file mode 100755 index 0000000..d5d184b --- /dev/null +++ b/pixels_fdw/pixels_fdw.control @@ -0,0 +1,5 @@ +# pixels_fdw extension +comment = 'foreign-data wrapper for pixels reader' +default_version = '1.0' +module_pathname = '$libdir/pixels_fdw' +relocatable = true diff --git a/pixels_fdw/pixels_impl.cpp b/pixels_fdw/pixels_impl.cpp new file mode 100755 index 0000000..5697154 --- /dev/null +++ b/pixels_fdw/pixels_impl.cpp @@ -0,0 +1,888 @@ +#include "PixelsFdwExecutionState.hpp" +#include "PixelsFdwPlanState.hpp" + +extern "C" +{ +#include "postgres.h" +#include "access/htup_details.h" +#include "access/nbtree.h" +#include "access/reloptions.h" +#include "access/sysattr.h" +#include "catalog/pg_foreign_table.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "commands/explain.h" +#include "executor/spi.h" +#include "executor/tuptable.h" +#include "foreign/foreign.h" +#include "foreign/fdwapi.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "nodes/makefuncs.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/planmain.h" +#include "optimizer/restrictinfo.h" +#include "parser/parse_coerce.h" +#include "parser/parse_func.h" +#include "parser/parse_oper.h" +#include "utils/builtins.h" +#include "utils/jsonb.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/regproc.h" +#include "utils/rel.h" +#include "utils/typcache.h" +#include "access/table.h" +#include "optimizer/optimizer.h" +#include "catalog/pg_am_d.h" +#include "nodes/pathnodes.h" +} + +#define MAX_PIXELS_OPTION_LENGTH 500 + +static void* +pixelsGetOption(Oid relid, char* option_name) +{ + if (!option_name) { + elog(ERROR, + "empty option name"); + return nullptr; + } + ForeignTable *table; + ForeignServer *server; + ForeignDataWrapper *wrapper; + List *options; + ListCell *lc; + + table = GetForeignTable(relid); + server = GetForeignServer(table->serverid); + wrapper = GetForeignDataWrapper(server->fdwid); + + options = NIL; + options = list_concat(options, wrapper->options); + options = list_concat(options, server->options); + options = list_concat(options, table->options); + + foreach(lc, table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, option_name) == 0) + { + void* result_option = (char *) palloc0(MAX_PIXELS_OPTION_LENGTH); + memcpy(result_option, + defGetString(def), + MAX_PIXELS_OPTION_LENGTH); + return result_option; + } + } + elog(ERROR, + "unknown option '%s'", + option_name); + return nullptr; +} + +static List* +pixelsGetOptions(Oid relid) +{ + ForeignTable *table; + ForeignServer *server; + ForeignDataWrapper *wrapper; + List *options; + ListCell *lc; + + table = GetForeignTable(relid); + server = GetForeignServer(table->serverid); + wrapper = GetForeignDataWrapper(server->fdwid); + + options = NIL; + options = list_concat(options, wrapper->options); + options = list_concat(options, server->options); + options = list_concat(options, table->options); + + return options; +} + +typedef enum +{ + FPS_START = 0, + FPS_IDENT, + FPS_QUOTE +} FileParserState; + +static void +parse_filenames_list(const char *str, List* &filenames) +{ + char *cur = pstrdup(str); + char *f = cur; + FileParserState state = FPS_START; + while (*cur) + { + switch (state) + { + case FPS_START: + switch (*cur) + { + case ' ': + /* just skip */ + break; + case '|': + f = cur + 1; + state = FPS_QUOTE; + break; + default: + /* XXX we should check that *cur is a valid path symbol + * but let's skip it for now */ + state = FPS_IDENT; + f = cur; + break; + } + break; + case FPS_IDENT: + switch (*cur) + { + case ' ': + *cur = '\0'; + filenames = lappend(filenames, makeString(f)); + state = FPS_START; + break; + default: + break; + } + break; + case FPS_QUOTE: + switch (*cur) + { + case '|': + *cur = '\0'; + filenames = lappend(filenames, makeString(f)); + state = FPS_START; + break; + default: + break; + } + break; + default: + elog(ERROR, "pixels_fdw: unknown filename parse state"); + } + cur++; + } +} + +typedef enum +{ + FT_DIGIT = 0, + FT_DECIMAL, + FT_AND, + FT_OR, + FT_GTEQ, + FT_LTEQ, + FT_EQ, + FT_GT, + FT_LT, + FT_LB, + FT_RB, + FT_WORD, + FT_MISMATCH +} FilterType; + + +static void +parse_filter_type(const char *str, + PixelsFilter* &all_filters, + List* &refered_cols) +{ + std::string s = std::string(str); + assert(!s.empty()); + std::regex delim("\\s+"); + std::regex_token_iterator split(s.begin(), s.end(), delim, -1); + std::regex_token_iterator rend; + + std::vector regexs; + regexs.emplace_back(std::regex("\\d+")); + regexs.emplace_back(std::regex("\\d*\\.\\d*")); + regexs.emplace_back(std::regex("&")); + regexs.emplace_back(std::regex("\\|")); + regexs.emplace_back(std::regex(">=")); + regexs.emplace_back(std::regex("<=")); + regexs.emplace_back(std::regex("==")); + regexs.emplace_back(std::regex(">")); + regexs.emplace_back(std::regex("<")); + regexs.emplace_back(std::regex("\\(")); + regexs.emplace_back(std::regex("\\)")); + regexs.emplace_back(std::regex("\\w+")); + + std::vector optypes; + std::vector opnames; + + while (split != rend) { + std::string sub = *split++; + FilterType t = FT_DIGIT; + for (; t < FT_MISMATCH; t = (FilterType)(t + 1)) { + if (std::regex_match(sub, regexs.at(t))) { + break; + } + } + if (t == FT_MISMATCH) { + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option \"%s\"", + sub.c_str()))); + } + optypes.emplace_back(t); + opnames.emplace_back(sub); + } + + optypes.emplace_back(FT_MISMATCH); // Invalid + + std::stack optypes_stack; + std::stack> opnames_stack; + + int ipriority[] = {-1, -1, 4, 2, 6, 6, 6, 6, 6, 0, 7, -1, -1}; + int opriority[] = {-1, -1, 3, 1, 5, 5, 5, 5, 5, 7, 0, -1, -1}; + + std::stack filters; + + optypes_stack.push(FT_MISMATCH); // Invalid + for (int i = 0; i < optypes.size(); ) { + if (optypes.at(i) == FT_DIGIT || optypes.at(i) == FT_DECIMAL) { + opnames_stack.push(std::make_pair(opnames.front(), true)); + opnames.erase(opnames.begin()); + i++; + } + else if (optypes.at(i) == FT_WORD) { + char *refered_col = (char*)palloc0(opnames.front().length() + 1); + strcpy(refered_col, opnames.front().c_str()); + refered_cols = lappend(refered_cols, makeString(refered_col)); + opnames_stack.push(std::make_pair(opnames.front(), false)); + opnames.erase(opnames.begin()); + i++; + } + else { + if (opriority[optypes.at(i)] > ipriority[optypes_stack.top()]) { + optypes_stack.push(optypes.at(i)); + i++; + } + else if (opriority[optypes.at(i)] < ipriority[optypes_stack.top()]) { + switch (optypes_stack.top()) { + case FT_AND: { + PixelsFilter *and_filter = createPixelsFilter(PixelsFilterType::CONJUNCTION_AND, std::string(), 0, 0, string_t()); + if (filters.size() < 2) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + PixelsFilter *filter_2 = filters.top(); + filters.pop(); + PixelsFilter *filter_1 = filters.top(); + filters.pop(); + and_filter->setLChild(filter_1); + and_filter->setRChild(filter_2); + filters.push(and_filter); + break; + } + case FT_OR: { + PixelsFilter *or_filter = createPixelsFilter(PixelsFilterType::CONJUNCTION_OR, std::string(), 0, 0, string_t()); + if (filters.size() < 2) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + PixelsFilter *filter_2 = filters.top(); + filters.pop(); + PixelsFilter *filter_1 = filters.top(); + filters.pop(); + or_filter->setLChild(filter_1); + or_filter->setRChild(filter_2); + filters.push(or_filter); + break; + } + case FT_GTEQ: { + if (opnames_stack.size() < 2) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + std::pair oprand_2 = opnames_stack.top(); + opnames_stack.pop(); + std::pair oprand_1 = opnames_stack.top(); + opnames_stack.pop(); + if (oprand_1.second == oprand_2.second) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + if (!oprand_1.second) { + long ivalue; + double dvalue; + sscanf(oprand_2.first.c_str(), "%ld", &ivalue); + sscanf(oprand_2.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_2.first.c_str()); + char *cname = (char*)palloc0(oprand_1.first.size()); + strcpy(cname, oprand_1.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_GTEQ, oprand_1.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + else { + long ivalue; + double dvalue; + sscanf(oprand_1.first.c_str(), "%ld", &ivalue); + sscanf(oprand_1.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_1.first.c_str()); + char *cname = (char*)palloc0(oprand_2.first.size()); + strcpy(cname, oprand_2.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_LTEQ, oprand_2.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + break; + } + case FT_LTEQ: { + if (opnames_stack.size() < 2) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + std::pair oprand_2 = opnames_stack.top(); + opnames_stack.pop(); + std::pair oprand_1 = opnames_stack.top(); + opnames_stack.pop(); + if (oprand_1.second == oprand_2.second) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + if (!oprand_1.second) { + long ivalue; + double dvalue; + sscanf(oprand_2.first.c_str(), "%ld", &ivalue); + sscanf(oprand_2.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_2.first.c_str()); + char *cname = (char*)palloc0(oprand_1.first.size()); + strcpy(cname, oprand_1.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_LTEQ, oprand_1.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + else { + long ivalue; + double dvalue; + sscanf(oprand_1.first.c_str(), "%ld", &ivalue); + sscanf(oprand_1.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_1.first.c_str()); + char *cname = (char*)palloc0(oprand_2.first.size()); + strcpy(cname, oprand_2.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_GTEQ, oprand_2.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + break; + } + case FT_EQ: { + if (opnames_stack.size() < 2) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + std::pair oprand_2 = opnames_stack.top(); + opnames_stack.pop(); + std::pair oprand_1 = opnames_stack.top(); + opnames_stack.pop(); + if (oprand_1.second == oprand_2.second) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + if (!oprand_1.second) { + long ivalue; + double dvalue; + sscanf(oprand_2.first.c_str(), "%ld", &ivalue); + sscanf(oprand_2.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_2.first.c_str()); + char *cname = (char*)palloc0(oprand_1.first.size()); + strcpy(cname, oprand_1.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_LTEQ, oprand_1.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + else { + long ivalue; + double dvalue; + sscanf(oprand_1.first.c_str(), "%ld", &ivalue); + sscanf(oprand_1.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_1.first.c_str()); + char *cname = (char*)palloc0(oprand_2.first.size()); + strcpy(cname, oprand_2.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_GTEQ, oprand_2.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + break; + } + case FT_GT: { + if (opnames_stack.size() < 2) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + std::pair oprand_2 = opnames_stack.top(); + opnames_stack.pop(); + std::pair oprand_1 = opnames_stack.top(); + opnames_stack.pop(); + if (oprand_1.second == oprand_2.second) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + if (!oprand_1.second) { + long ivalue; + double dvalue; + sscanf(oprand_2.first.c_str(), "%ld", &ivalue); + sscanf(oprand_2.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_2.first.c_str()); + char *cname = (char*)palloc0(oprand_1.first.size()); + strcpy(cname, oprand_1.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_GT, oprand_1.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + else { + long ivalue; + double dvalue; + sscanf(oprand_1.first.c_str(), "%ld", &ivalue); + sscanf(oprand_1.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_1.first.c_str()); + char *cname = (char*)palloc0(oprand_2.first.size()); + strcpy(cname, oprand_2.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_LT, oprand_2.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + break; + } + case FT_LT: { + if (opnames_stack.size() < 2) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + std::pair oprand_2 = opnames_stack.top(); + opnames_stack.pop(); + std::pair oprand_1 = opnames_stack.top(); + opnames_stack.pop(); + if (oprand_1.second == oprand_2.second) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + if (!oprand_1.second) { + long ivalue; + double dvalue; + sscanf(oprand_2.first.c_str(), "%ld", &ivalue); + sscanf(oprand_2.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_2.first.c_str()); + char *cname = (char*)palloc0(oprand_1.first.size()); + strcpy(cname, oprand_1.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_LT, oprand_1.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + else { + long ivalue; + double dvalue; + sscanf(oprand_1.first.c_str(), "%ld", &ivalue); + sscanf(oprand_1.first.c_str(), "%lf", &dvalue); + string_t svalue = string_t(oprand_1.first.c_str()); + char *cname = (char*)palloc0(oprand_2.first.size()); + strcpy(cname, oprand_2.first.c_str()); + + PixelsFilter *gteq_filter = createPixelsFilter(PixelsFilterType::COMPARE_GT, oprand_2.first, ivalue, dvalue, svalue); + filters.push(gteq_filter); + } + break; + } + default: { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error ")); + } + } + optypes_stack.pop(); + } + else { + optypes_stack.pop(); + i++; + } + if (!opnames.empty()) { + opnames.erase(opnames.begin()); + } + } + } + if (opnames_stack.size() != 0 || filters.size() != 1) { + ereport(ERROR, + errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid filter option, parse error")); + } + all_filters = filters.top(); +} + +static void +search_and_merge(const char *col_name, PixelsFilter *root, PixelsFilter *&new_filter) { + if (!root->getColumnName().empty()) { + if ((root->getColumnName().compare(std::string(col_name)) == 0)) { + new_filter = root->copy(); + return; + } + return; + } + else { + assert(root->getFilterType() == PixelsFilterType::CONJUNCTION_AND || root->getFilterType() == PixelsFilterType::CONJUNCTION_OR); + assert(root->getLChild()); + assert(root->getRChild()); + PixelsFilter *new_lchild = nullptr; + search_and_merge(col_name, root->getLChild(), new_lchild); + PixelsFilter *new_rchild = nullptr; + search_and_merge(col_name, root->getRChild(), new_rchild); + if (!new_lchild && !new_rchild) { + return; + } + else if (new_lchild && !new_rchild) { + new_filter = new_lchild; + return; + } + else if (!new_lchild && new_rchild) { + new_filter = new_rchild; + return; + } + else { + new_filter = root->copy(); + new_filter->setColumnName(std::string(col_name)); + new_filter->setLChild(new_lchild); + new_filter->setRChild(new_rchild); + return; + } + } +} + +static void +separate_filters(PixelsFilter* &all_filters, + List* &refered_cols, + List* &col_filters) { + ListCell *lc; + foreach (lc, refered_cols) { + char *col_name = strVal(lfirst(lc)); + PixelsFilter* col_filter = nullptr; + search_and_merge(col_name, all_filters, col_filter); + if(col_filter) { + col_filters = lappend(col_filters, col_filter); + } + } +} + +extern "C" void +pixelsGetForeignRelSize(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid) { + PixelsFdwPlanState *fdw_private; + char* filename = (char*)pixelsGetOption(foreigntableid, + "filename"); + List* filenames = NIL; + parse_filenames_list(filename, filenames); + char* filters = (char*)pixelsGetOption(foreigntableid, + "filters"); + PixelsFilter* all_filters; + List* refered_cols = NIL; + parse_filter_type(filters, all_filters, refered_cols); + List* col_filters = NIL; + separate_filters(all_filters, refered_cols, col_filters); + List* options = pixelsGetOptions(foreigntableid); + fdw_private = createPixelsFdwPlanState(filenames, + col_filters, + options); + baserel->fdw_private = fdw_private; + baserel->tuples = fdw_private->getRowCount(); + baserel->rows = fdw_private->getRowCount(); +} + +static void +estimate_costs(PlannerInfo *root, + RelOptInfo *baserel, + PixelsFdwPlanState *fdw_private, + Cost *startup_cost, + Cost *run_cost, + Cost *total_cost) +{ + double ntuples; + + ntuples = baserel->tuples * clauselist_selectivity(root, + baserel->baserestrictinfo, + 0, + JOIN_INNER, + NULL); + + /* + * Here we assume that parquet tuple cost is the same as regular tuple cost + * even though this is probably not true in many cases. Maybe we'll come up + * with a smarter idea later. Also we use actual number of rows in selected + * rowgroups to calculate cost as we need to process those rows regardless + * of whether they're gonna be filtered out or not. + */ + *run_cost = fdw_private->getRowCount() * cpu_tuple_cost; + *startup_cost = baserel->baserestrictcost.startup; + *total_cost = *startup_cost + *run_cost; + + baserel->rows = ntuples; +} + +static void +extract_used_attributes(RelOptInfo *baserel) +{ + PixelsFdwPlanState *fdw_private = (PixelsFdwPlanState *) baserel->fdw_private; + ListCell *lc; + + pull_varattnos((Node *) baserel->reltarget->exprs, + baserel->relid, + &fdw_private->attrs_used); + + foreach(lc, baserel->baserestrictinfo) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + + pull_varattnos((Node *) rinfo->clause, + baserel->relid, + &fdw_private->attrs_used); + } + + if (bms_is_empty(fdw_private->attrs_used)) + { + bms_free(fdw_private->attrs_used); + fdw_private->attrs_used = bms_make_singleton(1 - FirstLowInvalidHeapAttributeNumber); + } +} + +extern "C" void +pixelsGetForeignPaths(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid) +{ + PixelsFdwPlanState *fdw_private = (PixelsFdwPlanState *) baserel->fdw_private; + Cost startup_cost; + Cost run_cost; + Cost total_cost; + + /* Estimate costs */ + estimate_costs(root, baserel, + fdw_private, + &startup_cost, + &run_cost, + &total_cost); + + extract_used_attributes(baserel); + + add_path(baserel, + (Path *) + create_foreignscan_path(root, + baserel, + NULL, /* default pathtarget */ + baserel->rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + baserel->lateral_relids, + NULL, /* no extra plan */ + NIL)); + /* assume there is no parallel paths, so run_cost omitted*/ +} + +extern "C" ForeignScan * +pixelsGetForeignPlan(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid, + ForeignPath *best_path, + List *tlist, + List *scan_clauses, + Plan *outer_plan) +{ + PixelsFdwPlanState *fdw_private = (PixelsFdwPlanState *) baserel->fdw_private; + List *params = NIL; + List *attrs_used = NIL; + AttrNumber attr; + Index scan_relid = baserel->relid; + + /* + * We have no native ability to evaluate restriction clauses, so we just + * put all the scan_clauses into the plan node's qual list for the + * executor to check. So all we have to do here is strip RestrictInfo + * nodes from the clauses and ignore pseudoconstants (which will be + * handled elsewhere). + */ + scan_clauses = extract_actual_clauses(scan_clauses, + false); + + attr = -1; + while ((attr = bms_next_member(fdw_private->attrs_used, attr)) >= 0) + attrs_used = lappend_int(attrs_used, attr); + + params = lappend(params, fdw_private->getFilesList()); + params = lappend(params, fdw_private->getFiltersList()); + params = lappend(params, attrs_used); + + /* Create the ForeignScan node */ + return make_foreignscan(tlist, + scan_clauses, + scan_relid, + NIL, /* no expressions to evaluate */ + params, + NIL, /* no custom tlist */ + NIL, /* no remote quals */ + outer_plan); +} + +extern "C" void +pixelsExplainForeignScan(ForeignScanState *node, ExplainState *es) +{ + char* filename = (char*)pixelsGetOption(RelationGetRelid(node->ss.ss_currentRelation), + "filename"); + ExplainPropertyText("Pixels File Names: ", + filename, + es); + char* filters = (char*)pixelsGetOption(RelationGetRelid(node->ss.ss_currentRelation), + "filters"); + ExplainPropertyText("Pixels Table Filters: ", + filters, + es); +} + +extern "C" void +pixelsBeginForeignScan(ForeignScanState *node, int eflags) +{ + List *fdw_private = ((ForeignScan *)(node->ss.ps.plan))->fdw_private; + ListCell *lc, *lc2; + List *filenames = NIL; + List *filters = NIL; + List *attrs_list; + std::set attrs_used; + int i = 0; + foreach (lc, fdw_private) + { + switch(i) + { + case 0: + filenames = (List *) lfirst(lc); + break; + case 1: + filters = (List *) lfirst(lc); + break; + case 2: + attrs_list = (List *) lfirst(lc); + foreach (lc2, attrs_list) + attrs_used.insert(lfirst_int(lc2)); + break; + } + ++i; + } + PixelsFdwExecutionState *festate; + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + festate = createPixelsFdwExecutionState(filenames, + filters, + attrs_used, + node->ss.ss_ScanTupleSlot->tts_tupleDescriptor); + node->fdw_state = (void *) festate; +} + + +/* + * pixelsIterateForeignScan + * Read next record from the data file and store it into the + * ScanTupleSlot as a virtual tuple + */ +extern "C" TupleTableSlot * +pixelsIterateForeignScan(ForeignScanState *node) +{ + PixelsFdwExecutionState *festate = (PixelsFdwExecutionState *) node->fdw_state; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + ExecClearTuple(slot); + if (festate->next(slot)) { + return slot; + } + return NULL; +} + +extern "C" void +pixelsReScanForeignScan(ForeignScanState *node) +{ + PixelsFdwExecutionState *festate = (PixelsFdwExecutionState *) node->fdw_state; + festate->rescan(); +} + +extern "C" void +pixelsEndForeignScan(ForeignScanState *node) +{ + PixelsFdwExecutionState *festate = (PixelsFdwExecutionState *) node->fdw_state; + delete festate; +} + +extern "C" bool +pixelsAnalyzeForeignTable(Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages) { + return false; +} + +extern "C" bool +pixelsIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, + RangeTblEntry *rte) { + return false; +} + +extern "C" Datum +pixels_fdw_validator_impl(PG_FUNCTION_ARGS) { + List *options = untransformRelOptions(PG_GETARG_DATUM(0)); + Oid catalog = PG_GETARG_OID(1); + ListCell *opt_lc; + bool filename_provided = false; + bool filters_provided = false; + + /* Only check table options */ + if (catalog != ForeignTableRelationId) + PG_RETURN_VOID(); + + foreach(opt_lc, options) + { + DefElem *def = (DefElem *) lfirst(opt_lc); + + if (strcmp(def->defname, "filename") == 0) + { + char *filename = pstrdup(defGetString(def)); + if (filename) { + filename_provided = true; + } + } + else if (strcmp(def->defname, "filters") == 0) + { + char *filters = pstrdup(defGetString(def)); + if (filters) { + filters_provided = true; + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("pixels_fdw: invalid option \"%s\"", + def->defname))); + } + } + + if (!filename_provided) + elog(ERROR, "pixels_fdw: filename is required"); + + PG_RETURN_VOID(); +} \ No newline at end of file diff --git a/pixels_fdw/test/data/example.pxl b/pixels_fdw/test/data/example.pxl new file mode 100755 index 0000000..9ec1b42 Binary files /dev/null and b/pixels_fdw/test/data/example.pxl differ diff --git a/pixels_fdw/test/data/example_0.pxl b/pixels_fdw/test/data/example_0.pxl new file mode 100755 index 0000000..9ec1b42 Binary files /dev/null and b/pixels_fdw/test/data/example_0.pxl differ diff --git a/pixels_fdw/test/data/example_1.pxl b/pixels_fdw/test/data/example_1.pxl new file mode 100755 index 0000000..9ec1b42 Binary files /dev/null and b/pixels_fdw/test/data/example_1.pxl differ diff --git a/pixels_fdw/third-party/protobuf b/pixels_fdw/third-party/protobuf new file mode 160000 index 0000000..fc6ae67 --- /dev/null +++ b/pixels_fdw/third-party/protobuf @@ -0,0 +1 @@ +Subproject commit fc6ae678ad4319d66864ec3713b4c9262278bd68