Workflow Module

Abstract Base Classes

class orchestrator.workflow.workflow_base.Workflow(root_directory='./orchestrator_workflow', checkpoint_file='./orchestrator_checkpoint.json', checkpoint_name=None, job_record_file=None, **kwargs)[source]

Bases: Recorder, ABC

Abstract base class to manage workflows

Responsibilities include directory creation, job creation, job status checking. A given workflow class will use the root_directory as the base for all calculation inputs and outputs in a directory hierarchy managed by make_path_base() and make_path(). workflow_args provide a vehicle for modulating the workflow behavior, though are not strictly required. The counters and jobs dictionaries are also initialized at instantiation, which are used to internally track workflow components.

__init__(root_directory='./orchestrator_workflow', checkpoint_file='./orchestrator_checkpoint.json', checkpoint_name=None, job_record_file=None, **kwargs)[source]

set variables and initialize the recorder

Parameters:
  • root_directory (str) – name of the directory under which all files and subdirectories will sit

    Default: ‘./orchestrator_workflow’

  • checkpoint_file (str) – name of the checkpoint file to write restart information to

    Default: ‘./orchestrator_checkpoint.json’

  • checkpoint_name (str) – name of the restart block for this module in the checkpoint file

    Default: ‘workflow’

  • job_record_file (str) – name of the file to save the pickled jobs dict

    Default: ‘./job_record.pkl’

make_path_base(module, path_type)[source]

Create the base directory for data and run outputs, excluding counter

make_path_base generates a hierarchical directory structure within the root directory. The path hierarchy is root/module/path_type, where the module is the orchestrator module responsible for the call, path_type is the type of calculation or job within that module. This is a shortened version of make_path(), omitting the count at the end of the directory path. The method returns the path as a string

Parameters:
  • module (str) – module requesting the path

  • path_type (str) – “type” or purpose of the path, i.e. training_data, trajectories, ground_truth, etc. This is typically supplied in an input file

Returns:

created path name

Return type:

str

make_path(module, path_type)[source]

Create the directory for data and run outputs to be located

make_path generates a hierarchical directory structure within the root directory. The path hierarchy is root/module/path_type/count, where the module is the orchestrator module responsible for the call, path_type is the type of calculation or job within that module, and count is the increment of that specific calculation type. The method returns the generated path as a string

Parameters:
  • module (str) – the module requesting the path

  • path_type (str) – “type” of the path, i.e. the purpose

Returns:

created path name

Return type:

str

get_job_status(job_handle)[source]

Queries the status of a job handle.

Returns JobStatus, which has (minimally) attributes: path , state, and exit_code, which correspond to the location of the job, the job state (i.e. completed), and an exit code that is 0 if successful and a flag with information if not.

Parameters:

job_handle (int) – job ID originally returned from submit_job()

Returns:

job’s JobStatus

Return type:

JobStatus

get_job_path(job_handle)[source]

returns the path where a specific job was run

Parameters:

job_handle (int) – job ID

Returns:

path where the job inputs/outputs are stored

Return type:

str or PathLike

get_attached_metadata(job_handle)[source]

returns the metadata associated with a specific job

Parameters:

job_handle (int) – job ID

Returns:

dict of metadata associated with the job

Return type:

dict

get_all_statuses()[source]

Returns information about all jobs from this Workflow.

Returns a dictionary with job_handle: status, where job_handle is returned by submit_job() and status is a JobStatus instance

Returns:

a dictionary of JobStatus objects

Return type:

dict

job_done_file_present(job_id)[source]

check if the job directory contains the “job_done” file

Use an empty file to give persistent indication if the job completed to avoid problems where job statuses are purged by the scheduler after a certain amount of time. The job templates will “touch job_done” at the end of the script to provide this persistent indicator.

Parameters:

job_id (int) – job ID to check

Returns:

True if file exists, false otherwise

Return type:

boolean

save_job_dict()[source]

Serialize the job dictionary for persistant storage

Write the jobs dict, containing all of the workflow’s JobStatus objects to the job_record_file. The record file is overwritten each time.

read_job_dict()[source]

Read a serialized job dictionary and set to the internal jobs dict

abstract checkpoint_workflow()[source]

