Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: TheoRadig/webhdfspp
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: hpi-epic/webhdfspp
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 4 commits
  • 9 files changed
  • 2 contributors

Commits on Nov 15, 2022

  1. track GET / PUT requests

    TheoRadig committed Nov 15, 2022
    Copy the full SHA
    b9485b6 View commit details

Commits on Jan 11, 2023

  1. Move to CMake 3

    tobodner authored Jan 11, 2023
    Copy the full SHA
    4315aeb View commit details

Commits on Jun 21, 2023

  1. Copy the full SHA
    8de0fdf View commit details

Commits on Sep 12, 2023

  1. Bump CMake version to 3.5

    tobodner authored Sep 12, 2023
    Copy the full SHA
    22e1a4a View commit details
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 2.8)
cmake_minimum_required(VERSION 3.5)
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)

enable_testing()
10 changes: 8 additions & 2 deletions include/webhdfspp/webhdfspp.h
Original file line number Diff line number Diff line change
@@ -35,17 +35,23 @@ class non_copyable {
void operator=(non_copyable const &) = delete;
};

struct RequestTracker {
size_t get_count = 0;
size_t put_count = 0;
};

struct Options {
std::vector<std::pair<std::string, short>> namenodes;
char* ssl_cert = nullptr;
char* ssl_key = nullptr;
std::string scheme;
std::vector<std::string> header;
RequestTracker request_tracker;
};

