Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DCHECK in debug rtc library #734

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/agent/webrtc/rtcFrame/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
'<(source_rel_dir)/core/rtc_adapter/VideoReceiveAdapter.cc',
'<(source_rel_dir)/core/rtc_adapter/VideoSendAdapter.cc',
'<(source_rel_dir)/core/rtc_adapter/AudioSendAdapter.cc',
'<(source_rel_dir)/core/rtc_adapter/thread/ProcessThreadProxy.cc',
'<(source_rel_dir)/core/rtc_adapter/thread/StaticTaskQueueFactory.cc',
'<(source_rel_dir)/core/owt_base/SsrcGenerator.cc',
'<(source_rel_dir)/core/owt_base/AudioUtilitiesNew.cpp',
Expand Down
37 changes: 11 additions & 26 deletions source/core/rtc_adapter/RtcAdapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,13 @@
#include <memory>
#include <mutex>

#include <rtc_base/event.h>
#include <system_wrappers/include/clock.h>

namespace rtc_adapter {

class RTCProcessThread {
public:
RTCProcessThread(const char* task_name)
: m_processThread(webrtc::ProcessThread::Create(task_name))
{
m_processThread->Start();
}
~RTCProcessThread()
{
m_processThread->Stop();
}

webrtc::ProcessThread* unwrap()
{
return m_processThread.get();
}
private:
std::unique_ptr<webrtc::ProcessThread> m_processThread;
};
namespace rtc_adapter {

static std::unique_ptr<RTCProcessThread> g_moduleThread
= std::make_unique<RTCProcessThread>("ModuleProcessThread");
static std::unique_ptr<RTCProcessThread> g_pacerThread
= std::make_unique<RTCProcessThread>("PacerThread");
static const int kCallDestroyTimeoutMs = 3000;

class RtcAdapterImpl : public RtcAdapter,
public CallOwner {
Expand Down Expand Up @@ -87,6 +66,12 @@ RtcAdapterImpl::RtcAdapterImpl()

RtcAdapterImpl::~RtcAdapterImpl()
{
rtc::Event done;
m_taskQueue->PostTask(webrtc::ToQueuedTask([this, &done] {
m_call.reset();
done.Set();
}));
done.Wait(kCallDestroyTimeoutMs);
}

void RtcAdapterImpl::initCall()
Expand All @@ -98,9 +83,9 @@ void RtcAdapterImpl::initCall()
call_config.task_queue_factory = m_taskQueueFactory.get();

std::unique_ptr<webrtc::ProcessThread> moduleThreadProxy =
std::make_unique<ProcessThreadProxy>(g_moduleThread->unwrap());
std::make_unique<ProcessThreadProxy>("ModuleProcessThread");
std::unique_ptr<webrtc::ProcessThread> pacerThreadProxy =
std::make_unique<ProcessThreadProxy>(g_pacerThread->unwrap());
std::make_unique<ProcessThreadProxy>("PacerThread");
m_call.reset(webrtc::Call::Create(
call_config, webrtc::Clock::GetRealTimeClock(),
std::move(moduleThreadProxy),
Expand Down
87 changes: 87 additions & 0 deletions source/core/rtc_adapter/thread/ProcessThreadProxy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (C) <2020> Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

#include "ProcessThreadProxy.h"
#include "RTCProcessThread.h"
#include <mutex>

namespace rtc_adapter {

static std::mutex g_threadsMutex;
static std::unordered_map<std::string,
std::unique_ptr<RTCProcessThread>> g_processThreads;
static std::unordered_map<std::string, int> g_threadCount;

ProcessThreadProxy::ProcessThreadProxy(const char* taskName)
: m_name(taskName)
{
}

void ProcessThreadProxy::Start()
{
// The lock is unnecessary if called from same thread
if (g_processThreads.count(m_name) == 0) {
g_processThreads[m_name] = std::make_unique<RTCProcessThread>(m_name.c_str());
g_threadCount[m_name] = 0;
}
g_threadCount[m_name]++;
}

void ProcessThreadProxy::Stop()
{
// The lock is unnecessary if called from same thread
if (g_threadCount.count(m_name) > 0) {
g_threadCount[m_name]--;
if (g_threadCount[m_name] == 0) {
g_processThreads.erase(m_name);
}
}

}

void ProcessThreadProxy::WakeUp(webrtc::Module* module)
{
// The lock is unnecessary if called from same thread
if (m_moduleProxyMap.count(module) > 0) {
webrtc::Module* moduleProxy = m_moduleProxyMap[module].get();
if (g_processThreads.count(m_name) > 0) {
g_processThreads[m_name]->unwrap()->WakeUp(moduleProxy);
}
}
}

void ProcessThreadProxy::PostTask(std::unique_ptr<webrtc::QueuedTask> task)
{
// The lock is unnecessary if called from same thread
if (g_processThreads.count(m_name) > 0) {
g_processThreads[m_name]->unwrap()->PostTask(std::move(task));
}
}

void ProcessThreadProxy::RegisterModule(webrtc::Module* module, const rtc::Location& from)
{
// The lock is unnecessary if called from same thread
if (m_moduleProxyMap.count(module) == 0) {
m_moduleProxyMap.emplace(module,
std::make_unique<ModuleProxy>(this, module));
}
webrtc::Module* moduleProxy = m_moduleProxyMap[module].get();
if (g_processThreads.count(m_name) > 0) {
g_processThreads[m_name]->unwrap()->RegisterModule(moduleProxy, from);
}
}

void ProcessThreadProxy::DeRegisterModule(webrtc::Module* module)
{
// The lock is unnecessary if called from same thread
if (m_moduleProxyMap.count(module) > 0) {
webrtc::Module* moduleProxy = m_moduleProxyMap[module].get();
if (g_processThreads.count(m_name) > 0) {
g_processThreads[m_name]->unwrap()->DeRegisterModule(moduleProxy);
}
m_moduleProxyMap.erase(module);
}
}

} // namespace rtc_adapter
66 changes: 38 additions & 28 deletions source/core/rtc_adapter/thread/ProcessThreadProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,58 +6,68 @@
#define RTC_ADAPTER_THREAD_PROCESS_THREAD_PROXY_

#include <modules/utility/include/process_thread.h>
#include <modules/include/module.h>
#include <rtc_base/checks.h>
#include <unordered_set>
#include <unordered_map>

namespace rtc_adapter {

// ProcessThreadProxy holds a pointer to actual ProcessThread
class ProcessThreadProxy : public webrtc::ProcessThread {
// Proxy the ProcessThreadAttached method to actual thread
class ModuleProxy : public webrtc::Module {
public:
ProcessThreadProxy(webrtc::ProcessThread* processThread)
: m_processThread(processThread)
ModuleProxy(webrtc::ProcessThread* thread, webrtc::Module* module)
: m_thread(thread)
, m_module(module)
{}

virtual int64_t TimeUntilNextProcess() override
{
return m_module->TimeUntilNextProcess();
}

virtual void Process() override
{
RTC_DCHECK(m_processThread);
m_module->Process();
}

virtual void ProcessThreadAttached(webrtc::ProcessThread* processThread) override
{
m_module->ProcessThreadAttached(m_thread);
}
private:
webrtc::ProcessThread* m_thread;
webrtc::Module* m_module;
};

// ProcessThreadProxy is a proxy for global ProcessThread
class ProcessThreadProxy : public webrtc::ProcessThread {
public:
ProcessThreadProxy(const char* taskName);

// Implements ProcessThread
virtual void Start() override {}
virtual void Start() override;

// Implements ProcessThread
// Stop() has no effect on proxy
virtual void Stop() override {}
virtual void Stop() override;

// Implements ProcessThread
// Call actual ProcessThread's WakeUp
virtual void WakeUp(webrtc::Module* module) override
{
m_processThread->WakeUp(module);
}
virtual void WakeUp(webrtc::Module* module) override;

// Implements ProcessThread
// Call actual ProcessThread's PostTask
virtual void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override
{
m_processThread->PostTask(std::move(task));
}
virtual void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;

// Implements ProcessThread
// Call actual ProcessThread's RegisterModule
virtual void RegisterModule(webrtc::Module* module, const rtc::Location& from) override
{
m_processThread->RegisterModule(module, from);
}
virtual void RegisterModule(webrtc::Module* module, const rtc::Location& from) override;

// Implements ProcessThread
// Call actual ProcessThread's DeRegisterModule
virtual void DeRegisterModule(webrtc::Module* module) override
{
m_processThread->DeRegisterModule(module);
}

virtual void DeRegisterModule(webrtc::Module* module) override;
private:
webrtc::ProcessThread* m_processThread;
// std::unordered_set<webrtc::Module*> m_modules;
std::string m_name;
std::unordered_map<webrtc::Module*, std::unique_ptr<ModuleProxy>> m_moduleProxyMap;
};

} // namespace rtc_adapter
Expand Down
34 changes: 34 additions & 0 deletions source/core/rtc_adapter/thread/RTCProcessThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (C) <2020> Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

#ifndef RTC_ADAPTER_THREAD_RTC_PROCESS_THREAD__
#define RTC_ADAPTER_THREAD_RTC_PROCESS_THREAD__

#include <modules/utility/include/process_thread.h>

namespace rtc_adapter {

class RTCProcessThread {
public:
RTCProcessThread(const char* task_name)
: m_processThread(webrtc::ProcessThread::Create(task_name))
{
m_processThread->Start();
}
~RTCProcessThread()
{
m_processThread->Stop();
}

webrtc::ProcessThread* unwrap()
{
return m_processThread.get();
}
private:
std::unique_ptr<webrtc::ProcessThread> m_processThread;
};

} // namespace rtc_adapter

#endif
48 changes: 28 additions & 20 deletions source/core/rtc_adapter/thread/StaticTaskQueueFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,37 @@ class TaskQueueDummy final : public webrtc::TaskQueueBase {
uint32_t milliseconds) override {}
};

// QueuedTaskProxy only execute when the owner shared_ptr exists
class QueuedTaskProxy : public webrtc::QueuedTask {
// TaskQueueProxy holds a TaskQueueBase* and proxy its method without Delete
class TaskQueueProxy : public webrtc::TaskQueueBase {
public:
QueuedTaskProxy(std::unique_ptr<webrtc::QueuedTask> task, std::shared_ptr<int> owner)
: m_task(std::move(task)), m_owner(owner) {}
// QueuedTaskProxy only execute when the owner shared_ptr exists
class QueuedTaskProxy : public webrtc::QueuedTask {
public:
QueuedTaskProxy(
std::unique_ptr<webrtc::QueuedTask> task,
std::shared_ptr<int> owner,
TaskQueueProxy* parent)
: m_task(std::move(task))
, m_owner(owner)
, m_parent(parent) {}

// Implements webrtc::QueuedTask
bool Run() override
{
if (auto owner = m_owner.lock()) {
// Implements webrtc::QueuedTask
bool Run() override
{
// Only run when owner exists
return m_task->Run();
if (auto owner = m_owner.lock()) {
// Set current to pass RTC_DCHECK
webrtc::TaskQueueBase::CurrentTaskQueueSetter setCurrent(m_parent);
return m_task->Run();
}
return true;
}
return true;
}
private:
std::unique_ptr<webrtc::QueuedTask> m_task;
std::weak_ptr<int> m_owner;
};
private:
std::unique_ptr<webrtc::QueuedTask> m_task;
std::weak_ptr<int> m_owner;
TaskQueueProxy* m_parent;
};

// TaskQueueProxy holds a TaskQueueBase* and proxy its method without Delete
class TaskQueueProxy : public webrtc::TaskQueueBase {
public:
TaskQueueProxy(webrtc::TaskQueueBase* taskQueue)
: m_taskQueue(taskQueue), m_sp(std::make_shared<int>(1))
{
Expand All @@ -70,14 +78,14 @@ class TaskQueueProxy : public webrtc::TaskQueueBase {
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override
{
m_taskQueue->PostTask(
std::make_unique<QueuedTaskProxy>(std::move(task), m_sp));
std::make_unique<QueuedTaskProxy>(std::move(task), m_sp, this));
}
// Implements webrtc::TaskQueueBase
void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) override
{
m_taskQueue->PostDelayedTask(
std::make_unique<QueuedTaskProxy>(std::move(task), m_sp), milliseconds);
std::make_unique<QueuedTaskProxy>(std::move(task), m_sp, this), milliseconds);
}
private:
webrtc::TaskQueueBase* m_taskQueue;
Expand Down