checkpoint the workflow module into the checkpoint file

save necessary internal variables into a dict with key checkpoint_name and write to the (json) checkpoint file for restart capabilities

abstract restart_workflow()[source]

restart the workflow module from the checkpoint file

check if the checkpoint_file has an entry matching the checkpoint_name and set internal variables accordingly if so

abstract block_until_completed(calc_ids)[source]

Function for enforcing synchronous execution

Helper function to run when blocking behavior is desired. Will consistently check the queue until the specified job has completed. Implementation for synchronous workflows is just to pass

Parameters:

calc_ids (int or list) – list of job IDs of the calculations to check for completion. Can also pass a single ID.

abstract submit_job(command, run_path, job_details=None)[source]

Submits a job for running

submit_job handles job submission for the modules and is the main interface for the workflows to be used. Inputs define the command to be executed for the job, location for the run, and details of the job resources. job_details inlcude dependencies of the job in the form of a list of job_ids, if the job is blocking (synchronus) or not, as a boolean, and an extra dictionary, extra_args, to add flexibility for concrete implementations for further parameterizing the job. The dependencies are a list of job IDs which must have a successfully completed JobStatus for the present job to run. If one of the dependencies returns an error, this job will not run and the status will return an error. If synchronous is True, submit_job will not return until the job is completed. This may be the only behavior of some implementations. Returns a job handle that can be used to query status and to retrieve the present job’s JobStatus, which is typically created by this method.

Parameters:
  • command (implementation dependent) – command that defines the job to be executed

  • run_path (str) – location in the file system where inputs and outputs are to be accessed and stored

  • job_details (dict) – optional parameters for running the job, such as number of nodes, queue, etc.

    Default: None

Returns:

return job ID to query this job status and location

Return type:

int

class orchestrator.workflow.workflow_base.HPCWorkflow(queue, account, walltime=60, nodes=1, tasks=1, tasks_per_node=1, qos='normal', wait_freq=60, **kwargs)[source]

Bases: Workflow, ABC

Generic (and abstract class) for workflows leveraging HPC schedulers

HPCWorkflow defines the shared init args and restart functionality that LSF, Slurm, and other HPC schedulers require. It is not instantitated directly, but inherited.

__init__(queue, account, walltime=60, nodes=1, tasks=1, tasks_per_node=1, qos='normal', wait_freq=60, **kwargs)[source]

set variables and initialize the recorder

The provided input arguments set the default parameters for the workflow, but can be overridden by values passed into job_details dict provided to submit_job().

Parameters:
  • queue (str) – default name of the queue to submit to

  • account (str) – default name of the account for the job

  • walltime (int or float) – default walltime for the job in minutes

    Default: 60

  • nodes (int) – default number of nodes to request

    Default: 1

  • tasks (int) – default number of tasks for a job

    Default: 1

  • tasks_per_node (int) – default number of tasks per node for a job. Will not be used if tasks is explicitly set.

    Default: 1

  • wait_freq (int) – the frequency with which squeue is called to get job status updates, in seconds

    Default: 60

  • kwargs (dict) – remaining keywords passed to parent: root_directory, checkpoint_file, checkpoint_name, and job_record_file

static format_walltime(minutes, include_seconds)[source]

utility function to create a time string based on input minutes

different schedulers require time specifications with or without seconds, so this utility allows an integer input to be properly converted into a HH:MM or HH:MM:ss string

Parameters:
  • minutes (float or int) – number of minutes (can be fractional) to convert

  • include_seconds (bool) – whether to print out the seconds or not

Returns:

the formatted time string

Return type:

str

checkpoint_workflow()[source]

checkpoint the workflow module into the checkpoint file

save necessary internal variables into a dict with key checkpoint_name and write to the (json) checkpoint file for restart capabilities

restart_workflow()[source]

restart the workflow module from the checkpoint file

check if the checkpoint_file has an entry matching the checkpoint_name and set internal variables accordingly if so

block_until_completed(calc_ids)[source]

Function for enforcing synchronous execution

Helper function to run when blocking behavior is desired. Will consistently check the queue until the specified job has completed.

Parameters:

calc_ids (int or list) – list of job IDs of the calculations to check for completion. Can also pass a single ID.

