3.1-rc3 (revision d9ca08bb)
OTF2_MPI_Collectives.h
Go to the documentation of this file.
1 /*
2  * This file is part of the Score-P software (http://www.score-p.org)
3  *
4  * Copyright (c) 2013-2014,
5  * Technische Universitaet Dresden, Germany
6  *
7  * This software may be modified and distributed under the terms of
8  * a BSD-style license. See the COPYING file in the package base
9  * directory for details.
10  *
11  */
12 
13 
356 #ifndef OTF2_MPI_COLLECTIVES_H
357 #define OTF2_MPI_COLLECTIVES_H
358 
359 #include <stdint.h>
360 #include <stdlib.h>
361 
362 #include <otf2/otf2.h>
363 
364 
365 #include <mpi.h>
366 
367 
378 static OTF2_ErrorCode
380  MPI_Comm globalComm,
381  MPI_Comm localComm );
382 
383 
394 static OTF2_ErrorCode
396  MPI_Comm globalComm,
397  uint32_t numberOfFiles );
398 
399 
407 static OTF2_ErrorCode
409  MPI_Comm globalComm );
410 
411 
422 #ifdef OTF2_MPI_USE_PMPI
423 # define CALL_MPI( name ) P ## name
424 #else
425 # define CALL_MPI( name ) name
426 # define OTF2_MPI_USE_PMPI
427 # define OTF2_MPI_USE_PMPI_undef_me
428 #endif
429 
430 
438 #ifndef OTF2_MPI_UINT8_T
439 # if MPI_VERSION >= 3
440 # define OTF2_MPI_UINT8_T MPI_UINT8_T
441 # else
442 # define OTF2_MPI_UINT8_T MPI_UNSIGNED_CHAR
443 # endif
444 #endif
445 
453 #ifndef OTF2_MPI_INT8_T
454 # if MPI_VERSION >= 3
455 # define OTF2_MPI_INT8_T MPI_INT8_T
456 # else
457 # define OTF2_MPI_INT8_T MPI_CHAR
458 # endif
459 #endif
460 
461 
469 #ifndef OTF2_MPI_UINT16_T
470 # if MPI_VERSION >= 3
471 # define OTF2_MPI_UINT16_T MPI_UINT16_T
472 # else
473 # define OTF2_MPI_UINT16_T MPI_UNSIGNED_SHORT
474 # endif
475 #endif
476 
484 #ifndef OTF2_MPI_INT16_T
485 # if MPI_VERSION >= 3
486 # define OTF2_MPI_INT16_T MPI_INT16_T
487 # else
488 # define OTF2_MPI_INT16_T MPI_SHORT
489 # endif
490 #endif
491 
492 
500 #ifndef OTF2_MPI_UINT32_T
501 # if MPI_VERSION >= 3
502 # define OTF2_MPI_UINT32_T MPI_UINT32_T
503 # else
504 # define OTF2_MPI_UINT32_T MPI_UNSIGNED
505 # endif
506 #endif
507 
515 #ifndef OTF2_MPI_INT32_T
516 # if MPI_VERSION >= 3
517 # define OTF2_MPI_INT32_T MPI_INT32_T
518 # else
519 # define OTF2_MPI_INT32_T MPI_INT
520 # endif
521 #endif
522 
523 
531 #ifndef OTF2_MPI_UINT64_T
532 # define OTF2_MPI_UINT64_T MPI_UINT64_T
533 # if MPI_VERSION < 3
534 # error Please define OTF2_MPI_UINT64_T to a suitable MPI datatype for uint64_t.
535 # endif
536 #endif
537 
545 #ifndef OTF2_MPI_INT64_T
546 # define OTF2_MPI_INT64_T MPI_INT64_T
547 # if MPI_VERSION < 3
548 # error Please define OTF2_MPI_INT64 to a suitable MPI datatype for int64_t.
549 # endif
550 #endif
551 
552 
559 #ifndef OTF2_MPI_FLOAT
560 # define OTF2_MPI_FLOAT MPI_FLOAT
561 #endif
562 
563 
570 #ifndef OTF2_MPI_DOUBLE
571 # define OTF2_MPI_DOUBLE MPI_DOUBLE
572 #endif
573 
574 
583 {
584  MPI_Comm comm;
585  int size;
586  int rank;
587  int displacements[ 1 ];
588 };
589 
590 
593 typedef struct OTF2_MPI_UserData
594 {
595  OTF2_CollectiveCallbacks callbacks;
596  OTF2_CollectiveContext* global;
597  OTF2_CollectiveContext* local;
598 } OTF2_MPI_UserData;
599 
600 
601 static void
602 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks );
603 
604 
606 otf2_mpi_create_context( MPI_Comm comm,
607  bool duplicate );
608 
609 
610 static void
611 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext );
612 
613 
615 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
616  uint32_t numberOfFiles );
617 
618 
619 static OTF2_ErrorCode
621  MPI_Comm globalComm,
622  MPI_Comm localComm )
623 {
624  OTF2_ErrorCode status = OTF2_SUCCESS;
625  OTF2_MPI_UserData* user_data = NULL;
626 
629 
630  if ( !archive )
631  {
633  }
634 
635  if ( MPI_COMM_NULL == globalComm )
636  {
638  }
639 
640  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
641  if ( !user_data )
642  {
644  }
645 
646  otf2_mpi_get_collectives( &user_data->callbacks );
647 
648  user_data->global = otf2_mpi_create_context( globalComm, true );
649  if ( !user_data->global )
650  {
652  goto out;
653  }
654 
655  if ( MPI_COMM_NULL != localComm )
656  {
657  user_data->local = otf2_mpi_create_context( localComm, true );
658  if ( !user_data->local )
659  {
661  goto out;
662  }
663  }
664 
665  status = OTF2_Archive_SetCollectiveCallbacks( archive,
666  &user_data->callbacks,
667  user_data,
668  user_data->global,
669  user_data->local );
670 
671 out:
672  if ( OTF2_SUCCESS != status )
673  {
674  otf2_mpi_destroy_context( user_data->local );
675  otf2_mpi_destroy_context( user_data->global );
676  free( user_data );
677  }
678 
679  return status;
680 }
681 
682 
683 static OTF2_ErrorCode
685  MPI_Comm globalComm,
686  uint32_t numberOfFiles )
687 {
688  OTF2_ErrorCode status = OTF2_SUCCESS;
689  OTF2_MPI_UserData* user_data = NULL;
690 
693 
694  if ( !archive )
695  {
697  }
698 
699  if ( MPI_COMM_NULL == globalComm )
700  {
702  }
703 
704  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
705  if ( !user_data )
706  {
708  }
709 
710  otf2_mpi_get_collectives( &user_data->callbacks );
711 
712  user_data->global = otf2_mpi_create_context( globalComm, true );
713  if ( !user_data->global )
714  {
716  goto out;
717  }
718 
719  user_data->local = otf2_mpi_split_context_by_number( user_data->global,
720  numberOfFiles );
721  if ( !user_data->local )
722  {
724  goto out;
725  }
726 
727  status = OTF2_Archive_SetCollectiveCallbacks( archive,
728  &user_data->callbacks,
729  user_data,
730  user_data->global,
731  user_data->local );
732 
733 out:
734  if ( OTF2_SUCCESS != status )
735  {
736  otf2_mpi_destroy_context( user_data->local );
737  otf2_mpi_destroy_context( user_data->global );
738  free( user_data );
739  }
740 
741  return status;
742 }
743 
744 
745 static OTF2_ErrorCode
747  MPI_Comm globalComm )
748 {
749  OTF2_ErrorCode status = OTF2_SUCCESS;
750  OTF2_MPI_UserData* user_data = NULL;
751 
754 
755  if ( !reader )
756  {
758  }
759 
760  if ( MPI_COMM_NULL == globalComm )
761  {
763  }
764 
765  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
766  if ( !user_data )
767  {
769  }
770 
771  otf2_mpi_get_collectives( &user_data->callbacks );
772 
773  user_data->global = otf2_mpi_create_context( globalComm, true );
774  if ( !user_data->global )
775  {
777  goto out;
778  }
779 
780  status = OTF2_Reader_SetCollectiveCallbacks( reader,
781  &user_data->callbacks,
782  user_data,
783  user_data->global,
784  NULL );
785 
786 out:
787  if ( OTF2_SUCCESS != status )
788  {
789  otf2_mpi_destroy_context( user_data->global );
790  free( user_data );
791  }
792 
793  return status;
794 }
795 
796 
798 otf2_mpi_create_context( MPI_Comm comm,
799  bool duplicate )
800 {
801  int ret;
802  int size;
803 
804  ret = CALL_MPI( MPI_Comm_size ) ( comm, &size );
805  if ( MPI_SUCCESS != ret )
806  {
807  return NULL;
808  }
809 
810  OTF2_CollectiveContext* new_context =
811  ( OTF2_CollectiveContext* )malloc( sizeof( *new_context )
812  + ( ( size - 1 ) * sizeof( int ) ) );
813  if ( !new_context )
814  {
815  return NULL;
816  }
817 
818  new_context->size = size;
819  ret = CALL_MPI( MPI_Comm_rank ) ( comm, &new_context->rank );
820  if ( MPI_SUCCESS != ret )
821  {
822  free( new_context );
823  return NULL;
824  }
825 
826  if ( duplicate )
827  {
828  ret = CALL_MPI( MPI_Comm_dup ) ( comm, &new_context->comm );
829  if ( MPI_SUCCESS != ret )
830  {
831  free( new_context );
832  return NULL;
833  }
834  }
835  else
836  {
837  new_context->comm = comm;
838  }
839 
840  return new_context;
841 }
842 
843 
844 static void
845 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext )
846 {
847  if ( !collectiveContext )
848  {
849  return;
850  }
851 
852  CALL_MPI( MPI_Comm_free ) ( &collectiveContext->comm );
853 
854  free( collectiveContext );
855 }
856 
857 
859 otf2_mpi_split_context( OTF2_CollectiveContext* commContext,
860  int color,
861  int key )
862 {
863  OTF2_CollectiveContext* new_context;
864  MPI_Comm new_comm;
865  int ret;
866  ret = CALL_MPI( MPI_Comm_split ) ( commContext->comm,
867  color,
868  key,
869  &new_comm );
870  if ( MPI_SUCCESS != ret )
871  {
872  return NULL;
873  }
874 
875  new_context = otf2_mpi_create_context( new_comm, false );
876  if ( !new_context )
877  {
878  CALL_MPI( MPI_Comm_free ) ( &new_comm );
879  return NULL;
880  }
881 
882  return new_context;
883 }
884 
885 
887 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
888  uint32_t numberOfFiles )
889 {
890  int file_number = 0;
891  int rem = commContext->size % numberOfFiles;
892  int local_size = commContext->size / numberOfFiles + !!rem;
893  int local_rank = 0;
894  int local_root = 0;
895  int i;
896  for ( i = 0; i < commContext->rank; i++ )
897  {
898  local_rank++;
899  if ( local_root + local_size == i + 1 )
900  {
901  local_root += local_size;
902  file_number++;
903  local_size -= file_number == rem;
904  local_rank = 0;
905  }
906  }
907 
908  return otf2_mpi_split_context( commContext,
909  file_number,
910  local_rank );
911 }
912 
913 
914 static MPI_Datatype
915 otf2_mpi_get_type( OTF2_Type type )
916 {
917 #define case_return( TYPE, MPI_SUFFIX ) \
918  case OTF2_TYPE_ ## TYPE: \
919  return OTF2_MPI_ ## TYPE ## MPI_SUFFIX
920  switch ( type )
921  {
922  case_return( UINT8, _T );
923  case_return( INT8, _T );
924  case_return( UINT16, _T );
925  case_return( INT16, _T );
926  case_return( UINT32, _T );
927  case_return( INT32, _T );
928  case_return( UINT64, _T );
929  case_return( INT64, _T );
930  case_return( FLOAT, );
931  case_return( DOUBLE, );
932  default:
933  return MPI_DATATYPE_NULL;
934  }
935 #undef case_return
936 }
937 
938 
939 static void
940 otf2_mpi_collectives_release( void* userData,
941  OTF2_CollectiveContext* globalCommContext,
942  OTF2_CollectiveContext* localCommContext )
943 {
944  OTF2_MPI_UserData* user_data = ( OTF2_MPI_UserData* )userData;
945 
946  ( void )globalCommContext;
947  ( void )localCommContext;
948 
949  otf2_mpi_destroy_context( user_data->global );
950  otf2_mpi_destroy_context( user_data->local );
951  free( user_data );
952 }
953 
954 
955 static OTF2_CallbackCode
956 otf2_mpi_collectives_create_local_comm( void* userData,
957  OTF2_CollectiveContext** localCommContextOut,
958  OTF2_CollectiveContext* globalCommContext,
959  uint32_t globalRank,
960  uint32_t globalSize,
961  uint32_t localRank,
962  uint32_t localSize,
963  uint32_t fileNumber,
964  uint32_t numberOfFiles )
965 {
966  ( void )userData;
967  ( void )globalRank;
968  ( void )globalSize;
969  ( void )localSize;
970  ( void )numberOfFiles;
971 
972  *localCommContextOut = otf2_mpi_split_context( globalCommContext,
973  fileNumber,
974  localRank );
975 
976  return *localCommContextOut
979 }
980 
981 
982 static OTF2_CallbackCode
983 otf2_mpi_collectives_free_local_comm( void* userData,
984  OTF2_CollectiveContext* localCommContext )
985 {
986  ( void )userData;
987 
988  otf2_mpi_destroy_context( localCommContext );
989 
990  return OTF2_CALLBACK_SUCCESS;
991 }
992 
993 
994 static OTF2_CallbackCode
995 otf2_mpi_collectives_get_size( void* userData,
996  OTF2_CollectiveContext* commContext,
997  uint32_t* size )
998 {
999  ( void )userData;
1000 
1001  *size = commContext->size;
1002 
1003  return OTF2_CALLBACK_SUCCESS;
1004 }
1005 
1006 
1007 static OTF2_CallbackCode
1008 otf2_mpi_collectives_get_rank( void* userData,
1009  OTF2_CollectiveContext* commContext,
1010  uint32_t* rank )
1011 {
1012  ( void )userData;
1013 
1014  *rank = commContext->rank;
1015 
1016  return OTF2_CALLBACK_SUCCESS;
1017 }
1018 
1019 
1020 static OTF2_CallbackCode
1021 otf2_mpi_collectives_barrier( void* userData,
1022  OTF2_CollectiveContext* commContext )
1023 {
1024  int ret;
1025 
1026  ( void )userData;
1027 
1028  ret = CALL_MPI( MPI_Barrier ) ( commContext->comm );
1029 
1030  return MPI_SUCCESS == ret
1033 }
1034 
1035 
1036 static OTF2_CallbackCode
1037 otf2_mpi_collectives_bcast( void* userData,
1038  OTF2_CollectiveContext* commContext,
1039  void* data,
1040  uint32_t numberElements,
1041  OTF2_Type type,
1042  uint32_t root )
1043 {
1044  int ret;
1045 
1046  ( void )userData;
1047 
1048  ret = CALL_MPI( MPI_Bcast ) ( data,
1049  numberElements,
1050  otf2_mpi_get_type( type ),
1051  root,
1052  commContext->comm );
1053 
1054  return MPI_SUCCESS == ret
1057 }
1058 
1059 
1060 static OTF2_CallbackCode
1061 otf2_mpi_collectives_gather( void* userData,
1062  OTF2_CollectiveContext* commContext,
1063  const void* inData,
1064  void* outData,
1065  uint32_t numberElements,
1066  OTF2_Type type,
1067  uint32_t root )
1068 {
1069  int ret;
1070 
1071  ( void )userData;
1072 
1073  ret = CALL_MPI( MPI_Gather ) ( ( void* )inData,
1074  numberElements,
1075  otf2_mpi_get_type( type ),
1076  outData,
1077  numberElements,
1078  otf2_mpi_get_type( type ),
1079  root,
1080  commContext->comm );
1081 
1082  return MPI_SUCCESS == ret
1085 }
1086 
1087 
1088 static OTF2_CallbackCode
1089 otf2_mpi_collectives_gatherv( void* userData,
1090  OTF2_CollectiveContext* commContext,
1091  const void* inData,
1092  uint32_t inElements,
1093  void* outData,
1094  const uint32_t* outElements,
1095  OTF2_Type type,
1096  uint32_t root )
1097 {
1098  int ret;
1099  int* displs = NULL;
1100 
1101  ( void )userData;
1102 
1103  if ( ( int )root == commContext->rank )
1104  {
1105  int i;
1106  int displ = 0;
1107  for ( i = 0; i < commContext->rank; ++i )
1108  {
1109  commContext->displacements[ i ] = displ;
1110  displ += outElements[ i ];
1111  }
1112  displs = commContext->displacements;
1113  }
1114 
1115  ret = CALL_MPI( MPI_Gatherv ) ( ( void* )inData,
1116  inElements,
1117  otf2_mpi_get_type( type ),
1118  outData,
1119  ( int* )outElements,
1120  displs,
1121  otf2_mpi_get_type( type ),
1122  root,
1123  commContext->comm );
1124 
1125  return MPI_SUCCESS == ret
1128 }
1129 
1130 
1131 static OTF2_CallbackCode
1132 otf2_mpi_collectives_scatter( void* userData,
1133  OTF2_CollectiveContext* commContext,
1134  const void* inData,
1135  void* outData,
1136  uint32_t numberElements,
1137  OTF2_Type type,
1138  uint32_t root )
1139 {
1140  ( void )userData;
1141 
1142  int ret = CALL_MPI( MPI_Scatter ) ( ( void* )inData,
1143  numberElements,
1144  otf2_mpi_get_type( type ),
1145  outData,
1146  numberElements,
1147  otf2_mpi_get_type( type ),
1148  root,
1149  commContext->comm );
1150 
1151  return MPI_SUCCESS == ret
1154 }
1155 
1156 
1157 static OTF2_CallbackCode
1158 otf2_mpi_collectives_scatterv( void* userData,
1159  OTF2_CollectiveContext* commContext,
1160  const void* inData,
1161  const uint32_t* inElements,
1162  void* outData,
1163  uint32_t outElements,
1164  OTF2_Type type,
1165  uint32_t root )
1166 {
1167  int* displs = NULL;
1168 
1169  ( void )userData;
1170 
1171  if ( ( int )root == commContext->rank )
1172  {
1173  int i;
1174  int displ = 0;
1175  for ( i = 0; i < commContext->rank; ++i )
1176  {
1177  commContext->displacements[ i ] = displ;
1178  displ += inElements[ i ];
1179  }
1180  displs = commContext->displacements;
1181  }
1182 
1183  int ret = CALL_MPI( MPI_Scatterv ) ( ( void* )inData,
1184  ( int* )inElements,
1185  displs,
1186  otf2_mpi_get_type( type ),
1187  outData,
1188  outElements,
1189  otf2_mpi_get_type( type ),
1190  root,
1191  commContext->comm );
1192 
1193  return MPI_SUCCESS == ret
1196 }
1197 
1198 
1199 static void
1200 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks )
1201 {
1202  collectiveCallbacks->otf2_release = otf2_mpi_collectives_release;
1203  collectiveCallbacks->otf2_get_size = otf2_mpi_collectives_get_size;
1204  collectiveCallbacks->otf2_get_rank = otf2_mpi_collectives_get_rank;
1205  collectiveCallbacks->otf2_create_local_comm = otf2_mpi_collectives_create_local_comm;
1206  collectiveCallbacks->otf2_free_local_comm = otf2_mpi_collectives_free_local_comm;
1207  collectiveCallbacks->otf2_barrier = otf2_mpi_collectives_barrier;
1208  collectiveCallbacks->otf2_bcast = otf2_mpi_collectives_bcast;
1209  collectiveCallbacks->otf2_gather = otf2_mpi_collectives_gather;
1210  collectiveCallbacks->otf2_gatherv = otf2_mpi_collectives_gatherv;
1211  collectiveCallbacks->otf2_scatter = otf2_mpi_collectives_scatter;
1212  collectiveCallbacks->otf2_scatterv = otf2_mpi_collectives_scatterv;
1213 }
1214 
1215 
1216 #undef CALL_MPI
1217 #ifdef OTF2_MPI_USE_PMPI_undef_me
1218 #undef OTF2_MPI_USE_PMPI
1219 #undef OTF2_MPI_USE_PMPI_undef_me
1220 #endif
1221 
1222 
1228 #endif /* OTF2_MPI_COLLECTIVES_H */
Record reading can continue.
Definition: OTF2_GeneralDefinitions.h:352
Main include file for applications using OTF2.
uint8_t OTF2_Type
Wrapper for enum OTF2_Type_enum.
Definition: OTF2_GeneralDefinitions.h:596
Definition: OTF2_ErrorCodes.h:65
Definition: OTF2_ErrorCodes.h:230
Definition: OTF2_ErrorCodes.h:244
Struct which holds all collective callbacks.
Definition: OTF2_Callbacks.h:497
OTF2_ErrorCode OTF2_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the reader.
OTF2_ErrorCode
Definition: OTF2_ErrorCodes.h:53
OTF2_CallbackCode
Return value to indicate that the record reading should be interrupted.
Definition: OTF2_GeneralDefinitions.h:349
Signaling an error in the callback.
Definition: OTF2_GeneralDefinitions.h:363
struct OTF2_Archive_struct OTF2_Archive
Keeps all meta-data for an OTF2 archive.
Definition: OTF2_Archive.h:220
struct OTF2_Reader_struct OTF2_Reader
Keeps all necessary information for the reader.
Definition: OTF2_Reader.h:193
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, MPI_Comm globalComm, MPI_Comm localComm)
Register an MPI collective context to an OTF2 archive.
static OTF2_ErrorCode OTF2_MPI_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, MPI_Comm globalComm)
Register an MPI collective context to an OTF2 reader.
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacksSplit(OTF2_Archive *archive, MPI_Comm globalComm, uint32_t numberOfFiles)
Register an MPI collective context to an OTF2 archive.
struct OTF2_CollectiveContext OTF2_CollectiveContext
User provided type for collective groups.
Definition: OTF2_Callbacks.h:310
OTF2_ErrorCode OTF2_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the archive.
Definition: OTF2_ErrorCodes.h:246