Skip to content

Commit b2bdbfc

Browse files
authored
Merge pull request #8 from imarc/feature/CPA-2350-individual-sync
Feature/cpa 2350 individual sync
2 parents 0d93d18 + e03ad95 commit b2bdbfc

2 files changed

Lines changed: 122 additions & 21 deletions

File tree

src/Mapping.php

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
*/
88
class Mapping
99
{
10+
/**
11+
*
12+
*/
13+
protected $adjuncts = array();
14+
15+
1016
/**
1117
*
1218
*/
@@ -110,6 +116,17 @@ public function __construct($source, $destination, $key)
110116
}
111117

112118

119+
/**
120+
*
121+
*/
122+
public function addAdjunct($adjunct, $keys)
123+
{
124+
$this->adjuncts[$adjunct] = $keys;
125+
126+
return $this;
127+
}
128+
129+
113130
/**
114131
*
115132
*/
@@ -256,6 +273,28 @@ public function compose($string, ...$params)
256273
}
257274

258275

276+
/**
277+
*
278+
*/
279+
public function composeSourceAdjunctKeyQuery(Mapping $adjunct, $ids)
280+
{
281+
$table = $adjunct->getDestination();
282+
$keys = $this->adjuncts[$table];
283+
284+
$sql = $this->compose(
285+
'SELECT %s FROM %s WHERE %s AND (%s)',
286+
$adjunct->makeSourceKey(),
287+
$adjunct->makeSourceFrom(),
288+
$adjunct->makeSourceWheres(),
289+
join(' OR ', array_map(function($id) use ($keys) {
290+
return sprintf("RTRIM(LTRIM(%s)) = '%s'", $keys['source'], $id[$this->key[0]]);
291+
}, $ids))
292+
);
293+
294+
return $sql;
295+
}
296+
297+
259298
/**
260299
*
261300
*/
@@ -315,7 +354,7 @@ public function composeSourceInsertSelectQuery()
315354
/**
316355
*
317356
*/
318-
public function composeSourceDeleteSelectQuery()
357+
public function composeSourceDeleteSelectQuery($ids = array())
319358
{
320359
$keys = [];
321360
foreach ($this->getKey() as $key) {
@@ -324,17 +363,25 @@ public function composeSourceDeleteSelectQuery()
324363

325364
$join_keys = join(' AND ', $keys);
326365

327-
$sql = $this->compose(
328-
'SELECT %s.* from devour_temp_%s RIGHT OUTER JOIN %s ON (%s) WHERE devour_temp_%s.%s IS NULL',
366+
$sql = 'SELECT %s.* from devour_temp_%s RIGHT OUTER JOIN %s ON (%s) WHERE devour_temp_%s.%s IS NULL';
367+
368+
$params = [
329369
$this->getDestination(),
330370
$this->getDestination(),
331371
$this->getDestination(),
332372
$join_keys,
333373
$this->getDestination(),
334374
$this->key[0]
335-
);
375+
];
336376

337-
return $sql;
377+
if (!empty($ids)) {
378+
if ($keys) {
379+
$sql .= ' AND %s';
380+
$params[] = $this->makeDestinationInKeys($ids);
381+
}
382+
}
383+
384+
return $this->compose($sql, ...$params);
338385
}
339386

340387

@@ -392,7 +439,7 @@ public function composeSourceSelectQuery($keys = NULL)
392439
$sql .= ' AND %s';
393440
$params[] = $this->makeSourceInKeys($keys);
394441
}
395-
442+
396443
$sql = $this->compose(
397444
$sql,
398445
...$params
@@ -419,6 +466,31 @@ public function composeSourceUpdatedKeysQuery(array $existing_keys)
419466
}
420467

421468

469+
/**
470+
*
471+
*/
472+
public function composeKeys($ids)
473+
{
474+
return array_map(function($key) {
475+
$keys = [];
476+
foreach ($this->key as $id) {
477+
$keys[$id] = $key;
478+
}
479+
480+
return $keys;
481+
}, $ids);
482+
}
483+
484+
485+
/**
486+
*
487+
*/
488+
public function getAdjuncts()
489+
{
490+
return $this->adjuncts;
491+
}
492+
493+
422494
/**
423495
*
424496
*/
@@ -544,9 +616,9 @@ protected function makeDestinationInKeys(array $keys)
544616

545617
return sprintf($group, implode(' AND ', array_map(function($field) use ($key) {
546618
if (is_string($key[$field])) {
547-
return sprintf("%s = '%s'", $field, str_replace("'", "''", $key[$field]));
619+
return sprintf("%s.%s = '%s'", $this->destination, $field, str_replace("'", "''", $key[$field]));
548620
} else {
549-
return sprintf("%s = %s", $field, $key[$field]);
621+
return sprintf("%s.%s = %s", $this->destination, $field, $key[$field]);
550622
}
551623
}, $this->key)));
552624
}, $keys)));
@@ -651,9 +723,9 @@ protected function makeSourceInKeys(array $keys)
651723