generate_batch_file(command, run_path, job_details=None, extra_args=None)[source]

Construct a batch file for job submission

This is a helper funciton for submit_job to call to construct a batch file for the proper scheduler.

Parameters:
  • command (str) – command that defines the job to be executed

  • run_path (str) – directory for the job to be executed in

  • job_details (dict) – optional parameters for running the job. Parameters specified in this dict are: ‘nodes’, ‘tasks’, ‘tasks_per_node’, ‘queue’, ‘account’, ‘walltime’, and ‘custom_preamble’

    Default: None

  • extra_args (dict) – dictionary of extra args, can include lines to pre- or postpend the job command in the batch script (given by the ‘preamble’ and ‘postamble’ keys), additional #SCHEDULER commands (given by the ‘extra_header’ key where the value is a formatted string including the #SBATCH/#BSUB keyword) as well as alternative batch template location, specified by the ‘template’ key

    Default: None

Returns:

name of the batch file

Return type:

str

Concrete Implementations

Local

class orchestrator.workflow.local.LocalWF(**kwargs)[source]

Bases: Workflow

Workflow manager for execution in the local environment (i.e. login node)

Responsibilities include directory creation, job creation, job status checking. Can run with mpi tasks if tasks are set in job_details, but no default parallel behavior is set.

__init__(**kwargs)[source]

set variables and initialize the recorder

Parameters:

kwargs – parameters passed to parent for init. Keys include: root_directory, checkpoint_file, checkpoint_name, and job_record_file

checkpoint_workflow()[source]

checkpoint the workflow module into the checkpoint file

save necessary internal variables into a dict with key checkpoint_name and write to the (json) checkpoint file for restart capabilities

restart_workflow()[source]

restart the workflow module from the checkpoint file

check if the checkpoint_file has an entry matching the checkpoint_name and set internal variables accordingly if so

block_until_completed(calc_ids)[source]

Function for enforcing synchronous execution

Implementation is just to pass since submit_job won’t return until the job is completed.

Parameters:

calc_ids (int or list) – list of slurm IDs of the jobs to check for completion. Can also pass a single ID.

submit_job(command, run_path, job_details=None)[source]

Submits a job for running.

submit_job handles job submission for the modules and is the main interface for the workflows to be used. For the LocalWF implementation, this method uses os.system to execute the command on via command line interface. Inputs define the command to be executed for the job, location for the run, and details of the job. job_details inlcude dependencies of the job but no other keys. The dependencies are a list of job IDs which must have a successfully completed JobStatus for the present job to run. If one of the dependencies returns an error, this job will not run and the status will return an error. Creates the JobStatus for this job, where the job state is always ‘done’ since jobs are run instantly on the command line. Returns a job handle (ID) that can be used to query status and to retrieve the present job’s JobStatus.

Parameters:
  • command (implementation dependent) – command that defines the job to be executed

  • run_path (str) – location in the file system where inputs and outputs are to be accessed and stored

  • job_details (dict) – optional parameters for running the job, such as number of nodes, queue, etc.

    Default: None

Returns:

return job ID to query this job status and location

Return type:

int

Slurm (sbatch)

class orchestrator.workflow.slurm.SlurmWF(default_template=None, **kwargs)[source]

Bases: HPCWorkflow, ABC

Workflow manager for full execution using a batch file and slurm scheduler

SlurmWF is a fully featured workflow module for submitting jobs to the slurm scheduler using batch files. It can handle asynchronus job submission but still provides the option for blocking (synchronous) behavior. Responsibilities include directory creation, job creation, job status checking.

__init__(default_template=None, **kwargs)[source]

set variables and initialize the recorder

Parameters:
  • default_template (str) – path to the template file to use for submission scripts. If none provided, uses the default template present in ./default_templates

    Default: None

  • kwargs (dict) – remaining parameters passed to parent for init. Keys include: queue, account, walltime, nodes, tasks, tasks_per_node, wait_freq, root_directory, checkpoint_file, checkpoint_name, and job_record_file

extract_slurm_id(str_output)[source]

From the command line output, extract the slurm job id

Parameters:

str_output (str) – full output string to extract ID from

Returns:

slurm ID

Return type:

int

generate_job_preamble(job_details)[source]

