StarPU Handbook
MPI Support

The integration of MPI transfers within task parallelism is done in a very natural way by the means of asynchronous interactions between the application and StarPU. This is implemented in a separate libstarpumpi library which basically provides "StarPU" equivalents of MPI_* functions, where void * buffers are replaced with starpu_data_handle_t, and all GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to use the usual mpirun command of the MPI implementation to start StarPU on the different MPI nodes.

In case the user wants to run several MPI processes by machine (e.g. one per NUMA node), STARPU_WORKERS_GETBIND should be used to make StarPU take into account the binding set by the MPI launcher (otherwise each StarPU instance would try to bind on all cores of the machine...)

An MPI Insert Task function provides an even more seamless transition to a distributed application, by automatically issuing all required data transfers according to the task graph and an application-provided distribution.

Building with MPI support

If a mpicc compiler is already in your PATH, StarPU will automatically enable MPI support in the build. If mpicc is not in PATH, you can specify its location by passing –with-mpicc=/where/there/is/mpicc to ./configure

It can be useful to enable MPI tests during make check by passing –enable-mpi-check to ./configure. And similarly to mpicc, if mpiexec in not in PATH, you can specify its location by passing –with-mpiexec=/where/there/is/mpiexec to ./configure, but this is not needed if it is next to mpicc, configure will look there in addition to PATH.

Similarly, Fortran examples use mpif90, which can be specified manually with –with-mpifort if it can't be found automatically.

Example Used In This Documentation

The example below will be used as the base for this documentation. It initializes a token on node 0, and the token is passed from node to node, incremented by one on each step. The code is not using StarPU yet.

for (loop = 0; loop < nloops; loop++)
{
int tag = loop*size + rank;
if (loop == 0 && rank == 0)
{
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
}
else
{
MPI_Recv(&token, 1, MPI_INT, (rank+size-1)%size, tag, MPI_COMM_WORLD);
}
token++;
if (loop == last_loop && rank == last_rank)
{
fprintf(stdout, "Finished: token value %d\n", token);
}
else
{
MPI_Send(&token, 1, MPI_INT, (rank+1)%size, tag+1, MPI_COMM_WORLD);
}
}

About Not Using The MPI Support

Although StarPU provides MPI support, the application programmer may want to keep his MPI communications as they are for a start, and only delegate task execution to StarPU. This is possible by just using starpu_data_acquire(), for instance:

for (loop = 0; loop < nloops; loop++)
{
int tag = loop*size + rank;
/* Acquire the data to be able to write to it */
starpu_data_acquire(token_handle, STARPU_W);
if (loop == 0 && rank == 0)
{
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
}
else
{
MPI_Recv(&token, 1, MPI_INT, (rank+size-1)%size, tag, MPI_COMM_WORLD);
}
starpu_data_release(token_handle);
/* Task delegation to StarPU to increment the token. The execution might
* be performed on a CPU, a GPU, etc. */
increment_token();
/* Acquire the update data to be able to read from it */
starpu_data_acquire(token_handle, STARPU_R);
if (loop == last_loop && rank == last_rank)
{
fprintf(stdout, "Finished: token value %d\n", token);
}
else
{
MPI_Send(&token, 1, MPI_INT, (rank+1)%size, tag+1, MPI_COMM_WORLD);
}
starpu_data_release(token_handle);
}
void starpu_data_release(starpu_data_handle_t handle)
int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
@ STARPU_W
Definition: starpu_data.h:58
@ STARPU_R
Definition: starpu_data.h:57

In that case, libstarpumpi is not needed. One can also use MPI_Isend() and MPI_Irecv(), by calling starpu_data_release() after MPI_Wait() or MPI_Test() have notified completion.

It is however better to use libstarpumpi, to save the application from having to synchronize with starpu_data_acquire(), and instead just submit all tasks and communications asynchronously, and wait for the overall completion.

Simple Example

The flags required to compile or link against the MPI layer are accessible with the following commands:

