Skip to content

Commit cf55b07

Browse files
authored
Merge pull request #55 from michael-poehnl/iox-#51-runtime_cleanup_shm_resources
iox-#51: capability to destroy sender + receiver ports
2 parents cc734e1 + f8c4e41 commit cf55b07

File tree

9 files changed

+99
-43
lines changed

9 files changed

+99
-43
lines changed

iceoryx_posh/include/iceoryx_posh/internal/popo/base_port.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
#pragma once
1616

17-
#include "iceoryx_posh/internal/popo/base_port_data.hpp"
1817
#include "iceoryx_posh/iceoryx_posh_types.hpp"
18+
#include "iceoryx_posh/internal/popo/base_port_data.hpp"
1919

2020
namespace iox
2121
{
@@ -57,31 +57,32 @@ class BasePort
5757
operator bool() const;
5858

5959
/// @brief Reads Type of actual CaPro Port (sender/receiver...)
60-
/// @param Nothing
6160
/// @return m_portType Type of Port in struct BasePortType
6261
BasePortType getPortType() const noexcept;
6362

6463
/// @brief Reads Type of actual CaPro Port (sender/receiver...)
65-
/// @param Nothing
6664
/// @return m_portType Type of Port in struct BasePortType
6765
capro::ServiceDescription getCaProServiceDescription() const noexcept;
6866

6967
/// @brief Gets Application Name for the active port
70-
/// @param Nothing
7168
/// @return Application name as String
7269
cxx::CString100 getApplicationName() const noexcept;
7370

74-
7571
/// @brief Gets Interface Name for the active port
76-
/// @param Nothing
7772
/// @return Interface name as enum value
7873
Interfaces getInterface() const noexcept;
7974

8075
/// @brief Gets Id of thethe active port
81-
/// @param Nothing
8276
/// @return UniqueId name as Integer
8377
uint64_t getUniqueID() const noexcept;
8478

79+
/// @brief Indicate that this port can be destroyed
80+
void destroy() noexcept;
81+
82+
/// @brief Checks whether port can be destroyed
83+
/// @return true if it shall be destroyed, false if not
84+
bool toBeDestroyed() const noexcept;
85+
8586
protected:
8687
const MemberType_t* getMembers() const noexcept;
8788
MemberType_t* getMembers() noexcept;

iceoryx_posh/include/iceoryx_posh/internal/popo/base_port_data.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ struct BasePortData
7676

7777
static std::atomic<uint64_t> s_uniqueIdCounter;
7878
std::atomic<uint64_t> m_uniqueId{0};
79+
std::atomic_bool m_toBeDestroyed{false};
7980

8081
iox::relative_ptr<runtime::RunnableData> m_runnable;
8182
};

iceoryx_posh/include/iceoryx_posh/internal/roudi/shared_memory_manager.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ class SharedMemoryManager
192192
void deletePortsOfProcess(std::string processName);
193193
void deleteRunnableAndItsPorts(std::string runnableName);
194194

195+
void destroySenderPort(SenderPortType::MemberType_t* const senderPortData);
196+
197+
void destroyReceiverPort(ReceiverPortType::MemberType_t* const receiverPortData);
198+
195199
void printmempool();
196200
std::string GetShmAddrString();
197201
uint64_t getShmSizeInBytes() const;

