Managing a pool of MPI processes with Python and Pypar

MPI is a standard for communication between multiple processes in parallel computing. These processes can be running on different cores, CPUs, or entirely different computers in a grid. MPI is a standard, and there are many implementations available (many are open source). The features of MPI can be accessed from Python with the packages pypar and mpi4py.  Here I present a Python script that implements a “process pool” design pattern using pypar.  I have observed a pattern often enough in my own work that I wrote this framework to avoid reinventing the wheel every time I come across it.
This pattern is useful for any embarrassingly parallel problem.  This describes a computing task that can be easily accelerated by running multiple parallel processes that do not not need to interact with one another.  For example, I have large scientific data sets from several runs of an experiment that need to be analyzed.  Since the data from each run can be analyzed independently from the other runs, I can analyze all the data sets at once on a parallel machine.   The code below implements a “master-worker” paradigm that requires at least three processes to accelerate the calculation.  The first process be comes the master, which does no calculation but hands out tasks to the workers.  The rest of the processes are workers, which receive a chunk of work, finish it, return the result to the master process, and then wait for more work.

#!/usr/bin/env python
from numpy import *
import pypar
import time
# Constants
MASTER_PROCESS = 0
WORK_TAG = 1
DIE_TAG = 2
MPI_myID = pypar.rank()
### Master Process ###
if MPI_myID == MASTER_PROCESS:
    num_processors = pypar.size()
    print "Master process found " + str(num_processors) + " worker processors."
    # Create a list of dummy arrays to pass to the worker processes
    work_size = 10
    work_array = range(0,work_size)
    for i in range(len(work_array)):
        work_array[i] = arange(0.0, 10.0)
    # Dispatch jobs to worker processes
    work_index = 0
    num_completed = 0
    # Start all worker processes
    for i in range(1, min(num_processors, work_size)):
        pypar.send(work_index, i, tag=WORK_TAG)
        pypar.send(work_array[work_index], i)
        print "Sent work index " + str(work_index) + " to processor " + str(i)
        work_index += 1
    # Receive results from each worker, and send it new data
    for i in range(num_processors, work_size):
        results, status = pypar.receive(source=pypar.any_source, tag=pypar.any_tag, return_status=True)
        index = status.tag
        proc = status.source
        num_completed += 1
        work_index += 1
        pypar.send(work_index, proc, tag=WORK_TAG)
        pypar.send(work_array[work_index], proc)
        print "Sent work index " + str(work_index) + " to processor " + str(proc)
    # Get results from remaining worker processes
    while num_completed < work_size-1:
        results, status = pypar.receive(source=pypar.any_source, tag=pypar.any_tag, return_status=True)
        num_completed += 1
    # Shut down worker processes
    for proc in range(1, num_processors):
        print "Stopping worker process " + str(proc)
        pypar.send(-1, proc, tag=DIE_TAG)
else:
    ### Worker Processes ###
    continue_working = True
    while continue_working:
        work_index, status =  pypar.receive(source=MASTER_PROCESS, tag=pypar.any_tag,
                return_status=True)
        if status.tag == DIE_TAG:
            continue_working = False
        else:
            work_array, status = pypar.receive(source=MASTER_PROCESS, tag=pypar.any_tag,
                return_status=True)
            work_index = status.tag
            # Code below simulates a task running
            time.sleep(random.random_integers(low=0, high=5))
            result_array = work_array.copy()
            pypar.send(result_array, destination=MASTER_PROCESS, tag=work_index)
    #### while
#### if worker
pypar.finalize()

Leave a Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.