diff --git a/source/agent/webrtc/rtcFrame/binding.gyp b/source/agent/webrtc/rtcFrame/binding.gyp index 237815908..e24a39949 100644 --- a/source/agent/webrtc/rtcFrame/binding.gyp +++ b/source/agent/webrtc/rtcFrame/binding.gyp @@ -68,6 +68,7 @@ '<(source_rel_dir)/core/rtc_adapter/VideoReceiveAdapter.cc', '<(source_rel_dir)/core/rtc_adapter/VideoSendAdapter.cc', '<(source_rel_dir)/core/rtc_adapter/AudioSendAdapter.cc', + '<(source_rel_dir)/core/rtc_adapter/thread/ProcessThreadProxy.cc', '<(source_rel_dir)/core/rtc_adapter/thread/StaticTaskQueueFactory.cc', '<(source_rel_dir)/core/owt_base/SsrcGenerator.cc', '<(source_rel_dir)/core/owt_base/AudioUtilitiesNew.cpp', diff --git a/source/core/rtc_adapter/RtcAdapter.cc b/source/core/rtc_adapter/RtcAdapter.cc index b8749b7a0..99a6961c4 100644 --- a/source/core/rtc_adapter/RtcAdapter.cc +++ b/source/core/rtc_adapter/RtcAdapter.cc @@ -13,34 +13,13 @@ #include #include +#include #include -namespace rtc_adapter { - -class RTCProcessThread { -public: - RTCProcessThread(const char* task_name) - : m_processThread(webrtc::ProcessThread::Create(task_name)) - { - m_processThread->Start(); - } - ~RTCProcessThread() - { - m_processThread->Stop(); - } - webrtc::ProcessThread* unwrap() - { - return m_processThread.get(); - } -private: - std::unique_ptr m_processThread; -}; +namespace rtc_adapter { -static std::unique_ptr g_moduleThread - = std::make_unique("ModuleProcessThread"); -static std::unique_ptr g_pacerThread - = std::make_unique("PacerThread"); +static const int kCallDestroyTimeoutMs = 3000; class RtcAdapterImpl : public RtcAdapter, public CallOwner { @@ -87,6 +66,12 @@ RtcAdapterImpl::RtcAdapterImpl() RtcAdapterImpl::~RtcAdapterImpl() { + rtc::Event done; + m_taskQueue->PostTask(webrtc::ToQueuedTask([this, &done] { + m_call.reset(); + done.Set(); + })); + done.Wait(kCallDestroyTimeoutMs); } void RtcAdapterImpl::initCall() @@ -98,9 +83,9 @@ void RtcAdapterImpl::initCall() call_config.task_queue_factory = m_taskQueueFactory.get(); std::unique_ptr moduleThreadProxy = - std::make_unique(g_moduleThread->unwrap()); + std::make_unique("ModuleProcessThread"); std::unique_ptr pacerThreadProxy = - std::make_unique(g_pacerThread->unwrap()); + std::make_unique("PacerThread"); m_call.reset(webrtc::Call::Create( call_config, webrtc::Clock::GetRealTimeClock(), std::move(moduleThreadProxy), diff --git a/source/core/rtc_adapter/thread/ProcessThreadProxy.cc b/source/core/rtc_adapter/thread/ProcessThreadProxy.cc new file mode 100644 index 000000000..584465226 --- /dev/null +++ b/source/core/rtc_adapter/thread/ProcessThreadProxy.cc @@ -0,0 +1,87 @@ +// Copyright (C) <2020> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +#include "ProcessThreadProxy.h" +#include "RTCProcessThread.h" +#include + +namespace rtc_adapter { + +static std::mutex g_threadsMutex; +static std::unordered_map> g_processThreads; +static std::unordered_map g_threadCount; + +ProcessThreadProxy::ProcessThreadProxy(const char* taskName) + : m_name(taskName) +{ +} + +void ProcessThreadProxy::Start() +{ + // The lock is unnecessary if called from same thread + if (g_processThreads.count(m_name) == 0) { + g_processThreads[m_name] = std::make_unique(m_name.c_str()); + g_threadCount[m_name] = 0; + } + g_threadCount[m_name]++; +} + +void ProcessThreadProxy::Stop() +{ + // The lock is unnecessary if called from same thread + if (g_threadCount.count(m_name) > 0) { + g_threadCount[m_name]--; + if (g_threadCount[m_name] == 0) { + g_processThreads.erase(m_name); + } + } + +} + +void ProcessThreadProxy::WakeUp(webrtc::Module* module) +{ + // The lock is unnecessary if called from same thread + if (m_moduleProxyMap.count(module) > 0) { + webrtc::Module* moduleProxy = m_moduleProxyMap[module].get(); + if (g_processThreads.count(m_name) > 0) { + g_processThreads[m_name]->unwrap()->WakeUp(moduleProxy); + } + } +} + +void ProcessThreadProxy::PostTask(std::unique_ptr task) +{ + // The lock is unnecessary if called from same thread + if (g_processThreads.count(m_name) > 0) { + g_processThreads[m_name]->unwrap()->PostTask(std::move(task)); + } +} + +void ProcessThreadProxy::RegisterModule(webrtc::Module* module, const rtc::Location& from) +{ + // The lock is unnecessary if called from same thread + if (m_moduleProxyMap.count(module) == 0) { + m_moduleProxyMap.emplace(module, + std::make_unique(this, module)); + } + webrtc::Module* moduleProxy = m_moduleProxyMap[module].get(); + if (g_processThreads.count(m_name) > 0) { + g_processThreads[m_name]->unwrap()->RegisterModule(moduleProxy, from); + } +} + +void ProcessThreadProxy::DeRegisterModule(webrtc::Module* module) +{ + // The lock is unnecessary if called from same thread + if (m_moduleProxyMap.count(module) > 0) { + webrtc::Module* moduleProxy = m_moduleProxyMap[module].get(); + if (g_processThreads.count(m_name) > 0) { + g_processThreads[m_name]->unwrap()->DeRegisterModule(moduleProxy); + } + m_moduleProxyMap.erase(module); + } +} + +} // namespace rtc_adapter diff --git a/source/core/rtc_adapter/thread/ProcessThreadProxy.h b/source/core/rtc_adapter/thread/ProcessThreadProxy.h index 3297b9949..b90527909 100644 --- a/source/core/rtc_adapter/thread/ProcessThreadProxy.h +++ b/source/core/rtc_adapter/thread/ProcessThreadProxy.h @@ -6,58 +6,68 @@ #define RTC_ADAPTER_THREAD_PROCESS_THREAD_PROXY_ #include +#include #include -#include +#include namespace rtc_adapter { -// ProcessThreadProxy holds a pointer to actual ProcessThread -class ProcessThreadProxy : public webrtc::ProcessThread { +// Proxy the ProcessThreadAttached method to actual thread +class ModuleProxy : public webrtc::Module { public: - ProcessThreadProxy(webrtc::ProcessThread* processThread) - : m_processThread(processThread) + ModuleProxy(webrtc::ProcessThread* thread, webrtc::Module* module) + : m_thread(thread) + , m_module(module) + {} + + virtual int64_t TimeUntilNextProcess() override + { + return m_module->TimeUntilNextProcess(); + } + + virtual void Process() override { - RTC_DCHECK(m_processThread); + m_module->Process(); } + virtual void ProcessThreadAttached(webrtc::ProcessThread* processThread) override + { + m_module->ProcessThreadAttached(m_thread); + } +private: + webrtc::ProcessThread* m_thread; + webrtc::Module* m_module; +}; + +// ProcessThreadProxy is a proxy for global ProcessThread +class ProcessThreadProxy : public webrtc::ProcessThread { +public: + ProcessThreadProxy(const char* taskName); + // Implements ProcessThread - virtual void Start() override {} + virtual void Start() override; // Implements ProcessThread - // Stop() has no effect on proxy - virtual void Stop() override {} + virtual void Stop() override; // Implements ProcessThread // Call actual ProcessThread's WakeUp - virtual void WakeUp(webrtc::Module* module) override - { - m_processThread->WakeUp(module); - } + virtual void WakeUp(webrtc::Module* module) override; // Implements ProcessThread // Call actual ProcessThread's PostTask - virtual void PostTask(std::unique_ptr task) override - { - m_processThread->PostTask(std::move(task)); - } + virtual void PostTask(std::unique_ptr task) override; // Implements ProcessThread // Call actual ProcessThread's RegisterModule - virtual void RegisterModule(webrtc::Module* module, const rtc::Location& from) override - { - m_processThread->RegisterModule(module, from); - } + virtual void RegisterModule(webrtc::Module* module, const rtc::Location& from) override; // Implements ProcessThread // Call actual ProcessThread's DeRegisterModule - virtual void DeRegisterModule(webrtc::Module* module) override - { - m_processThread->DeRegisterModule(module); - } - + virtual void DeRegisterModule(webrtc::Module* module) override; private: - webrtc::ProcessThread* m_processThread; - // std::unordered_set m_modules; + std::string m_name; + std::unordered_map> m_moduleProxyMap; }; } // namespace rtc_adapter diff --git a/source/core/rtc_adapter/thread/RTCProcessThread.h b/source/core/rtc_adapter/thread/RTCProcessThread.h new file mode 100644 index 000000000..3ad050a7f --- /dev/null +++ b/source/core/rtc_adapter/thread/RTCProcessThread.h @@ -0,0 +1,34 @@ +// Copyright (C) <2020> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef RTC_ADAPTER_THREAD_RTC_PROCESS_THREAD__ +#define RTC_ADAPTER_THREAD_RTC_PROCESS_THREAD__ + +#include + +namespace rtc_adapter { + +class RTCProcessThread { +public: + RTCProcessThread(const char* task_name) + : m_processThread(webrtc::ProcessThread::Create(task_name)) + { + m_processThread->Start(); + } + ~RTCProcessThread() + { + m_processThread->Stop(); + } + + webrtc::ProcessThread* unwrap() + { + return m_processThread.get(); + } +private: + std::unique_ptr m_processThread; +}; + +} // namespace rtc_adapter + +#endif diff --git a/source/core/rtc_adapter/thread/StaticTaskQueueFactory.cc b/source/core/rtc_adapter/thread/StaticTaskQueueFactory.cc index 00b064bcd..6d843e9dc 100644 --- a/source/core/rtc_adapter/thread/StaticTaskQueueFactory.cc +++ b/source/core/rtc_adapter/thread/StaticTaskQueueFactory.cc @@ -25,29 +25,37 @@ class TaskQueueDummy final : public webrtc::TaskQueueBase { uint32_t milliseconds) override {} }; -// QueuedTaskProxy only execute when the owner shared_ptr exists -class QueuedTaskProxy : public webrtc::QueuedTask { +// TaskQueueProxy holds a TaskQueueBase* and proxy its method without Delete +class TaskQueueProxy : public webrtc::TaskQueueBase { public: - QueuedTaskProxy(std::unique_ptr task, std::shared_ptr owner) - : m_task(std::move(task)), m_owner(owner) {} + // QueuedTaskProxy only execute when the owner shared_ptr exists + class QueuedTaskProxy : public webrtc::QueuedTask { + public: + QueuedTaskProxy( + std::unique_ptr task, + std::shared_ptr owner, + TaskQueueProxy* parent) + : m_task(std::move(task)) + , m_owner(owner) + , m_parent(parent) {} - // Implements webrtc::QueuedTask - bool Run() override - { - if (auto owner = m_owner.lock()) { + // Implements webrtc::QueuedTask + bool Run() override + { // Only run when owner exists - return m_task->Run(); + if (auto owner = m_owner.lock()) { + // Set current to pass RTC_DCHECK + webrtc::TaskQueueBase::CurrentTaskQueueSetter setCurrent(m_parent); + return m_task->Run(); + } + return true; } - return true; - } -private: - std::unique_ptr m_task; - std::weak_ptr m_owner; -}; + private: + std::unique_ptr m_task; + std::weak_ptr m_owner; + TaskQueueProxy* m_parent; + }; -// TaskQueueProxy holds a TaskQueueBase* and proxy its method without Delete -class TaskQueueProxy : public webrtc::TaskQueueBase { -public: TaskQueueProxy(webrtc::TaskQueueBase* taskQueue) : m_taskQueue(taskQueue), m_sp(std::make_shared(1)) { @@ -70,14 +78,14 @@ class TaskQueueProxy : public webrtc::TaskQueueBase { void PostTask(std::unique_ptr task) override { m_taskQueue->PostTask( - std::make_unique(std::move(task), m_sp)); + std::make_unique(std::move(task), m_sp, this)); } // Implements webrtc::TaskQueueBase void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) override { m_taskQueue->PostDelayedTask( - std::make_unique(std::move(task), m_sp), milliseconds); + std::make_unique(std::move(task), m_sp, this), milliseconds); } private: webrtc::TaskQueueBase* m_taskQueue;