652724
return sprintf($group, implode(' AND ', array_map(function($field) use ($key) {
653725
if (is_string($key[$field])) {
654-
return sprintf("%s = '%s'", $this->fields[$field], str_replace("'", "''", $key[$field]));
726+
return sprintf("RTRIM(LTRIM(%s)) = '%s'", $this->fields[$field], str_replace("'", "''", $key[$field]));
655727
} else {
656-
return sprintf("%s = %s", $this->fields[$field], $key[$field]);
728+
return sprintf("RTRIM(LTRIM(%s)) = %s", $this->fields[$field], $key[$field]);
657729
}
658730
}, $this->key)));
659731
}, $keys)));

src/Synchronizer.php

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,23 @@ public function getLastSyncTime(): ?string
361361
return strtotime($result->fetch(PDO::FETCH_ASSOC)['start_time']);
362362
}
363363

364+
/**
365+
*
366+
*/
367+
public function getMapping($name)
368+
{
369+
return $this->mappings[$name] ?? NULL;
370+
}
371+
372+
373+
/**
374+
*
375+
*/
376+
public function getMappings(): array
377+
{
378+
return $this->mappings;
379+
}
380+
364381

365382
/**
366383
*
@@ -441,7 +458,7 @@ public function schedule(array $mappings = array(), $scheduled_by = NULL): array
441458
/**
442459
*
443460
*/
444-
public function run(array $mappings = array(), $force_update = FALSE): array
461+
public function run(array $mappings = array(), $ids = array(), $force_update = FALSE): array
445462
{
446463
$this->stat();
447464

@@ -479,7 +496,7 @@ public function run(array $mappings = array(), $force_update = FALSE): array
479496

480497
foreach ($mappings as $mapping) {
481498
try {
482-
$this->syncMapping($mapping, $force_update);
499+
$this->syncMapping($mapping, $ids[$mapping] ?? [], $force_update);
483500

484501
} catch (\Exception $e) {
485502
$this->log($e->getMessage());
@@ -848,7 +865,7 @@ protected function log($message)
848865
/**
849866
*
850867
*/
851-
protected function syncMapping($name, $force_update)
868+
protected function syncMapping($name, $ids, $force_update)
852869
{
853870
if (!isset($this->mappings[$name])) {
854871
throw new RuntimeException(sprintf(
@@ -875,7 +892,7 @@ protected function syncMapping($name, $force_update)
875892
//
876893

877894
foreach ($mapping->getDependencies() as $dependency) {
878-
$this->syncMapping($dependency, $force_update);
895+
$this->syncMapping($dependency, [], $force_update);
879896
}
880897

881898
if ($this->strictTime) {
@@ -889,17 +906,17 @@ protected function syncMapping($name, $force_update)
889906
$this->log(sprintf('Syncing %s', $name));
890907

891908
$this->createTemporaryTable($mapping);
892-
$this->syncMappingTemporary($mapping);
909+
$this->syncMappingTemporary($mapping, $ids);
893910

894911
$start_sync_time = date('Y-m-d H:i:s');
895912

896-
if ($this->truncate[$mapping->getDestination()]) {
913+
if ($this->truncate[$mapping->getDestination()] && empty($ids)) {
897914
$this->truncateTable($mapping);
898915
$this->syncMappingInserts($mapping);
899916

900917
} else {
901918
if ($mapping->canDelete()) {
902-
$this->syncMappingDeletes($mapping);
919+
$this->syncMappingDeletes($mapping, $ids);
903920
$this->log('...completed deletions');
904921
}
905922

@@ -920,20 +937,32 @@ protected function syncMapping($name, $force_update)
920937
$this->updateSet($name, $start_sync_time);
921938

922939
$this->synced[array_pop($this->stack)] = TRUE;
940+
941+
if ($ids) {
942+
foreach ($mapping->getAdjuncts() as $adjunct => $config) {
943+
$adjunct = $this->mappings[$adjunct];
944+
$key_query = $mapping->composeSourceAdjunctKeyQuery($adjunct, $ids);
945+
$keys = $this->source->query($key_query)->fetchAll();
946+
947+
$keys = $this->filterKeys($adjunct, $keys, 'select');
948+
949+
$this->syncMapping($adjunct->getDestination(), $keys, $force_update);
950+
}
951+
}
923952
}
924953

925954

926955
/**
927956
*
928957
*/
929-
protected function syncMappingDeletes(Mapping $mapping)
958+
protected function syncMappingDeletes(Mapping $mapping, $ids = array())
930959
{
931960
if (!$mapping->canDelete()) {
932961
return;
933962
}
934963

935964
try {
936-
$delete_select_query = $mapping->composeSourceDeleteSelectQuery();
965+
$delete_select_query = $mapping->composeSourceDeleteSelectQuery($ids);
937966
$delete_results = $this->destination->query($delete_select_query)->fetchAll();
938967
} catch (\Exception $e) {
939968
$this->log(sprintf(
@@ -1044,9 +1073,9 @@ protected function syncMappingInserts(Mapping $mapping)
10441073
/**
10451074
*
10461075
*/
1047-
protected function syncMappingTemporary(Mapping $mapping)
1076+
protected function syncMappingTemporary(Mapping $mapping, $ids = array())
10481077
{
1049-
$source_select_query = $mapping->composeSourceSelectQuery();
1078+
$source_select_query = $mapping->composeSourceSelectQuery($ids);
10501079
try {
10511080
$source_results = $this->source->query($source_select_query, PDO::FETCH_ASSOC)->fetchAll();
10521081
$generated = array_keys($mapping->getGenerators());

0 commit comments

Comments
 (0)