Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CPP += -Ibitshuffle
INCFILES=ch_frb_io.hpp \
ch_frb_io_internals.hpp \
assembled_chunk_msgpack.hpp \
msgpack_binary_vector.hpp \
bitshuffle/bitshuffle.h bitshuffle/bitshuffle_core.h \
bitshuffle/bitshuffle_internals.h bitshuffle/iochain.h \
chlog.hpp
Expand Down
1 change: 1 addition & 0 deletions assembled_chunk_ringbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream:
max_fpga_flushed(0),
max_fpga_retrieved(0),
first_fpgacount(0),
first_packet_received(false),
ini_params(ini_params_),
beam_id(beam_id_),
stream_id(stream_id_),
Expand Down
1 change: 1 addition & 0 deletions ch_frb_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class intensity_network_stream : noncopyable {

// Returns the first fpgacount of the first chunk sent downstream by
// the given beam id.
// Raises runtime_error if the first packet has not been received yet.
uint64_t get_first_fpga_count(int beam);

// Returns the last FPGA count processed by each of the assembler,
Expand Down
8 changes: 4 additions & 4 deletions ch_frb_io_internals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ class assembled_chunk_ringbuf : noncopyable,
std::atomic<uint64_t> max_fpga_retrieved;
// The fpgacount of the first chunk produced by this stream
std::atomic<uint64_t> first_fpgacount;


// Set to 'true' in the first call to put_unassembled_packet().
std::atomic<bool> first_packet_received;

assembled_chunk_ringbuf(const intensity_network_stream::initializer &ini_params, int beam_id, int stream_id);

~assembled_chunk_ringbuf();
Expand Down Expand Up @@ -347,9 +350,6 @@ class assembled_chunk_ringbuf : noncopyable,

output_device_pool output_devices;

// Set to 'true' in the first call to put_unassembled_packet().
bool first_packet_received = false;

// Helper function called in assembler thread, to add a new assembled_chunk to the ring buffer.
// Resets 'chunk' to a null pointer.
// Warning: only safe to call from assembler thread.
Expand Down
7 changes: 5 additions & 2 deletions intensity_network_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,12 @@ uint64_t intensity_network_stream::get_first_fpga_count(int beam) {
// Which of my assemblers (if any) is handling the requested beam?
int nbeams = this->ini_params.beam_ids.size();
for (int i=0; i<nbeams; i++)
if (this->ini_params.beam_ids[i] == beam)
if (this->ini_params.beam_ids[i] == beam) {
if (!this->assemblers[i]->first_packet_received)
throw runtime_error("ch_frb_io: get_first_fpga_count called, but first packet has not been received yet.");
return this->assemblers[i]->first_fpgacount;
return 0;
}
throw runtime_error("ch_frb_io internal error: beam_id not found in intensity_network_stream::get_first_fpga_count()");
}

void intensity_network_stream::get_max_fpga_count_seen(vector<uint64_t> &flushed,
Expand Down
73 changes: 73 additions & 0 deletions msgpack_binary_vector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#ifndef _MSGPACK_BINARY_VECTOR_HPP
#define _MSGPACK_BINARY_VECTOR_HPP

#include <vector>
#include <iostream>

#include <msgpack.hpp>

namespace ch_frb_io {

template<typename T>
class msgpack_binary_vector : public std::vector<T>
{};

}

namespace msgpack {
MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS) {
namespace adaptor {

template<typename T>
struct convert<ch_frb_io::msgpack_binary_vector<T> > {
msgpack::object const& operator()(msgpack::object const& o,
ch_frb_io::msgpack_binary_vector<T>& v) const {
//std::cout << "msgpack_binary_vector: type " << o.type << std::endl;
if (o.type != msgpack::type::ARRAY)
throw std::runtime_error("msgpack_binary_vector: expected type ARRAY");
// Make sure array is big enough to check version
//std::cout << "msgpack_binary_vector: array size " << o.via.array.size << std::endl;
if (o.via.array.size != 3)
throw std::runtime_error("msgpack_binary_vector: expected array size 3");
msgpack::object* arr = o.via.array.ptr;
uint8_t version = arr[0].as<uint8_t>();
//std::cout << "version " << version << std::endl;
if (version != 1)
throw std::runtime_error("msgpack_binary_vector: expected version=1");
size_t n = arr[1].as<size_t>();
//std::cout << "msgpack_binary_vector: vector size " << n << std::endl; //", type " << arr[2].type << std::endl;
v.resize(n);
if (arr[2].type != msgpack::type::BIN)
throw msgpack::type_error();
//std::cout << "binary size " << arr[2].via.bin.size << " vs " << n << " x " <<
//sizeof(T) << " = " << (n * sizeof(T)) << std::endl;
if (arr[2].via.bin.size != n * sizeof(T))
throw msgpack::type_error();
memcpy(reinterpret_cast<void*>(v.data()), arr[2].via.bin.ptr, n * sizeof(T));
//std::cout << "msgpack_binary_vector: returned vector size " << v.size() << std::endl;
return o;
}
};

template<typename T>
struct pack<ch_frb_io::msgpack_binary_vector<T> > {
template <typename Stream>
packer<Stream>& operator()(msgpack::packer<Stream>& o, ch_frb_io::msgpack_binary_vector<T> const& v) const {
uint8_t version = 1;
o.pack_array(3);
o.pack(version);
o.pack(v.size());
o.pack_bin(v.size() * sizeof(T));
o.pack_bin_body(reinterpret_cast<const char*>(v.data()));
return o;
}
};

} // namespace adaptor
} // MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS)
} // namespace msgpack

#endif