iceoryx_posh/include/iceoryx_posh/popo/publisher.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ namespace iox
2525
{
2626
namespace popo
2727
{
28-
2928
class Publisher
3029
{
3130
public:
@@ -40,13 +39,14 @@ class Publisher
4039
Publisher& operator=(Publisher&&) = default;
4140
Publisher(Publisher&& other) = default;
4241

43-
virtual ~Publisher() noexcept = default;
42+
virtual ~Publisher() noexcept;
4443

4544
/// @brief Allocate memory for the chunk to be sent
4645
/// @param[in] payloadSize size of shared memory to be allocated
4746
/// @param[in] useDynamicPayloadSizes bool value of using dynamic payload size
4847
/// @return Information about the chunk reserved
49-
virtual mepoo::ChunkHeader* allocateChunkWithHeader(uint32_t payloadSize, bool useDynamicPayloadSizes = false) noexcept;
48+
virtual mepoo::ChunkHeader* allocateChunkWithHeader(uint32_t payloadSize,
49+
bool useDynamicPayloadSizes = false) noexcept;
5050

5151
/// @brief Allocate memory for chunk to be sent
5252
/// @param[in] payloadSize size of shared memory to be allocated
@@ -66,7 +66,8 @@ class Publisher
6666
/// @param[in] chunkHeader Information of the chunk to be removed.
6767
virtual void freeChunk(mepoo::ChunkHeader* const chunkHeader) noexcept;
6868

69-
/// @brief Function for converting payload information to ChunkHeader , deleting particular chunk from chunkcontainer
69+
/// @brief Function for converting payload information to ChunkHeader , deleting particular chunk from
70+
/// chunkcontainer
7071
/// @param[in] payload payload of the chunk to be removed.
7172
virtual void freeChunk(void* const payload) noexcept;
7273

iceoryx_posh/include/iceoryx_posh/popo/subscriber.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ class Subscriber
127127
/// @return true if the references are set otherwise false
128128
bool isChunkReceiveSemaphoreSet() noexcept;
129129

130+
/// @brief Unset the semaphore if one is set
131+
void unsetChunkReceiveSemaphore() noexcept;
132+
130133
protected:
131134
// needed for unit testing
132135
Subscriber() noexcept;

iceoryx_posh/source/popo/base_port.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,15 @@ BasePort::operator bool() const
7474
return m_basePortDataPtr != nullptr;
7575
}
7676

77+
void BasePort::destroy() noexcept
78+
{
79+
getMembers()->m_toBeDestroyed.store(true, std::memory_order_relaxed);
80+
}
81+
82+
bool BasePort::toBeDestroyed() const noexcept
83+
{
84+
return getMembers()->m_toBeDestroyed.load(std::memory_order_relaxed);
85+
}
86+
7787
} // namespace popo
7888
} // namespace iox

iceoryx_posh/source/popo/publisher.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ Publisher::Publisher(const capro::ServiceDescription& service, const cxx::CStrin
2929
{
3030
}
3131

32+
Publisher::~Publisher() noexcept
33+
{
34+
m_sender.destroy();
35+
}
36+
3237
const void* Publisher::getLastChunk() const noexcept
3338
{
3439
assert(false && "Not yet supported");

iceoryx_posh/source/popo/subscriber.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ Subscriber::Subscriber() noexcept
3030
}
3131

3232
Subscriber::Subscriber(const capro::ServiceDescription& service, const cxx::CString100& runnableName) noexcept
33-
: m_receiver(
34-
runtime::PoshRuntime::getInstance().getMiddlewareReceiver(service, Interfaces::INTERNAL, runnableName))
33+
: m_receiver(runtime::PoshRuntime::getInstance().getMiddlewareReceiver(service, Interfaces::INTERNAL, runnableName))
3534
{
3635
}
3736

@@ -40,6 +39,7 @@ Subscriber::~Subscriber() noexcept
4039
unsetReceiveHandler();
4140
// TODO: Find an alternative like an RAII receive handler which
4241
// is called in the dtor. You cannot expect the user to call it before destruction
42+
m_receiver.destroy();
4343
}
4444

4545
void Subscriber::subscribe(const uint32_t cacheSize) noexcept
@@ -220,6 +220,12 @@ bool Subscriber::isChunkReceiveSemaphoreSet() noexcept
220220
return m_receiver.AreCallbackReferencesSet();
221221
}
222222

223+
void Subscriber::unsetChunkReceiveSemaphore() noexcept
224+
{
225+
m_receiver.UnsetCallbackReferences();
226+
}
227+
228+
223229
void Subscriber::eventCallbackMain() noexcept
224230
{
225231
while (m_callbackRunFlag)

iceoryx_posh/source/roudi/shared_memory_manager.cpp

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ void SharedMemoryManager::handleSenderPorts()
117117
// forward to interfaces
118118
sendToAllMatchingInterfacePorts(caproMessage, l_senderPort.getInterface());
119119
}
120+
// check if we have to destroy this sender port
121+
if (l_senderPort.toBeDestroyed())
122+
{
123+
destroySenderPort(l_senderPortData);
124+
}
120125
}
121126
}
122127

@@ -141,6 +146,11 @@ void SharedMemoryManager::handleReceiverPorts()
141146
l_receiverPort.dispatchCaProMessage(nackMessage);
142147
}
143148
}
149+
// check if we have to destroy this sender port
150+
if (l_receiverPort.toBeDestroyed())
151+
{
152+
destroyReceiverPort(l_receiverPortData);
153+
}
144154
}
145155
}
146156

