Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Origin/local #3

Open
wants to merge 3 commits into
base: local
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions examples/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
#include "ndnsd/discovery/service-discovery.hpp"
#include <ndn-cxx/util/logger.hpp>

#include<iostream>
#include <list>
#include <iostream>

#include <boost/program_options/options_description.hpp>
#include <boost/program_options/variables_map.hpp>
Expand Down Expand Up @@ -58,10 +57,10 @@ class Consumer
{
NDN_LOG_INFO("Service info received");
auto status = (callback.status == ndnsd::discovery::ACTIVE)? "ACTIVE": "EXPIRED";
std::cout << "Status: " << status << std::endl;
NDN_LOG_INFO("Status: " << status);
for (const auto& item : callback.serviceDetails)
{
std::cout << item.first << ": " << item.second << std::endl;
NDN_LOG_INFO("Callback: " << item.first << ":" << item.second);
}
}

Expand Down Expand Up @@ -121,7 +120,6 @@ main(int argc, char* argv[])

try
{
std::cout << "Fetching service info for: " << serviceName << std::endl;
NDN_LOG_INFO("Fetching service info for: " << serviceName);
Consumer consumer(serviceName, flags);
consumer.execute();
Expand All @@ -130,11 +128,4 @@ main(int argc, char* argv[])
std::cerr << "ERROR: " << e.what() << std::endl;
NDN_LOG_ERROR("Cannot execute consumer, try again later: " << e.what());
}

try {
Consumer consumer(argv[1], flags);
consumer.execute();
}
catch (const std::exception& e) {
}
}
}
11 changes: 6 additions & 5 deletions examples/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
#include "ndnsd/discovery/service-discovery.hpp"
#include <ndn-cxx/util/logger.hpp>

#include<iostream>
#include <list>
#include <iostream>

// #include <list>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just remove?


NDN_LOG_INIT(ndnsd.examples.ProducerApp);

Expand Down Expand Up @@ -51,10 +52,10 @@ class Producer
{
NDN_LOG_INFO("Service publish callback received");
auto status = (callback.status == ndnsd::discovery::ACTIVE)? "ACTIVE": "EXPIRED";
std::cout << "\n Status: " << status << std::endl;
NDN_LOG_INFO("Status: " << status);
for (auto& item : callback.serviceDetails)
{
std::cout << item.first << ": " << item.second << std::endl;
NDN_LOG_INFO("Callback: " << item.first << ":" << item.second);
}
}

Expand All @@ -78,4 +79,4 @@ main(int argc, char* argv[])
std::cout << "Exception: " << e.what() << std::endl;
NDN_LOG_ERROR("Cannot execute producer, try again later: " << e.what());
}
}
}
2 changes: 1 addition & 1 deletion ndnsd/communication/sync-adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,4 @@ SyncProtocolAdapter::onPSyncUpdate(const std::vector<psync::MissingDataInfo>& up
m_syncUpdateCallback(dinfo);
}

} // namespace ndnsd
} // namespace ndnsd
6 changes: 3 additions & 3 deletions ndnsd/discovery/file-processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include "file-processor.hpp"

#include<iostream>
#include <iostream>

#include <ndn-cxx/util/logger.hpp>

Expand Down Expand Up @@ -76,7 +76,7 @@ ServiceInfoFileProcessor::processFile()
const auto& key = details.first; //get_value<std::string >();
const auto& val = details.second.get_value<std::string >();

NDN_LOG_INFO("Reading file: "<< val);
NDN_LOG_DEBUG("Key: " << key << "Value: " << val);
m_serviceMetaInfo.insert(std::pair<std::string, std::string>(key, val));
}
}
Expand All @@ -92,4 +92,4 @@ ServiceInfoFileProcessor::processFile()
}

} // namespace discovery
} // namespace ndnsd
} // namespace ndnsd
2 changes: 1 addition & 1 deletion ndnsd/discovery/file-processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ class ServiceInfoFileProcessor
} // namespace discovery
} // namespace ndnsd

#endif // NDNSD_FILE_PROCESSOR_HPP
#endif // NDNSD_FILE_PROCESSOR_HPP
98 changes: 53 additions & 45 deletions ndnsd/discovery/service-discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ ServiceDiscovery::ServiceDiscovery(const std::string& filename,
std::bind(&ServiceDiscovery::processSyncUpdate, this, _1))
, m_discoveryCallback(discoveryCallback)
{

setUpdateProducerState();
// service is ACTIVE at this point
m_serviceStatus = ACTIVE;

setInterestFilter(m_producerState.applicationPrefix);

// listen on reload prefix as well.
Expand Down Expand Up @@ -89,6 +93,11 @@ ServiceDiscovery::setUpdateProducerState(bool update)
m_producerState.serviceLifetime = m_fileProcessor.getServiceLifetime();
m_producerState.publishTimestamp = ndn::time::system_clock::now();
m_producerState.serviceMetaInfo = m_fileProcessor.getServiceMeta();

// Reset the content of the wire after each reload.
// This could have been better, but leaving as it is for now.
if(m_wire.hasWire())
m_wire.reset();
}

