Source code for otf2.reader
from contextlib import closing
import _otf2
from .event_reader import BufferedEventReader
from .error import TraceReaderError
from .registry import DefinitionRegistry
# The contextlib.closing documentation actually suggests to name this thing open
# so we do this despite the shadowing
[docs]def open(*args, **kwargs):
"""
Returns a :py:class:`otf2.reader.Reader` object usable within a :py:obj:`with` statement.
:param str anchor_file_path: the path to the anchor file (mostly 'some/path/trace.otf2')
:param int batch_events: the number of events, which are batched while reading the events
from the trace (default: 100)
"""
return closing(Reader(*args, **kwargs))
[docs]class Reader(object):
"""
This class is used to read existing traces
:param str anchor_file_path: the path to the anchor file (mostly 'some/path/trace.otf2')
:param int batch_events: the number of events, which are batched while reading the events
from the trace (default: 100)
"""
def __init__(self, anchor_file_path, batch_events=100, collective_callbacks=None,
collective_data=None, global_comm_context=None, local_comm_context=None):
self._handle = _otf2.Reader_Open(anchor_file_path)
if collective_callbacks is None:
_otf2.Reader_SetSerialCollectiveCallbacks(self._handle)
else:
_otf2.Reader_SetCollectiveCallbacks(self._handle, collective_callbacks,
collective_data, global_comm_context,
local_comm_context)
self._definitions = DefinitionRegistry()
self.batch_events = batch_events
global_def_reader = _otf2.Reader_GetGlobalDefReader(self._handle)
def_reader_callbacks = _otf2.GlobalDefReaderCallbacks_New()
self.definitions._set_global_def_reader_callbacks(def_reader_callbacks)
_otf2.Reader_RegisterGlobalDefCallbacks(self._handle, global_def_reader,
def_reader_callbacks, self.definitions)
_otf2.GlobalDefReaderCallbacks_Delete(def_reader_callbacks)
_otf2.Reader_ReadAllGlobalDefinitions(self._handle, global_def_reader)
_otf2.Reader_CloseGlobalDefReader(self._handle, global_def_reader)
# <--- self.definitions contains all definitions from trace --->
# Copy locations in case someone modifies the definitions before we need them
# In this variable we keep all locations that were part of the initially read definitions
self._locations = [location for location in self.definitions.locations]
self._event_reader = BufferedEventReader(self, self.batch_events)
self._global_evt_reader_handle = None
if self.definitions.clock_properties is None:
raise TraceReaderError("Trace has no ClockProperties definition.")
def _get_global_evt_reader_handle(self, locations):
if locations is None:
locations = self._locations
for location in locations:
if location not in self._locations:
raise ValueError("Trying to select invalid location while reading trace.")
_otf2.Reader_SelectLocation(self._handle, location._ref)
# The error handling follows the official example of the otf2_mpi_reader_example
def_files_open = False
try:
_otf2.Reader_OpenDefFiles(self._handle)
def_files_open = True
except _otf2.Error:
pass
try:
_otf2.Reader_OpenEvtFiles(self._handle)
except _otf2.Error:
pass
for location in locations:
if def_files_open:
def_reader = _otf2.Reader_GetDefReader(self._handle, location._ref)
if def_reader:
_otf2.Reader_ReadAllLocalDefinitions(self._handle, def_reader)
_otf2.Reader_CloseDefReader(self._handle, def_reader)
_otf2.Reader_GetEvtReader(self._handle, location._ref)
if def_files_open:
_otf2.Reader_CloseDefFiles(self._handle)
assert self._global_evt_reader_handle is None
self._global_evt_reader_handle = _otf2.Reader_GetGlobalEvtReader(self._handle)
return self._global_evt_reader_handle
def close(self):
if self._handle is None:
return
if self._global_evt_reader_handle is not None:
_otf2.Reader_CloseGlobalEvtReader(self._handle, self._global_evt_reader_handle)
_otf2.Reader_CloseEvtFiles(self._handle)
self._global_evt_reader_handle = None
_otf2.Reader_Close(self._handle)
self._handle = None
@property
def timer_resolution(self):
"""
Returns the resolution of the timer of the read trace
"""
return self.definitions.clock_properties.timer_resolution
@property
def events(self):
"""
Returns an iterable object representing all events in the opened trace.
Example:
.. code-block:: python
for location, event in trace.events(location_foo):
print("Encountered event at {}".format(event.time))
:param Iterable locations: all locations, for which events will be read, (None means for every location)
"""
return self._event_reader
@property
def definitions(self):
"""
Returns a :py:class:`otf2.registry.DefinitionRegistry` object representing all
definitions in the trace.
"""
return self._definitions
@property
def handle(self):
"""
Returns the handle to the underlaying OTF2 Reader.
"""
if self._handle is None:
raise TraceReaderError("Trying to access handle, but reader is already closed.")
return self._handle