Skip to content

Commit

Permalink
Use database for performing check for intersecting periods being proc…
Browse files Browse the repository at this point in the history
…essed
  • Loading branch information
sgiehl committed Jul 15, 2024
1 parent ba2a23b commit 9178d93
Show file tree
Hide file tree
Showing 3 changed files with 475 additions and 347 deletions.
49 changes: 24 additions & 25 deletions core/CronArchive/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public function getNextArchivesToProcess()
}

$reason = $this->shouldSkipArchiveBecauseLowerPeriodOrSegmentIsInProgress($invalidatedArchive);
if ($reason) {
if ($reason !== null) {
$this->logger->debug("Skipping invalidated archive, $reason: $invalidationDesc");
$invalidationsToExcludeInBatch[$invalidatedArchive['idinvalidation']] = true;
$this->addInvalidationToExclude($invalidatedArchive);
Expand Down Expand Up @@ -430,50 +430,50 @@ public function canSkipArchiveBecauseNoPoint(array $invalidatedArchive)
return $loader->canSkipThisArchive(); // if no point in archiving, skip
}

public function shouldSkipArchiveBecauseLowerPeriodOrSegmentIsInProgress(array $archiveToProcess)
public function shouldSkipArchiveBecauseLowerPeriodOrSegmentIsInProgress(array $archiveToProcess): ?string
{
$inProgressArchives = $this->cliMultiRequestParser->getInProgressArchivingCommands();
$inProgressArchives = $this->model->getInvalidationsInProgress(
(int) $archiveToProcess['idsite']
);

$periods = array_flip(Piwik::$idPeriods);

foreach ($inProgressArchives as $archiveBeingProcessed) {
if (
empty($archiveBeingProcessed['period'])
|| empty($archiveBeingProcessed['date'])
) {
continue;
}
$this->findSegmentForArchive($archiveBeingProcessed);

if (
empty($archiveBeingProcessed['idSite'])
|| $archiveBeingProcessed['idSite'] != $archiveToProcess['idsite']
) {
if ($archiveBeingProcessed['idsite'] != $archiveToProcess['idsite']) {
continue; // different site
}

// we don't care about lower periods being concurrent if they are for different segments (that are not "all visits")
if (
!empty($archiveBeingProcessed['segment'])
&& !empty($archiveToProcess['segment'])
!empty($archiveBeingProcessed['segment']) && !empty($archiveToProcess['segment'])
&& $archiveBeingProcessed['segment'] != $archiveToProcess['segment']
&& urldecode($archiveBeingProcessed['segment']) != $archiveToProcess['segment']
) {
continue;
}

$archiveBeingProcessed['periodObj'] = PeriodFactory::build($archiveBeingProcessed['period'], $archiveBeingProcessed['date']);
$archiveBeingProcessed['periodObj'] = PeriodFactory::build($periods[$archiveBeingProcessed['period']], $archiveBeingProcessed['date1']);

if (!$this->isArchiveOfLowerPeriod($archiveToProcess, $archiveBeingProcessed)) {
continue;
}

if ($this->isArchiveOfLowerPeriod($archiveToProcess, $archiveBeingProcessed)) {
return "lower or same period in progress in another local climulti process (period = {$archiveBeingProcessed['period']}, date = {$archiveBeingProcessed['date']})";
if (empty($archiveToProcess['segment']) && !empty($archiveBeingProcessed['segment'])) {
return "segment archive in progress for same site with lower or same period ({$archiveBeingProcessed['segment']}, period = {$periods[$archiveBeingProcessed['period']]}, date = {$archiveBeingProcessed['date1']})";
}

if ($this->isArchiveNonSegmentAndInProgressArchiveSegment($archiveToProcess, $archiveBeingProcessed)) {
return "segment archive in progress for same site/period ({$archiveBeingProcessed['segment']})";
if (!empty($archiveToProcess['segment']) && empty($archiveBeingProcessed['segment'])) {
return "all visits archive in progress for same site with lower or same period (period = {$periods[$archiveBeingProcessed['period']]}, date = {$archiveBeingProcessed['date1']})";
}

return "lower or same period in progress (period = {$periods[$archiveBeingProcessed['period']]}, date = {$archiveBeingProcessed['date1']})";
}

return false;
return null;
}

private function isArchiveOfLowerPeriod(array $archiveToProcess, $archiveBeingProcessed)
private function isArchiveOfLowerPeriod(array $archiveToProcess, $archiveBeingProcessed): bool
{
/** @var Period $archiveToProcessPeriodObj */
$archiveToProcessPeriodObj = $archiveToProcess['periodObj'];
Expand All @@ -494,8 +494,7 @@ private function isArchiveNonSegmentAndInProgressArchiveSegment(array $archiveTo
{
// archive is for different site/period
if (
empty($archiveBeingProcessed['idSite'])
|| $archiveToProcess['idsite'] != $archiveBeingProcessed['idSite']
$archiveToProcess['idsite'] != $archiveBeingProcessed['idsite']
|| $archiveToProcess['periodObj']->getId() != $archiveBeingProcessed['periodObj']->getId()
|| $archiveToProcess['periodObj']->getDateStart()->toString() != $archiveBeingProcessed['periodObj']->getDateStart()->toString()
) {
Expand Down
10 changes: 10 additions & 0 deletions core/DataAccess/Model.php
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,16 @@ public function isSimilarArchiveInProgress($invalidation)
return !empty($result);
}

public function getInvalidationsInProgress(int $idSite): array
{
$table = Common::prefixTable('archive_invalidations');

$bind = [$idSite, ArchiveInvalidator::INVALIDATION_STATUS_IN_PROGRESS];

$sql = "SELECT idsite, period, date1, date2, name FROM `$table` WHERE idsite = ? AND `status` = ? AND ts_started IS NOT NULL";
return Db::fetchAll($sql, $bind);
}

/**
* Gets the next invalidated archive that should be archived in a table.
*
Expand Down
Loading

0 comments on commit 9178d93

Please sign in to comment.