Timeout on parallel jobs in R

Python’s multiprocessing and threading libraries both have a timeout parameter for re-joining threads after they’ve finished. This provides an easy way to launch multi-threaded jobs while ensuing that no single thread runs for longer than a specified timeout. This is very useful in implementing a standard “timeout on a function call” operation, as detailed in this Stack Overflow question of that title which offers a bewildering variety of approaches to that problem. Among the easiest of those is the recommendation to rely on the multiprocessing libraries’s join() operation which accepts a timeout parameter, as described in the library’s documentation. There is also an equivalent parameter for python’s other main parallelisation library, threading.

A nice example of the usefulness of this timeout parameter in action is given in the fitter package by @cokelaer for fitting probability distributions to observed data. The main function fits a wide range of different distributions, and can even automagically select the best distribution according to specified criteria. This is done through fitting different distributions in parallel on different threads, generally greatly speeding up calculations. Distributional fitting is, however, often an iterative procedure, meaning the duration required to generate a fit within some specified tolerance can not be known in advance. Parallel threads by default must wait for all to terminate before individual results can be joined. To ensure distributional fits are generated within a reasonable duration, fitter has a _timed_run function to:

spawn a thread and run the given function … and return the given default value if the timeout is exceeded.

The bit of that function which controls the timeout consists of the following lines (with code for exception handling removed here):

def _timed_run (self, func, args=()):
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default

        def run(self):
            self.result = func(args)

    it = InterruptableThread()
    it.start()
    it.join(self.timeout)
    return it.result

That represents a succinct way to run a multi-threaded job in which each thread obeys a specified timeout parameter. This post describes two approaches to implementing equivalent functionality in R.

Timeout in R’s ‘parallel’ package

R’s {parallel} package offers one way to implement a timeout parameter, via the mccollect() function, which is (almost) equivalent to Python’s .join() operator. This can be illustrated with this arbitrarily slow function:

fn <- function (x = 10L) {
    vapply (seq (x), function (i) {
                Sys.sleep (0.2)
                runif (1)
        }, numeric (1))
}

Calculating this in parallel is straightforward with the mcparallel() and mccollect() functions. This code generates 10 random inputs to fn() which will take random durations up to 20 * 0.2 = 4 seconds each.

set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
library (parallel)
jobs <- lapply (n, function (i) mcparallel (fn (i)))
system.time (
    res <- mccollect (jobs)
)

##    user  system elapsed 
##   0.006   0.000   3.615

That took much less than the expected duration of,

sum (n) / 5

## [1] 17.4

The mccollect() function has a timeout parameter “to check for job results”. Specifying that in the above function then gives the following, noting that the parameter wait also has to be passed with its non-default value of FALSE to activate timeout.

jobs <- lapply (n, function (i) mcparallel (fn (i)))
system.time (
    res <- mccollect (jobs, wait = FALSE, timeout = 2)
)

##    user  system elapsed 
##   0.000   0.000   0.003

That seems much too quick! What does the result look like?

res

## $`24053`
## [1] 0.6096623

It seems that mccollect() has only returned one result. The reason can be seen by tracing the implementation of the timeout parameter from the mccollect() function through to the selectChildren() function into the C function, select_children(), and finally to the lines which implement the waiting procedure. These lines show that the function returns as soon as it collects a value from any of the “child” processes (via the R_ext/R_SelectEx() function which is implemented here). So setting timeout in mccollect() will then return results as soon as the first result has been been generated. That of course means that the remaining jobs continue to be processed, and can be returned by subsequent calls to mccollect(). Two consecutive calls will then naturally return the first two results to be processed. To check this, we need to note that the jobs list contains process ID (pid) values, one of which is detached by the first call to mccollect(), and so has to be removed from the jobs list.

jobs <- lapply (n, function (i) mcparallel (fn (i)))
pids <- vapply (jobs, function (i) i$pid, integer (1))
system.time (
    res1 <- mccollect (jobs, wait = FALSE, timeout = 2)
)

##    user  system elapsed 
##   0.000   0.000   0.007

jobs <- jobs [which (!pids %in% names (res1))]
system.time (
    res2 <- mccollect (jobs, wait = FALSE, timeout = 2)
)
##    user  system elapsed 
##   0.000   0.000   0.003

The two returned values are then,

res1; res2

## $`26140`
## [1] 0.05318079
## 
## $`26146`
## [1] 0.7513229

So R has a timeout parameter on parallel jobs, but it doesn’t work like the equivalent Python parameters, and arguably doesn’t work how one might expect. That code exploration is nevertheless sufficient to understand how a pythonic version could be implemented:

par_timeout <- function (f, n, timeout) {
    jobs <- lapply (n, function (i) mcparallel (f (i)))
    Sys.sleep (timeout)
    mccollect (jobs, wait = FALSE)
}
par_timeout (fn, n, 2)

## $`26913`
## [1] 0.008293313
## 
## $`26908`
## [1] 0.2473093 0.9442306
## 
## $`26907`
## [1] 0.8032608
## 
## $`26906`
## [1] 0.1900972 0.8134690 0.2745623 0.3148808 0.3954601 0.7415558 0.9394560
## 
## $`26905`
## [1] 0.7566425 0.2494607 0.4848817 0.3469343

