Skip to content

Commit a324475

Browse files
committed
libpressio version 0.55.0
Major Changes + launch modules are now user extendable with the libpressio_ext/launch/external_launch.h header this will allow users to provide their own process launching mechanisms where traditional methods are blocked/disallowed. + Support for remote metrics servers which use a JSON protocol was added. This enables servers that use MPI to be nested beneath MPI and allows for metrics servers that have long startup times to remain running and respond to multiple metrics requests. The protocol format is still unstable and may change. + New meta-IO module called `copy_template` which copies a template from another file before dispatching to another IO module for writing. This enables users to copy most of an HDF5 file and overwrite only a portion of it when using it with external metrics. + POSSIBLE BREAKING CHANGE, `external:command` is now a write only setting. If the module supports it, the corresponding option is now external:commands which is a std::vector<std::string>. The old name sets this new parameter by splitting on spaces. + the external metrics module gained the ability to choose to intercept pressio_compressor_compress_many or pressio_compressor_compress via `external:use_many` + New higher level python interface `libpressio.py` It's interface is also unstable and may change frequently. Minor Changes + Support for newer versions of mgard which use mgard::mgard_library as the cmake target Bug Fixes + fixed some static analysis warnings in the test cases.
1 parent f587465 commit a324475

File tree

13 files changed

+819
-71
lines changed

13 files changed

+819
-71
lines changed

CMakeLists.txt

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
cmake_minimum_required(VERSION 3.13 FATAL_ERROR)
2-
project(libpressio VERSION "0.54.1" LANGUAGES CXX C)
2+
project(libpressio VERSION "0.55.0" LANGUAGES CXX C)
33

44
#correct was to set a default build type
55
# https://blog.kitware.com/cmake-and-the-default-build-type/
@@ -53,11 +53,11 @@ add_library(libpressio
5353
./src/plugins/compressors/sampling.cc
5454
./src/plugins/compressors/resize.cc
5555
./src/plugins/compressors/transpose.cc
56+
./src/plugins/launch/external_forkexec.cc
5657
./src/plugins/metrics/ks_test.cc
5758
./src/plugins/metrics/kth_error.cc
5859
./src/plugins/metrics/composite.cc
5960
./src/plugins/metrics/external.cc
60-
./src/plugins/metrics/external_forkexec.cc
6161
./src/plugins/metrics/metrics_base.cc
6262
./src/plugins/metrics/size.cc
6363
./src/plugins/metrics/time.cc
@@ -67,6 +67,7 @@ add_library(libpressio
6767
./src/plugins/metrics/printer.cc
6868
./src/plugins/metrics/region_of_interest.cc
6969
./src/plugins/metrics/noop.cc
70+
./src/plugins/io/copy_template.cc
7071
./src/plugins/io/posix.cc
7172
./src/plugins/io/noop.cc
7273
./src/plugins/io/csv.cc
@@ -177,7 +178,11 @@ if(LIBPRESSIO_HAS_MGARD)
177178
PRIVATE
178179
${CMAKE_CURRENT_SOURCE_DIR}/src/plugins/compressors/mgard_plugin.cc
179180
)
181+
if(TARGET mgard::mgard)
180182
target_link_libraries(libpressio PRIVATE mgard::mgard)
183+
elseif(TARGET mgard::mgard_library)
184+
target_link_libraries(libpressio PRIVATE mgard::mgard_library)
185+
endif()
181186
endif()
182187

183188
option(LIBPRESSIO_HAS_ZFP "build the ZFP plugin" OFF)
@@ -281,7 +286,7 @@ if(LIBPRESSIO_HAS_MPI)
281286
find_package(MPI COMPONENTS CXX)
282287
target_sources(libpressio
283288
PRIVATE
284-
${CMAKE_CURRENT_SOURCE_DIR}/src/plugins/metrics/external_mpispawn.cc
289+
${CMAKE_CURRENT_SOURCE_DIR}/src/plugins/launch/external_mpispawn.cc
285290
)
286291
target_link_libraries(libpressio PRIVATE MPI::MPI_CXX)
287292
endif()
@@ -341,6 +346,19 @@ if(LIBPRESSIO_HAS_LIBDISTRIBUTED)
341346
)
342347
endif()
343348

