Source code for orchestrator.utils.restart

import json
from .recorder import Recorder


[docs] class Restart(Recorder): """ Utility class for managing reading and writing checkpoint files. The `restarter` instance can be imported for access to the read and write checkpoint methods. """
[docs] def __init__(self): """ Initialize the Restart utility class. Provides methods to read and write checkpoint files. Tracks whether a checkpoint file is missing to avoid duplicate log messages. """ super().__init__() self.file_missing_known = False
[docs] def write_checkpoint_file(self, checkpoint_file, checkpoint_dict): """ Add (or update) a checkpoint dictionary in the checkpoint file. If the checkpoint file already exists, the new dictionary will be merged with the existing content. If the module's section already exists, it will be overwritten with the latest data. :param checkpoint_file: Path and filename (should be JSON) to read and write to. :type checkpoint_file: str :param checkpoint_dict: Checkpoint information to add or update in the file. The dictionary should be nested, with a single root key associated with the module, and the value being a dictionary of quantities to checkpoint. :type checkpoint_dict: dict :raises ValueError: If the checkpoint dictionary does not have a single root key. """ checkpoint_name = list(checkpoint_dict.keys()) if len(checkpoint_name) != 1: checkpoint_name = 'ambiguous, not a single key in the dict' else: checkpoint_name = checkpoint_name[0] self.logger.info(f'Checkpointing: {checkpoint_name}') try: # If checkpoint file exists, merge this dict with existing content with open(checkpoint_file, 'r') as fin: previous_checkpoint = json.load(fin) # Merge the two dictionaries, overwriting the module's section output_dict = previous_checkpoint | checkpoint_dict except FileNotFoundError: # Otherwise, this dict is the only thing to save output_dict = checkpoint_dict # Save the full checkpoint content with open(checkpoint_file, 'w') as fout: json.dump(output_dict, fout, indent=4) self.file_missing_known = False
[docs] def read_checkpoint_file(self, checkpoint_file, module_key): """ Read a checkpoint file and extract a specific module dictionary. If the checkpoint file or module key does not exist, an empty dictionary will be returned. :param checkpoint_file: JSON filename (including full path) to read from. :type checkpoint_file: str :param module_key: Module key for the specific checkpoint dictionary to return. :type module_key: str :returns: Module dictionary associated with the module_key. :rtype: dict """ try: with open(checkpoint_file, 'r') as fin: restart_dict = json.load(fin) try: module_restart = restart_dict[module_key] except KeyError: self.logger.info(f'Checkpoint file exists, but no section for ' f'{module_key}') module_restart = {} except FileNotFoundError: module_restart = {} # Log the message only if a write has occurred since the last read if not self.file_missing_known: self.logger.info(f'{checkpoint_file} does not exist, starting ' 'from scratch') self.file_missing_known = True return module_restart
#: Restart instance which can be imported for use in other modules. restarter = Restart()