$ pkg-config --cflags starpumpi-1.3  # options for the compiler
$ pkg-config --libs starpumpi-1.3    # options for the linker
void increment_token(void)
{
struct starpu_task *task = starpu_task_create();
task->cl = &increment_cl;
task->handles[0] = token_handle;
}
int main(int argc, char **argv)
{
int rank, size;
starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));
unsigned nloops = NITER;
unsigned loop;
unsigned last_loop = nloops - 1;
unsigned last_rank = size - 1;
for (loop = 0; loop < nloops; loop++)
{
int tag = loop*size + rank;
if (loop == 0 && rank == 0)
{
starpu_data_acquire(token_handle, STARPU_W);
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
starpu_data_release(token_handle);
}
else
{
starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag, MPI_COMM_WORLD, NULL, NULL);
}
increment_token();
if (loop == last_loop && rank == last_rank)
{
starpu_data_acquire(token_handle, STARPU_R);
fprintf(stdout, "Finished: token value %d\n", token);
starpu_data_release(token_handle);
}
else
{
starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1, MPI_COMM_WORLD, NULL, NULL);
}
}
if (rank == last_rank)
{
fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
STARPU_ASSERT(token == nloops*size);
}
struct starpu_codelet * cl
Definition: starpu_task.h:570
starpu_data_handle_t handles[STARPU_NMAXBUFS]
Definition: starpu_task.h:647
struct starpu_task * starpu_task_create(void) STARPU_ATTRIBUTE_MALLOC
#define STARPU_MAIN_RAM
Definition: starpu_task.h:119
int starpu_task_submit(struct starpu_task *task) STARPU_WARN_UNUSED_RESULT
int starpu_task_wait_for_all(void)
Definition: starpu_task.h:553
void starpu_vector_data_register(starpu_data_handle_t *handle, int home_node, uintptr_t ptr, uint32_t nx, size_t elemsize)
int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, void(*callback)(void *), void *arg)
int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, void(*callback)(void *), void *arg)
int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
int starpu_mpi_comm_size(MPI_Comm comm, int *size)
int starpu_mpi_shutdown(void)
int starpu_mpi_init_conf(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm, struct starpu_conf *conf)
#define STARPU_ASSERT(x)
Definition: starpu_util.h:205

We have here replaced MPI_Recv() and MPI_Send() with starpu_mpi_irecv_detached() and starpu_mpi_isend_detached(), which just submit the communication to be performed. The implicit sequential consistency dependencies provide synchronization between mpi reception and emission and the corresponding tasks. The only remaining synchronization with starpu_data_acquire() is at the beginning and the end.

How to Initialize StarPU-MPI

As seen in the previous example, one has to call starpu_mpi_init_conf() to initialize StarPU-MPI. The third parameter of the function indicates if MPI should be initialized by StarPU or if the application did it itself. If the application initializes MPI itself, it must call MPI_Init_thread() with MPI_THREAD_SERIALIZED or MPI_THREAD_MULTIPLE, since StarPU-MPI uses a separate thread to perform the communications. MPI_THREAD_MULTIPLE is necessary if the application also performs some MPI communications.

Point To Point Communication

The standard point to point communications of MPI have been implemented. The semantic is similar to the MPI one, but adapted to the DSM provided by StarPU. A MPI request will only be submitted when the data is available in the main memory of the node submitting the request.

There are two types of asynchronous communications: the classic asynchronous communications and the detached communications. The classic asynchronous communications (starpu_mpi_isend() and starpu_mpi_irecv()) need to be followed by a call to starpu_mpi_wait() or to starpu_mpi_test() to wait for or to test the completion of the communication. Waiting for or testing the completion of detached communications is not possible, this is done internally by StarPU-MPI, on completion, the resources are automatically released. This mechanism is similar to the pthread detach state attribute which determines whether a thread will be created in a joinable or a detached state.

For send communications, data is acquired with the mode STARPU_R. When using the configure option --enable-mpi-pedantic-isend, the mode STARPU_RW is used to make sure there is no more than 1 concurrent MPI_Isend() call accessing a data and StarPU does not read from it from tasks during the communication.

