Parallel programming has gained significant traction in the era of multicore and multi-node computing systems, necessitating programmers to write efficient code that can leverage these resources. Our previous article, “Introduction to Parallel Programming with OpenMP”1, focused on OpenMP, a powerful API for shared-memory parallel programming. Although OpenMP allows programmers to exploit multicore architectures effectively, it is not designed for distributed memory systems where communication between different nodes is necessary2. This limitation is addressed by the Message Passing Interface (MPI), another crucial tool for parallel programming.
The key distinction between OpenMP and MPI is the memory model each supports. OpenMP is designed to exploit shared-memory architectures where all threads can access the entire memory space3. In contrast, MPI supports distributed memory systems, where each process has its own private memory, and processes communicate by explicitly passing messages4. This distinction makes MPI a critical tool for large-scale high-performance computing (HPC) applications that need to run on clusters of computers, each having its own memory5.
This article provides an introduction to parallel programming with MPI. We will explain the MPI model, various constructs, and advanced features while drawing comparisons with OpenMP when necessary to provide a clear understanding of where each shines. This knowledge can aid programmers in writing efficient parallel code tailored to their specific hardware architecture and problem requirements6.
Parallel programming is a powerful method for leveraging multiple cores within a single machine, as we discussed in our previous article, “Introduction to Parallel Programming with OpenMP”1. However, as computational needs continue to grow, a single machine’s processing power often becomes insufficient. This is where distributed programming enters the picture.
In distributed programming, we expand beyond the boundaries of a single machine and use a cluster of machines, or nodes, each having its own memory and processors7. This model aligns with MPI, designed for distributed memory systems, in contrast to OpenMP, which targets shared memory systems4.
There are several compelling reasons to engage with distributed programming:
Understanding the distributed programming model and how to exploit it effectively is necessary for anyone working with large-scale computational problems. In this article, we will delve into the nooks and crannies of distributed programming using MPI.
In order to run and test the parallel computing capabilities of MPI, an appropriate build environment needs to be configured. This involves selecting an operating system, installing a suitable MPI implementation, and validating the environment with a straightforward program.
It’s crucial to understand that, unlike OpenMP, MPI is not a compiler-side solution but a library that facilitates the creation and management of parallel processes.
The operating system selection can significantly influence the development and performance of parallel applications. In this tutorial, we’ll be discussing both Pop!_OS 22.04, a Linux distribution by System76, and macOS. Pop!_OS, a Linux OS based on Ubuntu, offers a solid environment for software development and is particularly effective when working with compilers and libraries such as MPI. Similarly, macOS, with its UNIX-based architecture, offers similar robustness for software development.
Please note that the following steps and code examples can be replicated on any Ubuntu/Debian-based Linux distribution or macOS, given their structural and functional similarities.
The subsequent crucial step involves the installation of the MPI library. We’ll be using the Open MPI implementation in this case, due to its wide acceptance and excellent support.
On Ubuntu/Debian-based systems like Pop!_OS, the Open MPI can be installed using the following command:
$ sudo apt install libopenmpi-dev
On macOS, you can use the Homebrew package manager to install Open MPI. If you haven’t installed Homebrew yet, you can do so by executing the following command in the Terminal:
$ /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
After Homebrew is installed, you can install Open MPI:
$ brew install open-mpi
With the operating system selected and the MPI library installed, it is now time to verify the functionality of the environment. For this, we will use a parallel computing variant of the classic ”Hello, World!” program.
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);printf("Γειά σου, Κόσμε, from process %d out of %d processes.\n", rank, size);// finalise the MPI environmentMPI_Finalize();return 0;}
Just like the traditional “Hello, World!” program, this program prints a greeting message (in Greek, because why not?). However, this program will spawn multiple processes and print the greeting message from each process, illustrating the distributed parallel execution model inherent in MPI.
Regardless of the operating system and the installed MPI implementation, the code can be compiled and executed using the following commands:
For Open MPI on Ubuntu/Debian-based systems or macOS:
$ mpicc hello.c -o hello$ mpirun -np 4 ./hello
After executing the above commands, the output should resemble the following:
$ mpirun -np 4 ./helloΓειά σου, Κόσμε, from process 0 out of 4 processes.Γειά σου, Κόσμε, from process 1 out of 4 processes.Γειά σου, Κόσμε, from process 2 out of 4 processes.Γειά σου, Κόσμε, from process 3 out of 4 processes.
The output confirms that the environment is correctly configured, with MPI successfully creating a distributed computing environment and spawning multiple processes.
In summary, setting up a build environment for MPI involves choosing a suitable operating system, installing the MPI library, and running an initial program to verify the setup. The chosen Pop_OS and macOS, along with Open MPI, offer a reliable and robust environment for parallel programming with MPI.
In MPI, the unit of execution is referred to as a process. Unlike threads, as discussed in our previous article Introduction to Parallel Programming with OpenMP, processes in MPI operate in separate memory spaces. This means that, by default, data is not shared among them. Instead, processes collaborate by explicitly sending and receiving messages.
In MPI, communication occurs within a context, known as a communicator. A communicator encompasses a group of processes. We refer to a specific communicator when we send or receive messages using MPI.
The most well-known communicator is MPI_COMM_WORLD
, which contains all the processes launched at the start of the MPI program. Here is a simple MPI code snippet that demonstrates its usage:
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);printf("Γειά σου, Κόσμε, from process %d out of %d processes.\n", rank, size);// finalise the MPI environmentMPI_Finalize();return 0;}
When executed, each process will print its rank (unique identifier) in the MPI_COMM_WORLD
communicator and the total number of processes.
In MPI, the primary mechanism of data exchange between two processes is via point-to-point communication, mainly using MPI_Send()
and MPI_Recv()
(and its many derivatives)5:
MPI_Send()
: Used to send messages to a specific process.MPI_Recv()
: Used to receive messages from a specific process.Here is a basic example of these operations:
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size, data;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);if (rank == 0) {data = 42;MPI_Send(&data, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);} else if (rank == 1) {MPI_Recv(&data, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);printf("Process %d received data: %d.\n", rank, data);}// finalise the MPI environmentMPI_Finalize();return 0;}
In the above example, process 0 sends the integer 42 to process 1. Process 1 then receives the integer and prints it.
In parallel programming with MPI, communication between processes is crucial for data exchange and synchronisation. MPI provides two main types of communication modes: blocking and non-blocking5:
MPI_Send()
, the function call does not return until the data in the sent buffer can be safely reused. This means the data has either been sent or buffered by the MPI system so that modifications to the buffer will not affect the ongoing send operation. Similarly, a blocking MPI_Recv()
does not return until the data has been received and the buffer is populated. While blocking operations ensure the simplicity of communication, they might lead to potential idle times, especially if one process waits for another.MPI_Isend()
and MPI_Irecv()
are examples of non-blocking send and receive operations. While this approach can enhance performance, it does require careful synchronisation to ensure that the non-blocking operations are completed before the data is accessed, ensuring data integrity.By understanding the distinction between these communication modes, developers can make informed choices about structuring their MPI-based parallel applications for maximum efficiency. In short, non-blocking operations enhance performance by overlapping computation with communication, but they require careful synchronisation to ensure data integrity.
Parallel programming with MPI can be abstracted into common patterns that frequently appear in various problem domains. Understanding these patterns is pivotal as they provide a foundation for constructing more complex parallel applications. In this section, we will introduce two of the most fundamental patterns in MPI: Single Program, Multiple Data (SPMD) and the Master/Worker Paradigm.
SPMD is arguably the most common parallel programming pattern in MPI. In SPMD, all processes execute the same program but operate on different data subsets. The process’s rank typically controls the difference in behaviour from one process to another.
Imagine a scenario where we have a large array of numbers and want each process to compute the sum of a portion of this array.
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// create and initialise the 'data' array with some valuesint data[100];for (int i = 0; i < 100; i++) {data[i] = i + 1;}// divide the work amongst the processesint number_of_elements = sizeof(data) / sizeof(data[0]);int data_size_per_process = number_of_elements / size;int start_index = rank * data_size_per_process;int end_index = start_index + data_size_per_process;int local_sum = 0;// each process calculates the sum of its portionfor (int i = start_index; i < end_index; i++) {local_sum += data[i];}printf("Process %d computed sum: %d.\n", rank, local_sum);// finalise the MPI environmentMPI_Finalize();return 0;}
The SPMD model is intrinsic to MPI; even without explicitly intending to use it, you are likely employing SPMD when programming with MPI.
In the Master/Worker model, one process (usually the one with rank 0) acts as the master, responsible for delegating tasks to other processes, termed as workers. The workers compute their tasks and then return the results to the master. This pattern is particularly useful for dynamic load balancing, where tasks vary in computational complexity.
Let us consider a scenario where we need to calculate the factorial of several numbers. The master process assigns numbers to worker processes, which compute the factorial and return the result.
#include <stdio.h>#include <mpi.h>int compute_factorial(int number) {if (number == 0) return 1;return number * compute_factorial(number - 1);}int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// rank 0 is the master process while the rest are worker processesif (rank == 0) {for (int i = 1; i < size; i++) {MPI_Send(&i, 1, MPI_INT, i, 0, MPI_COMM_WORLD);}int results[size];for (int i = 1; i < size; i++) {MPI_Recv(&results[i - 1], 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);printf("Process %d factorial: %d.\n", i, results[i - 1]);}} else {int number;MPI_Recv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);int result = compute_factorial(number);MPI_Send(&result, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);}// finalise the MPI environmentMPI_Finalize();return 0;}
In this simplistic example, we assume that there are five processes available. The master (rank 0) distributes numbers from 1 to 4 to the four worker processes. In real-world scenarios, the master should handle dynamic worker availability and task assignment more effectively.
Both the SPMD and Master/Worker patterns form the bedrock of MPI programming. As you venture further into HPC, you will frequently find these patterns combined and adapted to fit various computational scenarios.
High-performance computing often requires complex data distribution patterns beyond simple point-to-point communication. MPI provides collective communication functions that allow processes to collaborate when sharing data. Understanding these patterns can significantly improve the efficiency of your parallel applications.
Collective communication operations involve a group of processes that collectively participate in the communication. This section will cover four critical patterns: broadcast, scatter, gather, and reduction operations11.
Broadcasting is the operation of sending a piece of data from one process (known as the root) to all other processes in a communicator.
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size, data;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// the root process initiates the broadcastif (rank == 0) {data = 42;}MPI_Bcast(&data, 1, MPI_INT, 0, MPI_COMM_WORLD);printf("Process %d received data: %d.\n", rank, data);// finalise the MPI environmentMPI_Finalize();return 0;}
In the code above, the root process (rank 0) broadcasts the value 42
to all other processes.
Scattering is distributing chunks of a data array from the root process to every process in a communicator, including itself.
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);int send_data[size], receive_data;// the root process initiates the scatterif (rank == 0) {for (int i = 0; i < size; i++) {send_data[i] = i * i;}}MPI_Scatter(send_data, 1, MPI_INT, &receive_data, 1, MPI_INT, 0, MPI_COMM_WORLD);printf("Process %d received data: %d.\n", rank, receive_data);// finalise the MPI environmentMPI_Finalize();return 0;}
In this scenario, data
in rank 0 contains {0, 1, 4, 9}
(assuming size=4), and each rank receives one of these values.
Gathering is the opposite of scattering. Data from all processes in a communicator is gathered into a single data array at the root process.
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);int receive_data[size];int send_data = rank * rank;MPI_Gather(&send_data, 1, MPI_INT, receive_data, 1, MPI_INT, 0, MPI_COMM_WORLD);// the root process initiates the gatherif (rank == 0) {for (int i = 0; i < size; i++) {printf("Process %d received data[%d]: %d.\n", rank, i, receive_data[i]);}}// finalise the MPI environmentMPI_Finalize();return 0;}
In this example, each rank sends its square (rank*rank
) to rank 0, which gathers the results in the data
array.
Reduction operations perform a specific operation (like summation, multiplication, etc.) on all processes’ data and store the result in one of the processes. All-reduce is similar but shares the result with all processes.
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size, data, summation;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);data = rank * rank;// this reduce operation will only put the data on process 0MPI_Reduce(&data, &summation, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);if (rank == 0) {printf("Process %d received data: %d.\n", rank, summation);}// this allreduce operation will put the data on all processesMPI_Allreduce(&data, &summation, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);printf("Process %d received data: %d.\n", rank, summation);// finalise the MPI environmentMPI_Finalize();return 0;}
In this code, each rank calculates the square of its rank. Using MPI_Reduce
, the squares are summed, and rank 0 prints the result. The MPI_Allreduce
operation also sums the squares, but every rank prints the result.
In many real-world problems, data is often more complex than simple integers, floats, or even arrays of such types. They might involve structuring multiple types of data together. For such scenarios, MPI offers the ability to define derived data types11.
When working with scientific or complex datasets, it is common to deal with structures that contain a mix of data types. For example, you may need to manage a particle’s position and velocity in an n-body simulation or a grid point’s coordinates and temperature in a computational fluid dynamics simulation. MPI’s basic data types cannot represent these complex structures directly, which necessitates derived data types.
MPI allows for the creation of custom data structures that can represent these more complex data types. These custom structures are often derived from existing MPI data types, allowing for the efficient communication of complex data.
Let us delve into an n-body problem as an example.
Consider a more advanced n-body problem where each body or particle is represented by its position (x, y, z)
, its velocity (vx, vy, vz)
, and its mass m
. In a C struct, it might look like:
typedef struct {float x, y, z; // positionfloat vx, vy, vz; // velocityfloat m; // mass} Particle;
To communicate this structure using MPI, we must define a custom MPI datatype for it:
#include <stddef.h>#include <stdio.h>#include <stdlib.h>#include <time.h>#include <math.h>#include <mpi.h>#define number_of_particles 10#define gravitational_constant 0.001 // gravitational constant for our simplified simulation// particle structure with position, velocity, and masstypedef struct {float x, y, z; // positionfloat vx, vy, vz; // velocityfloat m; // mass} Particle;// function to update a particle's position and velocity based on gravitational interactions with other particlesvoid update_particle(Particle *p, Particle particles[]) {for (int i = 0; i < number_of_particles; i++) {// ensure we don't compare the particle with itselfif (p != &particles[i]) {float dx = particles[i].x - p->x;float dy = particles[i].y - p->y;float dz = particles[i].z - p->z;float distance = sqrt(dx * dx + dy * dy + dz * dz);float force = gravitational_constant * p->m * particles[i].m / (distance * distance);p->vx += force * dx / distance;p->vy += force * dy / distance;p->vz += force * dz / distance;}}// update position based on velocityp->x += p->vx;p->y += p->vy;p->z += p->vz;}int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// define the custom MPI datatype for ParticleMPI_Datatype MPI_PARTICLE;int array_of_blocklengths[3] = {3, 3, 1};MPI_Aint array_of_displacements[3] = {offsetof(Particle, x), offsetof(Particle, vx), offsetof(Particle, m)};MPI_Datatype array_of_types[3] = {MPI_FLOAT, MPI_FLOAT, MPI_FLOAT};MPI_Type_create_struct(3, array_of_blocklengths, array_of_displacements, array_of_types, &MPI_PARTICLE);MPI_Type_commit(&MPI_PARTICLE);Particle particles[number_of_particles];// initialise particles on rank 0 with random valuesif (rank == 0) {srand((unsigned int) time(NULL));for (int i = 0; i < number_of_particles; i++) {particles[i].x = (float) rand() / RAND_MAX * 100.0;particles[i].y = (float) rand() / RAND_MAX * 100.0;particles[i].z = (float) rand() / RAND_MAX * 100.0;particles[i].vx = (float) rand() / RAND_MAX * 5.0;particles[i].vy = (float) rand() / RAND_MAX * 5.0;particles[i].vz = (float) rand() / RAND_MAX * 5.0;particles[i].m = (float) rand() / RAND_MAX * 10.0 + 1.0; // mass between 1 and 11}}// broadcast the particle data from rank 0 to all processesMPI_Bcast(particles, number_of_particles, MPI_PARTICLE, 0, MPI_COMM_WORLD);// simulate movement for some stepsfor (int step = 0; step < 100; ++step) {for (int i = 0; i < number_of_particles; i++) {update_particle(&particles[i], particles);}if (rank == 0) {printf("Process %d time step: %d.\n", rank, step);for (int i = 0; i < number_of_particles; i++) {printf("Process %d particle %d position: (%f, %f, %f).\n", rank, i,particles[i].x, particles[i].y, particles[i].z);}}}// cleanupMPI_Type_free(&MPI_PARTICLE);// finalise the MPI environmentMPI_Finalize();return 0;}
In this n-body simulation, we represent each particle with its position (x, y, z)
, velocity (vx, vy, vz)
, and mass m
. The program initialises 100 particles with random attributes at rank 0 and broadcasts this data to all processes using MPI’s broadcasting capabilities. Every rank then simulates these particles’ gravitational interactions and movement for ten steps. As the particles move, they influence each other based on a simplified gravitational force calculation.
The code provided offers a basis for expanding to handle a larger number of particles and their more intricate interactions. For more sophisticated communication patterns, one can explore other MPI communication methods, such as MPI_Send
and MPI_Recv
. It’s important to note that this code is simplified for demonstration purposes. In a full n-body simulation, nuances like interactions between all particles, boundary conditions, and intricate communication patterns between processes would need further attention.
Parallel applications often require coordination between processes, ensuring that they execute steps in a particular sequence or that certain processes don’t proceed until others have reached a certain point5. This coordination is achieved through synchronisation.
In a parallel computation, operations may occur in an unpredictable order because the execution rates of processes can vary. This unpredictability can lead to race conditions, where the program’s outcome can vary depending on the order of operations. A classic example is when two processes attempt to update a shared variable simultaneously.
Synchronisation mechanisms ensure that operations occur in the desired order, preventing such race conditions and ensuring consistent program outcomes.
The most basic form of synchronisation in MPI is barrier synchronisation. When a process encounters a barrier, it waits until all processes in its communicator have reached the same barrier. Only then will all the processes proceed. This is accomplished using the MPI_Barrier
function.
#include <stdio.h>#include <unistd.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);if (rank == 0) {printf("Process 0 is doing some work.\n");// simulate some work using sleepsleep(2);}// barrier to ensure process 0 completes its work before moving onMPI_Barrier(MPI_COMM_WORLD);printf("Process %d is synchronised now.\n", rank);// finalise the MPI environmentMPI_Finalize();return 0;}
When run with multiple processes, this code will ensure that the final set of print statements only occurs after Process 0 has completed its work and hit the barrier.
MPI provides other mechanisms to coordinate more intricate interactions between processes:
1. Locks: Just like in traditional multithreaded programming, MPI supports the concept of locks, which can be used to protect critical sections. This is achieved with MPI_Win_lock
and MPI_Win_unlock
in the context of one-sided communications.
2. Point-to-point synchronisation: Through the use of MPI_Send
and MPI_Recv
, processes can be forced to wait for data from another process. This implicitly synchronises the two processes involved.
#include <stdio.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size, data;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);if (rank == 0) {data = 42;MPI_Send(&data, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);} else if (rank == 1) {MPI_Recv(&data, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);printf("Process %d received data: %d.\n", rank, data);}// finalise the MPI environmentMPI_Finalize();return 0;}
In this example, Process 1 waits for data from Process 0 using MPI_Recv
. The program execution for Process 1 will proceed once the data is received.
When diving deeper into MPI programming, it is crucial to understand the various performance nuances to write efficient parallel programs. In this section, we will discuss some fundamental performance considerations, focusing on communication overhead, load balancing, and minimising communication11.
Every communication operation in MPI comes with an overhead. This overhead can significantly impact your program’s performance, especially if you have frequent small communications.
Let us consider a simple “ping-pong” communication between two processes, where one sends a message and the other sends it back continuously:
#include <stdio.h>#include <mpi.h>int main(int argc, char **argv) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);MPI_Status status;int ping_pong_count = 10000;double start_time, end_time;// ensure only two processes are usedif (size != 2) {if (rank == 0) {printf("This program requires exactly 2 processes.\n");}MPI_Abort(MPI_COMM_WORLD, 1);return 1; // exit with an error code}// determine the rank of the partnerint partner_rank = (rank + 1) % 2;// measure the time it takes to perform a series of ping-pong operationsstart_time = MPI_Wtime();for (int count = 0; count < ping_pong_count; count++) {if (rank == count % 2) {MPI_Send(&count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD);} else {MPI_Recv(&count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD, &status);}}end_time = MPI_Wtime();// print the time taken for the root processif (rank == 0) {printf("Ping-pong took %f seconds for %d loops\n", end_time - start_time, ping_pong_count);}// finalise the MPI environmentMPI_Finalize();return 0;}
This program demonstrates the communication overhead in continuously sending and receiving messages.
When parallelising a task, it is essential to decompose the problem so that each process receives an almost equal amount of work. This ensures that all processors are utilised effectively. If the loads are imbalanced, some processes might remain idle, leading to an inefficient use of resources.
Consider a case where the goal is to compute the sum of a large array. A straightforward parallel approach would be to let each process calculate the sum of a specific portion of the array. Below is a simple MPI-based C code example illustrating this concept:
#include <stdio.h>#include <stdlib.h>#include <mpi.h>int main(int argc, char *argv[]) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// create and initialise the 'data' array with some valuesint *data = malloc(100000 * sizeof(int));for (int i = 0; i < 100000; i++) {data[i] = i + 1;}// divide the work amongst the processesint number_of_elements = 100000;int data_size_per_process = number_of_elements / size;int start_index = rank * data_size_per_process;int end_index = start_index + data_size_per_process;long local_sum = 0, total_sum = 0;// each process calculates the sum of its portionfor (int i = start_index; i < end_index; i++) {local_sum += data[i];}printf("Process %d computed local sum: %ld.\n", rank, local_sum);// gather all local sums into the total sum at the root processMPI_Reduce(&local_sum, &total_sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);if (rank == 0) {printf("Process %d computed total sum: %ld.\n", rank, total_sum);}// cleanupfree(data);// finalise the MPI environmentMPI_Finalize();return 0;}
This example showcases a basic decomposition technique to ensure efficient load balancing across processes in parallel computing environments.
Communicating a few larger messages usually incurs less overhead than many smaller ones.
Here is a comparison of sending one message with 10k integers vs. 10k messages with one integer:
#include <stdio.h>#include <mpi.h>int main(int argc, char **argv) {int rank, size;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);MPI_Status status;int data[10000];double start_time, end_time;if (rank == 0) {// sending one message with 10k intsstart_time = MPI_Wtime();MPI_Send(data, 10000, MPI_INT, 1, 0, MPI_COMM_WORLD);MPI_Recv(data, 10000, MPI_INT, 1, 0, MPI_COMM_WORLD, &status);end_time = MPI_Wtime();printf("One message with 10k ints took %f seconds\n", end_time - start_time);// sending 10k messages with 1 intstart_time = MPI_Wtime();for (int i = 0; i < 10000; i++) {MPI_Send(&data[i], 1, MPI_INT, 1, 0, MPI_COMM_WORLD);MPI_Recv(&data[i], 1, MPI_INT, 1, 0, MPI_COMM_WORLD, &status);}end_time = MPI_Wtime();printf("10k messages with 1 int took %f seconds\n", end_time - start_time);} else if (rank == 1) {// receiving one message with 10k intsMPI_Recv(data, 10000, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);MPI_Send(data, 10000, MPI_INT, 0, 0, MPI_COMM_WORLD);// receiving 10k messages with 1 intfor (int i = 0; i < 10000; i++) {MPI_Recv(&data[i], 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);MPI_Send(&data[i], 1, MPI_INT, 0, 0, MPI_COMM_WORLD);}}// finalise the MPI environmentMPI_Finalize();return 0;}
This example illustrates the time difference between the two communication methods, emphasising the importance of minimising communication.
Error handling is of paramount importance in parallel computing, especially when using MPI. Unlike sequential programs, where errors usually affect only the single instance running, errors in parallel programs could jeopardise the entire cluster of processes. This section will dive deep into the intricacies of error handling in MPI, ensuring robust and resilient parallel applications.
In distributed systems, even minor unhandled errors can have significant ripple effects. A process might rely on data from another process; if that process encounters an error and does not communicate or handle it effectively, the former could stall indefinitely, leading to decreased performance or even a complete halt.
Several common errors can occur in MPI programs:
MPI provides mechanisms to detect and handle errors during runtime. Here is a look at how MPI deals with errors and how you can leverage its features:
By default, MPI aborts the program if it encounters an error, typically printing an error message.
#include <stdio.h>#include <mpi.h>int main(int argc, char **argv) {int rank, size, data;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// intentional error: rank 0 tries to receive from a non-existent rankif (rank == 0) {// error here: rank 5 does not exist (likely)MPI_Recv(&data, 1, MPI_INT, 5, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);}// finalise the MPI environmentMPI_Finalize();return 0;}
You will get an error when you run this program with two processes since rank five does not exist.
You can also specify custom error handlers using MPI_Comm_set_errhandler
.
#include <stdio.h>#include <mpi.h>void custom_error_handler(MPI_Comm *comm, int *err_code, ...) {char error_string[100];int length_of_error_string;MPI_Error_string(*err_code, error_string, &length_of_error_string);fprintf(stderr, "%3d: %s\n", *err_code, error_string);}int main(int argc, char **argv) {int rank, size, data;// initialise the MPI environmentMPI_Init(&argc, &argv);// get the rank of the process and the number of processesMPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// set custom error handler for MPI_COMM_WORLDMPI_Errhandler handler;MPI_Comm_create_errhandler(custom_error_handler, &handler);MPI_Comm_set_errhandler(MPI_COMM_WORLD, handler);// intentional error: rank 0 tries to receive from a non-existent rankif (rank == 0) {// error here: rank 5 does not exist (likely)MPI_Recv(&data, 1, MPI_INT, 5, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);}// cleanupMPI_Errhandler_free(&handler);// finalise the MPI environmentMPI_Finalize();return 0;}
The custom error handler allows a more flexible and tailored approach to managing errors in your MPI programs. It’s especially useful for debugging and understanding precisely what went wrong during program execution.
In this article, we explored the Message Passing Interface (MPI), an essential tool for parallel programming that addresses the limitations of OpenMP in distributed memory systems. We have covered core concepts like point-to-point communication, blocking and non-blocking communication, collective communications, derived data types and synchronisation. Parallel programming with MPI allows programmers to write efficient code that fully utilises multi-node computing systems through effective communication between nodes.
However, parallel programming with MPI, like any other programming technique, requires practice and deep understanding. Even though MPI simplifies the process, the onus of implementing correct, efficient, and effective parallelism lies on the programmer.
The study of MPI doesn’t end here. Many resources can further deepen your understanding and provide hands-on experiences:
Quick Links