Skip to content

Commit e788f66

Browse files
committed
Fix double free in loader destructor and race condition in consumer/producer (#678)
Signed-off-by: Joaquin Anton <[email protected]>
1 parent e95470e commit e788f66

21 files changed

+125
-181
lines changed

dali/pipeline/operators/reader/caffe2_reader_op.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,7 @@ class Caffe2Reader : public DataReader<CPUBackend, Tensor<CPUBackend>> {
3030
}
3131

3232
void RunImpl(SampleWorkspace* ws, const int i) override {
33-
const int idx = ws->data_idx();
34-
35-
auto* raw_data = GetSample(idx);
36-
37-
parser_->Parse(*raw_data, ws);
38-
39-
return;
33+
parser_->Parse(GetSample(ws->data_idx()), ws);
4034
}
4135

4236
protected:

dali/pipeline/operators/reader/caffe_reader_op.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,7 @@ class CaffeReader : public DataReader<CPUBackend, Tensor<CPUBackend>> {
3030
}
3131

3232
void RunImpl(SampleWorkspace* ws, const int i) override {
33-
const int idx = ws->data_idx();
34-
35-
auto* raw_data = GetSample(idx);
36-
37-
parser_->Parse(*raw_data, ws);
38-
39-
return;
33+
parser_->Parse(GetSample(ws->data_idx()), ws);
4034
}
4135

4236
protected:

dali/pipeline/operators/reader/coco_reader_op.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,7 @@ class COCOReader : public DataReader<CPUBackend, ImageLabelWrapper> {
6161
}
6262

6363
void RunImpl(SampleWorkspace* ws, const int i) override {
64-
const int idx = ws->data_idx();
65-
66-
auto* image_label = GetSample(idx);
67-
68-
parser_->Parse(*image_label, ws);
69-
70-
return;
64+
parser_->Parse(GetSample(ws->data_idx()), ws);
7165
}
7266

7367
protected:

dali/pipeline/operators/reader/file_reader_op.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,24 @@ class FileReader : public DataReader<CPUBackend, ImageLabelWrapper> {
3030
void RunImpl(SampleWorkspace *ws, const int i) override {
3131
const int idx = ws->data_idx();
3232

33-
auto* image_label = GetSample(idx);
33+
const auto& image_label = GetSample(idx);
3434

3535
// copy from raw_data -> outputs directly
3636
auto &image_output = ws->Output<CPUBackend>(0);
3737
auto &label_output = ws->Output<CPUBackend>(1);
3838

39-
Index image_size = image_label->image.size();
39+
Index image_size = image_label.image.size();
4040

4141
image_output.Resize({image_size});
4242
image_output.mutable_data<uint8_t>();
4343
label_output.Resize({1});
4444

4545
std::memcpy(image_output.raw_mutable_data(),
46-
image_label->image.raw_data(),
46+
image_label.image.raw_data(),
4747
image_size);
48-
image_output.SetSourceInfo(image_label->image.GetSourceInfo());
48+
image_output.SetSourceInfo(image_label.image.GetSourceInfo());
4949

50-
label_output.mutable_data<int>()[0] = image_label->label;
51-
return;
50+
label_output.mutable_data<int>()[0] = image_label.label;
5251
}
5352

5453
protected:

dali/pipeline/operators/reader/loader/file_loader.cc

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ vector<std::pair<string, int>> filesystem::traverse_directories(const std::strin
9090
return file_label_pairs;
9191
}
9292

93-
void FileLoader::PrepareEmpty(ImageLabelWrapper *image_label) {
94-
PrepareEmptyTensor(&image_label->image);
93+
void FileLoader::PrepareEmpty(ImageLabelWrapper &image_label) {
94+
PrepareEmptyTensor(image_label.image);
9595
}
9696

97-
void FileLoader::ReadSample(ImageLabelWrapper* image_label) {
97+
void FileLoader::ReadSample(ImageLabelWrapper &image_label) {
9898
auto image_pair = image_label_pairs_[current_index_++];
9999

100100
// handle wrap-around
@@ -104,25 +104,22 @@ void FileLoader::ReadSample(ImageLabelWrapper* image_label) {
104104
Index image_size = current_image->Size();
105105

106106
if (copy_read_data_) {
107-
image_label->image.Resize({image_size});
107+
image_label.image.Resize({image_size});
108108
// copy the image
109-
current_image->Read(image_label->image.mutable_data<uint8_t>(), image_size);
109+
current_image->Read(image_label.image.mutable_data<uint8_t>(), image_size);
110110
} else {
111111
auto p = current_image->Get(image_size);
112112
// Wrap the raw data in the Tensor object.
113-
image_label->image.ShareData(p, image_size, {image_size});
114-
115-
TypeInfo type;
116-
type.SetType<uint8_t>();
117-
image_label->image.set_type(type);
113+
image_label.image.ShareData(p, image_size, {image_size});
114+
image_label.image.set_type(TypeInfo::Create<uint8_t>());
118115
}
119116

120-
image_label->image.SetSourceInfo(image_pair.first);
117+
image_label.image.SetSourceInfo(image_pair.first);
121118
// close the file handle
122119
current_image->Close();
123120

124121
// copy the label
125-
image_label->label = image_pair.second;
122+
image_label.label = image_pair.second;
126123
}
127124

128125
Index FileLoader::Size() {

dali/pipeline/operators/reader/loader/file_loader.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ class FileLoader : public Loader<CPUBackend, ImageLabelWrapper> {
8989
Reset(true);
9090
}
9191

92-
void PrepareEmpty(ImageLabelWrapper *tensor) override;
93-
void ReadSample(ImageLabelWrapper *tensor) override;
92+
void PrepareEmpty(ImageLabelWrapper &tensor) override;
93+
void ReadSample(ImageLabelWrapper &tensor) override;
9494

9595
Index Size() override;
9696

dali/pipeline/operators/reader/loader/indexed_file_loader.h

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class IndexedFileLoader : public Loader<CPUBackend, Tensor<CPUBackend>> {
3737
Init(options);
3838
}
3939

40-
void ReadSample(Tensor<CPUBackend>* tensor) override {
40+
void ReadSample(Tensor<CPUBackend>& tensor) override {
4141
MoveToNextShard(current_index_);
4242

4343
int64 seek_pos, size;
@@ -53,21 +53,18 @@ class IndexedFileLoader : public Loader<CPUBackend, Tensor<CPUBackend>> {
5353
auto p = current_file_->Get(size);
5454
DALI_ENFORCE(p != nullptr, "Error reading from a file " + uris_[current_file_index_]);
5555
// Wrap the raw data in the Tensor object.
56-
tensor->ShareData(p, size, {size});
57-
58-
TypeInfo type;
59-
type.SetType<uint8_t>();
60-
tensor->set_type(type);
56+
tensor.ShareData(p, size, {size});
57+
tensor.set_type(TypeInfo::Create<uint8_t>());
6158
} else {
62-
tensor->Resize({size});
63-
tensor->mutable_data<uint8_t>();
59+
tensor.set_type(TypeInfo::Create<uint8_t>());
60+
tensor.Resize({size});
6461

65-
int64 n_read = current_file_->Read(reinterpret_cast<uint8_t*>(tensor->raw_mutable_data()),
62+
int64 n_read = current_file_->Read(reinterpret_cast<uint8_t*>(tensor.raw_mutable_data()),
6663
size);
6764
DALI_ENFORCE(n_read == size, "Error reading from a file " + uris_[current_file_index_]);
6865
}
6966

70-
tensor->SetSourceInfo(uris_[current_file_index_] + " at index " + to_string(seek_pos));
67+
tensor.SetSourceInfo(uris_[current_file_index_] + " at index " + to_string(seek_pos));
7168
++current_index_;
7269
return;
7370
}

dali/pipeline/operators/reader/loader/lmdb.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,18 @@ class LMDBReader : public Loader<CPUBackend, Tensor<CPUBackend>> {
9191
mdb_env_ = nullptr;
9292
}
9393

94-
void ReadSample(Tensor<CPUBackend>* tensor) override {
94+
void ReadSample(Tensor<CPUBackend>& tensor) override {
9595
// assume cursor is valid, read next, loop to start if necessary
9696
lmdb::SeekLMDB(mdb_cursor_, MDB_NEXT, &key_, &value_);
9797
++current_index_;
9898

9999
MoveToNextShard(current_index_);
100100

101-
tensor->Resize({static_cast<Index>(value_.mv_size)});
102-
tensor->mutable_data<uint8_t>();
103-
tensor->SetSourceInfo(db_path_ + " at key " + to_string(reinterpret_cast<char*>(key_.mv_data)));
101+
tensor.Resize({static_cast<Index>(value_.mv_size)});
102+
tensor.set_type(TypeInfo::Create<uint8_t>());
103+
tensor.SetSourceInfo(db_path_ + " at key " + to_string(reinterpret_cast<char*>(key_.mv_data)));
104104

105-
std::memcpy(tensor->raw_mutable_data(),
105+
std::memcpy(tensor.raw_mutable_data(),
106106
reinterpret_cast<uint8_t*>(value_.mv_data),
107107
value_.mv_size*sizeof(uint8_t));
108108

dali/pipeline/operators/reader/loader/loader.h

Lines changed: 37 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
#ifndef DALI_PIPELINE_OPERATORS_READER_LOADER_LOADER_H_
1616
#define DALI_PIPELINE_OPERATORS_READER_LOADER_LOADER_H_
1717

18-
#include <list>
1918
#include <map>
19+
#include <memory>
2020
#include <mutex>
2121
#include <random>
2222
#include <string>
23+
#include <type_traits>
2324
#include <utility>
2425
#include <vector>
25-
#include <type_traits>
2626

2727
#include "dali/common.h"
2828
#include "dali/error_handling.h"
@@ -44,7 +44,7 @@ DLL_PUBLIC size_t start_index(const size_t shard_id,
4444
template <typename Backend, typename LoadTarget>
4545
class Loader {
4646
public:
47-
using LoadTarget_t = LoadTarget;
47+
using LoadTargetPtr = std::unique_ptr<LoadTarget>;
4848
explicit Loader(const OpSpec& options)
4949
: shuffle_(options.GetArgument<bool>("random_shuffle")),
5050
initial_buffer_fill_(shuffle_ ? options.GetArgument<int>("initial_fill") : 1),
@@ -65,105 +65,93 @@ class Loader {
6565
e_ = std::default_random_engine(seq);
6666
}
6767

68-
virtual ~Loader() {
69-
// delete all the temporary tensors
70-
while (!sample_buffer_.empty()) {
71-
LoadTarget * t = sample_buffer_.back();
72-
delete t;
73-
sample_buffer_.pop_back();
74-
}
75-
while (!empty_tensors_.empty()) {
76-
LoadTarget * t = empty_tensors_.back();
77-
delete t;
78-
empty_tensors_.pop_back();
79-
}
80-
}
68+
virtual ~Loader() = default;
8169

82-
virtual void PrepareEmpty(LoadTarget *tensor) {
70+
virtual void PrepareEmpty(LoadTarget& tensor) {
8371
PrepareEmptyTensor(tensor);
8472
}
8573

8674
template <typename T>
8775
typename std::enable_if<std::is_same<T, Tensor<CPUBackend>>::value>::type
88-
PrepareEmptyTensor(T *tensor) {
89-
tensor->set_pinned(false);
76+
PrepareEmptyTensor(T& tensor) {
77+
tensor.set_pinned(false);
9078
// Initialize tensors to a set size to limit expensive reallocations
91-
tensor->Resize({tensor_init_bytes_});
92-
tensor->template mutable_data<uint8_t>();
79+
tensor.Resize({tensor_init_bytes_});
80+
tensor.template mutable_data<uint8_t>();
9381
}
9482

9583
template <typename T>
9684
typename std::enable_if<!std::is_same<T, Tensor<CPUBackend>>::value>::type
97-
PrepareEmptyTensor(T *) {
85+
PrepareEmptyTensor(T&) {
9886
constexpr bool T_is_Tensor = std::is_same<T, Tensor<CPUBackend>>::value;
9987
DALI_ENFORCE(T_is_Tensor,
10088
"Please overload PrepareEmpty for custom LoadTarget type other than Tensor");
10189
}
10290

10391

10492
// Get a random read sample
105-
LoadTarget* ReadOne() {
93+
LoadTargetPtr ReadOne() {
10694
TimeRange tr("[Loader] ReadOne", TimeRange::kGreen1);
10795
// perform an iniital buffer fill if it hasn't already happened
10896
if (!initial_buffer_filled_) {
10997
TimeRange tr("[Loader] Filling initial buffer", TimeRange::kBlue1);
98+
11099
// Read an initial number of samples to fill our
111100
// sample buffer
112101
for (int i = 0; i < initial_buffer_fill_; ++i) {
113-
LoadTarget* tensor = new LoadTarget();
114-
PrepareEmpty(tensor);
115-
116-
ReadSample(tensor);
117-
sample_buffer_.push_back(tensor);
102+
auto tensor_ptr = LoadTargetPtr(new LoadTarget());
103+
PrepareEmpty(*tensor_ptr);
104+
ReadSample(*tensor_ptr);
105+
sample_buffer_.push_back(std::move(tensor_ptr));
118106
}
119107

120-
TimeRange tr2("[Loader] Filling empty list", TimeRange::kOrange);
121108
// need some entries in the empty_tensors_ list
109+
TimeRange tr2("[Loader] Filling empty list", TimeRange::kOrange);
110+
std::lock_guard<std::mutex> lock(empty_tensors_mutex_);
122111
for (int i = 0; i < initial_empty_size_; ++i) {
123-
LoadTarget* tensor = new LoadTarget();
124-
PrepareEmpty(tensor);
125-
126-
empty_tensors_.push_back(tensor);
112+
auto tensor_ptr = LoadTargetPtr(new LoadTarget());
113+
PrepareEmpty(*tensor_ptr);
114+
empty_tensors_.push_back(std::move(tensor_ptr));
127115
}
128116

129117
initial_buffer_filled_ = true;
130118
}
131119
// choose the random index
132120
int idx = shuffle_ ? dis(e_) % sample_buffer_.size() : 0;
133-
LoadTarget* elem = sample_buffer_[idx];
134121

135122
// swap end and idx, return the tensor to empties
136-
std::swap(sample_buffer_[idx], sample_buffer_[sample_buffer_.size()-1]);
123+
std::swap(sample_buffer_[idx], sample_buffer_.back());
137124
// remove last element
125+
LoadTargetPtr sample_ptr = std::move(sample_buffer_.back());
138126
sample_buffer_.pop_back();
139127

140128
// now grab an empty tensor, fill it and add to filled buffers
141-
// empty_tensors_ needs to be thread-safe w.r.t. ReturnTensor()
129+
// empty_tensors_ needs to be thread-safe w.r.t. RecycleTensor()
142130
// being called by multiple consumer threads
143-
LoadTarget* t;
131+
LoadTargetPtr tensor_ptr;
144132
{
145-
std::lock_guard<std::mutex> lock(return_mutex_);
133+
std::lock_guard<std::mutex> lock(empty_tensors_mutex_);
146134
DALI_ENFORCE(empty_tensors_.size() > 0, "No empty tensors - did you forget to return them?");
147-
t = empty_tensors_.back();
135+
tensor_ptr = std::move(empty_tensors_.back());
148136
empty_tensors_.pop_back();
149137
}
150-
ReadSample(t);
151-
sample_buffer_.push_back(t);
138+
ReadSample(*tensor_ptr);
139+
sample_buffer_.push_back(std::move(tensor_ptr));
152140

153-
return elem;
141+
return sample_ptr;
154142
}
155143

156144
// return a tensor to the empty pile
157145
// called by multiple consumer threads
158-
void ReturnTensor(LoadTarget* tensor) {
159-
std::lock_guard<std::mutex> lock(return_mutex_);
160-
empty_tensors_.push_back(tensor);
146+
void RecycleTensor(LoadTargetPtr&& tensor_ptr) {
147+
std::lock_guard<std::mutex> lock(empty_tensors_mutex_);
148+
empty_tensors_.push_back(std::move(tensor_ptr));
161149
}
162150

163151
// Read an actual sample from the FileStore,
164152
// used to populate the sample buffer for "shuffled"
165153
// reads.
166-
virtual void ReadSample(LoadTarget* tensor) = 0;
154+
virtual void ReadSample(LoadTarget& tensor) = 0;
167155

168156
// Give the size of the data accessed through the Loader
169157
virtual Index Size() = 0;
@@ -183,9 +171,9 @@ class Loader {
183171
(stick_to_shard_ && shard_id_ + 1 < num_shards_ &&
184172
current_index >= static_cast<Index>(start_index(shard_id_ + 1, num_shards_, Size())));
185173
}
186-
std::vector<LoadTarget*> sample_buffer_;
174+
std::vector<LoadTargetPtr> sample_buffer_;
187175

188-
std::list<LoadTarget*> empty_tensors_;
176+
std::vector<LoadTargetPtr> empty_tensors_;
189177

190178
// number of samples to initialize buffer with
191179
// ~1 minibatch seems reasonable
@@ -201,7 +189,7 @@ class Loader {
201189
Index seed_;
202190

203191
// control return of tensors
204-
std::mutex return_mutex_;
192+
std::mutex empty_tensors_mutex_;
205193

206194
// sharding
207195
const int shard_id_;

0 commit comments

Comments
 (0)