Parallel Python#

Spatial libraries with parallel support#

If starting with a new code, the first option could be to look for spatial libraries that have parallelization already built in:

  • Dask-geopandas for vector data analysis, still a lot more limited than geopandas

  • xarray for basic raster data analysis

  • xarray-spatial for common raster analysis functions

  • rioxarray for reading data via GDAL-supported formats and basic merging, clipping etc

  • osmnx for routing

Python parallel libraries#

The parallel spatial libraries are still developing and do not support very wide range of functionality, so often these do not fit all requirements. Or if you are changing an existing serial code to parallel. Then the next option is to write parallel coude yourself. The basic Python code runs in serial mode, so usually some changes to code are needed to benefit from parallel computing.

Python has many libraries to support parallelization:

  • Multi-core: multiprocessing and joblib

  • Multi-core or multi-node: dask and mpi4py

If unsure, start with Dask, it is one of the newest, most versatile and easy to use. But Dask has many options and alternatives, multiprocessing might be easier to learn as first. All of the above mentioned spatial libraries use Dask, except osmnx, which uses multiprocessing.

How many cores I can use?

If you need to check in code, how many cores you can use, use:

len(os.sched_getaffinity(0))

Do not use multiprocessing.cpu_count(), that counts only hardware cores, but does not understand batch jobs.

Dask#

Dask is a versatile Python library for scalable analytics.

Delayed computing

One general feature of Dask is that it delays computing to the point when the result is actually needed, for example for plotting or saving to a file. So for example when running the code in Jupyter, cells that actually require longer time may run instantly, but later some cell may take a lot of time.

When using Dask, two main decisions have to be made for running code in parallel, which we will answer next.

  1. How to run the parallel code?

  2. How to make the code parallel?

How to run the parallel code?#

Dask supports different set-ups for parallel computing, from supercomptuers point of view, the main options are:

While developing the code, it might be good to start with default scheduler or LocalCluster parallelization and then if needed change it to SLURMCluster. The required changes to code are small, when changing the parallelization set-up.

One of the advantages of using LocalCluster, is that then in Jupyter the Dask-extension is able to show progress and resource usage.

Default scheduler is started automatically, when Dask objects or functions are used.

LocalCluster

# With default settings
from dask.distributed import Client
client = Client()

# With for example 30 cores available, Dask by default would likely create 6 workers.
# Depending on your analysis, it might be good or not.
# To select the number of workers explicitly:
no_of_cores = len(os.sched_getaffinity(0))
client = Client(n_workers=no_of_cores)

SLURMCluster

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    queue="small",
    account=project_name,
    cores=no_of_cores,
    processes=no_of_cores,
    memory="12G",
    walltime="00:10:00",
    interface="ib0"
)

cluster.scale(number_of_jobs)
client = Client(cluster)

How to make the code parallel?#

Dask provides several options, inc Dask DataFrames, Dask Arrays, Dask Bags, Dask Delayed and Dask Futures. This decision depends on the type of analyzed data and already existing code. Additionally Dask has support for scalable machine learning with DaskML.

In this course we use Delayed functions. Delayed functions are useful in parallelising existing code. This approach delays function calls and creates a graph of the computing process. From the graph, Dask can then divide the work tasks to different workers whenever parallel computing is possible. Keep in mind that the other ways of code parallelisation might suit better in different use cases.

The changes to code are exactly the same for all parallization set-ups. The most simple changes could be:

  • For-loops:

    • Change to Dask’s delayed functions,

  • map() -> Dask's client.map()

# Example of changing for-loop and map() to Dask
# Just a demo slow function, that waits for 2 seconds
def slow_functio(i):
  time.sleep(2) 
  return(i)
# Input data vector, the slow function is run for each element.
input = range(0, 7)

Serial

# Basic FOR loop
a = []
for i in input: 
  a.append(slow_function(i))

print(a)

# Basic map
a = map(slow_function, input)
print(list(a))

Parallel, Dask delayed functions

from dask import delayed
from dask import compute
list_of_delayed_functions = []
for i in input:
    list_of_delayed_functions.append(delayed(slow_function)(i))

a = compute(list_of_delayed_functions)
print(a)

Parallel, with Dask futures

from dask.distributed import Client
client = Client() 
futures = client.map(slow_function, input)
a = client.gather(futures)
print(a)

variables with Dask

  • Dask exports needed variables and libraries automatically to the parallel processes

  • The variables must be serializable.

  • Avoid moving big size variables from main to parallel process. Spatial data analysis often includes significant amounts of data. It is better to read the data inside the parallel function. Give as input the file name or compute area coordinates etc.

Batch job scripts#

In batch job scripts it is important to set correctly:

  • nodes - How many nodes to reserve? 1 for default schedulers or LocalCluster, more than 1 for SLURMCluster

  • cpus-per-task - How many cores to reserver? Depending on needs, 1-n. n depends on number of available CPUs per node.

#SBATCH --nodes=1
#SBATCH --cpus-per-task=4  

(...)

srun python dask_script.py

Further reading: