## Distributed Arrays¶

We use dask.array, a small cluster on EC2, and distributed together to manipulate large nd-arrays in memory.

This is a writeup of a preliminary experiment and nothing to get excited about. We'll mostly talk about things that are broken, not about fancy things that solve the world's problems.

I've used data from the European Center for Meteorological Weather Forecasting before in this blogpost about dask.array. Previously we've interacted with large volumes of data using a single laptop by streaming data from disk. Now we dump all of the data into distributed memory. This has some advantages and some drawbacks.

ECMWF only has about 1MB/s upload bandwidth, so this takes a while.

In [1]:
cat /home/ubuntu/download.py

import pandas as pd
dates = pd.date_range('2014-01-01', '2014-12-31', freq='D')
dates = [str(d).split()[0] for d in dates]

from ecmwfapi import ECMWFDataServer

server = ECMWFDataServer()

for date in dates:
server.retrieve({
'stream'    : "oper",
'levtype'   : "sfc",
'param'     : "165.128/166.128/167.128",
'dataset'   : "interim",
'step'      : "00",
'grid'      : "0.25/0.25",
'time'      : "00/06/12/18",
'date'      : date,
'type'      : "an",
'class'     : "ei",
'target'    : date + ".nc",
'format'    : "netcdf" })


### Connect to distributed workers¶

I've set up a small cluster on EC2 using anaconda cluster and the following profile.

name: medium
node_id: ami-08faa660
node_type: m3.2xlarge
num_nodes: 5
provider: aws_east
user: ubuntu
plugins:
- notebook
- distributed


Note that anaconda cluster is now happy to setup a distributed network for me, thanks to nice work by Daniel Rodriguez. If you want to repeat this experiment on your own without acluster don't worry, distributed is simple to set up.

In [2]:
from distributed.client import clear
clear('54.172.83.119:8787')  # clean up any old data lying around

In [3]:
from distributed import Executor
e = Executor('54.172.83.119:8787')  # connect to cluster's center node

In [4]:
e.ncores  # track number of workers

Out[4]:
{('172.31.8.61', 8788): 8,
('172.31.8.62', 8788): 8,
('172.31.8.63', 8788): 8,
('172.31.8.64', 8788): 8}

So we have four machines, each with eight cores and 30GB of memory.

### Scatter arrays from head node to workers' memory¶

We've downloaded the ECMWF data onto the head node of our cluster as NetCDF files. We iteratively load these from disk to local memory and then scatter them throughout the memory of the the worker nodes of the cluster.

This is our first major problem; it would be much nicer if this data was stored in a distributed format so that we could read from all of our hard drives, rather than just the hard drive on the head node.

Sadly there is no commonly accepted standard on how to do this. Common approaches include the following:

1. Store data on a storage node, pull from that disk and scatter out to the workers (what we do here)
2. Store data on a network file system. This is essentially the same thing performance-wise but abstracts away the storage node.
3. Use a distributed file system like HDFS (not easy to use with NetCDF) or GlusterFS.
In [5]:
from glob import glob
filenames = sorted(glob('/home/ubuntu/2014-*-*.nc'))
len(filenames)

Out[5]:
365

Each file contains a few variables, we pull out t2m, the temperature two meters above the ground. This variable is 33MB per day and has the shape (4, 721, 1400) meaning that it has the following measurements:

• 4: every six hours
• 721: Every quarter degree of latitude
• 1440: Every quarter degree of longitude
In [6]:
import netCDF4
import numpy as np

f = netCDF4.Dataset(filenames[0])  # load single day's data from disk to memory
sample = f['t2m'][:]
{'shape': sample.shape, 'MegaBytes': sample.nbytes / 1e6}

Out[6]:
{'MegaBytes': 33.22368, 'shape': (4, 721, 1440)}

### Scatter¶

So we roll through our files, load t2m from disk, and scatter out to the workers.

In [7]:
e.who_has  # no data in the cluster

Out[7]:
defaultdict(set, {})
In [8]:
%%time
futures = []
for fn in filenames:
with netCDF4.Dataset(fn) as f:
t2m = f['t2m'][:]  # load from disk
futures.append(e.scatter([t2m])[0])

CPU times: user 49.4 s, sys: 1min 37s, total: 2min 26s
Wall time: 4min 34s

In [9]:
e.who_has  # lots of data in the cluster

Out[9]:
defaultdict(set,
{'27d53712-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'284fec64-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'28cbec2e-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'294b4ea6-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'29c7d598-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'2a483e2c-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'2abfa624-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'2b2fd282-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.61', 8788)},
'2b995eb4-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.64', 8788)},
'2c0d24a2-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.64', 8788)},
'2c7a1652-8999-11e5-87c9-0aa19f43a229-0': {('172.31.8.64', 8788)},
...)


We now stack these little NumPy arrays into a logical dask.array.

It's not yet clear what the right API is for this. People will have many different arrangements of how they want to organize their many small numpy arrays into larger dask.arrays. This space is green-field.

So for now, rather than build a dask.array.construct_distributed_array(...) function, we just construct the dask.array by hand. This is easy to do if you understand the structure of dask arrays but not user-friendly otherwise.

We'll make convenient user-facing functions after more experience with actual problems. (Send me your problems!)

In [10]:
import dask.array as da

dsk = {('x', i, 0, 0): f for i, f in enumerate(futures)}
chunks = ((4,) * len(futures), (721,), (1440,))

x = da.Array(dsk, 'x', chunks, np.float64)
x