class IoService : non_copyable {
public:
static IoService *New(const Options &options);
static IoService *New(std::shared_ptr<Options> options);
virtual Status Run() = 0;
virtual void Stop() = 0;
virtual ~IoService();
@@ -97,7 +103,7 @@ class OutputStream : non_copyable {
class FileSystem : non_copyable {
public:
virtual ~FileSystem();
static Status New(const Options &options,
static Status New(std::shared_ptr<Options> options,
std::shared_ptr<IoService> io_service, FileSystem **fsptr);
virtual Status Delete(const std::string &path, bool recursive) = 0;
virtual Status GetFileStatus(const std::string &path,
20 changes: 10 additions & 10 deletions lib/webhdfspp/filesystem.cc
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ namespace webhdfspp {

class WebHdfsFileSystem : public FileSystem {
public:
WebHdfsFileSystem(const Options &options,
WebHdfsFileSystem(std::shared_ptr<Options> options,
std::shared_ptr<IoService> io_service);
virtual Status Delete(const std::string &path, bool recursive) override;
virtual Status GetFileStatus(const std::string &path,
@@ -47,7 +47,7 @@ class WebHdfsFileSystem : public FileSystem {
Status List(const std::string &path, std::shared_ptr<FileStatuses> file_statuses) override;

private:
Options options_;
std::shared_ptr<Options> options_;
std::shared_ptr<IoServiceImpl> io_service_;
int active_endpoint_;

@@ -58,22 +58,22 @@ class WebHdfsFileSystem : public FileSystem {

FileSystem::~FileSystem() {}

Status FileSystem::New(const Options &options,
Status FileSystem::New(std::shared_ptr<Options> options,
std::shared_ptr<IoService> io_service,
FileSystem **fsptr) {
*fsptr = new WebHdfsFileSystem(options, io_service);
return Status::OK();
}

WebHdfsFileSystem::WebHdfsFileSystem(const Options &options,
WebHdfsFileSystem::WebHdfsFileSystem(std::shared_ptr<Options> options,
std::shared_ptr<IoService> io_service)
: options_(options),
io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)),
active_endpoint_(0) {}

Status WebHdfsFileSystem::Delete(const std::string &path, bool recursive) {
const auto &nn = options_.namenodes[active_endpoint_];
const auto &scheme = options_.scheme;
const auto &nn = options_->namenodes[active_endpoint_];
const auto &scheme = options_->scheme;

URIBuilder builder;
auto uri = builder.Scheme(scheme)
@@ -96,8 +96,8 @@ Status WebHdfsFileSystem::Delete(const std::string &path, bool recursive) {

Status WebHdfsFileSystem::GetFileStatus(const std::string &path,
FileStatus *stat) {
const auto &nn = options_.namenodes[active_endpoint_];
const auto &scheme = options_.scheme;
const auto &nn = options_->namenodes[active_endpoint_];
const auto &scheme = options_->scheme;

URIBuilder builder;
auto uri = builder.Scheme(scheme)
@@ -164,8 +164,8 @@ Status WebHdfsFileSystem::Open(const std::string &path,
}

Status WebHdfsFileSystem::List(const std::string &path, std::shared_ptr<FileStatuses> statuses) {
const auto &nn = options_.namenodes[active_endpoint_];
const auto &scheme = options_.scheme;
const auto &nn = options_->namenodes[active_endpoint_];
const auto &scheme = options_->scheme;

URIBuilder builder;
auto uri = builder.Scheme(scheme)
6 changes: 3 additions & 3 deletions lib/webhdfspp/inputstream.cc
Original file line number Diff line number Diff line change
@@ -22,17 +22,17 @@ namespace webhdfspp {

InputStream::~InputStream() {}

InputStreamImpl::InputStreamImpl(const Options &options,
InputStreamImpl::InputStreamImpl(std::shared_ptr<Options> options,
const std::string &path,
std::shared_ptr<IoServiceImpl> io_service, int active_endpoint)
: options_(options), path_(path), io_service_(io_service), active_endpoint_(active_endpoint) {}

Status InputStreamImpl::PositionRead(
size_t max_read_bytes, size_t offset,
const std::function<size_t(const char *, size_t)> &on_data_arrived) {
const auto nn = options_.namenodes[active_endpoint_];
const auto nn = options_->namenodes[active_endpoint_];
URIBuilder builder;
auto uri = builder.Scheme(options_.scheme)
auto uri = builder.Scheme(options_->scheme)
.Host(nn.first)
.Port(nn.second)
.Path("/webhdfs/v1" + path_)
4 changes: 2 additions & 2 deletions lib/webhdfspp/inputstream_impl.h
Original file line number Diff line number Diff line change
@@ -25,15 +25,15 @@ namespace webhdfspp {

class InputStreamImpl : public InputStream {
public:
InputStreamImpl(const Options &options,
InputStreamImpl(std::shared_ptr<Options> options,
const std::string &path,
std::shared_ptr<IoServiceImpl> io_service, int active_endpoint);
virtual Status PositionRead(size_t max_read_bytes, size_t offset,
const std::function<size_t(const char *, size_t)>
&on_data_arrived) override;

private:
const Options options_;
std::shared_ptr<Options> options_;
std::string path_;
std::shared_ptr<IoServiceImpl> io_service_;
int active_endpoint_;
18 changes: 11 additions & 7 deletions lib/webhdfspp/io_service.cc
Original file line number Diff line number Diff line change
@@ -27,11 +27,11 @@

namespace webhdfspp {

IoServiceImpl::IoServiceImpl(const Options &options) : options_(options) {}
IoServiceImpl::IoServiceImpl(std::shared_ptr<Options> options) : options_(options) {}

IoServiceImpl::~IoServiceImpl() {}

IoService *IoService::New(const Options &options) { return new IoServiceImpl(options); }
IoService *IoService::New(std::shared_ptr<Options> options) { return new IoServiceImpl(options); }

IoService::~IoService() {}

@@ -74,19 +74,19 @@ Status IoServiceImpl::DoNNRequest(const URIBuilder &uri,
}

void IoServiceImpl::AddCustomHeader(void *handle) const {
if (!options_.header.empty()) {
if (!options_->header.empty()) {
struct curl_slist *headers = nullptr;
for (const auto& header : options_.header) {
for (const auto& header : options_->header) {
headers = curl_slist_append(headers, header.c_str());
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);
}
}

void IoServiceImpl::SetAuthentication(void *handle) const {
if (options_.ssl_cert && options_.ssl_key) {
curl_easy_setopt(handle, CURLOPT_SSLCERT, options_.ssl_cert);
curl_easy_setopt(handle, CURLOPT_SSLKEY, options_.ssl_key);
if (options_->ssl_cert && options_->ssl_key) {
curl_easy_setopt(handle, CURLOPT_SSLCERT, options_->ssl_cert);
curl_easy_setopt(handle, CURLOPT_SSLKEY, options_->ssl_key);
}
}

@@ -100,6 +100,7 @@ Status IoServiceImpl::DoDNGet(

AddCustomHeader(handle);
SetAuthentication(handle);
options_->request_tracker.get_count++;

curl_easy_setopt(handle, CURLOPT_URL, uri_str.c_str());
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, error_buffer);
@@ -164,6 +165,9 @@ Status IoServiceImpl::DoPutCreate(const URIBuilder &uri, const char* data, size_

SetAuthentication(handle);
AddCustomHeader(handle);

options_->request_tracker.put_count++;

curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "PUT");
curl_easy_setopt(handle, CURLOPT_URL, uri_str.c_str());
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, error_buffer);
4 changes: 2 additions & 2 deletions lib/webhdfspp/io_service_impl.h
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ namespace webhdfspp {
class IoServiceImpl : public IoService,
public std::enable_shared_from_this<IoServiceImpl> {
public:
IoServiceImpl(const Options &options);
IoServiceImpl(std::shared_ptr<Options> options);
~IoServiceImpl();
virtual Status Run() override;
virtual void Stop() override;
@@ -55,7 +55,7 @@ class IoServiceImpl : public IoService,

void AddCustomHeader(void *handle) const;

const Options options_;
std::shared_ptr<Options> options_;
};
} // namespace webhdfspp

6 changes: 3 additions & 3 deletions lib/webhdfspp/outputstream.cc
Original file line number Diff line number Diff line change
@@ -6,14 +6,14 @@ namespace webhdfspp {

OutputStream::~OutputStream() {}

OutputStreamImpl::OutputStreamImpl(Options options, std::string path,
OutputStreamImpl::OutputStreamImpl(std::shared_ptr<Options> options, std::string path,
std::shared_ptr<IoServiceImpl> io_service, int active_endpoint, bool overwrite)
: options_(std::move(options)), path_(std::move(path)), io_service_(std::move(io_service)),
active_endpoint_(active_endpoint), overwrite_(overwrite) {}

Status OutputStreamImpl::WriteFile(const char* data, size_t nbyte) {
const auto& nn = options_.namenodes[active_endpoint_];
const auto& scheme = options_.scheme;
const auto& nn = options_->namenodes[active_endpoint_];
const auto& scheme = options_->scheme;
URIBuilder builder;
auto uri = builder.Scheme(scheme).Host(nn.first).Port(nn.second).Path("/webhdfs/v1" + path_).Param("op", "CREATE");

4 changes: 2 additions & 2 deletions lib/webhdfspp/outputstream_impl.h
Original file line number Diff line number Diff line change
@@ -25,14 +25,14 @@ namespace webhdfspp {

class OutputStreamImpl : public OutputStream {
public:
OutputStreamImpl(Options options,
OutputStreamImpl(std::shared_ptr<Options> options,
std::string path,
std::shared_ptr<IoServiceImpl> io_service, int active_endpoint, bool overwrite);

virtual Status WriteFile(const char* data, size_t nbyte) override;

private:
const Options options_;
std::shared_ptr<Options> options_;
std::string path_;
std::shared_ptr<IoServiceImpl> io_service_;
int active_endpoint_;