Internally, all communication are divided in 2 communications, a first message is used to exchange an envelope describing the data (i.e its tag and its size), the data itself is sent in a second message. All MPI communications submitted by StarPU uses a unique tag which has a default value, and can be accessed with the functions starpu_mpi_get_communication_tag() and starpu_mpi_set_communication_tag(). The matching of tags with corresponding requests is done within StarPU-MPI.

For any userland communication, the call of the corresponding function (e.g starpu_mpi_isend()) will result in the creation of a StarPU-MPI request, the function starpu_data_acquire_cb() is then called to asynchronously request StarPU to fetch the data in main memory; when the data is ready and the corresponding buffer has already been received by MPI, it will be copied in the memory of the data, otherwise the request is stored in the early requests list. Sending requests are stored in the ready requests list.

While requests need to be processed, the StarPU-MPI progression thread does the following:

  1. it polls the ready requests list. For all the ready requests, the appropriate function is called to post the corresponding MPI call. For example, an initial call to starpu_mpi_isend() will result in a call to MPI_Isend(). If the request is marked as detached, the request will then be added in the detached requests list.
  2. it posts a MPI_Irecv() to retrieve a data envelope.
  3. it polls the detached requests list. For all the detached requests, it tests its completion of the MPI request by calling MPI_Test(). On completion, the data handle is released, and if a callback was defined, it is called.
  4. finally, it checks if a data envelope has been received. If so, if the data envelope matches a request in the early requests list (i.e the request has already been posted by the application), the corresponding MPI call is posted (similarly to the first step above).

    If the data envelope does not match any application request, a temporary handle is created to receive the data, a StarPU-MPI request is created and added into the ready requests list, and thus will be processed in the first step of the next loop.

MPIPtpCommunication gives the list of all the point to point communications defined in StarPU-MPI.

Exchanging User Defined Data Interface

New data interfaces defined as explained in Defining A New Data Interface can also be used within StarPU-MPI and exchanged between nodes. Two functions needs to be defined through the type starpu_data_interface_ops. The function starpu_data_interface_ops::pack_data takes a handle and returns a contiguous memory buffer allocated with

starpu_malloc_flags(ptr, size, 0)
int starpu_malloc_flags(void **A, size_t dim, int flags)

along with its size where data to be conveyed to another node should be copied.

static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
{
STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
*count = complex_get_size(handle);
*ptr = starpu_malloc_on_node_flags(node, *count, 0);
memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary, complex_interface->nx*sizeof(double));
return 0;
}
void * starpu_data_get_interface_on_node(starpu_data_handle_t handle, unsigned memory_node)
uintptr_t starpu_malloc_on_node_flags(unsigned dst_node, size_t size, int flags)
struct _starpu_data_state * starpu_data_handle_t
Definition: starpu_data.h:44

The inverse operation is implemented in the function starpu_data_interface_ops::unpack_data which takes a contiguous memory buffer and recreates the data handle.

static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
{
STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double), complex_interface->nx*sizeof(double));
return 0;
}
static struct starpu_data_interface_ops interface_complex_ops =
{
...
.pack_data = complex_pack_data,
.unpack_data = complex_unpack_data
};
int(* pack_data)(starpu_data_handle_t handle, unsigned node, void **ptr, starpu_ssize_t *count)
Definition: starpu_data_interfaces.h:544
Definition: starpu_data_interfaces.h:366

Instead of defining pack and unpack operations, users may want to attach a MPI type to their user-defined data interface. The function starpu_mpi_interface_datatype_register() allows to do so. This function takes 3 parameters: the interface ID for which the MPI datatype is going to be defined, a function's pointer that will create the MPI datatype, and a function's pointer that will free the MPI datatype. If for some data an MPI datatype can not be built (e.g. complex data structure), the creation function can return -1, StarPU-MPI will then fallback to using pack/unpack.

The functions to create and free the MPI datatype are defined and registered as follows.