Set slurm arguments from job_details or from defaults defined by the WF

This is a helper function for constructing the preamble of the srun command. Values set are nodes, tasks (optional)

Parameters:

job_details (dict) – dict passed through submit_job() including any desired alterations from the workflow defaults

Returns:

populated preable string

Return type:

str

check_completed_job_status(slurm_id)[source]

Use scontrol to extract the status of a completed job

Parameters:

slurm_id (int) – job ID of the job to query

Returns:

job status

Return type:

str

update_job_status(slurm_ids)[source]

Query the scheduler and extract the job_status

This helper function uses squeue to check the slurm queue and extracts updates about a job’s progress, modifying the corresponding job_status object. Status options are: ‘done’, ‘pending’, ‘running’, ‘dependency’, and ‘completing’. The current status is returned for convenience.

Parameters:

slurm_ids (list) – list of slurm IDs of the jobs to check for completion

Returns:

list of job states

Return type:

list (of str)

submit_job(command, run_path, job_details=None)[source]

Submits a job for running using a submission script and sbatch.

submit_job handles job submission for the modules and is the main interface for the workflows to be used. For the SlurmWF implementation, fully articulated batch scripts are generated each job and submitted to the slurm scheduler via sbatch. Method inputs define the command to be executed for the job, location for the run, and details about the job’s resources (job_details). job_details inlcudes dependencies of the job in the form of a list of job_ids, if the job is blocking (synchronus) or not, and an extra dictionary , extra_args, to add flexibility for parameterizing the job (such as pre- or postambles to include in the batch file). dependencies are a list of job IDs which must have a successfully completed JobStatus for the present job to run. However, jobs can still be submitted to the queue with outstanding dependencies if run asynchronously. The after key in extra_args can be specified to define the slurm dependency behavior (i.e. ‘afterany’ or ‘afterok’). Note that while default job resources (nodes, account, walltime, etc.) are present, they can be overridden by providing these keywords in the job_details dict for any specific calculation. Creates the JobStatus for this job, where the job state is initially ‘submitted’ and can be updated to ‘pending’, ‘dependency’, ‘running’, ‘completing’, ‘done’, or ‘unknown’. The ‘done’ state means the calculation has completed, but can be decorated with suffixes that add more information if the job didn’t successfully complete (i.e. ‘done_timeout’). Status checks are preformed by update_job_status(). Returns the slurm ID, which can be used to retrieve the present job’s JobStatus. If the slurm ID cannot be identified, an internal tracking number (starting at 1000) is used instead.

Parameters:
  • command (str) – command that defines the job to be executed

  • run_path (str) – directory for the job to be executed in

  • job_details (dict) – specifics for running the job, such as number of nodes, queue, etc., as well as optional dependency list, if the job should be synchronous or asychronous, and any other optional arguments, such as pre- or postambles

    Default: None

Returns:

return job ID to query this job status and location

Return type:

int

LSF (bsub)

class orchestrator.workflow.lsf.LSFWF(default_template=None, **kwargs)[source]

Bases: HPCWorkflow, ABC

Workflow manager for full execution using a batch file and LSF scheduler

LSF is a fully featured workflow module for submitting jobs to the LSF scheduler using batch files. It can handle asynchronus job submission but still provides the option for blocking (synchronous) behavior. Responsibilities include directory creation, job creation, job status checking.

__init__(default_template=None, **kwargs)[source]

set variables and initialize the recorder

Parameters:
  • default_template (str) – path to the template file to use for submission scripts. If none provided, uses the default template present in ./default_templates

    Default: None

  • kwargs (dict) – remaining parameters passed to parent for init. Keys include: queue, account, walltime, nodes, tasks, tasks_per_node, wait_freq, root_directory, checkpoint_file, checkpoint_name, and job_record_file

extract_lsf_id(str_output)[source]

From the command line output, extract the LSF job id

Parameters:

str_output (str) – full output string to extract ID from

Returns:

LSF ID

Return type:

int

generate_job_preamble(job_details)[source]

Set LSF arguments from job_details or from defaults defined by the WF

This is a helper function for constructing the preamble of the lrun command. Values set are nodes, tasks (optional).

Parameters:

job_details (dict) – dict passed through submit_job() including any desired alterations from the workflow defaults

