Tue 27 October 2015
tl;dr: We enable ad-hoc distributed computations with a concurrent.futures interface.
distributed project prototype provides distributed computing on a cluster
in pure Python.
distributed.Executor interface mimics the
concurrent.futures stdlib package.
from distributed import Executor executor = Executor('192.168.1.124:8787') # address of center-node in cluster def inc(x): return x + 1 >>> x = executor.submit(inc, 1) >>> x <Future: status: waiting, key: inc-8bca9db48807c7d8bf613135d01b875f>
The submit call executes
inc(1) on a remote machine. The returned future
serves as a proxy to the remote result. We can collect this result back to the
local process with the
>>> x.result() # transfers data from remote to local process 2
However, we don't want to do this; moving data from remote workers to the local
process is expensive. We avoid calling
By default the result of the computation (
2) stays on the remote computer
where the computation occurred. We avoid data transfer by allowing
calls to directly accept futures as arguments:
>>> y = executor.submit(inc, x) # no need to call x.result()
The computation for
y will happen on the same machine where
x lives. This
interface deviates from
concurrent.futures where we would have to wait on
y. We no longer have to do the following:
>>> y = executor.submit(inc, x.result()) # old concurrent.futures API
This is useful for two reasons:
- Avoids moving data to the local process: We keep data on the distributed network. This avoids unnecessary data transfer.
- Fire and forget: We don't have to reason about which jobs finish before which others. The scheduler takes care of this for us.
Ad hoc computations
For ad-hoc computations we often want the fine-grained control that a simple
executor.submit(func, *args) function provides. Ad-hoc
computations often have various function calls that depend on each other in
strange ways; they don't fit a particular model or framework.
We don't need this fine-grained control for typical embarrassingly parallel
work, like parsing JSON and loading it into a database. In these typical cases
the bulk operations like
reduce provided by MapReduce or Spark fit
However when computations aren't straightforward and don't fit nicely into a
framework then we're stuck performing
join gymnastics over
strange key naming schemes to coerce our problem into the MapReduce or Spark
programming models. If we're not operating at the petabyte scale then these
programming models might be overkill.
.submit function has an overhead of about a millisecond per call (not
counting network latency). This might be crippling at the petabyte scale, but
it's quite convenient and freeing at the medium-data scale. It lets us use a
cluster of machines without strongly mangling our code or our conceptual model
of what's going on.
Example: Binary Tree Reduction
As an example we perform a binary tree reduction on a sequence of random arrays.
This is the kind of algorithm you would find hard-coded into a library like Spark or dask.array/dask.dataframe but that we can accomplish by hand with some for loops while still using parallel distributed computing. The difference here is that we're not limited to the algorithms chosen for us and can screw around more freely.
finish total single array output ^ / \ | z1 z2 neighbors merge | / \ / \ | y1 y2 y3 y4 neighbors merge ^ / \ / \ / \ / \ start x1 x2 x3 x4 x5 x6 x7 x8 many array inputs
- Make sixteen, million element random arrays on the cluster:
import numpy as np xs = executor.map(np.random.random,  * 16, pure=False)
- Add neighbors until there is only one left:
while len(xs) > 1: xs = [executor.submit(add, xs[i], xs[i + 1]) for i in range(0, len(xs), 2)]
- Fetch final result:
>>> xs.result() array([ 2.069694 , 9.01727599, 5.42682524, ..., 0.42372487, 1.50364966, 13.48674896])
This entire computation, from writing the code to receiving the answer takes well under a second on a small cluster. This isn't surprising, it's a very small computation. What's notable though is the very small startup and overhead time. It's rare to find distributed computing systems that can spin up and finish small computations this quickly.
Various other Python frameworks provide distributed function evaluation. A few
are listed here
. Notably we're stepping on the toes of
SCOOP, an excellent library that also
provides a distributed
distributed project could use a more distinct name. Any suggestions?
For more information see the following links: