Skip to content

Commit 8298353

Browse files
authored
Tasking module (NVIDIA#5436)
# Tasking module `dali::tasking` is a task execution engine which supports inter-task dependencies, semaphores and data passing. Main concepts of the tasking API are: * Task - a single-use object that encapsulates a function; it can depend on zero or more Waitable objects * Waitable - an object for which a task can wait * Scheduler - manages tasks and resolves their readiness state ## Main components A `Task` is also `Waitable` - that is, a task can depend on the completion of other task(s). There are two kinds of `Waitable` objects - completion events and releasable objects. A `CompletionEvent` (e.g. Task) is an object which remains acquirable after it's "completed". Performing Acquire operation on it does not alter the state of the object. A `Semaphore` can be externally released (it's `Releasable`), but `Acquire` lowers the semaphore count, so the number of tasks that can acquire a semaphore before it's released is limited. Acquisition of `Waitable` objects on behalf of waiting tasks is the duty of the `Scheduler`. By contrast, `Release` operation on `Releasable` objects can be performed in any context. ## Data passing Data can be passed between tasks. It's wrapped in a `TaskResult` type which stores either the value as `std::any` or the error as `std::exception_ptr`. An attempt to access the value with an exception present results in the exception being rethrown. A `Task` wraps a function taking 0 or 1 argument (in the latter case it's this-like pointer to the `Task`) and returning a value of values. Upon creation of the task, the caller defines the number of return values. There are two options: - Scalar return value - in which case the return value can be any objects convertible to `std::any`. If the task returns `std::any`, it's stripped and the stored value is copied. - Multiple return values - in this case the return value of the function must be either a collection or a `tuple`. A `Task` can `Subscribe` to the result of a producer task (which has to be done before the producer is submitted to the scheduler). The result can be obtained by calling `task->GetInputValue<T>(index)` ## Obtaining results When a task is added to the scheduler, a `TaskFuture` object can be obtained. `Scheduler::AddSilentTask` should be used if the result of the task is not needed. Calling `TaskFuture::Value` will wait for the task to complete and return the value produced by the task (or rethrow the exception). ## Error handling All errors thrown by the task functions are stored in the task results. An attempt to obtain results of a task that threw an exception will result in the exception being rethrown. --------- Signed-off-by: Michal Zientkiewicz <[email protected]>
1 parent 15f5912 commit 8298353

File tree

11 files changed

+2142
-2
lines changed

11 files changed

+2142
-2
lines changed

Doxyfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ SHORT_NAMES = NO
177177
# description.)
178178
# The default value is: NO.
179179

180-
JAVADOC_AUTOBRIEF = NO
180+
JAVADOC_AUTOBRIEF = YES
181181

182182
# If the QT_AUTOBRIEF tag is set to YES then doxygen will interpret the first
183183
# line (until the first dot) of a Qt-style comment as the brief description. If

dali/core/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2017-2018, NVIDIA CORPORATION. All rights reserved.
1+
# Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@ project(dali_core CUDA CXX C)
1616

1717
add_subdirectory(mm)
1818
add_subdirectory(os)
19+
add_subdirectory(exec)
1920

2021
# Get all the source files
2122
collect_headers(DALI_INST_HDRS PARENT_SCOPE)

dali/core/exec/CMakeLists.txt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
add_subdirectory(tasking)
16+
17+
# Get all the source files
18+
collect_headers(DALI_INST_HDRS PARENT_SCOPE)
19+
collect_sources(DALI_CORE_SRCS PARENT_SCOPE)
20+
collect_test_sources(DALI_CORE_TEST_SRCS PARENT_SCOPE)

dali/core/exec/tasking/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Get all the source files
16+
collect_headers(DALI_INST_HDRS PARENT_SCOPE)
17+
collect_sources(DALI_CORE_SRCS PARENT_SCOPE)
18+
collect_test_sources(DALI_CORE_TEST_SRCS PARENT_SCOPE)

dali/core/exec/tasking/scheduler.cc

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <cassert>
16+
#include <mutex>
17+
#include <iostream>
18+
#include "dali/core/exec/tasking/scheduler.h"
19+
20+
namespace dali::tasking {
21+
22+
bool Scheduler::AcquireAllAndMoveToReady(SharedTask &task) noexcept {
23+
assert(task->state_ <= TaskState::Pending);
24+
25+
// All or nothing - first we check that all preconditions are met
26+
for (auto &w : task->preconditions_)
27+
if (!w->IsAcquirable())
28+
return false; // at least one unmet
29+
// If they are, we acquire them - this must succeed
30+
for (auto &w : task->preconditions_)
31+
if (!w->TryAcquire(task)) {
32+
std::cerr
33+
<< "Internal error - resource acquisition failed for a resource known to be available"
34+
<< std::endl;
35+
std::abort();
36+
}
37+
38+
task->preconditions_.clear();
39+
task->state_ = TaskState::Ready;
40+
pending_.Remove(task);
41+
ready_.push(std::move(task));
42+
return true;
43+
}
44+
45+
void Scheduler::Notify(Waitable *w) {
46+
bool is_completion_event = dynamic_cast<CompletionEvent *>(w) != nullptr;
47+
bool is_task = is_completion_event && dynamic_cast<Task *>(w);
48+
49+
int new_ready = 0;
50+
{
51+
std::lock_guard g(mtx_);
52+
if (is_task)
53+
task_done_.notify_all();
54+
55+
SmallVector<SharedTask, 8> waiting;
56+
int n = w->waiting_.size();
57+
waiting.reserve(n);
58+
for (int i = 0; i < n; i++)
59+
waiting.emplace_back(w->waiting_[i]);
60+
61+
for (auto &task : waiting) {
62+
// If the waitable is a completion event, it will never become unacquirable again.
63+
// Otherwise, we have to re-check it.
64+
if (!is_completion_event && !w->IsAcquirable())
65+
break;
66+
if (task->Ready())
67+
continue;
68+
69+
// If the task has only one precondition or the waitable is a completion event,
70+
// then we can just try to acquire that waitable on behalf of the task.
71+
// A completion event, once complete, is never un-completed and all waiting threads
72+
// will be able to acquire it. This menas that we can eagerly acquire it without risking
73+
// deadlocks. This imposes less overhead than re-checking all preconditions each time.
74+
if (is_completion_event ||
75+
(task->preconditions_.size() == 1 && task->preconditions_.begin()->get() == w)) {
76+
// try acquire - the only way this can fail is that the task was
77+
// re-checked in another thread and marked as ready...
78+
if (!w->TryAcquire(task)) {
79+
assert(task->preconditions_.size() != 1 || task->preconditions_.begin()->get() != w);
80+
continue; // ... if so, nothing to do
81+
}
82+
auto it = std::find_if(task->preconditions_.begin(), task->preconditions_.end(),
83+
[w](auto &pre) { return pre.get() == w; });
84+
assert(it != task->preconditions_.end());
85+
task->preconditions_.erase(it);
86+
if (task->Ready()) {
87+
pending_.Remove(task);
88+
task->state_ = TaskState::Ready;
89+
ready_.push(std::move(task));
90+
new_ready++;
91+
// OK, the task is ready, we're done with it
92+
continue;
93+
}
94+
}
95+
96+
if (AcquireAllAndMoveToReady(task))
97+
new_ready++;
98+
}
99+
}
100+
101+
102+
if (new_ready == 1)
103+
this->task_ready_.notify_one();
104+
else if (new_ready > 1)
105+
this->task_ready_.notify_all();
106+
}
107+
108+
} // namespace dali::tasking

0 commit comments

Comments
 (0)