-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathl1-rpc.hpp
133 lines (100 loc) · 4.46 KB
/
l1-rpc.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#ifndef L1_RPC_H_
#define L1_RPC_H_
#include <vector>
#include <deque>
#include <thread>
#include <map>
#include <mutex>
#include <condition_variable>
#include <zmq.hpp>
#include <ch_frb_io.hpp>
#include <mask_stats.hpp>
const int default_port_l1_rpc = 5555;
// implementation detail: a struct used to communicate between I/O threads and the RpcServer.
class l1_backend_queue;
class chunk_status_map {
public:
void set(const std::string& filename, const std::string& status, const std::string& error_message);
bool get(const std::string& filename, std::string& status, std::string& error_message);
protected:
// result codes for write_chunk_request() calls:
// filename -> pair(status, error_message)
std::map<std::string, std::pair<std::string, std::string> > _write_chunk_status;
// (and the mutex for it)
std::mutex _status_mutex;
};
// The main L1 RPC server object.
class L1RpcServer {
public:
// Creates a new RPC server listening on the given port, and reading
// from the ring buffers of the given stream.
L1RpcServer(std::shared_ptr<ch_frb_io::intensity_network_stream> stream,
std::shared_ptr<const ch_frb_l1::mask_stats_map> maskstats,
std::vector<std::shared_ptr<const bonsai::dedisperser> > bonsais =
std::vector<std::shared_ptr<const bonsai::dedisperser> >(),
const std::string &port = "",
const std::string &cmdline = "",
std::vector<std::tuple<int, std::string, std::shared_ptr<const rf_pipelines::pipeline_object> > > monitors =
std::vector<std::tuple<int, std::string, std::shared_ptr<const rf_pipelines::pipeline_object> > >(),
zmq::context_t* ctx = NULL
);
~L1RpcServer();
// Main RPC service loop. Does not return.
void run();
// Start the RPC service thread.
std::thread start();
// Returns True if an RPC shutdown() call has been received.
bool is_shutdown();
// equivalent to receiving a shutdown() RPC.
void do_shutdown();
// For testing: enqueue the given chunk for writing.
void enqueue_write_request(std::shared_ptr<ch_frb_io::assembled_chunk>,
std::string filename,
int priority = 0);
protected:
// responds to the given RPC request, either sending immediate
// reply or queuing work for worker threads.
int _handle_request(zmq::message_t* client, zmq::message_t* request);
void _check_backend_queue();
// retrieves assembled_chunks overlapping the given range of
// FPGA-count values from the ring buffers for the given beam IDs.
void _get_chunks(const std::vector<int> &beams,
uint64_t min_fpga, uint64_t max_fpga,
std::vector<std::shared_ptr<ch_frb_io::assembled_chunk> > &chunks);
int _send_frontend_message(zmq::message_t& clientmsg,
zmq::message_t& tokenmsg,
zmq::message_t& contentmsg);
private:
// The command line that launched this L1 process
std::string _command_line;
// ZeroMQ context
zmq::context_t* _ctx;
// true if we created _ctx and thus should delete it
bool _created_ctx;
// Client-facing socket
zmq::socket_t _frontend;
// The "backend queue" is used by the I/O threads, to send WriteChunk_Reply
// objects back to the RpcServer, when write requests complete.
std::shared_ptr<l1_backend_queue> _backend_queue;
// Pool of I/O threads (one thread for each physical device), which accept
// write_chunk_requests from the RpcServer.
ch_frb_io::output_device_pool _output_devices;
// Port the client-facing socket is listening on.
std::string _port;
std::shared_ptr<chunk_status_map> _chunk_status;
// Only protects _shutdown!
std::mutex _q_mutex;
// flag when we are shutting down.
bool _shutdown;
// the stream we are serving RPC requests for.
std::shared_ptr<ch_frb_io::intensity_network_stream> _stream;
// objects holding RFI mask statistics
std::shared_ptr<const ch_frb_l1::mask_stats_map> _mask_stats;
// Bonsai dedisperser objects (used for latency reporting)
std::vector<std::shared_ptr<const bonsai::dedisperser> > _bonsais;
// Latency monitors
std::vector<std::tuple<int, std::string, std::shared_ptr<const rf_pipelines::pipeline_object> > > _latencies;
// server start time
struct timeval _time0;
};
#endif