3
3
# ****************** CANADIAN ASTRONOMY DATA CENTRE *******************
4
4
# ************* CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES **************
5
5
#
6
- # (c) 2016 . (c) 2016 .
6
+ # (c) 2021 . (c) 2021 .
7
7
# Government of Canada Gouvernement du Canada
8
8
# National Research Council Conseil national de recherches
9
9
# Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6
80
80
import sys
81
81
from datetime import datetime
82
82
83
- from cadcutils import net
84
- from cadcutils import util
83
+ from cadcutils import net , util , exceptions
85
84
from caom2 .obs_reader_writer import ObservationReader , ObservationWriter
86
85
from caom2 import obs_reader_writer
87
86
from caom2 .version import version as caom2_version
@@ -236,8 +235,8 @@ def visit(self, plugin, collection, start=None, end=None, obs_file=None,
236
235
while len (observations ) > 0 :
237
236
if nthreads is None :
238
237
results = [
239
- self ._process_observation_id (collection , observationID ,
240
- halt_on_error )
238
+ self .process_observation_id (collection , observationID ,
239
+ halt_on_error )
241
240
for observationID in observations ]
242
241
for v , u , s , f in results :
243
242
if v :
@@ -284,24 +283,39 @@ def visit(self, plugin, collection, start=None, end=None, obs_file=None,
284
283
break
285
284
return visited , updated , skipped , failed
286
285
287
- def _process_observation_id (self , collection , observationID ,
288
- halt_on_error ):
289
- visited = None
286
+ def process_observation_id (self , collection , observation_id ,
287
+ halt_on_error ):
288
+ """
289
+ Reads an observation, calls a plugin to update it and, if modified,
290
+ uploads it to the repo.
291
+
292
+ :param collection:
293
+ :param observation_id:
294
+ :param halt_on_error: if true, raise and exception when error
295
+ encountered otherwise log the error.
296
+ :return: (visited, updated, skipped, failed) tuple with values
297
+ equalled to None or observation_id depending on the outcome case
298
+ (visited=observationID always)
299
+ """
300
+ visited = observation_id
290
301
updated = None
291
302
skipped = None
292
303
failed = None
293
- self .logger .info ('Process observation: ' + observationID )
304
+ self .logger .info ('Process observation: ' + observation_id )
294
305
try :
295
- observation = self .get_observation (collection , observationID )
306
+ observation = self .get_observation (collection , observation_id )
307
+ orig_checksum = observation .acc_meta_checksum
308
+ if orig_checksum :
309
+ orig_checksum = orig_checksum .uri
296
310
if self .plugin .update (observation = observation ,
297
311
subject = self ._subject ) is False :
298
312
self .logger .info ('SKIP {}' .format (observation .observation_id ))
299
313
skipped = observation .observation_id
300
314
else :
301
- self .post_observation (observation )
315
+ self .post_observation (observation , orig_checksum )
302
316
self .logger .debug (
303
317
'UPDATED {}' .format (observation .observation_id ))
304
- updated = observation . observation_id
318
+ updated = observation_id
305
319
except TypeError as e :
306
320
if "unexpected keyword argument" in str (e ):
307
321
raise RuntimeError (
@@ -311,17 +325,30 @@ def _process_observation_id(self, collection, observationID,
311
325
else :
312
326
# other unexpected TypeError
313
327
raise e
328
+ except exceptions .UnexpectedException as e :
329
+ if e .orig_exception .response .status_code == 412 :
330
+ self .logger .info (
331
+ 'Race condition: observation {} updated on the server '
332
+ 'while being visited. Re-trying.' .format (observation_id ))
333
+ return self .process_observation_id (collection , observation_id ,
334
+ halt_on_error )
335
+ else :
336
+ failed = observation_id
337
+ self ._handle_error (e , observation_id , halt_on_error )
314
338
except Exception as e :
315
- failed = observationID
316
- self .logger .error (
317
- 'FAILED {} - Reason: {}' .format (observationID , e ))
318
- if halt_on_error :
319
- raise e
320
-
321
- visited = observationID
322
-
339
+ failed = observation_id
340
+ self ._handle_error (e , observation_id , halt_on_error )
341
+ visited = observation_id
323
342
return visited , updated , skipped , failed
324
343
344
+ def _handle_error (self , exception , observation_id , halt_on_error ):
345
+ self .logger .error (
346
+ 'FAILED {} - Reason: {}' .format (observation_id , str (exception )))
347
+ if halt_on_error :
348
+ raise exception
349
+ else :
350
+ self .logger .debug (exception )
351
+
325
352
def _get_obs_from_file (self , obs_file , start , end , halt_on_error ):
326
353
obs = []
327
354
failed = []
@@ -452,10 +479,13 @@ def get_observation(self, collection, observation_id):
452
479
raise Exception ('Got empty response for resource: {}' .format (path ))
453
480
return obs_reader .read (BytesIO (content ))
454
481
455
- def post_observation (self , observation ):
482
+ def post_observation (self , observation , orig_checksum = None ):
456
483
"""
457
484
Updates an observation in the CAOM2 repo
458
485
:param observation: observation to update
486
+ :param orig_checksum: the checksum of the observation to be updated.
487
+ Posting this value prevents race conditions when observations are
488
+ updated concurrently
459
489
:return: updated observation
460
490
"""
461
491
assert observation .collection is not None
@@ -469,6 +499,8 @@ def post_observation(self, observation):
469
499
observation , ibuffer )
470
500
obs_xml = ibuffer .getvalue ()
471
501
headers = {'Content-Type' : 'application/xml' }
502
+ if orig_checksum :
503
+ headers ['If-Match' ] = orig_checksum
472
504
self ._repo_client .post (
473
505
(self .capability_id , path ), headers = headers , data = obs_xml )
474
506
@@ -535,9 +567,9 @@ def multiprocess_observation_id(collection, observationID, plugin, subject,
535
567
log_level , resource_id , host , agent ,
536
568
halt_on_error ):
537
569
"""
538
- Multi-process version of CAOM2RepoClient._process_observation_id ().
570
+ Multi-process version of CAOM2RepoClient.process_observation_id ().
539
571
Each process handles Control-C via KeyboardInterrupt, which is not needed
540
- in CAOM2RepoClient._process_observation_id ().
572
+ in CAOM2RepoClient.process_observation_id ().
541
573
:param collection: Name of the collection
542
574
:param observationID: Observation identifier
543
575
:param plugin: path to python file that contains the algorithm to be
@@ -551,13 +583,6 @@ def multiprocess_observation_id(collection, observationID, plugin, subject,
551
583
:return: Tuple of observationID representing visited, updated, skipped
552
584
and failed
553
585
"""
554
- visited = None
555
- updated = None
556
- skipped = None
557
- failed = None
558
- observation = None
559
- # set up logging for each process
560
- subject = subject
561
586
logging .basicConfig (
562
587
format = '%(asctime)s %(process)d %(levelname)-8s %(name)-12s ' +
563
588
'%(funcName)s %(message)s' ,
@@ -566,34 +591,10 @@ def multiprocess_observation_id(collection, observationID, plugin, subject,
566
591
'multiprocess_observation_id(): {}' .format (observationID ))
567
592
568
593
client = CAOM2RepoClient (subject , log_level , resource_id , host , agent )
569
- try :
570
- observation = client .get_observation (collection , observationID )
571
- if plugin .update (observation = observation ,
572
- subject = subject ) is False :
573
- rootLogger .info ('SKIP {}' .format (observation .observation_id ))
574
- skipped = observation .observation_id
575
- else :
576
- client .post_observation (observation )
577
- rootLogger .debug ('UPDATED {}' .format (observation .observation_id ))
578
- updated = observation .observation_id
579
- except TypeError as e :
580
- if "unexpected keyword argument" in str (e ):
581
- raise RuntimeError (
582
- "{} - To fix the problem, please add the **kwargs "
583
- "argument to the list of arguments for the update"
584
- " method of your plugin." .format (str (e )))
585
- else :
586
- # other unexpected TypeError
587
- raise e
588
- except Exception as e :
589
- failed = observationID
590
- rootLogger .error ('FAILED {} - Reason: {}' .format (observationID , e ))
591
- if halt_on_error :
592
- raise e
593
-
594
- visited = observationID
595
-
596
- return visited , updated , skipped , failed
594
+ client .plugin = plugin
595
+ client .logger = rootLogger
596
+ return \
597
+ client .process_observation_id (collection , observationID , halt_on_error )
597
598
598
599
599
600
def main_app ():
0 commit comments