Fri 13 November 2015
Distributed Array Experiment
by Matthew Rocklin
Distributed Arrays¶
We use dask.array
, a small cluster on EC2, and distributed
together to manipulate large nd-arrays in memory.
This is a writeup of a preliminary experiment and nothing to get excited about. We'll mostly talk about things that are broken, not about fancy things that solve the world's problems.
Setup: Download data from ECMWF¶
I've used data from the European Center for Meteorological Weather Forecasting before in this blogpost about dask.array. Previously we've interacted with large volumes of data using a single laptop by streaming data from disk. Now we dump all of the data into distributed memory. This has some advantages and some drawbacks.
ECMWF only has about 1MB/s upload bandwidth, so this takes a while.
cat /home/ubuntu/download.py
Connect to distributed workers¶
I've set up a small cluster on EC2 using anaconda cluster and the following profile.
name: medium
node_id: ami-08faa660
node_type: m3.2xlarge
num_nodes: 5
provider: aws_east
user: ubuntu
plugins:
- notebook
- distributed
Note that anaconda cluster is now happy to setup a distributed
network for me, thanks to nice work by Daniel Rodriguez. If you want to repeat this experiment on your own without acluster
don't worry, distributed is simple to set up.
from distributed.client import clear
clear('54.172.83.119:8787') # clean up any old data lying around
from distributed import Executor
e = Executor('54.172.83.119:8787') # connect to cluster's center node
e.ncores # track number of workers
So we have four machines, each with eight cores and 30GB of memory.
Scatter arrays from head node to workers' memory¶
We've downloaded the ECMWF data onto the head node of our cluster as NetCDF files. We iteratively load these from disk to local memory and then scatter them throughout the memory of the the worker nodes of the cluster.
This is our first major problem; it would be much nicer if this data was stored in a distributed format so that we could read from all of our hard drives, rather than just the hard drive on the head node.
Sadly there is no commonly accepted standard on how to do this. Common approaches include the following:
- Store data on a storage node, pull from that disk and scatter out to the workers (what we do here)
- Store data on a network file system. This is essentially the same thing performance-wise but abstracts away the storage node.
- Use a distributed file system like HDFS (not easy to use with NetCDF) or GlusterFS.
from glob import glob
filenames = sorted(glob('/home/ubuntu/2014-*-*.nc'))
len(filenames)
Each file contains a few variables, we pull out t2m
, the temperature two meters above the ground. This variable is 33MB per day and has the shape (4, 721, 1400)
meaning that it has the following measurements:
- 4: every six hours
- 721: Every quarter degree of latitude
- 1440: Every quarter degree of longitude
import netCDF4
import numpy as np
f = netCDF4.Dataset(filenames[0]) # load single day's data from disk to memory
sample = f['t2m'][:]
{'shape': sample.shape, 'MegaBytes': sample.nbytes / 1e6}
Scatter¶
So we roll through our files, load t2m
from disk, and scatter out to the workers.
e.who_has # no data in the cluster
%%time
futures = []
for fn in filenames:
with netCDF4.Dataset(fn) as f:
t2m = f['t2m'][:] # load from disk
futures.append(e.scatter([t2m])[0])
e.who_has # lots of data in the cluster
Construct dask.array from futures¶
We now stack these little NumPy arrays into a logical dask.array.
It's not yet clear what the right API is for this. People will have many different arrangements of how they want to organize their many small numpy arrays into larger dask.arrays. This space is green-field.
So for now, rather than build a dask.array.construct_distributed_array(...)
function, we just construct the dask.array by hand. This is easy to do if you understand the structure of dask arrays but not user-friendly otherwise.
We'll make convenient user-facing functions after more experience with actual problems. (Send me your problems!)
import dask.array as da
dsk = {('x', i, 0, 0): f for i, f in enumerate(futures)}
chunks = ((4,) * len(futures), (721,), (1440,))
x = da.Array(dsk, 'x', chunks, np.float64)
x
All done. Hopefully that wasn't too bad but feel free to treat this as magic if you like.
Tell dask to use the distributed cluster¶
Dask needs to know that we want to use the distributed cluster operated by our executor. Otherwise it will try to use threads by default and this will fail.
import dask
dask.set_options(get=e.get)
Compute and time¶
We perform a sequence of small operations to tease out performance characteristics of the distributed network. Our full data is 12 GB spread over four nodes, each with eight cores.
x.nbytes / 1e9 # GB
Tiny Computation and Overhead¶
We pull a single element from the array to measure total system overhead.
The entire process takes a few milliseconds. I think that we can reduce this a bit further (there is an unnecessary extra roundtrip in there), but we're within a moderate factor of theoretical optimum.
%time x[0, 0, 0].compute()
Small data transfer¶
Data transfer is non-trivial on this network. Here we move around 8MB from a worker to the local machine in 110ms.
Ignoring latency, this is around 75MB/s. That's decent, but actually much slower than our disk bandwidth when operating out-of-core on a single machine.
%time x[100, :, :].compute()
These computations are about what we would expect if operating from disk. Neither distributed-memory nor local-disk is notably superior here.
Computation bound¶
Full computations use the whole cluster, we're mostly compute bound here. There is little data to transfer around (just single floats) so we're pretty sure that we're stressing computation and network latency.
%time x.sum().compute()
This is a huge win over an on-disk solution. This computation would have taken us minutes to do if we had to pull the data from disk.
Increase Communication¶
The next two computations do similar computations but with different communication patterns, we see a very large change in performance.
- The first computation computes the average temperature over the whole planet over time (averaging out spatial dimensions). It executes very quickly. This does not require any communication between worker nodes because it is aligned with how we've chunked our data.
- The second computation computes the average over the time dimension. This does require communication between worker nodes as they share about 3GB between each other.
%time x.mean(axis=[1, 2]).compute()
%time x.mean(axis=0).compute()
There are a few solutions to this problem:
- We can use networks with better interconnects
- We can change dask.array's reductions to be more tree-like. The data is already scattered so that nearby days are on the same machine. A more fine grained reduction will exploit this considerably.
Until we solve this problem this computation is slower in distributed-memory than with local-disk. Disk bandwidth is better and we can be more clever about keeping results in a single shared memory space.
Fail case¶
The following computation takes a very long time.
(x - x.mean())
This is because we don't track the size in bytes of every variable in the scheduler.
After we compute x.mean()
on one machine we want to subtract it from every numpy block $x_{i}$ of our array. If we know the relative size of the arrays then we know that we want to move the x.mean()
result to the machine with $x_i$ because x.mean()
is only a few bytes while $x_i$ is 33MB. Unfortunately the scheduler doesn't know what we know yet, so it doesn't always make this decision, and so the resulting computation can be very slow over a slow interconnect.
TODO¶
In conclusion, we have to solve the following problems:
- Think about n-dimensional distributed array storage
- Use dask.array + distributed more to get a sense of what user-APIs would be broadly helpful
- Support reduction trees in
dask.array
- Track size of variables in the distributed scheduler.