And we get five out of the expected 10 results returning within our specified timeout of 2 seconds. We can estimate from the generated values of n which ones should have returned, given that fn takes 0.2s per unit of the input, x, repeating the initial code used to generate those values.

set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
timeout <- 2 # in seconds
data.frame (n = n, should_work = n / 5 <= 2)

##     n should_work
## 1   4        TRUE
## 2   7        TRUE
## 3   1        TRUE
## 4   2        TRUE
## 5  11       FALSE
## 6  14       FALSE
## 7  18       FALSE
## 8  19       FALSE
## 9   1        TRUE
## 10 10        TRUE

And we might have expected 6 values to have returned, of which we actually got only 5, but perhaps the value of n = 10 extended just beyond the timeout? We’ll nevertheless compare this result with an alternative approach below. But first, there are some notable drawbacks to the approach illustrated here:

  1. The documentation for the mcparallel() and mccollect() functions state at the very first line, “These functions are based on forking and so are not available on Windows.” While that might not concern those who develop packages on other systems, it will greatly reduce the use of any code implementing parallel timeouts in this way.

  2. There are many “wrapper” packages around R’s core {parallel} functionality, notably including the “futureverse” family of packages, the primary aim of which is to make parallelisation in R simpler, through enabling any calls to be simply wrapped in parallelisation functions like future(). These packages offer no direct way of controlling the timeout parameter of mccollect(), or any equivalent functionality.

The next section explores a different approach that is operating-system independent.

Timeout via ‘callr’

The callr package by Gábor Csárdi and Winston Chang is designed for ‘calling R from R’ – that is, for,

performing computation in a separate R process, without affecting the current R process

The package offers two main modes of calling processes: as blocking, foreground processes via callr::r(), or as non-blocking, background processes via callr::r_bg(). The foreground r() function has an explicit timeout parameter, which returns a system_command_timeout_error if the specified timeout (in seconds) is exceeded. The following code calls the fn() function from above to demonstrate this functionality, wrapping the main call in tryCatch() to process the timeout errors:

timeout_fn <- function (x = 1L, timeout = 2) {
    tryCatch (
        callr::r (fn, args = list (x = x), timeout = timeout),
        error = function (e) NA
        )
}

Passing a value of x larger than around 5 should then timeout at 1 second, as this code demonstrates:

system.time (
    x <- timeout_fn (x = 10, timeout = 1)
    )

##    user  system elapsed 
##   0.152   0.035   0.959

The returned value is then:

x

## [1] NA

That function timed out as expected. Compare what happens when the timeout is extended well beyond that limit:

timeout_fn (x = 5, timeout = 10)

## [1] 0.7176185 0.9919061 0.3800352 0.7774452 0.9347052

The timeout parameter of callr::r() can thus be used to directly implement a timeout parameter. The following sub-section demonstrates how to extend this to parallel jobs.

Parallel timeout via ‘callr’

To illustrate a different approach than the previous mcparallel() function, the following code uses the mclapply function of the parallel package, which unfortunately also does not work on Windows, but suffices to demonstrate the principles.

set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
nc <- parallel::detectCores () - 1L
system.time (
    res <- parallel::mclapply (mc.cores = nc, n, function (i)
                               timeout_fn (x = i, timeout = 2))
    )

##    user  system elapsed 
##   1.754   0.544   3.008

print (res)

## [[1]]
## [1] 0.20134728 0.09508085 0.75240848 0.30041337
## 
## [[2]]
## [1] 0.5837042 0.6133771 0.3121486 0.2943205 0.4455983 0.5102744 0.8867751
## 
## [[3]]
## [1] 0.9381157
## 
## [[4]]
## [1] 0.9201705 0.9656466
## 
## [[5]]
## [1] NA
## 
## [[6]]
## [1] NA
## 
## [[7]]
## [1] NA
## 
## [[8]]
## [1] NA
## 
## [[9]]
## [1] 0.7515151
## 
## [[10]]
## [1] NA

And that returned 5 out of the 10 jobs, as for the previous example using mccollect(). (The actual values differ due to random number generators being seeded differently in the two lots of jobs.) This approach, of using callr to control function timeout parameters, enables parallel jobs to be implemented on all operating systems through replacing the mclapply() or mcparallel() functions with, for example, equivalent functions from the {snow} package. These {snow} functions (such as the parApply family of functions) also do not implement a timeout parameter, and so this {callr} approach offers one practical way to do so via those packages.

Timeout parameters and ‘future’ packages

Processes triggered by the {callr} package do not generally play nicely with the core {future} package, which was likely one motivation for Henrik Bengtsson to develop the {future.callr} package which explicitly uses {callr} to run each process. The processes are nevertheless triggered as callr::r_bg() processes which do not have a timeout parameter. While it is possible to directly implement a timeout parameter of r_bg processes by monitoring until timeout and then using the kill method, the future.callr package does not directly expose the r_bg processes necessary to enable this. There is therefore currently no safe way to implement a timeout parameter along the lines demonstrated here within any futureverse packages.

Originally posted: 14 Jan 22

Copyright © 2019--22 mark padgham