Subject: Re: [OMPI users] Problem with OpenMPI (MX btl and mtl) and threads
From: François Trahay (francois.trahay_at_[hidden])
Date: 2009-06-12 11:54:27


The machines have 4 cores. The THREADS_DEFAULT corresponds to a limit:
the program spawns threads once at a time. So at the beginning, only one
thread performs the ping pong test, then a thread is created and the two
threads run the ping pong test, then a thread is created and 3 threads
run the test, etc.

If your machines have 2 cores, you should set THREADS_DEFAULT to 2, so
that the machines are not overloaded.

In my case (machines with 4 cores), THREADS_DEFAULT is set to 16 in
order to study the behavior of the MPI implementation in an extreme
case. However, TCP seems to handle the concurrency quite fine up to 8
threads. Then, it tends to crash/hang (I guess the problem of
overloading cpu has not been studied deeply in OpenMPI)

Francois

Scott Atchley wrote:
> Francois,
>
> How many cores do your machines have?
>
> The file specifies THREADS_DEFAULT 16. Does this spawn 16 threads per
> MPI rank?
>
> I see crashes when I run this with MX (BTL with mx,sm,self and MTL).
> If I change THREADS_DEFAULT to 4, I see crashes with TCP (BTL with
> tcp,sm,self) as well.
>
> With THREADS_DEFAULT at 16, TCP seems to hang. I only have 2 cores,
> which mmay be why. :-)
>
> Scott
>
> On Jun 12, 2009, at 3:09 AM, François Trahay wrote:
>
>> Here's the program.
>> It should print something like that:
>>
>> [1 communicating threads]
>> [0] 1 2.484936 0.402 0.384
>> [0] 2 2.478036 0.807 0.770
>> [0] 4 2.501503 1.599 1.525
>> [0] 8 2.497516 3.203 3.055
>> thread #1
>> [2 communicating threads]
>> [0] 1 3.970628 0.252 0.240
>> [1] 1 3.929280 0.254 0.243
>> [1] 2 4.087206 0.489 0.467
>> [0] 2 5.181758 0.386 0.368
>> [1] 4 3.715222 1.077 1.027
>> [0] 4 4.358013 0.918 0.875
>> [1] 8 4.166852 1.920 1.831
>> [0] 8 3.628287 2.205 2.103
>> thread #2
>> [3 communicating threads]
>> [0] 1 5.922241 0.169 0.161
>> [2] 1 6.896299 0.145 0.138
>> [1] 1 8.973834 0.111 0.106
>> ...
>>
>>
>> Francois
>>
>> George Bosilca wrote:
>>> I will take a look at the BTL problem. Can you provide a copy of the
>>> benchmarks please.
>>>
>>> Thanks,
>>> george.
>>>
>>> On Jun 11, 2009, at 16:05 , François Trahay wrote:
>>>
>>>> concurrent_ping
>>>
>>>
>>> _______________________________________________
>>> users mailing list
>>> users_at_[hidden]
>>> http://www.open-mpi.org/mailman/listinfo.cgi/users
>>>
>>
>> /*
>> * NewMadeleine
>> * Copyright (C) 2006 (see AUTHORS file)
>> *
>> * This program is free software; you can redistribute it and/or modify
>> * it under the terms of the GNU General Public License as published by
>> * the Free Software Foundation; either version 2 of the License, or (at
>> * your option) any later version.
>> *
>> * This program is distributed in the hope that it will be useful, but
>> * WITHOUT ANY WARRANTY; without even the implied warranty of
>> * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
>> * General Public License for more details.
>> */
>> #include <stdlib.h>
>> #include <stdio.h>
>> #include <stdint.h>
>> #include <string.h>
>> #include <unistd.h>
>> #include "mpi.h"
>>
>> #include <semaphore.h>
>> #include <sched.h>
>>
>> /* This program performs several ping pong in parallel.
>> * This evaluates the efficienty to access nmad from 1, 2, 3, ...n
>> threads simultanously
>> */
>>
>> #define LEN_DEFAULT 4
>> #define WARMUPS_DEFAULT 1000
>> #define LOOPS_DEFAULT 10000
>> #define THREADS_DEFAULT 16
>> #define DATA_CONTROL_ACTIVATED 0
>>
>> static int comm_rank = -1;
>> static int comm_size = -1;
>> static char host_name[1024] = "";
>>
>> static int max_len = 16;
>> static int loops;
>> static int threads;
>> static int warmups;
>>
>> static sem_t ready_sem;
>>
>> static int go;
>>
>> static __inline__
>> uint32_t _next(uint32_t len, uint32_t multiplier, uint32_t increment)
>> {
>> if (!len)
>> return 1+increment;
>> else
>> return len*multiplier+increment;
>> }
>>
>> void usage_ping() {
>> fprintf(stderr, "-L len - packet length [%d]\n", LEN_DEFAULT);
>> fprintf(stderr, "-N iterations - iterations [%d]\n", LOOPS_DEFAULT);
>> fprintf(stderr, "-T thread - number of communicating threads
>> [%d]\n", THREADS_DEFAULT);
>> fprintf(stderr, "-W warmup - number of warmup iterations [%d]\n",
>> WARMUPS_DEFAULT);
>> }
>>
>> static void fill_buffer(char *buffer, int len) {
>> unsigned int i = 0;
>>
>> for (i = 0; i < len; i++) {
>> buffer[i] = 'a'+(i%26);
>> }
>> }
>>
>> static void clear_buffer(char *buffer, int len) {
>> memset(buffer, 0, len);
>> }
>>
>> #if DATA_CONTROL_ACTIVATED
>> static void control_buffer(char *msg, char *buffer, int len) {
>> tbx_bool_t ok = tbx_true;
>> unsigned char expected_char;
>> unsigned int i = 0;
>>
>> for(i = 0; i < len; i++){
>> expected_char = 'a'+(i%26);
>>
>> if(buffer[i] != expected_char){
>> printf("Bad data at byte %d: expected %c, received %c\n",
>> i, expected_char, buffer[i]);
>> ok = tbx_false;
>> }
>> }
>>
>>
>> if (!ok) {
>> printf("Controling %s - ", msg);
>> printf("%d bytes reception failed\n", len);
>>
>> TBX_FAILURE("data corruption");
>> } else {
>> printf("ok\n");
>> }
>> }
>> #endif
>>
>>
>> void
>> server(void* arg) {
>> int my_pos = (uint8_t)arg;
>> char *buf = NULL;
>> uint8_t tag = (uint8_t)arg;
>> int i, k;
>> int len;
>>
>> buf = malloc(max_len);
>> clear_buffer(buf, max_len);
>> for(i = my_pos; i <= threads; i++) {
>> /* Be sure all the communicating threads have been created before
>> we start */
>> while(go < i )
>> sched_yield();
>>
>> for(len=1; len < max_len; len*=2){
>> for(k = 0; k < loops + warmups; k++) {
>>
>> MPI_Request request;
>>
>> MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag,
>> MPI_COMM_WORLD, MPI_STATUS_IGNORE);
>>
>> #if DATA_CONTROL_ACTIVATED
>> control_buffer("received", buf, len);
>> #endif
>> MPI_Send(buf, len , MPI_CHAR, (comm_rank+1)%2, tag,
>> MPI_COMM_WORLD);
>>
>> }
>> }
>>
>> sem_post(&ready_sem);
>> }
>> }
>>
>> int
>> client(void *arg) {
>> int my_pos = (uint8_t)arg;
>> uint8_t tag = (uint8_t)my_pos;
>> char *buf = NULL;
>> double t1, t2;
>> double sum, lat, bw_million_byte, bw_mbyte;
>> int i, k;
>> int len;
>>
>> fprintf(stderr, "thread #%d\n", my_pos);
>> buf = malloc(max_len);
>> clear_buffer(buf, max_len);
>>
>> fill_buffer(buf, len);
>> for(i = my_pos; i <= threads; i++) {
>> /* Be sure all the communicating threads have been created before
>> we start */
>> while(go < i )
>> sched_yield();
>>
>> for(len=1; len < max_len; len*=2){
>> for(k = 0; k < warmups; k++) {
>> MPI_Request request;
>> #if DATA_CONTROL_ACTIVATED
>> control_buffer("sending", buf, len);
>> #endif
>> MPI_Send(buf, len, MPI_CHAR, (comm_rank+1)%2, tag,
>> MPI_COMM_WORLD);
>>
>> MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag,
>> MPI_COMM_WORLD, MPI_STATUS_IGNORE);
>> #if DATA_CONTROL_ACTIVATED
>> control_buffer("received", buf, len);
>> #endif
>> }
>>
>> t1= MPI_Wtime();
>>
>> for(k = 0; k < loops; k++) {
>> MPI_Request request;
>> #if DATA_CONTROL_ACTIVATED
>> control_buffer("sending", buf, len);
>> #endif
>> MPI_Send(buf, len, MPI_CHAR, (comm_rank+1)%2, tag,
>> MPI_COMM_WORLD);
>> MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag,
>> MPI_COMM_WORLD, MPI_STATUS_IGNORE);
>> #if DATA_CONTROL_ACTIVATED
>> control_buffer("received", buf, len);
>> #endif
>> }
>>
>> t2 = MPI_Wtime();
>>
>> sum = (t2 - t1)*1e6;
>>
>> lat = sum / (2 * loops);
>> bw_million_byte = len * (loops / (sum / 2));
>> bw_mbyte = bw_million_byte / 1.048576;
>>
>> printf("[%d]\t%d\t%lf\t%8.3f\t%8.3f\n", my_pos, len, lat,
>> bw_million_byte, bw_mbyte);
>> fflush(stdout);
>> }
>>
>> sem_post(&ready_sem);
>> }
>> }
>> int
>> main(int argc,
>> char **argv) {
>> int i, j;
>> pthread_t * pid;
>> static sem_t bourrin_ready;
>> pthread_attr_t attr;
>>
>> //len = LEN_DEFAULT;
>> loops = LOOPS_DEFAULT;
>> threads = THREADS_DEFAULT;
>> warmups = WARMUPS_DEFAULT;
>>
>> int provided;
>> int needed = MPI_THREAD_MULTIPLE;
>> MPI_Init_thread(&argc, &argv, needed, &provided);
>> if(provided < needed){
>> fprintf(stderr, "needed: %d, provided: %d\n", needed, provided);
>> exit(0);
>> }
>> MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
>> MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
>>
>>
>> if (argc > 1 && !strcmp(argv[1], "--help")) {
>> usage_ping();
>> exit(0);
>> }
>>
>> for(i=1 ; i<argc ; i+=2) {
>> if (!strcmp(argv[i], "-N")) {
>> loops = atoi(argv[i+1]);
>> }
>> else if (!strcmp(argv[i], "-L")) {
>> //len = atoi(argv[i+1]);
>> }
>> else if (!strcmp(argv[i], "-T")) {
>> threads = atoi(argv[i+1]);
>> }
>> else if (!strcmp(argv[i], "-W")) {
>> warmups = atoi(argv[i+1]);
>> }
>> else {
>> fprintf(stderr, "Illegal argument %s\n", argv[i]);
>> usage_ping();
>> exit(0);
>> }
>> }
>>
>> pthread_attr_init(&attr);
>> pid = malloc(sizeof(pthread_t) * threads);
>> sem_init(&ready_sem, 0, 0);
>>
>> go = 0;
>> for (i = 0 ; i< threads ; i++) {
>> printf("[%d communicating threads]\n", i+1);
>> if (comm_rank == 0) {
>> pthread_create(&pid[i], &attr, (void*)server, (uint8_t)i);
>> } else {
>> pthread_create(&pid[i], &attr, (void*)client, (uint8_t)i);
>> }
>>
>> for( j = 0; j <= i; j++){
>> sem_wait(&ready_sem);
>> go=j;
>> }
>> go++;
>> }
>>
>> for(i=0;i<threads;i++)
>> pthread_join(pid[i],NULL);
>>
>> MPI_Finalize();
>> exit(0);
>> }
>> _______________________________________________
>> users mailing list
>> users_at_[hidden]
>> http://www.open-mpi.org/mailman/listinfo.cgi/users
>
>
> _______________________________________________
> users mailing list
> users_at_[hidden]
> http://www.open-mpi.org/mailman/listinfo.cgi/users
>