diff --git a/examples/chat-pubsub.cpp b/examples/chat-pubsub.cpp index 615023d..fd4c1de 100644 --- a/examples/chat-pubsub.cpp +++ b/examples/chat-pubsub.cpp @@ -38,11 +38,16 @@ class Program { // Use HMAC signing for Sync Interests // Note: this is not generally recommended, but is used here for simplicity - SecurityOptions securityOptions(m_keyChain); - securityOptions.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl"); + SecurityOptions secOpts(m_keyChain); + secOpts.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl"); // Sign data packets using SHA256 (for simplicity) - securityOptions.dataSigner->signingInfo.setSha256Signing(); + secOpts.dataSigner->signingInfo.setSha256Signing(); + + // Do not fetch publications older than 10 seconds + SVSPubSubOptions opts; + opts.UseTimestamp = true; + opts.MaxPubAge = ndn::time::milliseconds(10000); // Create the Pub/Sub instance m_svsps = std::make_shared( @@ -50,7 +55,8 @@ class Program ndn::Name(m_options.m_id), face, std::bind(&Program::onMissingData, this, _1), - securityOptions); + opts, + secOpts); std::cout << "SVS client starting:" << m_options.m_id << std::endl; diff --git a/ndn-svs/mapping-provider.cpp b/ndn-svs/mapping-provider.cpp index 9b49d78..2bd6d56 100644 --- a/ndn-svs/mapping-provider.cpp +++ b/ndn-svs/mapping-provider.cpp @@ -40,9 +40,16 @@ MappingList::MappingList(const Block& block) { it->parse(); + // SeqNo and ApplicationName SeqNo seqNo = ndn::encoding::readNonNegativeInteger(it->elements().at(0)); Name name(it->elements().at(1)); - pairs.emplace_back(seqNo, name); + + // Additional blocks + std::vector blocks; + for (auto it2 = it->elements().begin() + 2; it2 != it->elements().end(); it2++) + blocks.push_back(*it2); + + pairs.push_back({ seqNo, std::make_pair(name, blocks) }); continue; } } @@ -54,10 +61,16 @@ MappingList::encode() const ndn::encoding::EncodingBuffer enc; size_t totalLength = 0; - for (const auto& [seq, name] : pairs) + for (const auto& [seq, mapping] : pairs) { + size_t entryLength = 0; + + // Additional blocks + for (const auto& block : mapping.second) + entryLength += ndn::encoding::prependBlock(enc, block); + // Name - size_t entryLength = ndn::encoding::prependBlock(enc, name.wireEncode()); + entryLength += ndn::encoding::prependBlock(enc, mapping.first.wireEncode()); // SeqNo entryLength += ndn::encoding::prependNonNegativeIntegerBlock(enc, tlv::SeqNo, seq); @@ -91,12 +104,12 @@ MappingProvider::MappingProvider(const Name& syncPrefix, } void -MappingProvider::insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const Name& appName) +MappingProvider::insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const MappingEntryPair& entry) { - m_map[Name(nodeId).appendNumber(seqNo)] = appName; + m_map[Name(nodeId).appendNumber(seqNo)] = entry; } -Name +MappingEntryPair MappingProvider::getMapping(const NodeID& nodeId, const SeqNo& seqNo) { return m_map.at(Name(nodeId).appendNumber(seqNo)); @@ -111,8 +124,8 @@ MappingProvider::onMappingQuery(const Interest& interest) for (SeqNo i = query.low; i <= std::max(query.high, query.low); i++) { try { - Name name = getMapping(query.nodeId, i); - queryResponse.pairs.emplace_back(i, name); + auto mapping = getMapping(query.nodeId, i); + queryResponse.pairs.emplace_back(i, mapping); } catch (const std::exception&) { // TODO: don't give up if not everything is found @@ -160,12 +173,12 @@ MappingProvider::fetchNameMapping(const MissingDataInfo& info, MappingList list(block); // Add all mappings to self - for (const auto& [seq, name] : list.pairs) { + for (const auto& [seq, mapping] : list.pairs) { try { getMapping(info.nodeId, seq); } catch (const std::exception&) { - insertMapping(info.nodeId, seq, name); + insertMapping(info.nodeId, seq, mapping); } } diff --git a/ndn-svs/mapping-provider.hpp b/ndn-svs/mapping-provider.hpp index 0f363b2..73490d6 100644 --- a/ndn-svs/mapping-provider.hpp +++ b/ndn-svs/mapping-provider.hpp @@ -24,6 +24,8 @@ namespace ndn::svs { +using MappingEntryPair = std::pair>; + /** * @brief TLV type for mapping list */ @@ -45,7 +47,7 @@ class MappingList public: NodeID nodeId; - std::vector> pairs; + std::vector> pairs; }; /** @@ -65,17 +67,17 @@ class MappingProvider : noncopyable using MappingListCallback = std::function; /** - * @brief Insert a mapping into the store + * @brief Insert a mapping entry into the store */ void - insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const Name& appName); + insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const MappingEntryPair& entry); /** * @brief Get a mapping and throw if not found * * @returns Corresponding application name */ - Name + MappingEntryPair getMapping(const NodeID& nodeId, const SeqNo& seqNo); /** @@ -127,7 +129,7 @@ class MappingProvider : noncopyable ndn::ScopedRegisteredPrefixHandle m_registeredPrefix; - std::map m_map; + std::map m_map; }; } // namespace ndn::svs diff --git a/ndn-svs/svspubsub.cpp b/ndn-svs/svspubsub.cpp index 8122fd0..d78cf70 100644 --- a/ndn-svs/svspubsub.cpp +++ b/ndn-svs/svspubsub.cpp @@ -24,16 +24,17 @@ SVSPubSub::SVSPubSub(const Name& syncPrefix, const Name& nodePrefix, ndn::Face& face, UpdateCallback updateCallback, - const SecurityOptions& securityOptions, - std::shared_ptr dataStore) + const SVSPubSubOptions& options, + const SecurityOptions& securityOptions) : m_face(face) , m_syncPrefix(syncPrefix) , m_dataPrefix(nodePrefix) , m_onUpdate(std::move(updateCallback)) + , m_opts(options) , m_securityOptions(securityOptions) , m_svsync(syncPrefix, nodePrefix, face, std::bind(&SVSPubSub::updateCallbackInternal, this, _1), - securityOptions, std::move(dataStore)) + securityOptions, options.dataStore) , m_mappingProvider(syncPrefix, nodePrefix, face, securityOptions) { m_svsync.getCore().setGetExtraBlockCallback(std::bind(&SVSPubSub::onGetExtraData, this, _1)); @@ -42,7 +43,8 @@ SVSPubSub::SVSPubSub(const Name& syncPrefix, SeqNo SVSPubSub::publish(const Name& name, span value, - const Name& nodePrefix, time::milliseconds freshnessPeriod) + const Name& nodePrefix, time::milliseconds freshnessPeriod, + std::vector mappingBlocks) { // Segment the data if larger than MAX_DATA_SIZE if (value.size() > MAX_DATA_SIZE) { @@ -52,7 +54,8 @@ SVSPubSub::publish(const Name& name, span value, NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1; - for (size_t i = 0; i < nSegments; i++) { + for (size_t i = 0; i < nSegments; i++) + { // Create encapsulated segment auto segmentName = Name(name).appendVersion(0).appendSegment(i); auto segment = Data(segmentName); @@ -71,11 +74,12 @@ SVSPubSub::publish(const Name& name, span value, } // Insert mapping and manually update the sequence number - insertMapping(nid, seqNo, name); + insertMapping(nid, seqNo, name, mappingBlocks); m_svsync.getCore().updateSeqNo(seqNo, nid); return seqNo; } - else { + else + { ndn::Data data(name); data.setContent(value); data.setFreshnessPeriod(freshnessPeriod); @@ -85,24 +89,43 @@ SVSPubSub::publish(const Name& name, span value, } SeqNo -SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix) +SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix, + std::vector mappingBlocks) { NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; SeqNo seqNo = m_svsync.publishData(data.wireEncode(), data.getFreshnessPeriod(), nid, ndn::tlv::Data); - insertMapping(nid, seqNo, data.getName()); + insertMapping(nid, seqNo, data.getName(), mappingBlocks); return seqNo; } void -SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name) +SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, + std::vector additional) { + // additional is a copy deliberately + // this way we can add well-known mappings to the list + + // add timestamp block + if (m_opts.UseTimestamp) { + unsigned long now = + std::chrono::duration_cast + (std::chrono::system_clock::now().time_since_epoch()).count(); + auto timestamp = Name::Component::fromNumber(now, tlv::TimestampNameComponent); + additional.push_back(timestamp); + } + + // create mapping entry + MappingEntryPair entry = { name, additional }; + + // notify subscribers in next sync interest if (m_notificationMappingList.nodeId == EMPTY_NAME || m_notificationMappingList.nodeId == nid) { m_notificationMappingList.nodeId = nid; - m_notificationMappingList.pairs.emplace_back(seqNo, name); + m_notificationMappingList.pairs.push_back({ seqNo, entry }); } - m_mappingProvider.insertMapping(nid, seqNo, name); + // send mapping to provider + m_mappingProvider.insertMapping(nid, seqNo, entry); } uint32_t @@ -171,18 +194,14 @@ SVSPubSub::updateCallbackInternal(const std::vector& MissingDataInfo remainingInfo = stream; // Attemt to find what we already know about mapping + // This typically refers to the Sync Interest mapping optimization, + // where the Sync Interest contains the notification mapping list for (SeqNo i = remainingInfo.low; i <= remainingInfo.high; i++) { try { - Name mapping = m_mappingProvider.getMapping(stream.nodeId, i); - for (const auto& sub : m_prefixSubscriptions) - { - if (sub.prefix.isPrefixOf(mapping)) - { - m_fetchMap[std::pair(stream.nodeId, i)].push_back(sub); - } - } + // throws if mapping not found + this->processMapping(stream.nodeId, i); remainingInfo.low++; } catch (const std::exception&) @@ -205,17 +224,12 @@ SVSPubSub::updateCallbackInternal(const std::vector& m_mappingProvider.fetchNameMapping(truncatedRemainingInfo, [this, remainingInfo, streamName] (const MappingList& list) { - for (const auto& sub : m_prefixSubscriptions) - { - for (const auto& [seq, name] : list.pairs) - { - if (sub.prefix.isPrefixOf(name)) - { - m_fetchMap[std::pair(streamName, seq)].push_back(sub); - fetchAll(); - } - } - } + bool queued = false; + for (const auto& [seq, mapping] : list.pairs) + queued |= this->processMapping(streamName, seq); + + if (queued) + this->fetchAll(); }, -1); remainingInfo.low += 11; @@ -227,6 +241,48 @@ SVSPubSub::updateCallbackInternal(const std::vector& m_onUpdate(info); } +bool +SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) +{ + // this will throw if mapping not found + auto mapping = m_mappingProvider.getMapping(nodeId, seqNo); + + // check if timestamp is too old + if (m_opts.MaxPubAge > time::milliseconds::zero()) + { + // look for the additional timestamp block + // if no timestamp block is present, we just skip this step + for (const auto& block : mapping.second) + { + if (block.type() != tlv::TimestampNameComponent) + continue; + + unsigned long now = + std::chrono::duration_cast + (std::chrono::system_clock::now().time_since_epoch()).count(); + + unsigned long pubTime = Name::Component(block).toNumber(); + unsigned long maxAge = time::microseconds(m_opts.MaxPubAge).count(); + + if (now - pubTime > maxAge) + return false; + } + } + + // check if known mapping matches subscription + bool queued = false; + for (const auto& sub : m_prefixSubscriptions) + { + if (sub.prefix.isPrefixOf(mapping.first)) + { + m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); + queued = true; + } + } + + return queued; +} + void SVSPubSub::fetchAll() { diff --git a/ndn-svs/svspubsub.hpp b/ndn-svs/svspubsub.hpp index f640a0c..df84da6 100644 --- a/ndn-svs/svspubsub.hpp +++ b/ndn-svs/svspubsub.hpp @@ -27,6 +27,28 @@ namespace ndn::svs { +/** + * @brief Options for SVS pub/sub constructor + */ +struct SVSPubSubOptions +{ + /// @brief Interface to store data packets + std::shared_ptr dataStore = SVSync::DEFAULT_DATASTORE; + + /** + * @brief Send publication timestamp in mapping blocks. + * This option should be enabled in all instances for + * correct usage of the MaxPubAge option. + */ + bool UseTimestamp = true; + + /** + * @brief Maximum age of publications to be fetched. + * The UseTimestamp option should be enabled for this to work. + */ + time::milliseconds MaxPubAge = time::milliseconds::zero(); +}; + /** * @brief A pub/sub interface for SVS. * @@ -50,8 +72,8 @@ class SVSPubSub : noncopyable const Name& nodePrefix, ndn::Face& face, UpdateCallback updateCallback, - const SecurityOptions& securityOptions = SecurityOptions::DEFAULT, - std::shared_ptr dataStore = SVSync::DEFAULT_DATASTORE); + const SVSPubSubOptions& options = SVSPubSubOptions(), + const SecurityOptions& securityOptions = SecurityOptions::DEFAULT); virtual ~SVSPubSub() = default; @@ -84,11 +106,13 @@ class SVSPubSub : noncopyable * @param value data buffer * @param nodePrefix Name to publish the data under * @param freshnessPeriod freshness period for the data + * @param mappingBlocks Additional blocks to be published with the mapping (use sparingly) */ SeqNo publish(const Name& name, span value, const Name& nodePrefix = EMPTY_NAME, - time::milliseconds freshnessPeriod = FRESH_FOREVER); + time::milliseconds freshnessPeriod = FRESH_FOREVER, + std::vector mappingBlocks = {}); /** * @brief Subscribe to a application name prefix. @@ -133,9 +157,12 @@ class SVSPubSub : noncopyable * * @param data Data packet to publish * @param nodePrefix Name to publish the data under + * @param mappingBlocks Additional blocks to be published with the mapping (use sparingly) */ SeqNo - publishPacket(const Data& data, const Name& nodePrefix = EMPTY_NAME); + publishPacket(const Data& data, + const Name& nodePrefix = EMPTY_NAME, + std::vector mappingBlocks = {}); /** @brief Get the underlying sync */ SVSync& @@ -166,8 +193,18 @@ class SVSPubSub : noncopyable void onRecvExtraData(const Block& block); + /// @brief Insert a mapping entry into the store void - insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name); + insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, + std::vector additional); + + /** + * @brief Get and process mapping from store. + * @returns true if new publications were queued for fetch + * @throws std::exception error if mapping is not found + */ + bool + processMapping(const NodeID& nodeId, SeqNo seqNo); void fetchAll(); @@ -185,6 +222,7 @@ class SVSPubSub : noncopyable const Name m_syncPrefix; const Name m_dataPrefix; const UpdateCallback m_onUpdate; + const SVSPubSubOptions m_opts; const SecurityOptions m_securityOptions; SVSync m_svsync;