28
28
# ' process one future at the time (`workers = 1L`), whereas HPC backends,
29
29
# ' where futures are resolved via separate jobs on a scheduler, can have
30
30
# ' multiple workers. In the latter, the default is `workers = NULL`, which
31
- # ' will resolve to `getOption("future.batchtools.workers")`. If neither
32
- # ' are specified, then the default is `100`.
31
+ # ' will resolve to
32
+ # ' \code{getOption("\link{future.batchtools.workers}")}.
33
+ # ' If neither are specified, then the default is `100`.
33
34
# '
34
35
# ' @param finalize If TRUE, any underlying registries are
35
36
# ' deleted when this object is garbage collected, otherwise not.
@@ -230,6 +231,14 @@ status <- function(future, ...) {
230
231
})
231
232
} # # get_status()
232
233
234
+ # # Known to be in its final state?
235
+ if (getOption(" future.batchtools.status.cache" , TRUE )) {
236
+ status <- future $ .status
237
+ if (identical(status , c(" defined" , " finished" , " started" , " submitted" ))) {
238
+ return (status )
239
+ }
240
+ }
241
+
233
242
config <- future $ config
234
243
reg <- config $ reg
235
244
if (! inherits(reg , " Registry" )) return (NA_character_ )
@@ -251,6 +260,9 @@ status <- function(future, ...) {
251
260
if (result_has_errors(result )) status <- unique(c(" error" , status ))
252
261
}
253
262
263
+ # # Cache result
264
+ future $ .status <- status
265
+
254
266
if (debug ) mdebug(" - status: " , paste(sQuote(status ), collapse = " , " ))
255
267
256
268
status
@@ -335,48 +347,82 @@ loggedOutput.BatchtoolsFuture <- function(future, ...) {
335
347
# ' @export
336
348
# ' @keywords internal
337
349
resolved.BatchtoolsFuture <- function (x , ... ) {
338
- # # Has internal future state already been switched to be resolved
339
- resolved <- NextMethod()
340
- if (resolved ) return (TRUE )
350
+ signalEarly <- import_future(" signalEarly" )
351
+
352
+ # # Is value already collected?
353
+ if (! is.null(x $ result )) {
354
+ # # Signal conditions early?
355
+ signalEarly(x , ... )
356
+ return (TRUE )
357
+ }
358
+
359
+ # # Assert that the process that created the future is
360
+ # # also the one that evaluates/resolves/queries it.
361
+ assertOwner <- import_future(" assertOwner" )
362
+ assertOwner(x )
341
363
342
364
# # If not, checks the batchtools registry status
343
365
resolved <- finished(x )
344
366
if (is.na(resolved )) return (FALSE )
345
-
367
+
368
+ # # Signal conditions early? (happens only iff requested)
369
+ if (resolved ) signalEarly(x , ... )
370
+
346
371
resolved
347
372
}
348
373
349
374
# ' @importFrom future result
350
375
# ' @export
351
376
# ' @keywords internal
352
377
result.BatchtoolsFuture <- function (future , cleanup = TRUE , ... ) {
378
+
379
+ debug <- getOption(" future.debug" , FALSE )
380
+ if (debug ) {
381
+ mdebug(" result() for BatchtoolsFuture ..." )
382
+ on.exit(mdebug(" result() for BatchtoolsFuture ... done" ), add = TRUE )
383
+ }
384
+
353
385
# # Has the value already been collected?
354
386
result <- future $ result
355
- if (inherits(result , " FutureResult" )) return (result )
387
+ if (inherits(result , " FutureResult" )) {
388
+ if (debug ) mdebug(" - FutureResult already collected" )
389
+ return (result )
390
+ }
356
391
357
392
# # Has the value already been collected? - take two
358
393
if (future $ state %in% c(" finished" , " failed" , " interrupted" )) {
394
+ if (debug ) mdebug(" - FutureResult already collected - take 2" )
359
395
return (NextMethod())
360
396
}
361
397
362
398
if (future $ state == " created" ) {
399
+ if (debug ) mdebug(" - starting future ..." )
363
400
future <- run(future )
401
+ if (debug ) mdebug(" - starting future ... done" )
364
402
}
365
403
404
+ if (debug ) mdebug(" - getting batchtools status" )
366
405
stat <- status(future )
367
406
if (is_na(stat )) {
368
407
label <- future $ label
369
408
if (is.null(label )) label <- " <none>"
370
409
stopf(" The result no longer exists (or never existed) for Future ('%s') of class %s" , label , paste(sQuote(class(future )), collapse = " , " )) # nolint
371
410
}
372
411
412
+ if (debug ) mdebug(" - waiting for batchtools job to finish ..." )
373
413
result <- await(future , cleanup = FALSE )
414
+ if (debug ) mdebug(" - waiting for batchtools job to finish ... done" )
374
415
stop_if_not(inherits(result , " FutureResult" ))
375
416
future $ result <- result
376
417
future $ state <- " finished"
377
418
378
- if (cleanup ) delete(future )
419
+ if (cleanup ) {
420
+ if (debug ) mdebugf(" - delete %s ..." , class(future )[1 ])
421
+ delete(future )
422
+ if (debug ) mdebugf(" - delete %s ... done" , class(future )[1 ])
423
+ }
379
424
425
+ if (debug ) mdebug(" - NextMethod()" )
380
426
NextMethod()
381
427
}
382
428
@@ -454,7 +500,9 @@ run.BatchtoolsFuture <- function(future, ...) {
454
500
# # will have the same state of (loaded, attached) packages.
455
501
456
502
reg $ packages <- packages
457
- saveRegistry(reg = reg )
503
+ with_stealth_rng({
504
+ saveRegistry(reg = reg )
505
+ })
458
506
459
507
mdebugf(" Attaching %d packages (%s) ... DONE" ,
460
508
length(packages ), hpaste(sQuote(packages )))
@@ -538,6 +586,11 @@ run.BatchtoolsFuture <- function(future, ...) {
538
586
# # 6. Rerserve worker for future
539
587
registerFuture(future )
540
588
589
+ # # 7. Trigger early signalling
590
+ if (inherits(future , " BatchtoolsUniprocessFuture" )) {
591
+ resolved(future )
592
+ }
593
+
541
594
invisible (future )
542
595
} # # run()
543
596
@@ -553,6 +606,7 @@ await <- function(future, cleanup = TRUE,
553
606
stop_if_not(is.finite(alpha ), alpha > 0 )
554
607
555
608
debug <- getOption(" future.debug" , FALSE )
609
+ if (debug ) mdebug(" future.batchtools:::await() ..." )
556
610
557
611
expr <- future $ expr
558
612
config <- future $ config
@@ -571,10 +625,12 @@ await <- function(future, cleanup = TRUE,
571
625
572
626
res <- waitForJobs(ids = jobid , timeout = timeout , sleep = sleep_fcn ,
573
627
stop.on.error = FALSE , reg = reg )
574
- mdebugf(" - batchtools::waitForJobs(): %s" , res )
628
+ if ( debug ) mdebugf(" - batchtools::waitForJobs(): %s" , res )
575
629
stat <- status(future )
576
- mdebugf(" - status(): %s" , paste(sQuote(stat ), collapse = " , " ))
577
- mdebug(" batchtools::waitForJobs() ... done" )
630
+ if (debug ) {
631
+ mdebugf(" - status(): %s" , paste(sQuote(stat ), collapse = " , " ))
632
+ mdebug(" batchtools::waitForJobs() ... done" )
633
+ }
578
634
579
635
finished <- is_na(stat ) || any(c(" finished" , " error" , " expired" ) %in% stat )
580
636
@@ -587,20 +643,23 @@ await <- function(future, cleanup = TRUE,
587
643
label <- future $ label
588
644
if (is.null(label )) label <- " <none>"
589
645
if (" finished" %in% stat ) {
590
- mdebug(" - batchtools::loadResult() ..." )
646
+ if ( debug ) mdebug(" - batchtools::loadResult() ..." )
591
647
result <- loadResult(reg = reg , id = jobid )
592
- mdebug(" - batchtools::loadResult() ... done" )
648
+ if (debug ) mdebug(" - batchtools::loadResult() ... done" )
649
+
593
650
if (inherits(result , " FutureResult" )) {
594
651
prototype_fields <- c(prototype_fields , " batchtools_log" )
595
- result [[" batchtools_log" ]] <- try({
596
- mdebug(" - batchtools::getLog() ..." )
597
- on.exit(mdebug(" - batchtools::getLog() ... done" ))
652
+ result [[" batchtools_log" ]] <- try(local({
653
+ if (debug ) {
654
+ mdebug(" - batchtools::getLog() ..." )
655
+ on.exit(mdebug(" - batchtools::getLog() ... done" ))
656
+ }
598
657
# # Since we're already collected the results, the log file
599
658
# # should already exist, if it exists. Because of this,
600
659
# # only poll for the log file for a second before giving up.
601
660
reg $ cluster.functions $ fs.latency <- 1.0
602
661
getLog(id = jobid , reg = reg )
603
- }, silent = TRUE )
662
+ }) , silent = TRUE )
604
663
if (result_has_errors(result )) cleanup <- FALSE
605
664
}
606
665
} else if (" error" %in% stat ) {
@@ -644,6 +703,8 @@ await <- function(future, cleanup = TRUE,
644
703
delete(future , delta = 0.5 * delta , ... )
645
704
}
646
705
706
+ if (debug ) mdebug(" future.batchtools:::await() ... done" )
707
+
647
708
result
648
709
} # await()
649
710
@@ -768,6 +829,10 @@ delete.BatchtoolsFuture <- function(future,
768
829
with_stealth_rng({
769
830
interval <- delta
770
831
for (kk in seq_len(times )) {
832
+ try(unlink(path , recursive = TRUE ), silent = FALSE )
833
+ if (! file_test(" -d" , path )) break
834
+ try(removeRegistry(wait = 0.0 , reg = reg ), silent = FALSE )
835
+ if (! file_test(" -d" , path )) break
771
836
try(clearRegistry(reg = reg ), silent = TRUE )
772
837
try(removeRegistry(wait = 0.0 , reg = reg ), silent = FALSE )
773
838
if (! file_test(" -d" , path )) break
0 commit comments