ndn::Name
Expand All @@ -104,9 +113,7 @@ ServiceDiscovery::processFalgs(const std::map<char, uint8_t>& flags,
const char type, bool optional)
{
if (flags.count(type) > 0)
{
return flags.find(type)->second;
}
else
{
if (!optional)
Expand Down Expand Up @@ -137,11 +144,11 @@ ServiceDiscovery::consumerHandler()
void
ServiceDiscovery::run()
{
try
{
try {
m_face.processEvents();
}
catch (const std::exception& ex) {
catch (const std::exception& ex)
{
std::cerr << ex.what() << std::endl;
NDN_LOG_ERROR("Face error: " << ex.what());
}
Expand Down Expand Up @@ -196,27 +203,6 @@ ServiceDiscovery::processInterest(const ndn::Name& name, const ndn::Interest& in
}
}

std::string
ServiceDiscovery::makeDataContent()
{
// reset the wire first
if(m_wire.hasWire())
m_wire.reset();
// |service-name|<applicationPrefix>|<key>|<val>|<key>|<val>|...and so on
std::string dataContent = "service-name";
dataContent += "|";
dataContent += m_producerState.applicationPrefix.toUri();

for (auto const& item : m_producerState.serviceMetaInfo)
{
dataContent += "|";
dataContent += item.first;
dataContent += "|";
dataContent += item.second;
}
return dataContent;
}

void
ServiceDiscovery::sendData(const ndn::Name& name)
{
Expand All @@ -225,13 +211,11 @@ ServiceDiscovery::sendData(const ndn::Name& name)
auto timeDiff = ndn::time::system_clock::now() - m_producerState.publishTimestamp;
auto timeToExpire = ndn::time::duration_cast<ndn::time::seconds>(timeDiff);

int status = (timeToExpire > m_producerState.serviceLifetime) ? EXPIRED : ACTIVE;
m_serviceStatus = (timeToExpire > m_producerState.serviceLifetime) ? EXPIRED : ACTIVE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move lines 211-214 to wireEncode too.
And you need to still have a local variable serviceStatus
and then check with global variable. Otherwise no need to wireEncode again.


ndn::Data replyData(name);
replyData.setFreshnessPeriod(1_ms);

auto dataContent = makeDataContent();
replyData.setContent(wireEncode(dataContent, status));
replyData.setContent(wireEncode());
m_keyChain.sign(replyData);
m_face.put(replyData);
NDN_LOG_INFO("Data sent for name: " << name);
Expand Down Expand Up @@ -272,7 +256,7 @@ ServiceDiscovery::onTimeout(const ndn::Interest& interest)
{
m_counter--;
}
NDN_LOG_INFO("Interest: " << interest.getName() << "timeout");
NDN_LOG_INFO("Interest: " << interest.getName() << "timed out");
}

void
Expand Down Expand Up @@ -313,6 +297,7 @@ ServiceDiscovery::processSyncUpdate(const std::vector<ndnsd::SyncDataInfo>& upda
{
consumerReply.serviceDetails.insert(std::pair<std::string, std::string>
("prefix", item.prefix.toUri()));
// Service is just published, so the status returned to producer will be active
consumerReply.status = ACTIVE;
m_discoveryCallback(consumerReply);
}
Expand All @@ -321,36 +306,60 @@ ServiceDiscovery::processSyncUpdate(const std::vector<ndnsd::SyncDataInfo>& upda

template<ndn::encoding::Tag TAG>
size_t
ServiceDiscovery::wireEncode(ndn::EncodingImpl<TAG>& encoder,
const std::string& info, int status) const
ServiceDiscovery::wireEncode(ndn::EncodingImpl<TAG>& encoder, const std::string& info) const
{
size_t totalLength = 0;
totalLength += prependStringBlock(encoder, tlv::ServiceInfo, info);
totalLength += prependNonNegativeIntegerBlock(encoder, tlv::ServiceStatus, status);
totalLength += prependNonNegativeIntegerBlock(encoder, tlv::ServiceStatus, m_serviceStatus);
totalLength += encoder.prependVarNumber(totalLength);
totalLength += encoder.prependVarNumber(tlv::DiscoveryData);

return totalLength;
}

const ndn::Block&
ServiceDiscovery::wireEncode(const std::string& info, int status)
ServiceDiscovery::wireEncode()
{
if (m_wire.hasWire())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If hasWire, you should simply resend. The below string should be saved in a class member and then reset wire when this string is modified/updated.

return m_wire;
{
// check if status has changed or not
ndn::Block wire(m_wire);
wire.parse();
auto it = wire.elements_begin();
if (it != wire.elements_end() && it->type() == tlv::ServiceStatus)
{
NDN_LOG_DEBUG("No change in Block content, returning old Block");
if (ndn::readNonNegativeInteger(*it) == m_serviceStatus)
return m_wire;
}
m_wire.reset();
}

// |service-name|<applicationPrefix>|<key>|<val>|<key>|<val>|...and so on
std::string dataContent = "service-name";
dataContent += "|";
dataContent += m_producerState.applicationPrefix.toUri();

for (auto const& item : m_producerState.serviceMetaInfo)
{
dataContent += "|";
dataContent += item.first;
dataContent += "|";
dataContent += item.second;
}

ndn::EncodingEstimator estimator;
size_t estimatedSize = wireEncode(estimator, info, status);
size_t estimatedSize = wireEncode(estimator, dataContent);

ndn::EncodingBuffer buffer(estimatedSize, 0);
wireEncode(buffer, info, status);
wireEncode(buffer, dataContent);
m_wire = buffer.block();

return m_wire;
}

std::map<std::string, std::string>
ServiceDiscovery::processData(std::string reply)
processData(std::string reply)
{
std::map<std::string, std::string> keyVal;
std::vector<std::string> items;
Expand All @@ -363,7 +372,7 @@ ServiceDiscovery::processData(std::string reply)
}

Reply
ServiceDiscovery::wireDecode(const ndn::Block& wire)
wireDecode(const ndn::Block& wire)
{
Reply consumerReply;
auto blockType = wire.type();
Expand All @@ -375,19 +384,17 @@ ServiceDiscovery::wireDecode(const ndn::Block& wire)
}

wire.parse();
m_wire = wire;
ndn::Block::element_const_iterator it = wire.elements_begin();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto


ndn::Block::element_const_iterator it = m_wire.elements_begin();

if (it != m_wire.elements_end() && it->type() == tlv::ServiceStatus) {
if (it != wire.elements_end() && it->type() == tlv::ServiceStatus) {
consumerReply.status = ndn::readNonNegativeInteger(*it);
++it;
}
else {
NDN_LOG_DEBUG("Service status is missing");
}

if (it != m_wire.elements_end() && it->type() == tlv::ServiceInfo) {
if (it != wire.elements_end() && it->type() == tlv::ServiceInfo) {
auto serviceMetaInfo = readString(*it);
consumerReply.serviceDetails = processData(readString(*it));
}
Expand All @@ -397,5 +404,6 @@ ServiceDiscovery::wireDecode(const ndn::Block& wire)

return consumerReply;
}

} // namespace discovery
} // namespace ndnsd
26 changes: 15 additions & 11 deletions ndnsd/discovery/service-discovery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ typedef struct Reply Reply;

typedef std::function<void(const Reply& serviceUpdates)> DiscoveryCallback;

std::map<std::string, std::string>
processData(std::string reply);

Reply
wireDecode(const ndn::Block& wire);

class ServiceDiscovery
{

Expand Down Expand Up @@ -192,9 +198,6 @@ class ServiceDiscovery
std::string
makeDataContent();

std::map<std::string, std::string>
processData(std::string reply);

private:
void
doUpdate(const ndn::Name& prefix);
Expand Down Expand Up @@ -231,13 +234,10 @@ class ServiceDiscovery

template<ndn::encoding::Tag TAG>
size_t
wireEncode(ndn::EncodingImpl<TAG>& block, const std::string& info, int status) const;
wireEncode(ndn::EncodingImpl<TAG>& block, const std::string& info) const;

const ndn::Block&
wireEncode(const std::string& info, int status);

Reply
wireDecode(const ndn::Block& wire);
wireEncode();

private:
ndn::Face m_face;
Expand All @@ -253,15 +253,19 @@ class ServiceDiscovery

uint8_t m_appType;
uint8_t m_counter;
uint8_t m_serviceStatus;

uint32_t m_syncProtocol;
uint32_t m_continuousDiscovery;

// Flag specific to consumer application, if set, will not stop consumer application
// but rather keep listening for service updates and send it back to user.
uint32_t m_contDiscovery;
SyncProtocolAdapter m_syncAdapter;
static const ndn::Name DEFAULT_CONSUMER_ONLY_NAME;
mutable ndn::Block m_wire;
DiscoveryCallback m_discoveryCallback;

};

} //namespace discovery
} //namespace ndnsd
#endif // NDNSD_SERVICE_DISCOVERY_HPP
#endif // NDNSD_SERVICE_DISCOVERY_HPP
Loading