5
5
# ' @param envir The environment in which global environment
6
6
# ' should be located.
7
7
# '
8
- # ' @param substitute Controls whether \code{ expr} should be
9
- # ' \code{ substitute()} :d or not.
8
+ # ' @param substitute Controls whether ` expr` should be
9
+ # ' ` substitute()` :d or not.
10
10
# '
11
11
# ' @param globals (optional) a logical, a character vector, a named list, or
12
- # ' a \link[globals]{ Globals} object. If TRUE, globals are identified by code
13
- # ' inspection based on \code{ expr} and \code{ tweak} searching from environment
14
- # ' \code{ envir} . If FALSE, no globals are used. If a character vector, then
15
- # ' globals are identified by lookup based their names \code{ globals} searching
16
- # ' from environment \code{ envir} . If a named list or a Globals object, the
12
+ # ' a [Globals][globals:: Globals] object. If TRUE, globals are identified by code
13
+ # ' inspection based on ` expr` and ` tweak` searching from environment
14
+ # ' ` envir` . If FALSE, no globals are used. If a character vector, then
15
+ # ' globals are identified by lookup based their names ` globals` searching
16
+ # ' from environment ` envir` . If a named list or a Globals object, the
17
17
# ' globals are used as is.
18
18
# '
19
19
# ' @param label (optional) Label of the future (where applicable, becomes the
20
20
# ' job name for most job schedulers).
21
21
# '
22
22
# ' @param conf A batchtools configuration environment.
23
23
# '
24
- # ' @param cluster.functions A batchtools \link[batchtools]{ ClusterFunctions}
24
+ # ' @param cluster.functions A batchtools [ClusterFunctions][batchtools:: ClusterFunctions]
25
25
# ' object.
26
26
# '
27
27
# ' @param resources A named list passed to the batchtools template (available
28
- # ' as variable \code{ resources} ).
28
+ # ' as variable ` resources` ).
29
29
# '
30
- # ' @param workers (optional) Additional specification for the batchtools
31
- # ' backend.
30
+ # ' @param workers (optional) The maximum number of workers the batchtools
31
+ # ' backend may use at any time. Interactive and "local" backends can only
32
+ # ' process one future at the time, whereas HPC backends where futures are
33
+ # ' resolved via separate jobs on a scheduler, the default is to assume an
34
+ # ' infinite number of workers.
32
35
# '
33
36
# ' @param finalize If TRUE, any underlying registries are
34
37
# ' deleted when this object is garbage collected, otherwise not.
35
38
# '
36
- # ' @param \ldots Additional arguments passed to \code{\link [future]{ Future}()} .
39
+ # ' @param \ldots Additional arguments passed to [future:: Future()] .
37
40
# '
38
41
# ' @return A BatchtoolsFuture object
39
42
# '
40
43
# ' @export
41
- # ' @importFrom future Future
44
+ # ' @importFrom future Future getGlobalsAndPackages
42
45
# ' @importFrom batchtools submitJobs
43
46
# ' @keywords internal
44
47
BatchtoolsFuture <- function (expr = NULL , envir = parent.frame(),
@@ -53,29 +56,27 @@ BatchtoolsFuture <- function(expr = NULL, envir = parent.frame(),
53
56
if (! is.null(label )) label <- as.character(label )
54
57
55
58
if (! is.null(cluster.functions )) {
56
- stopifnot (is.list(cluster.functions ))
59
+ stop_if_not (is.list(cluster.functions ))
57
60
}
58
61
59
62
if (! is.null(workers )) {
60
- stopifnot (length(workers ) > = 1 )
63
+ stop_if_not (length(workers ) > = 1 )
61
64
if (is.numeric(workers )) {
62
- stopifnot (! anyNA(workers ), all(workers > = 1 ))
65
+ stop_if_not (! anyNA(workers ), all(workers > = 1 ))
63
66
} else if (is.character(workers )) {
64
67
} else {
65
- stopifnot (" Argument 'workers' should be either numeric or character: " ,
68
+ stop_if_not (" Argument 'workers' should be either numeric or character: " ,
66
69
mode(workers ))
67
70
}
68
71
}
69
72
70
- stopifnot (is.list(resources ))
73
+ stop_if_not (is.list(resources ))
71
74
72
75
# # Record globals
73
- getGlobalsAndPackages <- import_future(" getGlobalsAndPackages" )
74
76
gp <- getGlobalsAndPackages(expr , envir = envir , globals = globals )
75
77
76
- # # Create BatchtoolsFuture object
77
78
future <- Future(expr = gp $ expr , envir = envir , substitute = FALSE ,
78
- workers = workers , label = label , ... )
79
+ workers = workers , label = label , version = " 1.8 " , ... )
79
80
80
81
future $ globals <- gp $ globals
81
82
future $ packages <- unique(c(packages , gp $ packages ))
@@ -131,6 +132,8 @@ print.BatchtoolsFuture <- function(x, ...) {
131
132
} else {
132
133
printf(" batchtools Registry:\n " )
133
134
print(reg )
135
+ printf(" File dir exists: %s\n " , file_test(" -d" , reg $ file.dir ))
136
+ printf(" Work dir exists: %s\n " , file_test(" -d" , reg $ work.dir ))
134
137
}
135
138
136
139
invisible (x )
@@ -189,6 +192,13 @@ status.BatchtoolsFuture <- function(future, ...) {
189
192
status <- status [status ]
190
193
status <- sort(names(status ))
191
194
status <- setdiff(status , c(" n" ))
195
+
196
+ result <- future $ result
197
+ if (inherits(result , " FutureResult" )) {
198
+ condition <- result $ condition
199
+ if (inherits(condition , " error" )) status <- c(" error" , status )
200
+ }
201
+
192
202
status
193
203
}
194
204
@@ -272,7 +282,7 @@ value.BatchtoolsFuture <- function(future, signal = TRUE,
272
282
onMissing = c(" default" , " error" ),
273
283
default = NULL , cleanup = TRUE , ... ) {
274
284
# # Has the value already been collected?
275
- if (future $ state %in% c(" finished " , " failed" , " interrupted" )) {
285
+ if (future $ state %in% c(" done " , " failed" , " interrupted" )) {
276
286
return (NextMethod(" value" ))
277
287
}
278
288
@@ -289,14 +299,11 @@ value.BatchtoolsFuture <- function(future, signal = TRUE,
289
299
stop(sprintf(" The value no longer exists (or never existed) for Future ('%s') of class %s" , label , paste(sQuote(class(future )), collapse = " , " ))) # nolint
290
300
}
291
301
292
- tryCatch({
293
- future $ value <- await(future , cleanup = FALSE )
294
- future $ state <- " finished"
295
- if (cleanup ) delete(future , ... )
296
- }, simpleError = function (ex ) {
297
- future $ state <- " failed"
298
- future $ value <- ex
299
- })
302
+ result <- await(future , cleanup = FALSE )
303
+ stop_if_not(inherits(result , " FutureResult" ))
304
+ future $ result <- result
305
+ future $ state <- " finished"
306
+ if (cleanup ) delete(future , ... )
300
307
301
308
NextMethod(" value" )
302
309
} # value()
@@ -338,7 +345,7 @@ run.BatchtoolsFuture <- function(future, ...) {
338
345
expr <- substitute(local(expr ), list (expr = expr ))
339
346
340
347
reg <- future $ config $ reg
341
- stopifnot (inherits(reg , " Registry" ))
348
+ stop_if_not (inherits(reg , " Registry" ))
342
349
343
350
# # (ii) Attach packages that needs to be attached
344
351
packages <- future $ packages
@@ -383,9 +390,9 @@ run.BatchtoolsFuture <- function(future, ...) {
383
390
future $ config $ jobid <- jobid
384
391
mdebug(" Created %s future #%d" , class(future )[1 ], jobid $ job.id )
385
392
386
- # # WORKAROUND: (For multicore and OS X only)
393
+ # # WORKAROUND: (For multicore and macOS only)
387
394
if (reg $ cluster.functions $ name == " Multicore" ) {
388
- # # On some OS X systems, a system call to 'ps' may output an error message
395
+ # # On some macOS systems, a system call to 'ps' may output an error message
389
396
# # "dyld: DYLD_ environment variables being ignored because main executable
390
397
# # (/bin/ps) is setuid or setgid" to standard error that is picked up by
391
398
# # batchtools which incorrectly tries to parse it. By unsetting all DYLD_*
@@ -428,14 +435,14 @@ await <- function(...) UseMethod("await")
428
435
# ' @param timeout Total time (in seconds) waiting before generating an error.
429
436
# ' @param delta The number of seconds to wait between each poll.
430
437
# ' @param alpha A factor to scale up the waiting time in each iteration such
431
- # ' that the waiting time in the k:th iteration is \code{ alpha ^ k * delta} .
438
+ # ' that the waiting time in the k:th iteration is ` alpha ^ k * delta` .
432
439
# ' @param \ldots Not used.
433
440
# '
434
441
# ' @return The value of the evaluated expression.
435
442
# ' If an error occurs, an informative Exception is thrown.
436
443
# '
437
444
# ' @details
438
- # ' Note that \code{ await()} should only be called once, because
445
+ # ' Note that ` await()` should only be called once, because
439
446
# ' after being called the actual asynchronous future may be removed
440
447
# ' and will no longer available in subsequent calls. If called
441
448
# ' again, an error may be thrown.
@@ -452,8 +459,8 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
452
459
alpha = getOption(" future.wait.alpha" , 1.01 ),
453
460
... ) {
454
461
mdebug <- import_future(" mdebug" )
455
- stopifnot (is.finite(timeout ), timeout > = 0 )
456
- stopifnot (is.finite(alpha ), alpha > 0 )
462
+ stop_if_not (is.finite(timeout ), timeout > = 0 )
463
+ stop_if_not (is.finite(alpha ), alpha > 0 )
457
464
458
465
debug <- getOption(" future.debug" , FALSE )
459
466
@@ -480,13 +487,23 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
480
487
481
488
finished <- is_na(stat ) || any(c(" done" , " error" , " expired" ) %in% stat )
482
489
483
- res <- NULL
490
+ # # PROTOTYPE RESULTS BELOW:
491
+ prototype_fields <- NULL
492
+
493
+ result <- NULL
484
494
if (finished ) {
485
495
mdebug(" Results:" )
486
496
label <- future $ label
487
497
if (is.null(label )) label <- " <none>"
488
498
if (" done" %in% stat ) {
489
- res <- loadResult(reg = reg , id = jobid )
499
+ result <- loadResult(reg = reg , id = jobid )
500
+ if (inherits(result , " FutureResult" )) {
501
+ prototype_fields <- c(prototype_fields , " stdout" )
502
+ result $ stdout <- getLog(id = jobid , reg = reg )
503
+ if (inherits(result $ condition , " error" )) {
504
+ cleanup <- FALSE
505
+ }
506
+ }
490
507
} else if (" error" %in% stat ) {
491
508
cleanup <- FALSE
492
509
msg <- sprintf(" BatchtoolsError in %s ('%s'): %s" ,
@@ -512,19 +529,23 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
512
529
msg <- sprintf(" BatchtoolsDeleted: Cannot retrieve value. Future ('%s') deleted: %s" , label , reg $ file.dir ) # nolint
513
530
stop(BatchtoolsFutureError(msg , future = future ))
514
531
}
515
- if (debug ) { mstr(res ) }
532
+ if (debug ) { mstr(result ) }
516
533
} else {
517
534
cleanup <- FALSE
518
535
msg <- sprintf(" AsyncNotReadyError: Polled for results for %s seconds every %g seconds, but asynchronous evaluation for future ('%s') is still running: %s" , timeout , delta , label , reg $ file.dir ) # nolint
519
536
stop(BatchtoolsFutureError(msg , future = future ))
520
537
}
521
538
539
+ if (length(prototype_fields ) > 0 ) {
540
+ result $ PROTOTYPE_WARNING <- sprintf(" WARNING: The fields %s should be considered internal and experimental for now, that is, until the Future API for these additional features has been settled. For more information, please see https://github.com/HenrikBengtsson/future/issues/172" , hpaste(sQuote(prototype_fields ), max_head = Inf , collapse = " , " , last_collapse = " and " ))
541
+ }
542
+
522
543
# # Cleanup?
523
544
if (cleanup ) {
524
545
delete(future , delta = 0.5 * delta , ... )
525
546
}
526
547
527
- res
548
+ result
528
549
} # await()
529
550
530
551
@@ -600,13 +621,27 @@ delete.BatchtoolsFuture <- function(future,
600
621
}
601
622
}
602
623
624
+ # # FIXME: Make sure to collect the results before deleting
625
+ # # the internal batchtools registry
626
+ result <- future $ result
627
+ if (is.null(result )) {
628
+ value(future , signal = FALSE )
629
+ result <- future $ result
630
+ }
631
+ stop_if_not(inherits(result , " FutureResult" ))
603
632
604
633
# # To simplify post mortem troubleshooting in non-interactive sessions,
605
634
# # should the batchtools registry files be removed or not?
606
635
mdebug(" delete(): Option 'future.delete = %s" ,
607
636
sQuote(getOption(" future.delete" , " <NULL>" )))
608
637
if (! getOption(" future.delete" , interactive())) {
609
638
status <- status(future )
639
+ res <- future $ result
640
+ if (inherits(res , " FutureResult" )) {
641
+ if (inherits(res $ condition , " error" )) status <- " error"
642
+ }
643
+ mdebug(" delete(): status(<future>) = %s" ,
644
+ paste(sQuote(status ), collapse = " , " ))
610
645
if (any(c(" error" , " expired" ) %in% status )) {
611
646
msg <- sprintf(" Will not remove batchtools registry, because the status of the batchtools was %s and option 'future.delete' is FALSE or running in an interactive session: %s" , paste(sQuote(status ), collapse = " , " ), sQuote(path )) # nolint
612
647
mdebug(" delete(): %s" , msg )
0 commit comments