Python’s
multiprocessing
and threading
libraries both have a timeout parameter for re-joining threads after
they’ve finished. This provides an easy way to launch multi-threaded
jobs while ensuing that no single thread runs for longer than a
specified timeout. This is very useful in implementing a standard
“timeout on a function call” operation, as detailed in this Stack
Overflow question of that
title
which offers a bewildering variety of approaches to that problem. Among
the easiest of those is the recommendation to rely on the
multiprocessing libraries’s join()
operation which accepts a
timeout parameter, as described in the library’s
documentation.
There is also an equivalent parameter for python’s other main
parallelisation library,
threading.
A nice example of the usefulness of this timeout parameter in action
is given in the fitter package
by @cokelaer for fitting probability
distributions to observed data. The main function fits a wide range of
different distributions, and can even automagically select the best
distribution according to specified criteria. This is done through
fitting different distributions in parallel on different threads,
generally greatly speeding up calculations. Distributional fitting is,
however, often an iterative procedure, meaning the duration required to
generate a fit within some specified tolerance can not be known in
advance. Parallel threads by default must wait for all to terminate
before individual results can be joined. To ensure distributional fits
are generated within a reasonable duration, fitter has a _timed_run
function
to:
spawn a thread and run the given function … and return the given default value if the timeout is exceeded.
The bit of that function which controls the timeout consists of the following lines (with code for exception handling removed here):
def _timed_run (self, func, args=()):
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = func(args)
it = InterruptableThread()
it.start()
it.join(self.timeout)
return it.resultThat represents a succinct way to run a multi-threaded job in which each thread obeys a specified timeout parameter. This post describes two approaches to implementing equivalent functionality in R.
R’s {parallel}
package
offers one way to implement a timeout parameter, via the
mccollect()
function,
which is (almost) equivalent to Python’s .join() operator. This can be
illustrated with this arbitrarily slow function:
fn <- function (x = 10L) {
vapply (seq (x), function (i) {
Sys.sleep (0.2)
runif (1)
}, numeric (1))
}Calculating this in parallel is straightforward with the mcparallel()
and mccollect() functions. This code generates 10 random inputs to
fn() which will take random durations up to 20 * 0.2 = 4 seconds
each.
set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
library (parallel)
jobs <- lapply (n, function (i) mcparallel (fn (i)))
system.time (
res <- mccollect (jobs)
)## user system elapsed
## 0.006 0.000 3.615That took much less than the expected duration of,
sum (n) / 5## [1] 17.4The mccollect() function has a timeout parameter “to check for job
results”. Specifying that in the above function then gives the
following, noting that the parameter wait also has to be passed with
its non-default value of FALSE to activate timeout.
jobs <- lapply (n, function (i) mcparallel (fn (i)))
system.time (
res <- mccollect (jobs, wait = FALSE, timeout = 2)
)## user system elapsed
## 0.000 0.000 0.003That seems much too quick! What does the result look like?
res## $`24053`
## [1] 0.6096623It seems that mccollect() has only returned one result. The reason can
be seen by tracing the implementation of the timeout parameter from
the mccollect()
function
through to the selectChildren()
function
into the C function,
select_children(),
and finally to the lines which implement the waiting
procedure.
These lines show that the function returns as soon as it collects a
value from any of the “child” processes (via the R_ext/R_SelectEx()
function
which is implemented
here).
So setting timeout in mccollect() will then return results as soon
as the first result has been been generated. That of course means that
the remaining jobs continue to be processed, and can be returned by
subsequent calls to mccollect(). Two consecutive calls will then
naturally return the first two results to be processed. To check this,
we need to note that the jobs list contains process ID (pid) values,
one of which is detached by the first call to mccollect(), and so has
to be removed from the jobs list.
jobs <- lapply (n, function (i) mcparallel (fn (i)))
pids <- vapply (jobs, function (i) i$pid, integer (1))
system.time (
res1 <- mccollect (jobs, wait = FALSE, timeout = 2)
)## user system elapsed
## 0.000 0.000 0.007jobs <- jobs [which (!pids %in% names (res1))]
system.time (
res2 <- mccollect (jobs, wait = FALSE, timeout = 2)
)## user system elapsed
## 0.000 0.000 0.003The two returned values are then,
res1; res2## $`26140`
## [1] 0.05318079
##
## $`26146`
## [1] 0.7513229So R has a timeout parameter on parallel jobs, but it doesn’t work
like the equivalent Python parameters, and arguably doesn’t work how one
might expect. That code exploration is nevertheless sufficient to
understand how a pythonic version could be implemented:
par_timeout <- function (f, n, timeout) {
jobs <- lapply (n, function (i) mcparallel (f (i)))
Sys.sleep (timeout)
mccollect (jobs, wait = FALSE)
}
par_timeout (fn, n, 2)## $`26913`
## [1] 0.008293313
##
## $`26908`
## [1] 0.2473093 0.9442306
##
## $`26907`
## [1] 0.8032608
##
## $`26906`
## [1] 0.1900972 0.8134690 0.2745623 0.3148808 0.3954601 0.7415558 0.9394560
##
## $`26905`
## [1] 0.7566425 0.2494607 0.4848817 0.3469343And we get five out of the expected 10 results returning within our
specified timeout of 2 seconds. We can estimate from the generated
values of n which ones should have returned, given that fn takes
0.2s per unit of the input, x, repeating the initial code used to
generate those values.
set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
timeout <- 2 # in seconds
data.frame (n = n, should_work = n / 5 <= 2)## n should_work
## 1 4 TRUE
## 2 7 TRUE
## 3 1 TRUE
## 4 2 TRUE
## 5 11 FALSE
## 6 14 FALSE
## 7 18 FALSE
## 8 19 FALSE
## 9 1 TRUE
## 10 10 TRUEAnd we might have expected 6 values to have returned, of which we
actually got only 5, but perhaps the value of n = 10 extended just
beyond the timeout? We’ll nevertheless compare this result with an
alternative approach below. But first, there are some notable drawbacks
to the approach illustrated here:
The documentation for the mcparallel() and mccollect()
functions
state at the very first line, “These functions are based on forking
and so are not available on Windows.” While that might not concern
those who develop packages on other systems, it will greatly reduce
the use of any code implementing parallel timeouts in this way.
There are many “wrapper” packages around R’s core {parallel}
functionality, notably including the “futureverse” family of
packages, the primary aim of which is to
make parallelisation in R simpler, through enabling any calls to be
simply wrapped in parallelisation functions like future(). These
packages offer no direct way of controlling the timeout parameter
of mccollect(), or any equivalent functionality.
The next section explores a different approach that is operating-system independent.
The callr package by Gábor Csárdi and Winston Chang is designed for ‘calling R from R’ – that is, for,
performing computation in a separate R process, without affecting the current R process
The package offers two main modes of calling processes: as blocking,
foreground processes via
callr::r(), or as
non-blocking, background processes via
callr::r_bg(). The
foreground r() function has an explicit timeout parameter, which
returns a system_command_timeout_error if the specified timeout (in
seconds) is exceeded. The following code calls the fn() function from
above to demonstrate this functionality, wrapping the main call in
tryCatch() to process the timeout errors:
timeout_fn <- function (x = 1L, timeout = 2) {
tryCatch (
callr::r (fn, args = list (x = x), timeout = timeout),
error = function (e) NA
)
}Passing a value of x larger than around 5 should then timeout at 1
second, as this code demonstrates:
system.time (
x <- timeout_fn (x = 10, timeout = 1)
)## user system elapsed
## 0.152 0.035 0.959The returned value is then:
x## [1] NAThat function timed out as expected. Compare what happens when the
timeout is extended well beyond that limit:
timeout_fn (x = 5, timeout = 10)## [1] 0.7176185 0.9919061 0.3800352 0.7774452 0.9347052The timeout parameter of callr::r() can thus be used to directly
implement a timeout parameter. The following sub-section demonstrates
how to extend this to parallel jobs.
To illustrate a different approach than the previous mcparallel()
function, the following code uses the mclapply function of the
parallel
package,
which unfortunately also does not work on Windows, but suffices to
demonstrate the principles.
set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
nc <- parallel::detectCores () - 1L
system.time (
res <- parallel::mclapply (mc.cores = nc, n, function (i)
timeout_fn (x = i, timeout = 2))
)## user system elapsed
## 1.754 0.544 3.008print (res)## [[1]]
## [1] 0.20134728 0.09508085 0.75240848 0.30041337
##
## [[2]]
## [1] 0.5837042 0.6133771 0.3121486 0.2943205 0.4455983 0.5102744 0.8867751
##
## [[3]]
## [1] 0.9381157
##
## [[4]]
## [1] 0.9201705 0.9656466
##
## [[5]]
## [1] NA
##
## [[6]]
## [1] NA
##
## [[7]]
## [1] NA
##
## [[8]]
## [1] NA
##
## [[9]]
## [1] 0.7515151
##
## [[10]]
## [1] NAAnd that returned 5 out of the 10 jobs, as for the previous example
using mccollect(). (The actual values differ due to random number
generators being seeded differently in the two lots of jobs.) This
approach, of using callr to control function timeout parameters,
enables parallel jobs to be implemented on all operating systems through
replacing the mclapply() or mcparallel() functions with, for
example, equivalent functions from the {snow}
package. These
{snow} functions (such as the parApply family of functions) also do
not implement a timeout parameter, and so this {callr} approach
offers one practical way to do so via those packages.
Processes triggered by the {callr} package do not generally play
nicely with the core {future} package, which was likely one motivation
for Henrik Bengtsson to develop the {future.callr}
package which explicitly uses
{callr} to run each process. The processes are nevertheless triggered
as callr::r_bg() processes which do not have a timeout parameter.
While it is possible to directly implement a timeout parameter of r_bg
processes by monitoring until timeout and then using the kill method,
the future.callr package does not directly expose the r_bg processes
necessary to enable this. There is therefore currently no safe way to
implement a timeout parameter along the lines demonstrated here within
any futureverse packages.
Copyright © 2019--22 mark padgham