Python’s
multiprocessing
and threading
libraries both have a timeout parameter for re-joining threads after
they’ve finished. This provides an easy way to launch multi-threaded
jobs while ensuing that no single thread runs for longer than a
specified timeout. This is very useful in implementing a standard
“timeout on a function call” operation, as detailed in this Stack
Overflow question of that
title
which offers a bewildering variety of approaches to that problem. Among
the easiest of those is the recommendation to rely on the
multiprocessing
libraries’s join()
operation which accepts a
timeout
parameter, as described in the library’s
documentation.
There is also an equivalent parameter for python’s other main
parallelisation library,
threading
.
A nice example of the usefulness of this timeout
parameter in action
is given in the fitter
package
by @cokelaer for fitting probability
distributions to observed data. The main function fits a wide range of
different distributions, and can even automagically select the best
distribution according to specified criteria. This is done through
fitting different distributions in parallel on different threads,
generally greatly speeding up calculations. Distributional fitting is,
however, often an iterative procedure, meaning the duration required to
generate a fit within some specified tolerance can not be known in
advance. Parallel threads by default must wait for all to terminate
before individual results can be joined. To ensure distributional fits
are generated within a reasonable duration, fitter
has a _timed_run
function
to:
spawn a thread and run the given function … and return the given default value if the timeout is exceeded.
The bit of that function which controls the timeout consists of the following lines (with code for exception handling removed here):
def _timed_run (self, func, args=()):
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = func(args)
it = InterruptableThread()
it.start()
it.join(self.timeout)
return it.result
That represents a succinct way to run a multi-threaded job in which each thread obeys a specified timeout parameter. This post describes two approaches to implementing equivalent functionality in R.
R’s {parallel}
package
offers one way to implement a timeout
parameter, via the
mccollect()
function,
which is (almost) equivalent to Python’s .join()
operator. This can be
illustrated with this arbitrarily slow function:
fn <- function (x = 10L) {
vapply (seq (x), function (i) {
Sys.sleep (0.2)
runif (1)
}, numeric (1))
}
Calculating this in parallel is straightforward with the mcparallel()
and mccollect()
functions. This code generates 10 random inputs to
fn()
which will take random durations up to 20 * 0.2 = 4 seconds
each.
set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
library (parallel)
jobs <- lapply (n, function (i) mcparallel (fn (i)))
system.time (
res <- mccollect (jobs)
)
## user system elapsed
## 0.006 0.000 3.615
That took much less than the expected duration of,
sum (n) / 5
## [1] 17.4
The mccollect()
function has a timeout
parameter “to check for job
results”. Specifying that in the above function then gives the
following, noting that the parameter wait
also has to be passed with
its non-default value of FALSE
to activate timeout
.
jobs <- lapply (n, function (i) mcparallel (fn (i)))
system.time (
res <- mccollect (jobs, wait = FALSE, timeout = 2)
)
## user system elapsed
## 0.000 0.000 0.003
That seems much too quick! What does the result look like?
res
## $`24053`
## [1] 0.6096623
It seems that mccollect()
has only returned one result. The reason can
be seen by tracing the implementation of the timeout
parameter from
the mccollect()
function
through to the selectChildren()
function
into the C function,
select_children()
,
and finally to the lines which implement the waiting
procedure.
These lines show that the function returns as soon as it collects a
value from any of the “child” processes (via the R_ext/R_SelectEx()
function
which is implemented
here).
So setting timeout
in mccollect()
will then return results as soon
as the first result has been been generated. That of course means that
the remaining jobs continue to be processed, and can be returned by
subsequent calls to mccollect()
. Two consecutive calls will then
naturally return the first two results to be processed. To check this,
we need to note that the jobs
list contains process ID (pid
) values,
one of which is detached by the first call to mccollect()
, and so has
to be removed from the jobs
list.
jobs <- lapply (n, function (i) mcparallel (fn (i)))
pids <- vapply (jobs, function (i) i$pid, integer (1))
system.time (
res1 <- mccollect (jobs, wait = FALSE, timeout = 2)
)
## user system elapsed
## 0.000 0.000 0.007
jobs <- jobs [which (!pids %in% names (res1))]
system.time (
res2 <- mccollect (jobs, wait = FALSE, timeout = 2)
)
## user system elapsed
## 0.000 0.000 0.003
The two returned values are then,
res1; res2
## $`26140`
## [1] 0.05318079
##
## $`26146`
## [1] 0.7513229
So R has a timeout
parameter on parallel jobs, but it doesn’t work
like the equivalent Python parameters, and arguably doesn’t work how one
might expect. That code exploration is nevertheless sufficient to
understand how a pythonic version could be implemented:
par_timeout <- function (f, n, timeout) {
jobs <- lapply (n, function (i) mcparallel (f (i)))
Sys.sleep (timeout)
mccollect (jobs, wait = FALSE)
}
par_timeout (fn, n, 2)
## $`26913`
## [1] 0.008293313
##
## $`26908`
## [1] 0.2473093 0.9442306
##
## $`26907`
## [1] 0.8032608
##
## $`26906`
## [1] 0.1900972 0.8134690 0.2745623 0.3148808 0.3954601 0.7415558 0.9394560
##
## $`26905`
## [1] 0.7566425 0.2494607 0.4848817 0.3469343
And we get five out of the expected 10 results returning within our
specified timeout
of 2 seconds. We can estimate from the generated
values of n
which ones should have returned, given that fn
takes
0.2s per unit of the input, x
, repeating the initial code used to
generate those values.
set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
timeout <- 2 # in seconds
data.frame (n = n, should_work = n / 5 <= 2)
## n should_work
## 1 4 TRUE
## 2 7 TRUE
## 3 1 TRUE
## 4 2 TRUE
## 5 11 FALSE
## 6 14 FALSE
## 7 18 FALSE
## 8 19 FALSE
## 9 1 TRUE
## 10 10 TRUE
And we might have expected 6 values to have returned, of which we
actually got only 5, but perhaps the value of n = 10
extended just
beyond the timeout? We’ll nevertheless compare this result with an
alternative approach below. But first, there are some notable drawbacks
to the approach illustrated here:
The documentation for the mcparallel()
and mccollect()
functions
state at the very first line, “These functions are based on forking
and so are not available on Windows.” While that might not concern
those who develop packages on other systems, it will greatly reduce
the use of any code implementing parallel timeouts in this way.
There are many “wrapper” packages around R’s core {parallel}
functionality, notably including the “futureverse” family of
packages, the primary aim of which is to
make parallelisation in R simpler, through enabling any calls to be
simply wrapped in parallelisation functions like future()
. These
packages offer no direct way of controlling the timeout
parameter
of mccollect()
, or any equivalent functionality.
The next section explores a different approach that is operating-system independent.
The callr package by Gábor Csárdi and Winston Chang is designed for ‘calling R from R’ – that is, for,
performing computation in a separate R process, without affecting the current R process
The package offers two main modes of calling processes: as blocking,
foreground processes via
callr::r()
, or as
non-blocking, background processes via
callr::r_bg()
. The
foreground r()
function has an explicit timeout
parameter, which
returns a system_command_timeout_error
if the specified timeout (in
seconds) is exceeded. The following code calls the fn()
function from
above to demonstrate this functionality, wrapping the main call in
tryCatch()
to process the timeout errors:
timeout_fn <- function (x = 1L, timeout = 2) {
tryCatch (
callr::r (fn, args = list (x = x), timeout = timeout),
error = function (e) NA
)
}
Passing a value of x
larger than around 5 should then timeout at 1
second, as this code demonstrates:
system.time (
x <- timeout_fn (x = 10, timeout = 1)
)
## user system elapsed
## 0.152 0.035 0.959
The returned value is then:
x
## [1] NA
That function timed out as expected. Compare what happens when the
timeout
is extended well beyond that limit:
timeout_fn (x = 5, timeout = 10)
## [1] 0.7176185 0.9919061 0.3800352 0.7774452 0.9347052
The timeout
parameter of callr::r()
can thus be used to directly
implement a timeout parameter. The following sub-section demonstrates
how to extend this to parallel jobs.
To illustrate a different approach than the previous mcparallel()
function, the following code uses the mclapply
function of the
parallel
package,
which unfortunately also does not work on Windows, but suffices to
demonstrate the principles.
set.seed (1)
n <- sample (1:20, size = 10, replace = TRUE)
nc <- parallel::detectCores () - 1L
system.time (
res <- parallel::mclapply (mc.cores = nc, n, function (i)
timeout_fn (x = i, timeout = 2))
)
## user system elapsed
## 1.754 0.544 3.008
print (res)
## [[1]]
## [1] 0.20134728 0.09508085 0.75240848 0.30041337
##
## [[2]]
## [1] 0.5837042 0.6133771 0.3121486 0.2943205 0.4455983 0.5102744 0.8867751
##
## [[3]]
## [1] 0.9381157
##
## [[4]]
## [1] 0.9201705 0.9656466
##
## [[5]]
## [1] NA
##
## [[6]]
## [1] NA
##
## [[7]]
## [1] NA
##
## [[8]]
## [1] NA
##
## [[9]]
## [1] 0.7515151
##
## [[10]]
## [1] NA
And that returned 5 out of the 10 jobs, as for the previous example
using mccollect()
. (The actual values differ due to random number
generators being seeded differently in the two lots of jobs.) This
approach, of using callr
to control function timeout
parameters,
enables parallel jobs to be implemented on all operating systems through
replacing the mclapply()
or mcparallel()
functions with, for
example, equivalent functions from the {snow}
package. These
{snow}
functions (such as the parApply
family of functions) also do
not implement a timeout
parameter, and so this {callr}
approach
offers one practical way to do so via those packages.
Processes triggered by the {callr}
package do not generally play
nicely with the core {future}
package, which was likely one motivation
for Henrik Bengtsson to develop the {future.callr}
package which explicitly uses
{callr}
to run each process. The processes are nevertheless triggered
as callr::r_bg()
processes which do not have a timeout
parameter.
While it is possible to directly implement a timeout parameter of r_bg
processes by monitoring until timeout and then using the kill
method,
the future.callr
package does not directly expose the r_bg
processes
necessary to enable this. There is therefore currently no safe way to
implement a timeout parameter along the lines demonstrated here within
any futureverse
packages.
Copyright © 2019--22 mark padgham