Source code for orchestrator.workflow.local

import shutil
from os import system, PathLike
from typing import Optional, Union
from .workflow_base import Workflow, JobStatus
from ..utils.restart import restarter
from ..utils.data_standard import METADATA_KEY


[docs] class LocalWF(Workflow): """ Workflow manager for execution in the local environment (i.e. login node) Responsibilities include directory creation, job creation, job status checking. Can run with mpi tasks if tasks are set in job_details, but no default parallel behavior is set. """
[docs] def __init__(self, **kwargs): """ set variables and initialize the recorder :param kwargs: parameters passed to parent for init. Keys include: root_directory, checkpoint_file, checkpoint_name, and job_record_file :type kwargs: """ self.current_job_id = 0 super().__init__(**kwargs)
[docs] def checkpoint_workflow(self): """ checkpoint the workflow module into the checkpoint file save necessary internal variables into a dict with key checkpoint_name and write to the (json) checkpoint file for restart capabilities """ self.save_job_dict() save_dict = { self.checkpoint_name: { 'current_job': self.current_job_id, 'counters': self.counters, } } restarter.write_checkpoint_file(self.checkpoint_file, save_dict)
[docs] def restart_workflow(self): """ restart the workflow module from the checkpoint file check if the checkpoint_file has an entry matching the checkpoint_name and set internal variables accordingly if so """ # set the jobs dict from the pickle file self.read_job_dict() # see if any internal variables were checkpointed restart_dict = restarter.read_checkpoint_file( self.checkpoint_file, self.checkpoint_name, ) self.current_job_id = restart_dict.get('current_job', self.current_job_id) self.counters = restart_dict.get('counters', self.counters)
[docs] def block_until_completed(self, calc_ids: Union[list, int]): """ Function for enforcing synchronous execution Implementation is just to pass since submit_job won't return until the job is completed. :param calc_ids: list of slurm IDs of the jobs to check for completion. Can also pass a single ID. :type calc_ids: int or list """ pass
[docs] def submit_job( self, command: str, run_path: Union[str, PathLike], job_details: Optional[dict[str, Union[float, str]]] = None, ) -> int: """ Submits a job for running. submit_job handles job submission for the modules and is the main interface for the workflows to be used. For the :class:`LocalWF` implementation, this method uses ``os.system`` to execute the command on via command line interface. Inputs define the command to be executed for the job, location for the run, and details of the job. ``job_details`` inlcude ``dependencies`` of the job but no other keys. The ``dependencies`` are a list of job IDs which must have a successfully completed :class:`~JobStatus` for the present job to run. If one of the dependencies returns an error, this job will not run and the status will return an error. Creates the :class:`~.workflow_base.JobStatus` for this job, where the job state is always 'done' since jobs are run instantly on the command line. Returns a job handle (ID) that can be used to query status and to retrieve the present job's :class:`~.workflow_base.JobStatus`. :param command: command that defines the job to be executed :type command: implementation dependent :param run_path: location in the file system where inputs and outputs are to be accessed and stored :type run_path: str :param job_details: optional parameters for running the job, such as number of nodes, queue, etc. |default| ``None`` :type job_details: dict :returns: return job ID to query this job status and location :rtype: int """ if job_details is None: job_details = {} calc_id = self.current_job_id self.current_job_id += 1 # check inputs and provide information to user synchronous = job_details.get('synchronous', True) dependencies = job_details.get('dependencies', []) tasks = job_details.get('tasks') extra_args = job_details.get('extra_args', {}) if not synchronous: self.logger.info(('LocalWF cannot be run asynchronously, running ' 'in a blocking manner')) if extra_args: self.logger.info('Note that extra_args are not read by LocalWF') job_can_run = True exit_code = 'undefined error' if dependencies: self.logger.info(f'Checking dependencies for: {calc_id}') for depend_id in dependencies: job_status = self.get_job_status(depend_id) if job_status.state != 'done' or job_status.exit_code != 0: job_can_run = False exit_code = 'dependency not satisfied' self.logger.info((f'[{depend_id}]: state = ' f'{job_status.completed}, exit_code = ' f'{job_status.exit_code}')) break if command is None or command == '': job_can_run = False exit_code = 'empty command' if tasks is not None: mpi = None mpi_list = ['mpirun', 'mpiexec', 'srun'] for mpi_exec in mpi_list: if shutil.which(mpi_exec): mpi = mpi_exec if mpi: command = f'{mpi} -n {tasks} {command}' elif mpi is None: raise RuntimeError('`tasks` was set in job_details but could ' f'not find {mpi_list} in the environment.') if job_can_run: self.logger.info(f'Spawning job with ID: {calc_id}') exit_code = system( f'(cd {run_path}; {command} 2>> local_wf_stdout.log)') self.logger.info(f'Job {calc_id} execution completed') job_status = JobStatus(run_path, 'done', exit_code) job_status.metadata = extra_args.get(METADATA_KEY, {}) self.jobs[calc_id] = job_status self.save_job_dict() self.checkpoint_workflow() return calc_id