from abc import ABC
from os import path, PathLike
import subprocess as sp
from typing import Optional, Union
from .workflow_base import HPCWorkflow, JobStatus
from ..utils.exceptions import (
ProblematicJobStateError,
JobSubmissionError,
UnfullfillableDependenciesError,
)
from ..utils.data_standard import METADATA_KEY
[docs]
class LSFWF(HPCWorkflow, ABC):
"""
Workflow manager for full execution using a batch file and LSF scheduler
LSF is a fully featured workflow module for submitting jobs to the
LSF scheduler using batch files. It can handle asynchronus job submission
but still provides the option for blocking (synchronous) behavior.
Responsibilities include directory creation, job creation, job status
checking.
"""
[docs]
def __init__(self, default_template: Optional[str] = None, **kwargs):
"""
set variables and initialize the recorder
:param default_template: path to the template file to use for
submission scripts. If none provided, uses the default template
present in ./default_templates |default| ``None``
:type default_template: str
:param kwargs: remaining parameters passed to parent for init. Keys
include: queue, account, walltime, nodes, tasks, tasks_per_node,
wait_freq, root_directory, checkpoint_file, checkpoint_name, and
job_record_file
:type kwargs: dict
"""
super().__init__(**kwargs)
if default_template is None:
source_file_location = path.dirname(path.abspath(__file__))
self.default_template = (f'{source_file_location}/'
'default_templates/lsf.sh')
else:
self.default_template = default_template
# determines print format of walltime strings
self.USE_SEC = False
self.run_string = 'lrun'
[docs]
def generate_job_preamble(
self,
job_details: dict[str, Union[float, str]],
) -> str:
"""
Set LSF arguments from job_details or from defaults defined by the WF
This is a helper function for constructing the preamble of the lrun
command. Values set are nodes, tasks (optional).
:param job_details: dict passed through :meth:`~submit_job` including
any desired alterations from the workflow defaults
:type job_details: dict
:returns: populated preable string
:rtype: str
"""
node_val = job_details.get('nodes', self.default_nodes)
task_val = job_details.get('tasks', self.default_tasks)
custom_preamble = job_details.get('custom_preamble', None)
tasks_per_node_val = job_details.get('tasks_per_node',
self.default_tasks_per_node)
if custom_preamble is not None:
job_arg_string = custom_preamble
elif task_val > 1:
if tasks_per_node_val > 1:
self.logger.info((f'Warning: tasks and tasks-per-node are '
f'both specified. Using tasks = {task_val}'))
job_arg_string = (f'-N{node_val} -n{task_val}')
else:
# either tasks_per_node_val > 1, or both tasks-per-node and tasks
# = 1. In both cases, desireable to use tasks-per-node so if nodes
# > 1, can benefit from the distributed memory setup
job_arg_string = (f'-N{node_val} -T{tasks_per_node_val}')
return job_arg_string
[docs]
def update_job_status(self, lsf_ids: list[int]) -> list[str]:
"""
Query the scheduler and extract the job_status
This helper function uses bquery to check the LSF queue and extracts
updates about a job's progress, modifying the corresponding job_status
object. Status options are: 'done', 'pending', 'running', 'dependency',
and 'completing'. The current status is returned for convenience.
:param lsf_ids: list of LSF IDs of the jobs to check for completion
:type lsf_ids: list
:returns: list of job states
:rtype: list (of str)
"""
status_changed = False
job_str = ' '.join([str(x) for x in lsf_ids])
queue_output = sp.run(
f'bquery -o "id stat pendstate dependency exit_reason" {job_str}',
capture_output=True,
shell=True,
encoding='UTF-8')
updated_states = []
if queue_output.returncode != 0 or queue_output.stderr:
self.logger.info((f'Problem checking for jobs {lsf_ids}, '
f'exit code: {queue_output.returncode} '
f'stderr: {queue_output.stderr.strip()}'))
else:
split_output = queue_output.stdout.split()
for lsf_id in lsf_ids:
known_status = self.get_job_status(lsf_id)
try:
lsf_str_index = split_output.index(str(lsf_id))
lsf_state = split_output[lsf_str_index + 1]
if lsf_state == 'DONE':
new_state = 'done'
elif lsf_state == 'RUN':
new_state = 'running'
elif lsf_state == 'PEND':
pend_state = split_output[lsf_str_index + 2]
if pend_state == 'IPEND':
dependency = split_output[lsf_str_index + 3]
if dependency == '-':
new_state = 'pending_blocked'
else:
new_state = 'dependency'
else:
new_state = 'pending'
elif lsf_state == 'EXIT':
exit_reason = split_output[lsf_str_index + 4][:-1]
kill_word = split_output[lsf_str_index + 6]
if exit_reason == 'TERM_RUNLIMIT':
new_state = 'done_timeout'
elif (exit_reason == 'TERM_OWNER'
and kill_word == 'killed'):
new_state = 'done_cancelled'
else:
new_state = 'done_other'
else:
new_state = 'unknown'
except ValueError:
# LSF id not in list, so state unknown
new_state = 'unknown'
self.logger.info((f'Cannot find {lsf_id} with bquery, '
f'set state to unknown'))
except Exception:
new_state = 'error'
self.logger.info((f'Job {lsf_id} state parsing had an '
f' unknown error, set state to "error"'))
if new_state != known_status.state:
self.logger.info((f'Updating job {lsf_id} state '
f'from {known_status.state} to '
f'{new_state}'))
known_status.state = new_state
status_changed = True
if new_state in self.problematic_states:
raise ProblematicJobStateError(
(f'Orchestrator does not currently have set behavior '
f'for "{new_state}" job state. Check your queue for '
f'any remaining pending jobs.'))
updated_states.append(known_status.state)
if status_changed:
# we only update the job dict if any statuses have changed
self.checkpoint_workflow()
return updated_states
[docs]
def submit_job(
self,
command: str,
run_path: Union[str, PathLike],
job_details: Optional[dict[str, Union[float, str]]] = None,
) -> int:
"""
Submits a job for running using a submission script and bsub.
submit_job handles job submission for the modules and is the main
interface for the workflows to be used. For the :class:`LSFWF`
implementation, fully articulated batch scripts are generated each job
and submitted to the LSF scheduler via bsub. Method inputs define
the ``command`` to be executed for the job, location for the run, and
details about the job's resources (``job_details``). ``job_details``
inlcudes ``dependencies`` of the job in the form of a list of job_ids,
if the job is blocking (``synchronus``) or not, and an extra dictionary
, ``extra_args``, to add flexibility for parameterizing the job (such
as pre- or postambles to include in the batch file). ``dependencies``
are a list of job IDs which must have a successfully completed
:class:`~JobStatus` for the present job to run. However, jobs can still
be submitted to the queue with outstanding dependencies if run
asynchronously. The ``after`` key in ``extra_args`` can be specified to
define the LSF dependency behavior (i.e. 'exit' or 'done').
Note that while default job resources (nodes, account, walltime, etc.)
are present, they can be overridden by providing these keywords in the
``job_details`` dict for any specific calculation. Creates the
:class:`~.workflow_base.JobStatus` for this job, where the job state is
initially 'submitted' and can be updated to 'pending', 'dependency',
'running', 'completing', 'done', or 'unknown'. The 'done' state means
the calculation has completed, but can be decorated with suffixes that
add more information if the job didn't successfully complete (i.e.
'done_timeout'). Status checks are preformed by
:meth:`~update_job_status`. Returns the LSF ID, which can be used to
retrieve the present job's :class:`~.workflow_base.JobStatus`. If the
LSF ID cannot be identified, an internal tracking number
(starting at 1000) is used instead.
:param command: command that defines the job to be executed
:type command: str
:param run_path: directory for the job to be executed in
:type run_path: str
:param job_details: specifics for running the job, such as
number of nodes, queue, etc., as well as optional dependency list,
if the job should be synchronous or asychronous, and any other
optional arguments, such as pre- or postambles |default| ``None``
:type job_details: dict
:returns: return job ID to query this job status and location
:rtype: int
"""
# check inputs and provide information to user
if job_details is None:
job_details = {}
self.logger.info((f'No job details specified, will use defaults:\n'
f' nnodes = {self.default_nodes}, G = '
f'{self.default_account}, W = '
f'{self.default_walltime}, q = '
f'{self.default_queue}'))
synchronous = job_details.get('synchronous', False)
dependencies = job_details.get('dependencies', [])
extra_args = job_details.get('extra_args', {})
job_can_run = True
calc_id = -1
exit_code = 'undefined error'
if command is None or command == '':
job_can_run = False
exit_code = 'empty command'
self.logger.info('Job will not run: no command')
if job_can_run:
# generate the batch script
batch_file = self.generate_batch_file(
command,
run_path,
job_details,
extra_args,
)
# submit the batch script, with dependencies
if dependencies:
self.logger.info(
f'Including dependencies: {str(dependencies)[1:-1]}')
# after type for LSF should be exit (= afterany)
# or done (= afterok)
after_type = extra_args.get('after', 'exit')
# build up the depend string
depend_str = f'-w "{after_type}({dependencies[0]})'
if len(dependencies) > 1:
for remaining_ids in dependencies[1:]:
depend_str += f' && {after_type}({remaining_ids})'
depend_str += '"'
else:
depend_str = ''
self.logger.info('Spawning job, ID to be defined')
submit_command = f'cd {run_path}; bsub {depend_str} {batch_file}'
process_output = sp.run(submit_command,
capture_output=True,
shell=True,
encoding='UTF-8')
exit_code = process_output.returncode
# get the LSF ID
if exit_code != 0:
self.logger.info((f'Something wrong with submission [exit '
f'code = {exit_code}]'))
calc_id = self.extract_lsf_id(process_output.stderr)
else:
calc_id = self.extract_lsf_id(process_output.stdout)
# create job_status and add to self.jobs
job_status = JobStatus(run_path, 'submitted', exit_code)
# if synch, wait and check for completion
if synchronous:
self.block_until_completed(calc_id)
else:
calc_id = self.unknown_job_id
self.unknown_job_id += 1
self.new_unknown_id = True
job_status = JobStatus(run_path, 'done_cancelled', exit_code)
if dependencies:
raise UnfullfillableDependenciesError()
job_status.metadata = extra_args.get(METADATA_KEY, {})
self.jobs[calc_id] = job_status
self.checkpoint_workflow()
return calc_id