Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enabled appdirect mode for indexsearcher #222

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion AnnService/inc/Core/BKT/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ namespace SPTAG
ErrorCode SaveIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams);

ErrorCode LoadConfig(Helper::IniReader& p_reader);
ErrorCode LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams);
// Intel PM change
ErrorCode LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams, bool data_in_pm, bool graph_in_pm, std::string p_pm_path);
ErrorCode LoadIndexDataFromMemory(const std::vector<ByteArray>& p_indexBlobs);

ErrorCode BuildIndex(const void* p_data, SizeType p_vectorNum, DimensionType p_dimension);
Expand Down
90 changes: 79 additions & 11 deletions AnnService/inc/Core/Common/Dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,54 @@
#ifndef _SPTAG_COMMON_DATASET_H_
#define _SPTAG_COMMON_DATASET_H_

// Intel - for testing
#include<iostream>


// Intel PM change
#include <memkind.h>

#define ALIGN 32

// round up X to the nearest multiple of Y
#define ROUND_UP(X, Y) \
((((uint64_t)(X) / (Y)) + ((uint64_t)(X) % (Y) != 0)) * (Y))

// alignment test
#define IS_ALIGNED(X, Y) ((uint64_t)(X) % (uint64_t)(Y) == 0)



namespace SPTAG
{
namespace COMMON
{
// Intel PM changge
struct KindWrapper {
KindWrapper() = default;

~KindWrapper() {
if (isinit()) {
memkind_destroy_kind(kind);
}
}

bool isinit() { return kind != nullptr; }

void init(const char* dir) {
if (isinit()) {
return;
}

memkind_create_pmem(dir, 0, &kind);
}

// members
memkind_t kind;
};
static KindWrapper _pmem_kind;
// END Intel PM change

// structure to save Data and Graph
template <typename T>
class Dataset
Expand All @@ -26,26 +70,50 @@ namespace SPTAG

public:
Dataset() {}

Dataset(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, T* data_ = nullptr, bool transferOnwership_ = true)
// BEGIN Intel PM Change
Dataset(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, T* data_ = nullptr, bool transferOnwership_ = true, std::string p_pm_path="" )
{
Initialize(rows_, cols_, rowsInBlock_, capacity_, data_, transferOnwership_);
}
// END Intel PM Change
~Dataset()
{
if (ownData) _mm_free(data);
// BEGIN Intel PM Change
//if (ownData) _mm_free(data);
if (ownData) {
auto kind = memkind_detect_kind(data);
memkind_free(kind, data);
}

for (T* ptr : incBlocks) _mm_free(ptr);
incBlocks.clear();
}
void Initialize(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, T* data_ = nullptr, bool transferOnwership_ = true)
void Initialize(SizeType rows_, DimensionType cols_, SizeType rowsInBlock_, SizeType capacity_, T* data_ = nullptr, bool transferOnwership_ = true, std::string p_pm_path="")
{
rows = rows_;
cols = cols_;
data = data_;
if (data_ == nullptr || !transferOnwership_)
{
ownData = true;
data = (T*)_mm_malloc(((size_t)rows) * cols * sizeof(T), ALIGN);
// Intel PM change
size_t size = ((size_t)rows) * cols * sizeof(T);

if (! IS_ALIGNED(size, ALIGN)) ROUND_UP(size, ALIGN);

if (p_pm_path != "")
{
if (!_pmem_kind.isinit()) {
_pmem_kind.init(p_pm_path.c_str());
}

data = (T*)memkind_malloc(_pmem_kind.kind, size);
}
else
{
data = (T*)memkind_malloc(MEMKIND_DEFAULT, size);
}

if (data_ != nullptr) memcpy(data, data_, ((size_t)rows) * cols * sizeof(T));
else std::memset(data, -1, ((size_t)rows) * cols * sizeof(T));
}
Expand Down Expand Up @@ -155,27 +223,27 @@ namespace SPTAG
return Save(ptr);
}

ErrorCode Load(std::shared_ptr<Helper::DiskPriorityIO> pInput, SizeType blockSize, SizeType capacity)
ErrorCode Load(std::shared_ptr<Helper::DiskPriorityIO> pInput, SizeType blockSize, SizeType capacity, std::string p_pm_path="")
{
IOBINARY(pInput, ReadBinary, sizeof(SizeType), (char*)&rows);
IOBINARY(pInput, ReadBinary, sizeof(DimensionType), (char*)&cols);

Initialize(rows, cols, blockSize, capacity);
Initialize(rows, cols, blockSize, capacity, nullptr, true, p_pm_path);
IOBINARY(pInput, ReadBinary, sizeof(T) * cols * rows, (char*)data);
LOG(Helper::LogLevel::LL_Info, "Load %s (%d,%d) Finish!\n", name.c_str(), rows, cols);
return ErrorCode::Success;
}

ErrorCode Load(std::string sDataPointsFileName, SizeType blockSize, SizeType capacity)
ErrorCode Load(std::string sDataPointsFileName, SizeType blockSize, SizeType capacity, std::string p_pm_path="")
{
LOG(Helper::LogLevel::LL_Info, "Load %s From %s\n", name.c_str(), sDataPointsFileName.c_str());
auto ptr = f_createIO();
if (ptr == nullptr || !ptr->Initialize(sDataPointsFileName.c_str(), std::ios::binary | std::ios::in)) return ErrorCode::FailedOpenFile;
return Load(ptr, blockSize, capacity);
return Load(ptr, blockSize, capacity, p_pm_path);
}

// Functions for loading models from memory mapped files
ErrorCode Load(char* pDataPointsMemFile, SizeType blockSize, SizeType capacity)
ErrorCode Load(char* pDataPointsMemFile, SizeType blockSize, SizeType capacity, std::string p_pm_path="")
{
SizeType R;
DimensionType C;
Expand All @@ -185,7 +253,7 @@ namespace SPTAG
C = *((DimensionType*)pDataPointsMemFile);
pDataPointsMemFile += sizeof(DimensionType);

Initialize(R, C, blockSize, capacity, (T*)pDataPointsMemFile);
Initialize(R, C, blockSize, capacity, (T*)pDataPointsMemFile,true, p_pm_path);
LOG(Helper::LogLevel::LL_Info, "Load %s (%d,%d) Finish!\n", name.c_str(), R, C);
return ErrorCode::Success;
}
Expand Down
12 changes: 6 additions & 6 deletions AnnService/inc/Core/Common/NeighborhoodGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,30 +451,30 @@ namespace SPTAG
return m_pNeighborhoodGraph.BufferSize();
}

ErrorCode LoadGraph(std::shared_ptr<Helper::DiskPriorityIO> input, SizeType blockSize, SizeType capacity)
ErrorCode LoadGraph(std::shared_ptr<Helper::DiskPriorityIO> input, SizeType blockSize, SizeType capacity, std::string p_pm_path="")
{
ErrorCode ret = ErrorCode::Success;
if ((ret = m_pNeighborhoodGraph.Load(input, blockSize, capacity)) != ErrorCode::Success) return ret;
if ((ret = m_pNeighborhoodGraph.Load(input, blockSize, capacity, p_pm_path)) != ErrorCode::Success) return ret;

m_iGraphSize = m_pNeighborhoodGraph.R();
m_iNeighborhoodSize = m_pNeighborhoodGraph.C();
return ret;
}

ErrorCode LoadGraph(std::string sGraphFilename, SizeType blockSize, SizeType capacity)
ErrorCode LoadGraph(std::string sGraphFilename, SizeType blockSize, SizeType capacity, std::string p_pm_path="")
{
ErrorCode ret = ErrorCode::Success;
if ((ret = m_pNeighborhoodGraph.Load(sGraphFilename, blockSize, capacity)) != ErrorCode::Success) return ret;
if ((ret = m_pNeighborhoodGraph.Load(sGraphFilename, blockSize, capacity, p_pm_path)) != ErrorCode::Success) return ret;

m_iGraphSize = m_pNeighborhoodGraph.R();
m_iNeighborhoodSize = m_pNeighborhoodGraph.C();
return ret;
}

ErrorCode LoadGraph(char* pGraphMemFile, SizeType blockSize, SizeType capacity)
ErrorCode LoadGraph(char* pGraphMemFile, SizeType blockSize, SizeType capacity, std::string p_pm_path="")
{
ErrorCode ret = ErrorCode::Success;
if ((ret = m_pNeighborhoodGraph.Load(pGraphMemFile, blockSize, capacity)) != ErrorCode::Success) return ret;
if ((ret = m_pNeighborhoodGraph.Load(pGraphMemFile, blockSize, capacity, p_pm_path)) != ErrorCode::Success) return ret;

m_iGraphSize = m_pNeighborhoodGraph.R();
m_iNeighborhoodSize = m_pNeighborhoodGraph.C();
Expand Down
3 changes: 2 additions & 1 deletion AnnService/inc/Core/KDT/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ namespace SPTAG
ErrorCode SaveIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams);

ErrorCode LoadConfig(Helper::IniReader& p_reader);
ErrorCode LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams);
// Intel PM change
ErrorCode LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams, bool data_in_pm, bool graph_in_pm, std::string p_pm_path);
ErrorCode LoadIndexDataFromMemory(const std::vector<ByteArray>& p_indexBlobs);