Out[10]:
dask.array<x, shape=(1460, 721, 1440), dtype=float64, chunksize=(4, 721, 1440)>

All done. Hopefully that wasn't too bad but feel free to treat this as magic if you like.

### Tell dask to use the distributed cluster¶

Dask needs to know that we want to use the distributed cluster operated by our executor. Otherwise it will try to use threads by default and this will fail.

In [11]:
import dask

Out[11]:
<dask.context.set_options at 0x7fa62423b950>

### Compute and time¶

We perform a sequence of small operations to tease out performance characteristics of the distributed network. Our full data is 12 GB spread over four nodes, each with eight cores.

In [12]:
x.nbytes / 1e9  # GB

Out[12]:
12.1266432

We pull a single element from the array to measure total system overhead.

The entire process takes a few milliseconds. I think that we can reduce this a bit further (there is an unnecessary extra roundtrip in there), but we're within a moderate factor of theoretical optimum.

In [13]:
%time x[0, 0, 0].compute()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 8.81 ms

Out[13]:
253.96238623555584

### Small data transfer¶

Data transfer is non-trivial on this network. Here we move around 8MB from a worker to the local machine in 110ms.

Ignoring latency, this is around 75MB/s. That's decent, but actually much slower than our disk bandwidth when operating out-of-core on a single machine.

In [14]:
%time x[100, :, :].compute()

CPU times: user 64 ms, sys: 28 ms, total: 92 ms
Wall time: 110 ms

Out[14]:
array([[ 264.11380568,  264.11380568,  264.11380568, ...,  264.11380568,
264.11380568,  264.11380568],
[ 265.37222634,  265.36602722,  265.35982811, ...,  265.38307479,
265.37997524,  265.3753259 ],
[ 265.46521309,  265.45901397,  265.45126508, ...,  265.47761132,
265.47296198,  265.46986243],
...,
[ 243.97287605,  243.96977649,  243.96512716, ...,  243.98527428,
243.98062495,  243.97752539],
[ 243.97287605,  243.96977649,  243.96512716, ...,  243.98527428,
243.98062495,  243.97752539],
[ 243.10190018,  243.10190018,  243.10190018, ...,  243.10190018,
243.10190018,  243.10190018]])

These computations are about what we would expect if operating from disk. Neither distributed-memory nor local-disk is notably superior here.

### Computation bound¶

Full computations use the whole cluster, we're mostly compute bound here. There is little data to transfer around (just single floats) so we're pretty sure that we're stressing computation and network latency.

In [15]:
%time x.sum().compute()

CPU times: user 460 ms, sys: 36 ms, total: 496 ms
Wall time: 774 ms

Out[15]:
422680260719.94751

This is a huge win over an on-disk solution. This computation would have taken us minutes to do if we had to pull the data from disk.

### Increase Communication¶

The next two computations do similar computations but with different communication patterns, we see a very large change in performance.

• The first computation computes the average temperature over the whole planet over time (averaging out spatial dimensions). It executes very quickly. This does not require any communication between worker nodes because it is aligned with how we've chunked our data.
• The second computation computes the average over the time dimension. This does require communication between worker nodes as they share about 3GB between each other.
In [16]:
%time x.mean(axis=[1, 2]).compute()

CPU times: user 644 ms, sys: 32 ms, total: 676 ms
Wall time: 1.17 s

Out[16]:
array([ 277.29761461,  277.50818372,  277.77023522, ...,  277.26209424,
277.61283083,  277.38320223])
In [17]:
%time x.mean(axis=0).compute()

CPU times: user 688 ms, sys: 160 ms, total: 848 ms
Wall time: 5min 27s

Out[17]:
array([[ 260.67545742,  260.67545742,  260.67545742, ...,  260.67545742,
260.67545742,  260.67545742],
[ 260.80978656,  260.81082765,  260.8119064 , ...,  260.80643692,
260.80756166,  260.80863382],
[ 260.85199166,  260.85328043,  260.85450587, ...,  260.84783287,
260.84923888,  260.85058685],
...,
[ 228.02812417,  228.02513365,  228.0221535 , ...,  228.0347369 ,
228.03253842,  228.03033575],
[ 228.02848855,  228.02552743,  228.02253079, ...,  228.03511764,
228.03293455,  228.03067397],
[ 227.19711672,  227.19711672,  227.19711672, ...,  227.19711672,
227.19711672,  227.19711672]])

There are a few solutions to this problem:

1. We can use networks with better interconnects
2. We can change dask.array's reductions to be more tree-like. The data is already scattered so that nearby days are on the same machine. A more fine grained reduction will exploit this considerably.

Until we solve this problem this computation is slower in distributed-memory than with local-disk. Disk bandwidth is better and we can be more clever about keeping results in a single shared memory space.

### Fail case¶

The following computation takes a very long time.

(x - x.mean())



This is because we don't track the size in bytes of every variable in the scheduler.

After we compute x.mean() on one machine we want to subtract it from every numpy block $x_{i}$ of our array. If we know the relative size of the arrays then we know that we want to move the x.mean() result to the machine with $x_i$ because x.mean() is only a few bytes while $x_i$ is 33MB. Unfortunately the scheduler doesn't know what we know yet, so it doesn't always make this decision, and so the resulting computation can be very slow over a slow interconnect.

### TODO¶

In conclusion, we have to solve the following problems:

1. Think about n-dimensional distributed array storage
2. Use dask.array + distributed more to get a sense of what user-APIs would be broadly helpful
3. Support reduction trees in dask.array
4. Track size of variables in the distributed scheduler.