Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions port/cpl_vsil_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4992,25 +4992,30 @@ bool IVSIS3LikeFSHandler::Sync(const char *pszSource, const char *pszTarget,

struct JobQueue
{
IVSIS3LikeFSHandler *poFS;
IVSIS3LikeFSHandlerWithMultipartUpload *poTargetFSMultipartHandler;
// Immutable variables
IVSIS3LikeFSHandler *const poFS;
IVSIS3LikeFSHandlerWithMultipartUpload *const
poTargetFSMultipartHandler;
const std::vector<ChunkToCopy> &aoChunksToCopy;
const std::vector<size_t> &anIndexToCopy;
std::map<std::string, MultiPartDef> &oMapMultiPartDefs;
volatile int iCurIdx = 0;
volatile bool ret = true;
volatile bool stop = false;
std::string osSourceDir{};
std::string osTargetDir{};
std::string osSource{};
std::string osTarget{};
std::mutex sMutex{};
uint64_t nTotalCopied = 0;
bool bSupportsParallelMultipartUpload = false;
size_t nMaxChunkSize = 0;
const std::string osSourceDir;
const std::string osTargetDir;
const std::string osSource;
const std::string osTarget;
const bool bSupportsParallelMultipartUpload;
const size_t nMaxChunkSize;
const CPLHTTPRetryParameters &oRetryParameters;
const CPLStringList &aosObjectCreationOptions;

// All variables below are modified in threads and their access must
// be protected by sMutex.
std::mutex sMutex{};
std::map<std::string, MultiPartDef> &oMapMultiPartDefs;
uint64_t nTotalCopied = 0;
int iCurIdx = 0;
bool ret = true;
bool stop = false;

JobQueue(IVSIS3LikeFSHandler *poFSIn,
IVSIS3LikeFSHandlerWithMultipartUpload
*poTargetFSMultipartHandlerIn,
Expand All @@ -5027,14 +5032,14 @@ bool IVSIS3LikeFSHandler::Sync(const char *pszSource, const char *pszTarget,
: poFS(poFSIn),
poTargetFSMultipartHandler(poTargetFSMultipartHandlerIn),
aoChunksToCopy(aoChunksToCopyIn), anIndexToCopy(anIndexToCopyIn),
oMapMultiPartDefs(oMapMultiPartDefsIn),
osSourceDir(osSourceDirIn), osTargetDir(osTargetDirIn),
osSource(osSourceIn), osTarget(osTargetIn),
bSupportsParallelMultipartUpload(
bSupportsParallelMultipartUploadIn),
nMaxChunkSize(nMaxChunkSizeIn),
oRetryParameters(oRetryParametersIn),
aosObjectCreationOptions(aosObjectCreationOptionsIn)
aosObjectCreationOptions(aosObjectCreationOptionsIn),
oMapMultiPartDefs(oMapMultiPartDefsIn)
{
}

Expand All @@ -5057,23 +5062,34 @@ bool IVSIS3LikeFSHandler::Sync(const char *pszSource, const char *pszTarget,
static_cast<ProgressData *>(pProgressDataIn);
const auto nInc = static_cast<uint64_t>(
(pct - pProgress->dfLastPct) * pProgress->nFileSize + 0.5);
pProgress->queue->sMutex.lock();
pProgress->queue->nTotalCopied += nInc;
pProgress->queue->sMutex.unlock();
{
std::lock_guard oLock(pProgress->queue->sMutex);
pProgress->queue->nTotalCopied += nInc;
}
pProgress->dfLastPct = pct;
return TRUE;
}
};

JobQueue *queue = static_cast<JobQueue *>(pDataIn);
while (!queue->stop)
while (true)
{
const int idx = CPLAtomicInc(&(queue->iCurIdx)) - 1;
if (static_cast<size_t>(idx) >= queue->anIndexToCopy.size())
auto [idx, bStop] = [queue]()
{
queue->stop = true;
std::lock_guard oLock(queue->sMutex);
const int newIdx = queue->iCurIdx;
if (static_cast<size_t>(newIdx) >= queue->anIndexToCopy.size())
{
queue->stop = true;
}
else
{
++queue->iCurIdx;
}
return std::pair(newIdx, queue->stop);
}();
if (bStop)
break;
}
const auto &chunk =
queue->aoChunksToCopy[queue->anIndexToCopy[idx]];
const std::string osSubSource(
Expand Down Expand Up @@ -5156,6 +5172,7 @@ bool IVSIS3LikeFSHandler::Sync(const char *pszSource, const char *pszTarget,
}
else
{
std::lock_guard oLock(queue->sMutex);
queue->ret = false;
queue->stop = true;
}
Expand All @@ -5169,6 +5186,7 @@ bool IVSIS3LikeFSHandler::Sync(const char *pszSource, const char *pszTarget,
queue->aosObjectCreationOptions.List(),
ProgressData::progressFunc, &progressData) != 0)
{
std::lock_guard oLock(queue->sMutex);
queue->ret = false;
queue->stop = true;
}
Expand All @@ -5190,6 +5208,7 @@ bool IVSIS3LikeFSHandler::Sync(const char *pszSource, const char *pszTarget,
auto hThread = CPLCreateJoinableThread(threadFunc, &sJobQueue);
if (!hThread)
{
std::lock_guard oLock(sJobQueue.sMutex);
sJobQueue.ret = false;
sJobQueue.stop = true;
break;
Expand All @@ -5199,20 +5218,32 @@ bool IVSIS3LikeFSHandler::Sync(const char *pszSource, const char *pszTarget,
if (pProgressFunc)
{
const uint64_t nTotalSizeDenom = std::max<uint64_t>(1, nTotalSize);
while (!sJobQueue.stop)
while (![&sJobQueue]()
{
std::lock_guard oLock(sJobQueue.sMutex);
return sJobQueue.stop;
}())
{
CPLSleep(0.1);
sJobQueue.sMutex.lock();
const auto nTotalCopied = sJobQueue.nTotalCopied;
sJobQueue.sMutex.unlock();
const auto nTotalCopied = [&sJobQueue]()
{
std::lock_guard oLock(sJobQueue.sMutex);
return sJobQueue.nTotalCopied;
}();
if (!pProgressFunc(double(nTotalCopied) / nTotalSizeDenom, "",
pProgressData))
{
std::lock_guard oLock(sJobQueue.sMutex);
sJobQueue.ret = false;
sJobQueue.stop = true;
}
}
if (sJobQueue.ret)
if (
[&sJobQueue]()
{
std::lock_guard oLock(sJobQueue.sMutex);
return sJobQueue.ret;
}())
{
pProgressFunc(1.0, "", pProgressData);
}
Expand Down
Loading