ErrorCode BuildIndex(const void* p_data, SizeType p_vectorNum, DimensionType p_dimension);
Expand Down
8 changes: 5 additions & 3 deletions AnnService/inc/Core/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class VectorIndex

static std::shared_ptr<VectorIndex> CreateInstance(IndexAlgoType p_algo, VectorValueType p_valuetype);

static ErrorCode LoadIndex(const std::string& p_loaderFilePath, std::shared_ptr<VectorIndex>& p_vectorIndex);
// Intel PM Change
static ErrorCode LoadIndex(const std::string& p_loaderFilePath, std::shared_ptr<VectorIndex>& p_vectorIndex, bool data_in_pm=false, bool graph_in_pm=false, std::string p_pm_path="");

static ErrorCode LoadIndexFromFile(const std::string& p_file, std::shared_ptr<VectorIndex>& p_vectorIndex);

Expand All @@ -118,8 +119,9 @@ class VectorIndex
virtual ErrorCode SaveIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams) = 0;

virtual ErrorCode LoadConfig(Helper::IniReader& p_reader) = 0;

virtual ErrorCode LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams) = 0;

// Intel PM Change
virtual ErrorCode LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams, bool data_in_pm=false, bool graph_in_pm=false, std::string p_pm_path="") = 0;