@@ -344,21 +354,7 @@ void SharedMemoryManager::deletePortsOfProcess(std::string f_processName)
344354
SenderPortType l_sender(port);
345355
if (f_processName == l_sender.getApplicationName())
346356
{
347-
const auto& serviceDescription = l_sender.getCaProServiceDescription();
348-
removeEntryFromServiceRegistry(serviceDescription.getServiceIDString(),
349-
serviceDescription.getInstanceIDString());
350-
l_sender.cleanup();
351-
352-
capro::CaproMessage message(capro::CaproMessageType::STOP_OFFER, serviceDescription);
353-
m_portIntrospection.reportMessage(message);
354-
355-
sendToAllMatchingReceiverPorts(message, l_sender);
356-
357-
m_portIntrospection.removeSender(f_processName, serviceDescription);
358-
359-
// delete sender impl from list after StopOffer was processed
360-
l_shm->m_senderPortMembers.erase(port);
361-
DEBUG_PRINTF("Deleted SenderPortImpl of application %s\n", f_processName.c_str());
357+
destroySenderPort(port);
362358
}
363359
}
364360

@@ -367,21 +363,7 @@ void SharedMemoryManager::deletePortsOfProcess(std::string f_processName)
367363
ReceiverPortType l_receiver(port);
368364
if (f_processName == l_receiver.getApplicationName())
369365
{
370-
// do the complete cleanup for the receiver port for being able to erase it
371-
l_receiver.cleanup();
372-
373-
const auto& serviceDescription = l_receiver.getCaProServiceDescription();
374-
capro::CaproMessage message(capro::CaproMessageType::UNSUB, serviceDescription);
375-
message.m_requestPort = port;
376-
m_portIntrospection.reportMessage(message);
377-
378-
sendToAllMatchingSenderPorts(message, l_receiver);
379-
380-
m_portIntrospection.removeReceiver(f_processName, serviceDescription);
381-
382-
// delete receiver impl from list after unsubscribe was processed
383-
l_shm->m_receiverPortMembers.erase(port);
384-
DEBUG_PRINTF("Deleted ReceiverPortImpl of application %s\n", f_processName.c_str());
366+
destroyReceiverPort(port);
385367
}
386368
}
387369

@@ -435,6 +417,49 @@ void SharedMemoryManager::deleteRunnableAndItsPorts(std::string f_runnableName)
435417
}
436418
}
437419

420+
void SharedMemoryManager::destroySenderPort(SenderPortType::MemberType_t* const senderPortData)
421+
{
422+
SenderPortType senderPort(senderPortData);
423+
424+
const auto& serviceDescription = senderPort.getCaProServiceDescription();
425+
removeEntryFromServiceRegistry(serviceDescription.getServiceIDString(), serviceDescription.getInstanceIDString());
426+
senderPort.cleanup();
427+
428+
const capro::CaproMessage message(capro::CaproMessageType::STOP_OFFER, serviceDescription);
429+
m_portIntrospection.reportMessage(message);
430+
431+
sendToAllMatchingReceiverPorts(message, senderPort);
432+
sendToAllMatchingInterfacePorts(message, senderPort.getInterface());
433+
434+
m_portIntrospection.removeSender(senderPort.getApplicationName(), serviceDescription);
435+
436+
// delete sender impl from list after StopOffer was processed
437+
const auto shm = m_ShmInterface.getShmInterface();
438+
shm->m_senderPortMembers.erase(senderPortData);
439+
DEBUG_PRINTF("Destroyed SenderPortImpl\n");
440+
}
441+
442+
void SharedMemoryManager::destroyReceiverPort(ReceiverPortType::MemberType_t* const receiverPortData)
443+
{
444+
ReceiverPortType receiverPort(receiverPortData);
445+
446+
receiverPort.cleanup();
447+
448+
const auto& serviceDescription = receiverPort.getCaProServiceDescription();
449+
capro::CaproMessage message(capro::CaproMessageType::UNSUB, serviceDescription);
450+
message.m_requestPort = receiverPortData;
451+
m_portIntrospection.reportMessage(message);
452+
453+
sendToAllMatchingSenderPorts(message, receiverPort);
454+
455+
m_portIntrospection.removeReceiver(receiverPort.getApplicationName(), serviceDescription);
456+
457+
// delete receiver impl from list after unsubscribe was processed
458+
const auto shm = m_ShmInterface.getShmInterface();
459+
shm->m_receiverPortMembers.erase(receiverPortData);
460+
DEBUG_PRINTF("Destroyed ReceiverPortImpl\n");
461+
}
462+
438463
std::string SharedMemoryManager::GetShmAddrString()
439464
{
440465
return m_ShmInterface.getBaseAddrString();

0 commit comments

Comments
 (0)