Returns:

populated preable string

Return type:

str

update_job_status(lsf_ids)[source]

Query the scheduler and extract the job_status

This helper function uses bquery to check the LSF queue and extracts updates about a job’s progress, modifying the corresponding job_status object. Status options are: ‘done’, ‘pending’, ‘running’, ‘dependency’, and ‘completing’. The current status is returned for convenience.

Parameters:

lsf_ids (list) – list of LSF IDs of the jobs to check for completion

Returns:

list of job states

Return type:

list (of str)

submit_job(command, run_path, job_details=None)[source]

Submits a job for running using a submission script and bsub.

submit_job handles job submission for the modules and is the main interface for the workflows to be used. For the LSFWF implementation, fully articulated batch scripts are generated each job and submitted to the LSF scheduler via bsub. Method inputs define the command to be executed for the job, location for the run, and details about the job’s resources (job_details). job_details inlcudes dependencies of the job in the form of a list of job_ids, if the job is blocking (synchronus) or not, and an extra dictionary , extra_args, to add flexibility for parameterizing the job (such as pre- or postambles to include in the batch file). dependencies are a list of job IDs which must have a successfully completed JobStatus for the present job to run. However, jobs can still be submitted to the queue with outstanding dependencies if run asynchronously. The after key in extra_args can be specified to define the LSF dependency behavior (i.e. ‘exit’ or ‘done’). Note that while default job resources (nodes, account, walltime, etc.) are present, they can be overridden by providing these keywords in the job_details dict for any specific calculation. Creates the JobStatus for this job, where the job state is initially ‘submitted’ and can be updated to ‘pending’, ‘dependency’, ‘running’, ‘completing’, ‘done’, or ‘unknown’. The ‘done’ state means the calculation has completed, but can be decorated with suffixes that add more information if the job didn’t successfully complete (i.e. ‘done_timeout’). Status checks are preformed by update_job_status(). Returns the LSF ID, which can be used to retrieve the present job’s JobStatus. If the LSF ID cannot be identified, an internal tracking number (starting at 1000) is used instead.

Parameters:
  • command (str) – command that defines the job to be executed

  • run_path (str) – directory for the job to be executed in

  • job_details (dict) – specifics for running the job, such as number of nodes, queue, etc., as well as optional dependency list, if the job should be synchronous or asychronous, and any other optional arguments, such as pre- or postambles

    Default: None

Returns:

return job ID to query this job status and location

Return type:

int

Slurm to LSF (bsub)

class orchestrator.workflow.slurm_to_lsf.SlurmtoLSFWF(default_template=None, **kwargs)[source]

Bases: HPCWorkflow, ABC

Workflow manager for full execution using a batch file and LSF scheduler

SlurmtoLSF is a fully featured workflow module for submitting jobs to the LSF scheduler using batch files while Orchestrator is running on SLURM. It can handle asynchronus job submission but still provides the option for blocking (synchronous) behavior. Responsibilities include directory creation, job creation, job status checking.

__init__(default_template=None, **kwargs)[source]

set variables and initialize the recorder

Parameters:
  • default_template (str) – path to the template file to use for submission scripts. If none provided, uses the default template present in ./default_templates

    Default: None

  • kwargs (dict) – remaining parameters passed to parent for init. Keys include: queue, account, walltime, nodes, tasks, tasks_per_node, wait_freq, root_directory, checkpoint_file, checkpoint_name, and job_record_file

extract_lsf_id(str_output)[source]

From the command line output, extract the LSF job id

Parameters:

str_output (str) – full output string to extract ID from

Returns:

LSF ID

Return type:

int

generate_job_preamble(job_details)[source]

Set LSF arguments from job_details or from defaults defined by the WF

This is a helper function for constructing the preamble of the lrun command. Values set are nodes, tasks (optional).

Parameters:

job_details (dict) – dict passed through submit_job() including any desired alterations from the workflow defaults

Returns:

populated preable string

Return type:

str

update_job_status(lsf_ids)[source]

Query the scheduler and extract the job_status

This helper function uses bquery to check the LSF queue and extracts updates about a job’s progress, modifying the corresponding job_status object. Status options are: ‘done’, ‘pending’, ‘running’, ‘dependency’, and ‘completing’. The current status is returned for convinience.

