3.0-rc2 (revision 337012f1)
OTF2_MPI_Collectives.h
Go to the documentation of this file.
1 /*
2  * This file is part of the Score-P software (http://www.score-p.org)
3  *
4  * Copyright (c) 2013-2014,
5  * Technische Universitaet Dresden, Germany
6  *
7  * This software may be modified and distributed under the terms of
8  * a BSD-style license. See the COPYING file in the package base
9  * directory for details.
10  *
11  */
12 
13 
389 #ifndef OTF2_MPI_COLLECTIVES_H
390 #define OTF2_MPI_COLLECTIVES_H
391 
392 #include <stdint.h>
393 #include <stdlib.h>
394 
395 #include <otf2/otf2.h>
396 
397 
398 #include <mpi.h>
399 
400 
411 static OTF2_ErrorCode
413  MPI_Comm globalComm,
414  MPI_Comm localComm );
415 
416 
427 static OTF2_ErrorCode
429  MPI_Comm globalComm,
430  uint32_t numberOfFiles );
431 
432 
440 static OTF2_ErrorCode
442  MPI_Comm globalComm );
443 
444 
455 #ifdef OTF2_MPI_USE_PMPI
456 # define CALL_MPI( name ) P ## name
457 #else
458 # define CALL_MPI( name ) name
459 # define OTF2_MPI_USE_PMPI
460 # define OTF2_MPI_USE_PMPI_undef_me
461 #endif
462 
463 
471 #ifndef OTF2_MPI_UINT8_T
472 # if MPI_VERSION >= 3
473 # define OTF2_MPI_UINT8_T MPI_UINT8_T
474 # else
475 # define OTF2_MPI_UINT8_T MPI_UNSIGNED_CHAR
476 # endif
477 #endif
478 
486 #ifndef OTF2_MPI_INT8_T
487 # if MPI_VERSION >= 3
488 # define OTF2_MPI_INT8_T MPI_INT8_T
489 # else
490 # define OTF2_MPI_INT8_T MPI_CHAR
491 # endif
492 #endif
493 
494 
502 #ifndef OTF2_MPI_UINT16_T
503 # if MPI_VERSION >= 3
504 # define OTF2_MPI_UINT16_T MPI_UINT16_T
505 # else
506 # define OTF2_MPI_UINT16_T MPI_UNSIGNED_SHORT
507 # endif
508 #endif
509 
517 #ifndef OTF2_MPI_INT16_T
518 # if MPI_VERSION >= 3
519 # define OTF2_MPI_INT16_T MPI_INT16_T
520 # else
521 # define OTF2_MPI_INT16_T MPI_SHORT
522 # endif
523 #endif
524 
525 
533 #ifndef OTF2_MPI_UINT32_T
534 # if MPI_VERSION >= 3
535 # define OTF2_MPI_UINT32_T MPI_UINT32_T
536 # else
537 # define OTF2_MPI_UINT32_T MPI_UNSIGNED
538 # endif
539 #endif
540 
548 #ifndef OTF2_MPI_INT32_T
549 # if MPI_VERSION >= 3
550 # define OTF2_MPI_INT32_T MPI_INT32_T
551 # else
552 # define OTF2_MPI_INT32_T MPI_INT
553 # endif
554 #endif
555 
556 
564 #ifndef OTF2_MPI_UINT64_T
565 # define OTF2_MPI_UINT64_T MPI_UINT64_T
566 # if MPI_VERSION < 3
567 # error Please define OTF2_MPI_UINT64_T to a suitable MPI datatype for uint64_t.
568 # endif
569 #endif
570 
578 #ifndef OTF2_MPI_INT64_T
579 # define OTF2_MPI_INT64_T MPI_INT64_T
580 # if MPI_VERSION < 3
581 # error Please define OTF2_MPI_INT64 to a suitable MPI datatype for int64_t.
582 # endif
583 #endif
584 
585 
592 #ifndef OTF2_MPI_FLOAT
593 # define OTF2_MPI_FLOAT MPI_FLOAT
594 #endif
595 
596 
603 #ifndef OTF2_MPI_DOUBLE
604 # define OTF2_MPI_DOUBLE MPI_DOUBLE
605 #endif
606 
607 
616 {
617  MPI_Comm comm;
618  int size;
619  int rank;
620  int displacements[ 1 ];
621 };
622 
623 
626 typedef struct OTF2_MPI_UserData
627 {
628  OTF2_CollectiveCallbacks callbacks;
629  OTF2_CollectiveContext* global;
630  OTF2_CollectiveContext* local;
631 } OTF2_MPI_UserData;
632 
633 
634 static void
635 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks );
636 
637 
639 otf2_mpi_create_context( MPI_Comm comm,
640  bool duplicate );
641 
642 
643 static void
644 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext );
645 
646 
648 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
649  uint32_t numberOfFiles );
650 
651 
652 static OTF2_ErrorCode
654  MPI_Comm globalComm,
655  MPI_Comm localComm )
656 {
657  OTF2_ErrorCode status = OTF2_SUCCESS;
658  OTF2_MPI_UserData* user_data = NULL;
659 
662 
663  if ( !archive )
664  {
666  }
667 
668  if ( MPI_COMM_NULL == globalComm )
669  {
671  }
672 
673  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
674  if ( !user_data )
675  {
677  }
678 
679  otf2_mpi_get_collectives( &user_data->callbacks );
680 
681  user_data->global = otf2_mpi_create_context( globalComm, true );
682  if ( !user_data->global )
683  {
685  goto out;
686  }
687 
688  if ( MPI_COMM_NULL != localComm )
689  {
690  user_data->local = otf2_mpi_create_context( localComm, true );
691  if ( !user_data->local )
692  {
694  goto out;
695  }
696  }
697 
698  status = OTF2_Archive_SetCollectiveCallbacks( archive,
699  &user_data->callbacks,
700  user_data,
701  user_data->global,
702  user_data->local );
703 
704 out:
705  if ( OTF2_SUCCESS != status )
706  {
707  otf2_mpi_destroy_context( user_data->local );
708  otf2_mpi_destroy_context( user_data->global );
709  free( user_data );
710  }
711 
712  return status;
713 }
714 
715 
716 static OTF2_ErrorCode
718  MPI_Comm globalComm,
719  uint32_t numberOfFiles )
720 {
721  OTF2_ErrorCode status = OTF2_SUCCESS;
722  OTF2_MPI_UserData* user_data = NULL;
723 
726 
727  if ( !archive )
728  {
730  }
731 
732  if ( MPI_COMM_NULL == globalComm )
733  {
735  }
736 
737  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
738  if ( !user_data )
739  {
741  }
742 
743  otf2_mpi_get_collectives( &user_data->callbacks );
744 
745  user_data->global = otf2_mpi_create_context( globalComm, true );
746  if ( !user_data->global )
747  {
749  goto out;
750  }
751 
752  user_data->local = otf2_mpi_split_context_by_number( user_data->global,
753  numberOfFiles );
754  if ( !user_data->local )
755  {
757  goto out;
758  }
759 
760  status = OTF2_Archive_SetCollectiveCallbacks( archive,
761  &user_data->callbacks,
762  user_data,
763  user_data->global,
764  user_data->local );
765 
766 out:
767  if ( OTF2_SUCCESS != status )
768  {
769  otf2_mpi_destroy_context( user_data->local );
770  otf2_mpi_destroy_context( user_data->global );
771  free( user_data );
772  }
773 
774  return status;
775 }
776 
777 
778 static OTF2_ErrorCode
780  MPI_Comm globalComm )
781 {
782  OTF2_ErrorCode status = OTF2_SUCCESS;
783  OTF2_MPI_UserData* user_data = NULL;
784 
787 
788  if ( !reader )
789  {
791  }
792 
793  if ( MPI_COMM_NULL == globalComm )
794  {
796  }
797 
798  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
799  if ( !user_data )
800  {
802  }
803 
804  otf2_mpi_get_collectives( &user_data->callbacks );
805 
806  user_data->global = otf2_mpi_create_context( globalComm, true );
807  if ( !user_data->global )
808  {
810  goto out;
811  }
812 
813  status = OTF2_Reader_SetCollectiveCallbacks( reader,
814  &user_data->callbacks,
815  user_data,
816  user_data->global,
817  NULL );
818 
819 out:
820  if ( OTF2_SUCCESS != status )
821  {
822  otf2_mpi_destroy_context( user_data->global );
823  free( user_data );
824  }
825 
826  return status;
827 }
828 
829 
831 otf2_mpi_create_context( MPI_Comm comm,
832  bool duplicate )
833 {
834  int ret;
835  int size;
836 
837  ret = CALL_MPI( MPI_Comm_size ) ( comm, &size );
838  if ( MPI_SUCCESS != ret )
839  {
840  return NULL;
841  }
842 
843  OTF2_CollectiveContext* new_context =
844  ( OTF2_CollectiveContext* )malloc( sizeof( *new_context )
845  + ( ( size - 1 ) * sizeof( int ) ) );
846  if ( !new_context )
847  {
848  return NULL;
849  }
850 
851  new_context->size = size;
852  ret = CALL_MPI( MPI_Comm_rank ) ( comm, &new_context->rank );
853  if ( MPI_SUCCESS != ret )
854  {
855  free( new_context );
856  return NULL;
857  }
858 
859  if ( duplicate )
860  {
861  ret = CALL_MPI( MPI_Comm_dup ) ( comm, &new_context->comm );
862  if ( MPI_SUCCESS != ret )
863  {
864  free( new_context );
865  return NULL;
866  }
867  }
868  else
869  {
870  new_context->comm = comm;
871  }
872 
873  return new_context;
874 }
875 
876 
877 static void
878 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext )
879 {
880  if ( !collectiveContext )
881  {
882  return;
883  }
884 
885  CALL_MPI( MPI_Comm_free ) ( &collectiveContext->comm );
886 
887  free( collectiveContext );
888 }
889 
890 
892 otf2_mpi_split_context( OTF2_CollectiveContext* commContext,
893  int color,
894  int key )
895 {
896  OTF2_CollectiveContext* new_context;
897  MPI_Comm new_comm;
898  int ret;
899  ret = CALL_MPI( MPI_Comm_split ) ( commContext->comm,
900  color,
901  key,
902  &new_comm );
903  if ( MPI_SUCCESS != ret )
904  {
905  return NULL;
906  }
907 
908  new_context = otf2_mpi_create_context( new_comm, false );
909  if ( !new_context )
910  {
911  CALL_MPI( MPI_Comm_free ) ( &new_comm );
912  return NULL;
913  }
914 
915  return new_context;
916 }
917 
918 
920 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
921  uint32_t numberOfFiles )
922 {
923  int file_number = 0;
924  int rem = commContext->size % numberOfFiles;
925  int local_size = commContext->size / numberOfFiles + !!rem;
926  int local_rank = 0;
927  int local_root = 0;
928  int i;
929  for ( i = 0; i < commContext->rank; i++ )
930  {
931  local_rank++;
932  if ( local_root + local_size == i + 1 )
933  {
934  local_root += local_size;
935  file_number++;
936  local_size -= file_number == rem;
937  local_rank = 0;
938  }
939  }
940 
941  return otf2_mpi_split_context( commContext,
942  file_number,
943  local_rank );
944 }
945 
946 
947 static MPI_Datatype
948 otf2_mpi_get_type( OTF2_Type type )
949 {
950 #define case_return( TYPE, MPI_SUFFIX ) \
951  case OTF2_TYPE_ ## TYPE: \
952  return OTF2_MPI_ ## TYPE ## MPI_SUFFIX
953  switch ( type )
954  {
955  case_return( UINT8, _T );
956  case_return( INT8, _T );
957  case_return( UINT16, _T );
958  case_return( INT16, _T );
959  case_return( UINT32, _T );
960  case_return( INT32, _T );
961  case_return( UINT64, _T );
962  case_return( INT64, _T );
963  case_return( FLOAT, );
964  case_return( DOUBLE, );
965  default:
966  return MPI_DATATYPE_NULL;
967  }
968 #undef case_return
969 }
970 
971 
972 static void
973 otf2_mpi_collectives_release( void* userData,
974  OTF2_CollectiveContext* globalCommContext,
975  OTF2_CollectiveContext* localCommContext )
976 {
977  OTF2_MPI_UserData* user_data = ( OTF2_MPI_UserData* )userData;
978 
979  ( void )globalCommContext;
980  ( void )localCommContext;
981 
982  otf2_mpi_destroy_context( user_data->global );
983  otf2_mpi_destroy_context( user_data->local );
984  free( user_data );
985 }
986 
987 
988 static OTF2_CallbackCode
989 otf2_mpi_collectives_create_local_comm( void* userData,
990  OTF2_CollectiveContext** localCommContextOut,
991  OTF2_CollectiveContext* globalCommContext,
992  uint32_t globalRank,
993  uint32_t globalSize,
994  uint32_t localRank,
995  uint32_t localSize,
996  uint32_t fileNumber,
997  uint32_t numberOfFiles )
998 {
999  ( void )userData;
1000  ( void )globalRank;
1001  ( void )globalSize;
1002  ( void )localSize;
1003  ( void )numberOfFiles;
1004 
1005  *localCommContextOut = otf2_mpi_split_context( globalCommContext,
1006  fileNumber,
1007  localRank );
1008 
1009  return *localCommContextOut
1012 }
1013 
1014 
1015 static OTF2_CallbackCode
1016 otf2_mpi_collectives_free_local_comm( void* userData,
1017  OTF2_CollectiveContext* localCommContext )
1018 {
1019  ( void )userData;
1020 
1021  otf2_mpi_destroy_context( localCommContext );
1022 
1023  return OTF2_CALLBACK_SUCCESS;
1024 }
1025 
1026 
1027 static OTF2_CallbackCode
1028 otf2_mpi_collectives_get_size( void* userData,
1029  OTF2_CollectiveContext* commContext,
1030  uint32_t* size )
1031 {
1032  ( void )userData;
1033 
1034  *size = commContext->size;
1035 
1036  return OTF2_CALLBACK_SUCCESS;
1037 }
1038 
1039 
1040 static OTF2_CallbackCode
1041 otf2_mpi_collectives_get_rank( void* userData,
1042  OTF2_CollectiveContext* commContext,
1043  uint32_t* rank )
1044 {
1045  ( void )userData;
1046 
1047  *rank = commContext->rank;
1048 
1049  return OTF2_CALLBACK_SUCCESS;
1050 }
1051 
1052 
1053 static OTF2_CallbackCode
1054 otf2_mpi_collectives_barrier( void* userData,
1055  OTF2_CollectiveContext* commContext )
1056 {
1057  int ret;
1058 
1059  ( void )userData;
1060 
1061  ret = CALL_MPI( MPI_Barrier ) ( commContext->comm );
1062 
1063  return MPI_SUCCESS == ret
1066 }
1067 
1068 
1069 static OTF2_CallbackCode
1070 otf2_mpi_collectives_bcast( void* userData,
1071  OTF2_CollectiveContext* commContext,
1072  void* data,
1073  uint32_t numberElements,
1074  OTF2_Type type,
1075  uint32_t root )
1076 {
1077  int ret;
1078 
1079  ( void )userData;
1080 
1081  ret = CALL_MPI( MPI_Bcast ) ( data,
1082  numberElements,
1083  otf2_mpi_get_type( type ),
1084  root,
1085  commContext->comm );
1086 
1087  return MPI_SUCCESS == ret
1090 }
1091 
1092 
1093 static OTF2_CallbackCode
1094 otf2_mpi_collectives_gather( void* userData,
1095  OTF2_CollectiveContext* commContext,
1096  const void* inData,
1097  void* outData,
1098  uint32_t numberElements,
1099  OTF2_Type type,
1100  uint32_t root )
1101 {
1102  int ret;
1103 
1104  ( void )userData;
1105 
1106  ret = CALL_MPI( MPI_Gather ) ( ( void* )inData,
1107  numberElements,
1108  otf2_mpi_get_type( type ),
1109  outData,
1110  numberElements,
1111  otf2_mpi_get_type( type ),
1112  root,
1113  commContext->comm );
1114 
1115  return MPI_SUCCESS == ret
1118 }
1119 
1120 
1121 static OTF2_CallbackCode
1122 otf2_mpi_collectives_gatherv( void* userData,
1123  OTF2_CollectiveContext* commContext,
1124  const void* inData,
1125  uint32_t inElements,
1126  void* outData,
1127  const uint32_t* outElements,
1128  OTF2_Type type,
1129  uint32_t root )
1130 {
1131  int ret;
1132  int* displs = NULL;
1133 
1134  ( void )userData;
1135 
1136  if ( ( int )root == commContext->rank )
1137  {
1138  int i;
1139  int displ = 0;
1140  for ( i = 0; i < commContext->rank; ++i )
1141  {
1142  commContext->displacements[ i ] = displ;
1143  displ += outElements[ i ];
1144  }
1145  displs = commContext->displacements;
1146  }
1147 
1148  ret = CALL_MPI( MPI_Gatherv ) ( ( void* )inData,
1149  inElements,
1150  otf2_mpi_get_type( type ),
1151  outData,
1152  ( int* )outElements,
1153  displs,
1154  otf2_mpi_get_type( type ),
1155  root,
1156  commContext->comm );
1157 
1158  return MPI_SUCCESS == ret
1161 }
1162 
1163 
1164 static OTF2_CallbackCode
1165 otf2_mpi_collectives_scatter( void* userData,
1166  OTF2_CollectiveContext* commContext,
1167  const void* inData,
1168  void* outData,
1169  uint32_t numberElements,
1170  OTF2_Type type,
1171  uint32_t root )
1172 {
1173  ( void )userData;
1174 
1175  int ret = CALL_MPI( MPI_Scatter ) ( ( void* )inData,
1176  numberElements,
1177  otf2_mpi_get_type( type ),
1178  outData,
1179  numberElements,
1180  otf2_mpi_get_type( type ),
1181  root,
1182  commContext->comm );
1183 
1184  return MPI_SUCCESS == ret
1187 }
1188 
1189 
1190 static OTF2_CallbackCode
1191 otf2_mpi_collectives_scatterv( void* userData,
1192  OTF2_CollectiveContext* commContext,
1193  const void* inData,
1194  const uint32_t* inElements,
1195  void* outData,
1196  uint32_t outElements,
1197  OTF2_Type type,
1198  uint32_t root )
1199 {
1200  int* displs = NULL;
1201 
1202  ( void )userData;
1203 
1204  if ( ( int )root == commContext->rank )
1205  {
1206  int i;
1207  int displ = 0;
1208  for ( i = 0; i < commContext->rank; ++i )
1209  {
1210  commContext->displacements[ i ] = displ;
1211  displ += inElements[ i ];
1212  }
1213  displs = commContext->displacements;
1214  }
1215 
1216  int ret = CALL_MPI( MPI_Scatterv ) ( ( void* )inData,
1217  ( int* )inElements,
1218  displs,
1219  otf2_mpi_get_type( type ),
1220  outData,
1221  outElements,
1222  otf2_mpi_get_type( type ),
1223  root,
1224  commContext->comm );
1225 
1226  return MPI_SUCCESS == ret
1229 }
1230 
1231 
1232 static void
1233 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks )
1234 {
1235  collectiveCallbacks->otf2_release = otf2_mpi_collectives_release;
1236  collectiveCallbacks->otf2_get_size = otf2_mpi_collectives_get_size;
1237  collectiveCallbacks->otf2_get_rank = otf2_mpi_collectives_get_rank;
1238  collectiveCallbacks->otf2_create_local_comm = otf2_mpi_collectives_create_local_comm;
1239  collectiveCallbacks->otf2_free_local_comm = otf2_mpi_collectives_free_local_comm;
1240  collectiveCallbacks->otf2_barrier = otf2_mpi_collectives_barrier;
1241  collectiveCallbacks->otf2_bcast = otf2_mpi_collectives_bcast;
1242  collectiveCallbacks->otf2_gather = otf2_mpi_collectives_gather;
1243  collectiveCallbacks->otf2_gatherv = otf2_mpi_collectives_gatherv;
1244  collectiveCallbacks->otf2_scatter = otf2_mpi_collectives_scatter;
1245  collectiveCallbacks->otf2_scatterv = otf2_mpi_collectives_scatterv;
1246 }
1247 
1248 
1249 #undef CALL_MPI
1250 #ifdef OTF2_MPI_USE_PMPI_undef_me
1251 #undef OTF2_MPI_USE_PMPI
1252 #undef OTF2_MPI_USE_PMPI_undef_me
1253 #endif
1254 
1255 
1261 #endif /* OTF2_MPI_COLLECTIVES_H */
Record reading can continue.
Definition: OTF2_GeneralDefinitions.h:352
Main include file for applications using OTF2.
uint8_t OTF2_Type
Wrapper for enum OTF2_Type_enum.
Definition: OTF2_GeneralDefinitions.h:596
Definition: OTF2_ErrorCodes.h:66
Definition: OTF2_ErrorCodes.h:231
Definition: OTF2_ErrorCodes.h:245
Struct which holds all collective callbacks.
Definition: OTF2_Callbacks.h:497
OTF2_ErrorCode OTF2_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the reader.
OTF2_ErrorCode
Definition: OTF2_ErrorCodes.h:54
OTF2_CallbackCode
Return value to indicate that the record reading should be interrupted.
Definition: OTF2_GeneralDefinitions.h:349
Signaling an error in the callback.
Definition: OTF2_GeneralDefinitions.h:363
struct OTF2_Archive_struct OTF2_Archive
Keeps all meta-data for an OTF2 archive.
Definition: OTF2_Archive.h:220
struct OTF2_Reader_struct OTF2_Reader
Keeps all necessary information for the reader.
Definition: OTF2_Reader.h:193
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, MPI_Comm globalComm, MPI_Comm localComm)
Register an MPI collective context to an OTF2 archive.
static OTF2_ErrorCode OTF2_MPI_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, MPI_Comm globalComm)
Register an MPI collective context to an OTF2 reader.
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacksSplit(OTF2_Archive *archive, MPI_Comm globalComm, uint32_t numberOfFiles)
Register an MPI collective context to an OTF2 archive.
struct OTF2_CollectiveContext OTF2_CollectiveContext
User provided type for collective groups.
Definition: OTF2_Callbacks.h:310
OTF2_ErrorCode OTF2_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the archive.
Definition: OTF2_ErrorCodes.h:247