Skip to content

Commit

Permalink
ps: implement additional blocks and timestamp (fix #15)
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Nov 8, 2023
1 parent f523fe3 commit 80dd7df
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 55 deletions.
14 changes: 10 additions & 4 deletions examples/chat-pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,25 @@ class Program
{
// Use HMAC signing for Sync Interests
// Note: this is not generally recommended, but is used here for simplicity
SecurityOptions securityOptions(m_keyChain);
securityOptions.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl");
SecurityOptions secOpts(m_keyChain);
secOpts.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl");

// Sign data packets using SHA256 (for simplicity)
securityOptions.dataSigner->signingInfo.setSha256Signing();
secOpts.dataSigner->signingInfo.setSha256Signing();

// Do not fetch publications older than 10 seconds
SVSPubSubOptions opts;
opts.UseTimestamp = true;
opts.MaxPubAge = ndn::time::milliseconds(10000);

// Create the Pub/Sub instance
m_svsps = std::make_shared<SVSPubSub>(
ndn::Name(m_options.prefix),
ndn::Name(m_options.m_id),
face,
std::bind(&Program::onMissingData, this, _1),
securityOptions);
opts,
secOpts);

std::cout << "SVS client starting:" << m_options.m_id << std::endl;

Expand Down
33 changes: 23 additions & 10 deletions ndn-svs/mapping-provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,16 @@ MappingList::MappingList(const Block& block)
{
it->parse();

// SeqNo and ApplicationName
SeqNo seqNo = ndn::encoding::readNonNegativeInteger(it->elements().at(0));
Name name(it->elements().at(1));
pairs.emplace_back(seqNo, name);

// Additional blocks
std::vector<Block> blocks;
for (auto it2 = it->elements().begin() + 2; it2 != it->elements().end(); it2++)
blocks.push_back(*it2);

pairs.push_back({ seqNo, std::make_pair(name, blocks) });
continue;
}
}
Expand All @@ -54,10 +61,16 @@ MappingList::encode() const
ndn::encoding::EncodingBuffer enc;
size_t totalLength = 0;

for (const auto& [seq, name] : pairs)
for (const auto& [seq, mapping] : pairs)
{
size_t entryLength = 0;

// Additional blocks
for (const auto& block : mapping.second)
entryLength += ndn::encoding::prependBlock(enc, block);

// Name
size_t entryLength = ndn::encoding::prependBlock(enc, name.wireEncode());
entryLength += ndn::encoding::prependBlock(enc, mapping.first.wireEncode());

// SeqNo
entryLength += ndn::encoding::prependNonNegativeIntegerBlock(enc, tlv::SeqNo, seq);
Expand Down Expand Up @@ -91,12 +104,12 @@ MappingProvider::MappingProvider(const Name& syncPrefix,
}

void
MappingProvider::insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const Name& appName)
MappingProvider::insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const MappingEntryPair& entry)
{
m_map[Name(nodeId).appendNumber(seqNo)] = appName;
m_map[Name(nodeId).appendNumber(seqNo)] = entry;
}