Parameters:

lsf_ids (list) – list of LSF IDs of the jobs to check for completion

Returns:

list of job states

Return type:

list (of str)

submit_job(command, run_path, job_details=None)[source]

Submits a job for running using a submission script and bsub.

submit_job handles job submission for the modules and is the main interface for the workflows to be used. For the SlurmtoLSFWF implementation, fully articulated batch scripts are generated each job and submitted to the LSF scheduler via bsub from another machine. Method inputs define the command to be executed for the job, location for the run, and details about the job’s resources (job_details). job_details inlcudes dependencies of the job in the form of a list of job_ids, if the job is blocking (synchronus) or not, and an extra dictionary , extra_args, to add flexibility for parameterizing the job (such as pre- or postambles to include in the batch file). dependencies are a list of job IDs which must have a successfully completed JobStatus for the present job to run. However, jobs can still be submitted to the queue with outstanding dependencies if run asynchronously. The after key in extra_args can be specified to define the LSF dependency behavior (i.e. ‘exit’ or ‘done’). Note that while default job resources (nodes, account, walltime, etc.) are present, they can be overridden by providing these keywords in the job_details dict for any specific calculation. Creates the JobStatus for this job, where the job state is initially ‘submitted’ and can be updated to ‘pending’, ‘dependency’, ‘running’, ‘completing’, ‘done’, or ‘unknown’. The ‘done’ state means the calculation has completed, but can be decorated with suffixes that add more information if the job didn’t successfully complete (i.e. ‘done_timeout’). Status checks are preformed by update_job_status(). Returns the LSF ID, which can be used to retrieve the present job’s JobStatus. If the LSF ID cannot be identified, an internal tracking number (starting at 1000) is used instead.

Parameters:
  • command (str) – command that defines the job to be executed

  • run_path (str) – directory for the job to be executed in

  • job_details (dict) – specifics for running the job, such as number of nodes, queue, etc., as well as optional dependency list, if the job should be synchronous or asychronous, and any other optional arguments, such as pre- or postambles

    Default: None

Returns:

return job ID to query this job status and location

Return type:

int

AiiDA

class orchestrator.workflow.aiida.AiidaWF(**kwargs)[source]

Bases: HPCWorkflow, ABC

Workflow class for using AiiDA

Note that kwargs set the default parameters for the workflow, but can be overridden by values passed into job_details provided to submit_job().

__init__(**kwargs)[source]

Initialize the values needed for the AiiDA WF.

Parameters:

kwargs (dict) – arguments to control workflow behavior, keys may include root_directory, checkpoint_file, checkpoint_name, and job_record. These will be defaulted to ‘./orchestrator_workflow’ and ‘./orchestrator_checkpoint.json’, ‘workflow’, and ‘./job_record.pkl’, respectively. AiiDA additionally uses ‘queue’ -> ‘pbatch’, ‘account’ -> ‘iap’, ‘walltime’ -> ‘1:00’, ‘nodes’ -> 1, ‘tasks’ -> 1, and ‘tasks_per_node’ -> 1

check_daemon_status()[source]

Check the status of the daemon used for AiiDA. If the daemon is not currently running, will attempt to start the daemon.

check_daemon_workload(workload_target=0.9)[source]

Will check the workload of the daemon and set to the designated workload target.

Parameters:

workload_target (float) – Target value to set the workload. Is used to decided the number of daemon workers needed. Default value is set to 90%.

get_aiida_daemon_client()[source]

Retrieve the daemon client used within in AiiDA.

Return type:

DaemonClient

checkpoint_workflow()[source]

Checkpoint the workflow module into the checkpoint file.

Save necessary internal variables into a dict with key checkpoint_name and write to the (json) checkpoint file for restart capabilities.

restart_workflow()[source]

Restart the workflow module from the checkpoint file.

Check if the checkpoint_file has an entry matching the checkpoint_name and set internal variables accordingly if so.

update_job_status(pks)[source]

Query the scheduler and extract the job_status.