void starpu_complex_interface_datatype_allocate(starpu_data_handle_t handle, MPI_Datatype *mpi_datatype)
{
int ret;
int blocklengths[2];
MPI_Aint displacements[2];
MPI_Datatype types[2] = {MPI_DOUBLE, MPI_DOUBLE};
struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
MPI_Get_address(complex_interface, displacements);
MPI_Get_address(&complex_interface->imaginary, displacements+1);
displacements[1] -= displacements[0];
displacements[0] = 0;
blocklengths[0] = complex_interface->nx;
blocklengths[1] = complex_interface->nx;
ret = MPI_Type_create_struct(2, blocklengths, displacements, types, mpi_datatype);
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_contiguous failed");
ret = MPI_Type_commit(mpi_datatype);
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
}
void starpu_complex_interface_datatype_free(MPI_Datatype *mpi_datatype)
{
MPI_Type_free(mpi_datatype);
}
static struct starpu_data_interface_ops interface_complex_ops =
{
...
};
starpu_mpi_interface_datatype_register(interface_complex_ops.interfaceid, starpu_complex_interface_datatype_allocate, starpu_complex_interface_datatype_free);
starpu_data_interface handle;
starpu_complex_data_register(&handle, STARPU_MAIN_RAM, real, imaginary, 2);
...
enum starpu_data_interface_id interfaceid
Definition: starpu_data_interfaces.h:504
int starpu_data_interface_get_next_id(void)
int starpu_mpi_interface_datatype_register(enum starpu_data_interface_id id, starpu_mpi_datatype_allocate_func_t allocate_datatype_func, starpu_mpi_datatype_free_func_t free_datatype_func)
#define STARPU_ASSERT_MSG(x, msg,...)
Definition: starpu_util.h:226

It is also possible to use starpu_mpi_datatype_register() to register the functions through a handle rather than the interface ID, but note that in that case it is important to make sure no communication is going to occur before the function starpu_mpi_datatype_register() is called. This would otherwise produce an undefined result as the data may be received before the function is called, and so the MPI datatype would not be known by the StarPU-MPI communication engine, and the data would be processed with the pack and unpack operations. One would thus need to synchronize all nodes:

starpu_data_interface handle;
starpu_complex_data_register(&handle, STARPU_MAIN_RAM, real, imaginary, 2);
starpu_mpi_datatype_register(handle, starpu_complex_interface_datatype_allocate, starpu_complex_interface_datatype_free);
starpu_mpi_barrier(MPI_COMM_WORLD);
int starpu_mpi_barrier(MPI_Comm comm)
int starpu_mpi_datatype_register(starpu_data_handle_t handle, starpu_mpi_datatype_allocate_func_t allocate_datatype_func, starpu_mpi_datatype_free_func_t free_datatype_func)

MPI Insert Task Utility

To save the programmer from having to explicit all communications, StarPU provides an "MPI Insert Task Utility". The principe is that the application decides a distribution of the data over the MPI nodes by allocating it and notifying StarPU of this decision, i.e. tell StarPU which MPI node "owns" which data. It also decides, for each handle, an MPI tag which will be used to exchange the content of the handle. All MPI nodes then process the whole task graph, and StarPU automatically determines which node actually execute which task, and trigger the required MPI transfers.

The list of functions is described in MPIInsertTask.

Here an stencil example showing how to use starpu_mpi_task_insert(). One first needs to define a distribution function which specifies the locality of the data. Note that the data needs to be registered to MPI by calling starpu_mpi_data_register(). This function allows to set the distribution information and the MPI tag which should be used when communicating the data. It also allows to automatically clear the MPI communication cache when unregistering the data.

/* Returns the MPI node number where data is */
int my_distrib(int x, int y, int nb_nodes)
{
/* Block distrib */
return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
// /* Other examples useful for other kinds of computations */
// /* / distrib */
// return (x+y) % nb_nodes;
// /* Block cyclic distrib */
// unsigned side = sqrt(nb_nodes);
// return x % side + (y % side) * size;
}

Now the data can be registered within StarPU. Data which are not owned but will be needed for computations can be registered through the lazy allocation mechanism, i.e. with a home_node set to -1. StarPU will automatically allocate the memory when it is used for the first time.

One can note an optimization here