Name
MappingEntryPair
MappingProvider::getMapping(const NodeID& nodeId, const SeqNo& seqNo)
{
return m_map.at(Name(nodeId).appendNumber(seqNo));
Expand All @@ -111,8 +124,8 @@ MappingProvider::onMappingQuery(const Interest& interest)
for (SeqNo i = query.low; i <= std::max(query.high, query.low); i++)
{
try {
Name name = getMapping(query.nodeId, i);
queryResponse.pairs.emplace_back(i, name);
auto mapping = getMapping(query.nodeId, i);
queryResponse.pairs.emplace_back(i, mapping);
}
catch (const std::exception&) {
// TODO: don't give up if not everything is found
Expand Down Expand Up @@ -160,12 +173,12 @@ MappingProvider::fetchNameMapping(const MissingDataInfo& info,
MappingList list(block);

// Add all mappings to self
for (const auto& [seq, name] : list.pairs) {
for (const auto& [seq, mapping] : list.pairs) {
try {
getMapping(info.nodeId, seq);
}
catch (const std::exception&) {
insertMapping(info.nodeId, seq, name);
insertMapping(info.nodeId, seq, mapping);
}
}

Expand Down
12 changes: 7 additions & 5 deletions ndn-svs/mapping-provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

namespace ndn::svs {

using MappingEntryPair = std::pair<Name, std::vector<Block>>;

/**
* @brief TLV type for mapping list
*/
Expand All @@ -45,7 +47,7 @@ class MappingList

public:
NodeID nodeId;
std::vector<std::pair<SeqNo, Name>> pairs;
std::vector<std::pair<SeqNo, MappingEntryPair>> pairs;
};

/**
Expand All @@ -65,17 +67,17 @@ class MappingProvider : noncopyable
using MappingListCallback = std::function<void(const MappingList&)>;

/**
* @brief Insert a mapping into the store
* @brief Insert a mapping entry into the store
*/
void
insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const Name& appName);
insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const MappingEntryPair& entry);

/**
* @brief Get a mapping and throw if not found
*
* @returns Corresponding application name
*/
Name
MappingEntryPair
getMapping(const NodeID& nodeId, const SeqNo& seqNo);

/**
Expand Down Expand Up @@ -127,7 +129,7 @@ class MappingProvider : noncopyable

ndn::ScopedRegisteredPrefixHandle m_registeredPrefix;

std::map<Name, Name> m_map;
std::map<Name, MappingEntryPair> m_map;
};

} // namespace ndn::svs
Expand Down
118 changes: 87 additions & 31 deletions ndn-svs/svspubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ SVSPubSub::SVSPubSub(const Name& syncPrefix,
const Name& nodePrefix,
ndn::Face& face,
UpdateCallback updateCallback,
const SecurityOptions& securityOptions,
std::shared_ptr<DataStore> dataStore)
const SVSPubSubOptions& options,
const SecurityOptions& securityOptions)
: m_face(face)
, m_syncPrefix(syncPrefix)
, m_dataPrefix(nodePrefix)
, m_onUpdate(std::move(updateCallback))
, m_opts(options)
, m_securityOptions(securityOptions)
, m_svsync(syncPrefix, nodePrefix, face,
std::bind(&SVSPubSub::updateCallbackInternal, this, _1),
securityOptions, std::move(dataStore))
securityOptions, options.dataStore)
, m_mappingProvider(syncPrefix, nodePrefix, face, securityOptions)
{
m_svsync.getCore().setGetExtraBlockCallback(std::bind(&SVSPubSub::onGetExtraData, this, _1));
Expand All @@ -42,7 +43,8 @@ SVSPubSub::SVSPubSub(const Name& syncPrefix,

SeqNo
SVSPubSub::publish(const Name& name, span<const uint8_t> value,
const Name& nodePrefix, time::milliseconds freshnessPeriod)
const Name& nodePrefix, time::milliseconds freshnessPeriod,
std::vector<Block> mappingBlocks)
{
// Segment the data if larger than MAX_DATA_SIZE
if (value.size() > MAX_DATA_SIZE) {
Expand All @@ -52,7 +54,8 @@ SVSPubSub::publish(const Name& name, span<const uint8_t> value,
NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix;
SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1;

for (size_t i = 0; i < nSegments; i++) {
for (size_t i = 0; i < nSegments; i++)
{
// Create encapsulated segment
auto segmentName = Name(name).appendVersion(0).appendSegment(i);
auto segment = Data(segmentName);
Expand All @@ -71,11 +74,12 @@ SVSPubSub::publish(const Name& name, span<const uint8_t> value,
}

// Insert mapping and manually update the sequence number
insertMapping(nid, seqNo, name);
insertMapping(nid, seqNo, name, mappingBlocks);
m_svsync.getCore().updateSeqNo(seqNo, nid);
return seqNo;
}
else {
else
{
ndn::Data data(name);
data.setContent(value);
data.setFreshnessPeriod(freshnessPeriod);
Expand All @@ -85,24 +89,43 @@ SVSPubSub::publish(const Name& name, span<const uint8_t> value,
}

SeqNo
SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix)
SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix,
std::vector<Block> mappingBlocks)
{
NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix;
SeqNo seqNo = m_svsync.publishData(data.wireEncode(), data.getFreshnessPeriod(), nid, ndn::tlv::Data);
insertMapping(nid, seqNo, data.getName());
insertMapping(nid, seqNo, data.getName(), mappingBlocks);
return seqNo;
}

void
SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name)
SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name,
std::vector<Block> additional)
{
// additional is a copy deliberately
// this way we can add well-known mappings to the list

// add timestamp block
if (m_opts.UseTimestamp) {
unsigned long now =
std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::system_clock::now().time_since_epoch()).count();
auto timestamp = Name::Component::fromNumber(now, tlv::TimestampNameComponent);
additional.push_back(timestamp);
}

// create mapping entry
MappingEntryPair entry = { name, additional };

// notify subscribers in next sync interest
if (m_notificationMappingList.nodeId == EMPTY_NAME || m_notificationMappingList.nodeId == nid)
{
m_notificationMappingList.nodeId = nid;
m_notificationMappingList.pairs.emplace_back(seqNo, name);
m_notificationMappingList.pairs.push_back({ seqNo, entry });
}

m_mappingProvider.insertMapping(nid, seqNo, name);
// send mapping to provider
m_mappingProvider.insertMapping(nid, seqNo, entry);
}

uint32_t
Expand Down Expand Up @@ -171,18 +194,14 @@ SVSPubSub::updateCallbackInternal(const std::vector<ndn::svs::MissingDataInfo>&
MissingDataInfo remainingInfo = stream;

// Attemt to find what we already know about mapping
// This typically refers to the Sync Interest mapping optimization,
// where the Sync Interest contains the notification mapping list
for (SeqNo i = remainingInfo.low; i <= remainingInfo.high; i++)
{
try
{
Name mapping = m_mappingProvider.getMapping(stream.nodeId, i);
for (const auto& sub : m_prefixSubscriptions)
{
if (sub.prefix.isPrefixOf(mapping))
{
m_fetchMap[std::pair(stream.nodeId, i)].push_back(sub);
}
}
// throws if mapping not found
this->processMapping(stream.nodeId, i);
remainingInfo.low++;
}
catch (const std::exception&)
Expand All @@ -205,17 +224,12 @@ SVSPubSub::updateCallbackInternal(const std::vector<ndn::svs::MissingDataInfo>&

m_mappingProvider.fetchNameMapping(truncatedRemainingInfo,
[this, remainingInfo, streamName] (const MappingList& list) {
for (const auto& sub : m_prefixSubscriptions)
{
for (const auto& [seq, name] : list.pairs)
{
if (sub.prefix.isPrefixOf(name))
{
m_fetchMap[std::pair(streamName, seq)].push_back(sub);
fetchAll();
}
}
}
bool queued = false;
for (const auto& [seq, mapping] : list.pairs)
queued |= this->processMapping(streamName, seq);

if (queued)
this->fetchAll();
}, -1);

remainingInfo.low += 11;
Expand All @@ -227,6 +241,48 @@ SVSPubSub::updateCallbackInternal(const std::vector<ndn::svs::MissingDataInfo>&
m_onUpdate(info);
}

bool
SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo)
{
// this will throw if mapping not found
auto mapping = m_mappingProvider.getMapping(nodeId, seqNo);

// check if timestamp is too old
if (m_opts.MaxPubAge > time::milliseconds::zero())
{
// look for the additional timestamp block
// if no timestamp block is present, we just skip this step
for (const auto& block : mapping.second)
{
if (block.type() != tlv::TimestampNameComponent)
continue;

unsigned long now =
std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::system_clock::now().time_since_epoch()).count();

unsigned long pubTime = Name::Component(block).toNumber();
unsigned long maxAge = time::microseconds(m_opts.MaxPubAge).count();

if (now - pubTime > maxAge)
return false;
}
}

// check if known mapping matches subscription
bool queued = false;
for (const auto& sub : m_prefixSubscriptions)
{
if (sub.prefix.isPrefixOf(mapping.first))
{
m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub);
queued = true;
}
}

return queued;
}

void
SVSPubSub::fetchAll()
{
Expand Down
Loading

0 comments on commit 80dd7df

Please sign in to comment.