This helper function uses the AiiDA QueryBuilder to check updates about a job’s progress, modifying the corresponding job_status object. Status options are: ‘CREATED’, ‘EXCEPTED’, ‘FINISHED’, ‘KILLED’, ‘RUNNING’, and ‘WAITING’. The current status is returned for convenience.

Parameters:

pks (list[int]) – list of AiiDA PKs of the jobs to check for completion

Return type:

list[str]

Returns:

list of job states

block_until_completed(pks)[source]

Function will periodically check on the job status in AiiDA. The time between checks is based on the wait_freq variable in the workflow. The default value is 60 seconds.

Parameters:

pks (Union[int, list[int]]) – list of AiiDA PKs of the jobs to check for completion. Can also pass a single ID.

submit_job(builder, job_details)[source]

Submits a job to AiiDA.

submit_job handles job submission to AiiDA for the Oracle calculations. As most information should have already been defined in the computer and code items of AiiDA, this function will primarily set things such as details about the job’s resources (job_details). Note that while default job resources (nodes, account, walltime, etc.) are present, they can be overridden by providing these keywords in the job_details dict for any specific calculation. Creates the JobStatus for this job, where the job state is initially ‘CREATED’ and can be updated to ‘EXCEPTED’, ‘FINISHED’, ‘KILLED’, ‘RUNNING’, or ‘WAITING’. The ‘FINISHED’ state means the calculation has completed, but can be decorated with suffixes that add more information if the job didn’t successfully complete (i.e. ‘FINISHED_TIMEOUT’). Status checks are performed by update_job_status(). Returns the AiiDA pk, which can be used to retrieve the present job’s JobStatus.

Parameters:
  • builder (ProcessBuilder) – AiiDA builder object containing the required information to submit the calculation.

  • job_details (dict) – specifics for running the job, such as number of nodes, queue, etc., as well as optional dependency list, if the job should be synchronous or asychronous, and any other optional arguments, such as pre- or postambles

    Default: None

Return type:

int

Returns:

return job ID to query this job status and location

static get_job_path(pk)[source]

Given the parent pk value that an Oracle returns, will get the absolute path on the remote server.

Parameters:

pk (int) – AiiDA PK

Return type:

Union[str, PathLike]

Returns:

Path on the remote server where the calculation occurred.

Workflow Builder

orchestrator.workflow.factory.workflow_factory = <orchestrator.utils.module_factory.ModuleFactory object>

default factory for workflows, includes LOCAL

class orchestrator.workflow.factory.WorkflowBuilder(factory=<orchestrator.utils.module_factory.ModuleFactory object>)[source]

Bases: ModuleBuilder

Constructor for workflows added in the factory

set the factory to be used for the builder. The default is to use the workflow_factory generated at the end of this module. A user defined ModuleFactory can optionally be supplied instead.

Parameters:

factory (ModuleFactory) – a workflow factory

Default: workflow_factory

__init__(factory=<orchestrator.utils.module_factory.ModuleFactory object>)[source]

constructor for the WorkflowBuilder, sets the factory to build from

Parameters:

factory (ModuleFactory) – a workflow factory

Default: workflow_factory

build(workflow_type, workflow_args)[source]

Return an instance of the specified workflow

The build method takes the specifier and input arguments to construct a concrete workflow instance.

Parameters:
  • workflow_type (str) – token of a workflow which has been added to the factory

  • workflow_args (dict) – arguments to control workflow behavior

    Default: None

Returns:

instantiated concrete Workflow

Return type:

Workflow

orchestrator.workflow.factory.workflow_builder = <orchestrator.workflow.factory.WorkflowBuilder object>

workflow builder object which can be imported for use in other modules

Job Status

class orchestrator.workflow.workflow_base.JobStatus(path, state=None, exit_code=None)[source]

Bases: object

data class for collecting meta-data about a job

Minimum attribute set for a JobStatus object is path, state, and exit_code, where path defines the location where inputs and outputs are stored, state defines the jobs status (i.e. done, pending, running) , and exit_code shows the result of the job once completed, with 0 indicating success. Note that state can change over time and exit_code may not be known when the object is first created.

__init__(path, state=None, exit_code=None)[source]
Parameters:
  • path (str) – directory path where inputs and outputs are stored for the job

  • state (str) – jobs status (i.e. done, pending, running)

  • exit_code (int or str) – result of the job