349+
option(LIBPRESSIO_HAS_REMOTELAUNCH "build the remote external launch plugin" OFF)
350+
if(LIBPRESSIO_HAS_REMOTELAUNCH)
351+
set(LIBPRESSIO_FEATURES "${LIBPRESSIO_FEATURES} remotelaunch")
352+
find_package(nlohmann_json REQUIRED)
353+
find_package(CURL REQUIRED)
354+
target_link_libraries(libpressio PRIVATE nlohmann_json::nlohmann_json CURL::libcurl)
355+
target_sources(libpressio
356+
PRIVATE
357+
${CMAKE_CURRENT_SOURCE_DIR}/src/plugins/launch/external_remotelaunch.cc
358+
)
359+
360+
endif()
361+
344362
configure_file(
345363
${CMAKE_CURRENT_SOURCE_DIR}/src/pressio_version.h.in
346364
${CMAKE_CURRENT_BINARY_DIR}/include/pressio_version.h

src/plugins/metrics/external_launch.h renamed to include/libpressio_ext/launch/external_launch.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct extern_proc_results {
2727

2828
struct libpressio_launch_plugin: public pressio_configurable, public pressio_errorable {
2929
virtual ~libpressio_launch_plugin()=default;
30-
virtual extern_proc_results launch(std::string const&, std::string const&) const =0;
30+
virtual extern_proc_results launch(std::vector<std::string> const&) const =0;
3131
virtual std::unique_ptr<libpressio_launch_plugin> clone() const = 0;
3232
};
3333

src/plugins/io/copy_template.cc

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#include <iostream>
2+
#include <fstream>
3+
#include "pressio_compressor.h"
4+
#include "libpressio_ext/io/posix.h"
5+
#include "libpressio_ext/cpp/pressio.h"
6+
#include "libpressio_ext/cpp/options.h"
7+
#include "libpressio_ext/cpp/data.h"
8+
#include "libpressio_ext/cpp/io.h"
9+
#include "std_compat/memory.h"
10+
11+
struct copy_template_io : public libpressio_io_plugin {
12+
virtual struct pressio_data* read_impl(struct pressio_data* data) override {
13+
return impl->read(data);
14+
}
15+
virtual int write_impl(struct pressio_data const* data) override{
16+
if(not template_path.empty()) {
17+
if(copy_template()) return error_code();
18+
}
19+
return impl->write(data);
20+
}
21+
virtual int set_options_impl(struct pressio_options const& options) override{
22+
get_meta(options, "copy_template:io", io_plugins(), impl_id, impl);
23+
get(options, "copy_template:template_path", &template_path);
24+
get(options, "io:path", &path);
25+
return 0;
26+
}
27+
virtual struct pressio_options get_options_impl() const override{
28+
pressio_options opts;
29+
set_meta(opts, "copy_template:io", impl_id, impl);
30+
set(opts, "copy_template:template_path", template_path);
31+
set(opts, "io:path", path);
32+
return opts;
33+
}
34+
virtual struct pressio_options get_configuration_impl() const override{
35+
pressio_options opts;
36+
set(opts, "pressio:thread_safe", static_cast<int32_t>(pressio_thread_safety_single));
37+
return opts;
38+
}
39+
40+
41+
int patch_version() const override{
42+
return 1;
43+
}
44+
virtual const char* version() const override{
45+
return "0.0.1";
46+
}
47+
const char* prefix() const override {
48+
return "copy_template";
49+
}
50+
51+
std::shared_ptr<libpressio_io_plugin> clone() override {
52+
return compat::make_unique<copy_template_io>(*this);
53+
}
54+
55+
void set_name_impl(std::string const& new_name) override {
56+
impl->set_name(new_name + "/" + impl->prefix());
57+
}
58+
59+
private:
60+
int copy_template() {
61+
std::ifstream template_file(template_path, std::ios::binary);
62+
std::ofstream output_file(path, std::ios::binary);
63+
if(!template_file) return set_error(1, "template_file does not exist");
64+
if(!output_file) return set_error(2, "output_file does not exist");
65+
output_file << template_file.rdbuf();
66+
return 0;
67+
}
68+
69+
std::string path;
70+
std::string template_path;
71+
std::string impl_id = "posix";
72+
pressio_io impl = io_plugins().build("posix");
73+
};
74+
75+
static pressio_register io_copy_template_plugin(io_plugins(), "copy_template", [](){ return compat::make_unique<copy_template_io>(); });

src/plugins/metrics/external_forkexec.cc renamed to src/plugins/launch/external_forkexec.cc

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "external_launch.h"
1+
#include "libpressio_ext/launch/external_launch.h"
22
#include <memory>
33
#include <sstream>
44
#include <unistd.h>
@@ -7,7 +7,7 @@
77
#include "std_compat/memory.h"
88

99
struct external_forkexec: public libpressio_launch_plugin {
10-
extern_proc_results launch(std::string const& full_command, std::string const& workdir) const override {
10+
extern_proc_results launch(std::vector<std::string> const& full_command) const override {
1111
extern_proc_results results;
1212

1313
//create the pipe for stdout
@@ -47,12 +47,11 @@ extern_proc_results launch(std::string const& full_command, std::string const& w
4747
exit(-2);
4848
}
4949

50-
std::istringstream command_stream(full_command);
51-
std::vector<std::string> args_mem(
52-
std::istream_iterator<std::string>{command_stream},
53-
std::istream_iterator<std::string>());
5450
std::vector<char*> args;
55-
std::transform(std::begin(args_mem), std::end(args_mem),
51+
for(auto const& command: commands) {
52+
args.push_back(const_cast<char*>(command.c_str()));
53+
}
54+
std::transform(std::begin(full_command), std::end(full_command),
5655
std::back_inserter(args), [](std::string const& s){return const_cast<char*>(s.c_str());});
5756
args.push_back(nullptr);
5857
if(args.front() != nullptr) {
@@ -107,10 +106,26 @@ extern_proc_results launch(std::string const& full_command, std::string const& w
107106
return "forkexec";
108107
}
109108

109+
int set_options(pressio_options const& options) override {
110+
get(options, "external:workdir", &workdir);
111+
get(options, "external:commands", &commands);
112+
return 0;
113+
}
114+
115+
pressio_options get_options() const override {
116+
pressio_options options;
117+
set(options, "external:workdir", workdir);
118+
set(options, "external:commands", commands);
119+
return options;
120+
}
121+
110122

111123
std::unique_ptr<libpressio_launch_plugin> clone() const override {
112124
return compat::make_unique<external_forkexec>(*this);
113125
}
126+
127+
std::string workdir=".";
128+
std::vector<std::string> commands;
114129
};
115130

116131
static pressio_register launch_forkexec_plugin(launch_plugins(), "forkexec", [](){ return compat::make_unique<external_forkexec>();});

src/plugins/metrics/external_mpispawn.cc renamed to src/plugins/launch/external_mpispawn.cc

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
1-
#include "external_launch.h"
1+
#include "libpressio_ext/launch/external_launch.h"
22
#include <iterator>
33
#include <memory>
44
#include <sstream>
55
#include <mpi.h>
66
#include "std_compat/memory.h"
77

88
struct external_mpispawn: public libpressio_launch_plugin {
9-
extern_proc_results launch(std::string const& full_command, std::string const& workdir) const override {
9+
extern_proc_results launch(std::vector<std::string> const& full_command) const override {
1010
extern_proc_results results;
1111

12-
std::istringstream command_stream(full_command);
13-
std::vector<std::string> args_mem(
14-
std::istream_iterator<std::string>{command_stream},
15-
std::istream_iterator<std::string>());
1612
std::vector<char*> args;
17-
std::transform(std::begin(args_mem), std::end(args_mem),
13+
for (auto const& command: commands) {
14+
args.push_back(const_cast<char*>(command.c_str()));
15+
}
16+
std::transform(std::begin(full_command), std::end(full_command),
1817
std::back_inserter(args), [](std::string const& s){return const_cast<char*>(s.c_str());});
1918
args.push_back(nullptr);
2019

@@ -49,10 +48,25 @@ extern_proc_results launch(std::string const& full_command, std::string const& w
4948
return "mpispawn";
5049
}
5150

51+
int set_options(pressio_options const& options) override {
52+
get(options, "external:workdir", &workdir);
53+
get(options, "external:commands", &commands);
54+
return 0;
55+
}
56+
57+
pressio_options get_options() const override {
58+
pressio_options options;
59+
set(options, "external:workdir", workdir);
60+
set(options, "external:commands", commands);
61+
return options;
62+
}
63+
5264
std::unique_ptr<libpressio_launch_plugin> clone() const override {
5365
return compat::make_unique<external_mpispawn>(*this);
5466
}
5567

68+
std::string workdir=".";
69+
std::vector<std::string> commands;
5670
};
5771

5872
static pressio_register launch_spawn_plugin(launch_plugins(), "mpispawn", [](){ return compat::make_unique<external_mpispawn>();});
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#include "libpressio_ext/launch/external_launch.h"
2+
#include <mutex>
3+
#include "std_compat/memory.h"
4+
#include <nlohmann/json.hpp>
5+
#include <curl/curl.h>
6+
#include <curl/easy.h>
7+
8+
static size_t write_to_std_string(char* txt, size_t size, size_t nelms, void* user_data) {
9+
std::string* user_str = reinterpret_cast<std::string*>(user_data);
10+
user_str->append(txt, size*nelms);
11+
return size*nelms;
12+
}
13+
14+
static std::mutex libpressio_curl_init_lock;
15+
struct libpressio_external_curl_manager {
16+
libpressio_external_curl_manager() {
17+
curl_global_init(CURL_GLOBAL_ALL);
18+
}
19+
~libpressio_external_curl_manager() {
20+
curl_global_cleanup();
21+
}
22+
23+
static std::shared_ptr<libpressio_external_curl_manager> get_library() {
24+
std::lock_guard<std::mutex> guard(libpressio_curl_init_lock);
25+
static std::weak_ptr<libpressio_external_curl_manager> weak{};
26+
if(auto observed = weak.lock())
27+
{
28+
return observed;
29+
} else {
30+
auto library = std::make_shared<libpressio_external_curl_manager>();
31+
weak = library;
32+
return library;
33+
}
34+
}
35+
};
36+
37+
38+
39+
struct external_remote: public libpressio_launch_plugin {
40+
external_remote(std::shared_ptr<libpressio_external_curl_manager>&& curl_singleton):
41+
curl_singleton(curl_singleton) {}
42+
43+
extern_proc_results launch(std::vector<std::string> const& full_command) const override {
44+
extern_proc_results results;
45+
nlohmann::json request;
46+
request["args"] = full_command;
47+
std::string request_str = request.dump();
48+
std::string response_str;
49+
char errbuf[CURL_ERROR_SIZE] = {0};
50+
51+
CURLcode ret;
52+
CURL *hnd;
53+
curl_slist* headers = nullptr;
54+
headers = curl_slist_append(headers, "Content-Type: application/json");
55+
hnd = curl_easy_init();
56+
curl_easy_setopt(hnd, CURLOPT_BUFFERSIZE, 102400L);
57+
curl_easy_setopt(hnd, CURLOPT_URL, connection_string.c_str());
58+
curl_easy_setopt(hnd, CURLOPT_NOPROGRESS, 1L);
59+
curl_easy_setopt(hnd, CURLOPT_USERAGENT, "curl/7.72.0");
60+
curl_easy_setopt(hnd, CURLOPT_MAXREDIRS, 50L);
61+
curl_easy_setopt(hnd, CURLOPT_HTTP_VERSION, (long)CURL_HTTP_VERSION_2TLS);
62+
curl_easy_setopt(hnd, CURLOPT_TCP_KEEPALIVE, 1L);
63+
curl_easy_setopt(hnd, CURLOPT_POST, 1L);
64+
curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, request_str.c_str());
65+
curl_easy_setopt(hnd, CURLOPT_POSTFIELDSIZE, request_str.size());
66+
curl_easy_setopt(hnd, CURLOPT_WRITEFUNCTION, &write_to_std_string);
67+
curl_easy_setopt(hnd, CURLOPT_WRITEDATA, &response_str);
68+
curl_easy_setopt(hnd, CURLOPT_HTTPHEADER, headers);
69+
curl_easy_setopt(hnd, CURLOPT_ERRORBUFFER, errbuf);
70+
71+
ret = curl_easy_perform(hnd);
72+
73+
if(ret != CURLE_OK) {
74+
results.error_code = ret;
75+
if(strlen(errbuf)) {
76+
results.proc_stderr = std::string(errbuf);
77+
} else {
78+
results.proc_stderr = curl_easy_strerror(ret);
79+
}
80+
} else {
81+
try {
82+
nlohmann::json response = nlohmann::json::parse(response_str);
83+
results.proc_stdout = response["stdout"].get<std::string>();
84+
results.proc_stderr = response["stderr"].get<std::string>();
85+
results.return_code = response["return_code"].get<int>();
86+
} catch(nlohmann::json::exception const& e) {
87+
results.proc_stdout = "";
88+
results.proc_stderr = response_str + "\n\n" + e.what();
89+
results.error_code = -1;
90+
}
91+
}
92+
93+
curl_easy_cleanup(hnd);
94+
curl_slist_free_all(headers);
95+
hnd = NULL;
96+
97+
98+
return results;
99+
}
100+
const char* prefix() const override {
101+
return "remote";
102+
}
103+
104+
int set_options(pressio_options const& options) override {
105+
get(options, "external:connection_string", &connection_string);
106+
return 0;
107+
}
108+
109+
pressio_options get_options() const override {
110+
pressio_options options;
111+
set(options, "external:connection_string", connection_string);
112+
return options;
113+
}
114+
115+
std::unique_ptr<libpressio_launch_plugin> clone() const override {
116+
return compat::make_unique<external_remote>(*this);
117+
}
118+
119+
std::string connection_string;
120+
std::shared_ptr<libpressio_external_curl_manager> curl_singleton;
121+
};
122+
123+
static pressio_register launch_spawn_plugin(launch_plugins(), "remote", [](){
124+
return compat::make_unique<external_remote>(
125+
libpressio_external_curl_manager::get_library()
126+
);
127+
});

0 commit comments

Comments
 (0)