virtual ErrorCode LoadIndexDataFromMemory(const std::vector<ByteArray>& p_indexBlobs) = 0;

Expand Down
11 changes: 8 additions & 3 deletions AnnService/src/Core/BKT/BKTIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,19 @@ namespace SPTAG
}

template <typename T>
ErrorCode Index<T>::LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams)
ErrorCode Index<T>::LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams, bool data_in_pm , bool graph_in_pm, std::string p_pm_path )
{
if (p_indexStreams.size() < 4) return ErrorCode::LackOfInputs;

std::string data_pm_path = "";
if (data_in_pm) data_pm_path = p_pm_path;
std::string graph_pm_path = "";
if (graph_in_pm) graph_pm_path = p_pm_path;

ErrorCode ret = ErrorCode::Success;
if (p_indexStreams[0] == nullptr || (ret = m_pSamples.Load(p_indexStreams[0], m_iDataBlockSize, m_iDataCapacity)) != ErrorCode::Success) return ret;
if (p_indexStreams[0] == nullptr || (ret = m_pSamples.Load(p_indexStreams[0], m_iDataBlockSize, m_iDataCapacity,data_pm_path)) != ErrorCode::Success) return ret;
if (p_indexStreams[1] == nullptr || (ret = m_pTrees.LoadTrees(p_indexStreams[1])) != ErrorCode::Success) return ret;
if (p_indexStreams[2] == nullptr || (ret = m_pGraph.LoadGraph(p_indexStreams[2], m_iDataBlockSize, m_iDataCapacity)) != ErrorCode::Success) return ret;
if (p_indexStreams[2] == nullptr || (ret = m_pGraph.LoadGraph(p_indexStreams[2], m_iDataBlockSize, m_iDataCapacity, graph_pm_path)) != ErrorCode::Success) return ret;
if (p_indexStreams[3] == nullptr) m_deletedID.Initialize(m_pSamples.R(), m_iDataBlockSize, m_iDataCapacity);
else if ((ret = m_deletedID.Load(p_indexStreams[3], m_iDataBlockSize, m_iDataCapacity)) != ErrorCode::Success) return ret;

Expand Down
11 changes: 8 additions & 3 deletions AnnService/src/Core/KDT/KDTIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,19 @@ namespace SPTAG
}

template <typename T>
ErrorCode Index<T>::LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams)
ErrorCode Index<T>::LoadIndexData(const std::vector<std::shared_ptr<Helper::DiskPriorityIO>>& p_indexStreams, bool data_in_pm , bool graph_in_pm , std::string p_pm_path )
{
if (p_indexStreams.size() < 4) return ErrorCode::LackOfInputs;

std::string data_pm_path = "";
if (data_in_pm) data_pm_path = p_pm_path;
std::string graph_pm_path = "";
if (graph_in_pm) graph_pm_path = p_pm_path;

ErrorCode ret = ErrorCode::Success;
if (p_indexStreams[0] == nullptr || (ret = m_pSamples.Load(p_indexStreams[0], m_iDataBlockSize, m_iDataCapacity)) != ErrorCode::Success) return ret;
if (p_indexStreams[0] == nullptr || (ret = m_pSamples.Load(p_indexStreams[0], m_iDataBlockSize, m_iDataCapacity,data_pm_path)) != ErrorCode::Success) return ret;
if (p_indexStreams[1] == nullptr || (ret = m_pTrees.LoadTrees(p_indexStreams[1])) != ErrorCode::Success) return ret;
if (p_indexStreams[2] == nullptr || (ret = m_pGraph.LoadGraph(p_indexStreams[2], m_iDataBlockSize, m_iDataCapacity)) != ErrorCode::Success) return ret;
if (p_indexStreams[2] == nullptr || (ret = m_pGraph.LoadGraph(p_indexStreams[2], m_iDataBlockSize, m_iDataCapacity, graph_pm_path)) != ErrorCode::Success) return ret;
if (p_indexStreams[3] == nullptr) m_deletedID.Initialize(m_pSamples.R(), m_iDataBlockSize, m_iDataCapacity);
else if ((ret = m_deletedID.Load(p_indexStreams[3], m_iDataBlockSize, m_iDataCapacity)) != ErrorCode::Success) return ret;

Expand Down
4 changes: 2 additions & 2 deletions AnnService/src/Core/VectorIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ VectorIndex::CreateInstance(IndexAlgoType p_algo, VectorValueType p_valuetype)


ErrorCode
VectorIndex::LoadIndex(const std::string& p_loaderFilePath, std::shared_ptr<VectorIndex>& p_vectorIndex)
VectorIndex::LoadIndex(const std::string& p_loaderFilePath, std::shared_ptr<VectorIndex>& p_vectorIndex, bool data_in_pm, bool graph_in_pm, std::string p_pm_path)
{
std::string folderPath(p_loaderFilePath);
if (!folderPath.empty() && *(folderPath.rbegin()) != FolderSep) folderPath += FolderSep;
Expand Down Expand Up @@ -472,7 +472,7 @@ VectorIndex::LoadIndex(const std::string& p_loaderFilePath, std::shared_ptr<Vect
handles.push_back(std::move(ptr));
}

if ((ret = p_vectorIndex->LoadIndexData(handles)) != ErrorCode::Success) return ret;
if ((ret = p_vectorIndex->LoadIndexData(handles,data_in_pm,graph_in_pm,p_pm_path)) != ErrorCode::Success) return ret;

if (iniReader.DoesSectionExist("MetaData"))
{
Expand Down
21 changes: 18 additions & 3 deletions AnnService/src/IndexSearcher/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class SearcherOptions : public Helper::ReaderOptions
AddOptionalOption(m_withMeta, "-a", "--withmeta", "Output metadata instead of vector id.");
AddOptionalOption(m_K, "-k", "--KNN", "K nearest neighbors for search.");
AddOptionalOption(m_batch, "-b", "--batchsize", "Batch query size.");
//Intel PM change
AddOptionalOption(m_pm_path, "-p", "--pm_path", "Path to Persistent Memory pool.");
AddOptionalOption(m_vectors_in_pm, "-e", "--vectors_in_pm", "Vectors will be stored in Persistent Memory pool.");
AddOptionalOption(m_graph_in_pm, "-g", "--graph_in_pm", "Graph will be stored in Persistent Memory pool.");
}

~SearcherOptions() {}
Expand All @@ -45,6 +49,10 @@ class SearcherOptions : public Helper::ReaderOptions
int m_K = 32;

int m_batch = 10000;

std::string m_pm_path = "";
bool m_vectors_in_pm = false;
bool m_graph_in_pm = false;
};

template <typename T>
Expand Down Expand Up @@ -140,6 +148,7 @@ int Process(std::shared_ptr<SearcherOptions> options, VectorIndex& index)
std::vector<std::set<SizeType>> truth(options->m_batch);
std::vector<QueryResult> results(options->m_batch, QueryResult(NULL, options->m_K, options->m_withMeta != 0));
std::vector<clock_t> latencies(options->m_batch + 1, 0);
std::vector<double> latency_stats(options->m_batch, 0);
int baseSquare = SPTAG::COMMON::Utils::GetBase<T>() * SPTAG::COMMON::Utils::GetBase<T>();

LOG(Helper::LogLevel::LL_Info, "[query]\t\t[maxcheck]\t[avg] \t[99%] \t[95%] \t[recall] \t[mem]\n");
Expand All @@ -150,7 +159,13 @@ int Process(std::shared_ptr<SearcherOptions> options, VectorIndex& index)
for (SizeType i = 0; i < numQuerys; i++) results[i].SetTarget(queryVectors->GetVector(startQuery + i));
if (ftruth.is_open()) LoadTruth(ftruth, truth, numQuerys, options->m_K);

SizeType subSize = (numQuerys - 1) / omp_get_num_threads() + 1;
SizeType subSize ;
#pragma omp parallel
{
#pragma omp single
subSize = (numQuerys - 1) / omp_get_num_threads() + 1;
}

for (int mc = 0; mc < maxCheck.size(); mc++)
{
index.SetParameter("MaxCheck", maxCheck[mc].c_str());
Expand Down Expand Up @@ -265,7 +280,7 @@ int main(int argc, char** argv)
}

std::shared_ptr<SPTAG::VectorIndex> vecIndex;
auto ret = SPTAG::VectorIndex::LoadIndex(options->m_indexFolder, vecIndex);
auto ret = SPTAG::VectorIndex::LoadIndex(options->m_indexFolder, vecIndex, options->m_vectors_in_pm, options->m_graph_in_pm, options->m_pm_path);
if (SPTAG::ErrorCode::Success != ret || nullptr == vecIndex)
{
LOG(Helper::LogLevel::LL_Error, "Cannot open index configure file!");
Expand Down Expand Up @@ -312,4 +327,4 @@ int main(int argc, char** argv)
default: break;
}
return 0;
}
}
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ else()
message (FATAL_ERROR "Could no find openmp!")
endif()

find_package(Boost 1.67 COMPONENTS system thread serialization wserialization regex filesystem)
find_package(Boost 1.66 COMPONENTS system thread serialization wserialization regex filesystem)
if (Boost_FOUND)
include_directories (${Boost_INCLUDE_DIR})
link_directories (${Boost_LIBRARY_DIR})
Expand All @@ -90,6 +90,8 @@ else()
message (FATAL_ERROR "Could not find Boost >= 1.67!")
endif()

link_libraries(memkind)

option(GPU "GPU" ON)
option(LIBRARYONLY "LIBRARYONLY" OFF)
add_subdirectory (AnnService)
Expand Down
2 changes: 1 addition & 1 deletion Test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ if (NOT LIBRARYONLY)
message (STATUS "BOOST_TEST_DYN_LINK")
endif()

find_package(Boost 1.67 COMPONENTS system thread serialization wserialization regex filesystem unit_test_framework)
find_package(Boost 1.66 COMPONENTS system thread serialization wserialization regex filesystem unit_test_framework)
if (Boost_FOUND)
include_directories (${Boost_INCLUDE_DIR})
link_directories (${Boost_LIBRARY_DIR})
Expand Down