Source code for orchestrator.target_property.melting_point

import numpy as np
import random
import ase.io
from ase import Atoms
from glob import glob
from os.path import getmtime, split, join
from typing import Union, Optional, List, Dict, Any, Tuple
from ..simulator import simulator_builder
from ..potential import Potential
from ..utils.exceptions import AnalysisError, DensityOOBError
from . import TargetProperty
from ..workflow import Workflow
from ..storage import Storage
from orchestrator.target_property.analysis import AnalyzeLammpsLog
from ..utils.restart import restarter
from ..utils.isinstance import isinstance_no_import


[docs] class MeltingPoint(TargetProperty): """ Class for determining melting point of a material Explore temperature range and determine final temperature that results in solid/liquid phase. Simulation parameters and required paths are read from json file. :param target_property_args: dict with the input parameters. The parameters include: :param path_type: path to perform target property calculations :type path_type: str :param model_path: path to store the potential file :type model_path: str :param init_config: path to pull configuration files :type init_config: str :param init_config_use: option to use an initial config file :type init_config_use: boolean :param gpu_use: option to use gpu for running simulations :type gpu_use: boolean :param random_seed_use: option to use random seed in the simulation :type random_seed_use: boolean :param melting_calc_params: melting point estimation specific parameters :type melting_calc_params: dict :param sim_params: simulation specific parameters :type sim_params: dict :param simulator_type: name of the simulator to perform simulations :type simulator_type: str :param simulator_path: path to the simulator executable :type simulator_path: str :param elements: list of elements which are present in the simulation :type elements: list :param input_template: LAMMPS template input files. This is a dictionary of key-value pairs that can be used to define any simulation input template files required for conducting simulations (e.g. one input simulations). :type input_template: dict """
[docs] def __init__(self, **target_property_args): """ Initialization of the MeltingPoint class with args dict :param target_property_args: dict with the input parameters. The parameters include: :param path_type: path to perform target property calculations :type path_type: str :param model_path: path to store the potential file :type model_path: str :param init_config: path to pull configuration files :type init_config: str :param init_config_use: option to use an initial config file :type init_config_use: boolean :param gpu_use: option to use gpu for running simulations :type gpu_use: boolean :param random_seed_use: option to use random seed in the simulation :type random_seed_use: boolean :param melting_calc_params: melting point estimation specific parameters :type melting_calc_params: dict :param sim_params: simulation specific parameters :type sim_params: dict :param job_details: optional parameters for running the job :type job_details: dict :param simulator_type: name of the simulator to perform simulations :type simulator_type: str :param simulator_path: path to the simulator executable :type simulator_path: str :param elements: list of elements which are present in the simulation :type elements: list :param input_template: LAMMPS template input file. This is a dictionary of key-value pairs that can be used to define any simulation input template files required for conducting simulations (e.g. one input template for npt simulations and another inpute template for nph simulations). :type input_template: dict """ self.path_type = target_property_args['path_type'] self.model_path = target_property_args['model_path'] self.init_config = target_property_args['init_config'] self.init_config_use = target_property_args['init_config_use'] self.random_seed_use = target_property_args['random_seed_use'] self.melting_calc_params = target_property_args['melting_calc_params'] self.sim_params = target_property_args['sim_params'] job_details = target_property_args['job_details'] npt_job_details = target_property_args.get('npt_job_details', {}) self.npt_job_details = {**job_details, **npt_job_details} nph_job_details = target_property_args.get('nph_job_details', {}) self.nph_job_details = {**job_details, **nph_job_details} self.num_calculations = target_property_args.get('num_calculations', 1) simulator_type = target_property_args['simulator_type'] simulator_path = target_property_args['simulator_path'] self.elements = target_property_args['elements'] self.input_template_npt = target_property_args['input_template']['NPT'] self.input_template_nph = target_property_args['input_template']['NPH'] npt_simulator_args = { 'code_path': simulator_path, 'elements': self.elements, 'input_template': self.input_template_npt, } self.built_simulator_npt = simulator_builder.build( simulator_type, npt_simulator_args, ) nph_simulator_args = { 'code_path': simulator_path, 'elements': self.elements, 'input_template': self.input_template_nph, } self.built_simulator_nph = simulator_builder.build( simulator_type, nph_simulator_args, ) self.progress_flag = 'init' self.current_state = {} self.outstanding_npt = [] self.outstanding_nph = None self.npt_calcs = [] self.nph_calcs = [] super().__init__(**target_property_args)
[docs] def checkpoint_property(self) -> None: """ checkpoint the property module into the checkpoint file save necessary internal variables into a dict with key checkpoint_name and write to the (json) checkpoint file for restart capabilities """ save_dict = { self.checkpoint_name: { 'progress_flag': self.progress_flag, 'current_state': self.current_state, 'npt_calcs': self.npt_calcs, 'nph_calcs': self.nph_calcs } } restarter.write_checkpoint_file(self.checkpoint_file, save_dict)
[docs] def restart_property(self) -> None: """ restart the property module from the checkpoint file check if the checkpoint_file has an entry matching the checkpoint_name and set internal variables accordingly if so """ restart_dict = restarter.read_checkpoint_file( self.checkpoint_file, self.checkpoint_name, ) self.progress_flag = restart_dict.get('progress_flag', self.progress_flag) self.current_state = restart_dict.get('current_state', self.current_state) self.npt_calcs = restart_dict.get('npt_calcs', self.npt_calcs) self.nph_calcs = restart_dict.get('nph_calcs', self.nph_calcs) if len(self.npt_calcs) > 0: self.outstanding_npt = self.npt_calcs[-1] if len(self.nph_calcs) > 0: self.outstanding_nph = self.nph_calcs[-1] if self.progress_flag != 'init': if self.progress_flag == 'done': # restart information exists but the last calculation ended self.restart = False self.npt_calcs = [] self.nph_calcs = [] else: # restart information exists and we actually want to restart self.restart = True else: # no restart information exists self.restart = False
[docs] def calculate_property( self, iter_num: int = 0, modified_params: Optional[Dict[str, Any]] = None, potential: Optional[Union[str, Potential]] = None, workflow: Optional[Workflow] = None, storage: Optional[Storage] = None, **kwargs, ) -> Dict[str, Union[float, Tuple[List[int], List[int]]]]: """ Find final equilibrium temperature for the melting point Iterate over range of temperatures between temp_min and temp_max, and adjust the temperature range depending on the analysis of liquid and solid phases. Continue spawning simulations with different temperatures until difference between maximum and minimum temperatures reduces below a set threshold (temp_threshold). After finding equilibrium temperature, perform NPH simulations and check if two phases exist by analyzing q_x parameter. Adjust the temperature until getting two phases.The extract_q and extract_msd methods from AnalyzeLammpsLog calculate local bond parameter q_x and mean square displacement, respectively. Additional parameters used by this method are temp_threshold (float, threshold temperature difference between temp_max and temp_min to determine the convergence of the final equilibriumm temperature), temp_min (float, min temperature guess), temp_max (float, max temperature guess), temp_incr (float, temperature increment for screening temperature in NPH stage),num_temp (int, number of temperatures to test between minimum and maximum temperature guesses, max_iter (int, maximum number of iterations for screening temperature in NPH stage). These are defined by self.melting_calc_params and set at class instantiation. This method also uses temp (float, simulation temperature), press (float, simulation pressure), which are defined by self.sim_params and set at class instantiation. :param iter_num: iteration number of the calculation of melting point if executed over multiple iterations :type iter_num: int :param modified_params: simulation parameters to modify the initially provided values, including interatomic potentials, simulation temperature and pressure :type modified_params: dict :param workflow: the workflow for managing job submission :type workflow: Workflow :param potential: interatomic potential to be used in LAMMPS. Can be either the string of a KIM potential available via the KIM API or a Potential object created by the Orchestrator. :type potential: str or Potential :returns: dictionary with the final temperature that results in solid/ liquid phases as the property_value, std based on time-averaging for the property_std, and a tuple of the NPT calculation ID list and the final NPH calculation ID as the calc_ids :rtype: dict """ if workflow is None: workflow = self.default_wf if modified_params is not None: self.sim_params['temp'] = modified_params['temp'] self.sim_params['press'] = modified_params['press'] if isinstance_no_import(potential, 'Potential'): if hasattr(potential, 'install_potential_in_kim_api') and callable( potential.install_potential_in_kim_api): module_name = self.__class__.__name__ save_root = workflow.make_path( module_name, 'potential_for_melting_point', ) potential_name = potential.kim_id potential.install_potential_in_kim_api( potential_name=potential_name, save_path=save_root) self.model_path = f'{save_root}/{potential_name}' self.sim_params['potential'] = potential_name else: raise NotImplementedError('Melting point calculations only ' 'supports Potentials with working ' 'install_potential_in_kim_api ' 'methods at this time') else: if potential is not None: self.sim_params['potential'] = potential timestep = self.sim_params.get('timestep', 0.001) lattice_param = self.sim_params.get('lattice_param', 'unknown') assert lattice_param != 'unknown' and lattice_param > 0, \ "Lattice parameter is not given or assigned to a wrong value" temp_threshold = self.melting_calc_params.get('temp_thresh', 1000.0) assert temp_threshold > 0, \ "Temperature threshold must be a positive value" temp_min = self.melting_calc_params.get('temp_min', 0.0) assert isinstance(temp_min, float) or isinstance(temp_min, int), \ "Minimum temperature must be a number" temp_max = self.melting_calc_params.get('temp_max', 12000.0) assert isinstance(temp_max, float) or isinstance(temp_max, int), \ "Maximum temperature must be a number" num_temp = self.melting_calc_params.get('num_temp', 4) assert num_temp > 0, "Number of temperatures must be a positive value" temp_incr = self.melting_calc_params.get('temp_incr', 100.0) assert temp_incr > 0, "Temperature threshold must be a positive value" exp_den_solid = self.melting_calc_params.get('exp_den_solid', 'none') assert exp_den_solid != 'none' and exp_den_solid > 0, \ "Exp. solid density is not given or assigned to a wrong value" exp_den_liquid = self.melting_calc_params.get('exp_den_liquid', 'none') assert exp_den_liquid != 'none' and exp_den_liquid > 0, \ "Exp. liquid density is not given or assigned to a wrong value" den_tol = self.melting_calc_params.get('den_tol', 10.0) assert den_tol > 0, "Density tolerance must be a positive value" max_iter = self.melting_calc_params.get('max_iter', 4) assert max_iter > 0, "Maximum iterations must be a positive value" eps_dbscan = self.melting_calc_params.get('eps_dbscan', 0.025) assert eps_dbscan != 'none' and eps_dbscan > 0, \ "Eps parameter is not set or assigned to a wrong value" temp_final = None calc_id_error = [] if not self.restart or self.progress_flag == 'npt': if self.progress_flag == 'npt': # if progress flag is npt, current state is guaranteed to have # temp_{min,max} keys temp_min = self.current_state['temp_min'] temp_max = self.current_state['temp_max'] self.logger.info('Temp bounds set from checkpoint, waiting for' f' calcs {self.npt_calcs} to finish') self.logger.info(f'Initial min and max temperatures are: ' f'{temp_min} K and {temp_max} K') phase = 'unknown' while (temp_max - temp_min) > temp_threshold: temp_list = np.linspace(temp_min, temp_max, num_temp + 2) self.logger.info(f'Temperatures to test : {temp_list}') if len(self.outstanding_npt) == 0: for temp in temp_list: self.sim_params['temp'] = temp calc_id = self.conduct_sim( self.sim_params, workflow, self.path_type + '/' + str(iter_num) + '/NPT') self.outstanding_npt.append(calc_id) # checkpoint after the batch of calcs are submitted self.npt_calcs.append(self.outstanding_npt) self.current_state = self.current_state | { 'temp_min': temp_min, 'temp_max': temp_max, } self.progress_flag = 'npt' self.checkpoint_property() workflow.block_until_completed(self.outstanding_npt) phase_list = [] q_list = [] for c_id in self.outstanding_npt: run_path_npt = workflow.get_job_path(c_id) log_npt = run_path_npt + '/' + 'lammps.out' if 'ERROR: Lost atoms:' in open(log_npt).read(): self.logger.info(f'Lost atoms error for: {c_id}') calc_id_error.append(c_id) if len(calc_id_error) > 0: traj_files = glob(f'{run_path_npt}/*.lammpstrj') file_times = [getmtime(f) for f in traj_files] newest_file = split(traj_files[np.argmax(file_times)])[1] results_dict = { 'property_value': newest_file, 'property_std': None, 'calc_ids': calc_id_error, 'success': False, } return results_dict for c_id in self.outstanding_npt: run_path_npt = workflow.get_job_path(c_id) log_msd_npt = run_path_npt + '/' + 'log_msd.dat' q_profile_npt = run_path_npt + '/' + 'q_profile.dat' two_phase_flag, two_phase_temp, two_phase_temp_std, \ label_unique, average_q_npt, \ total_atoms = AnalyzeLammpsLog.extract_q( [run_path_npt, log_msd_npt, q_profile_npt, str(eps_dbscan)]) phase = AnalyzeLammpsLog.extract_msd( [log_msd_npt, str(timestep), str(lattice_param)]) density_npt = AnalyzeLammpsLog.extract_density( [log_msd_npt]) self.check_density(phase, density_npt, exp_den_solid, exp_den_liquid, den_tol, c_id) phase_list.append(phase) q_list.append(np.mean(average_q_npt)) # done with the npt calcs on this iteration, reset for looping self.outstanding_npt = [] assert 'solid' in phase_list, ('All resulting phases are ' 'liquid. Please adjust Tmin and' ' Tmax') assert 'liquid' in phase_list, ('All resulting phases are ' 'solid. Please adjust Tmin and' ' Tmax') index_solid = [ i for i in range(len(phase_list)) if phase_list[i] == 'solid' ] index_liquid = [ i for i in range(len(phase_list)) if phase_list[i] == 'liquid' ] max_index_solid = max(index_solid) min_index_solid = min(index_solid) min_index_liquid = min(index_liquid) max_index_liquid = max(index_liquid) if min_index_liquid > max_index_solid: temp_max = temp_list[min_index_liquid] temp_min = temp_list[max_index_solid] q_liquid = q_list[max_index_liquid] q_solid = q_list[min_index_solid] self.current_state = self.current_state | { 'temp_min': temp_min, 'temp_max': temp_max, } # checkpoint after analysis is done self.checkpoint_property() else: raise AnalysisError('Phases are not in right order, please' ' check msd analysis') self.logger.info(f'New minimum and maximum temperatures are: ' f'{temp_min} K and {temp_max} K ') # this block will execute if restart == True and progress == nph # or just naturally once the previous block ends two_phase_flag = False if self.progress_flag == 'nph': # if progress flag is nph, current state is guaranteed to have # n_iter and temp_estimate keys n_iter = self.current_state['n_iter'] temp_estimate = self.current_state['temp_estimate'] self.logger.info(f'Restarting at n_iter = {n_iter} (temp_estimate ' f' = {temp_estimate})') else: n_iter = 0 temp_estimate = temp_min while not two_phase_flag: if self.outstanding_nph is None: if n_iter == 0: temp_estimate = temp_estimate average_q_nph = 0 else: if abs(np.mean(average_q_nph) - q_solid) < abs(np.mean(average_q_nph) - q_liquid): temp_estimate += temp_incr else: temp_estimate -= temp_incr self.sim_params['temp'] = temp_estimate self.logger.info(f'Iteration {n_iter}: Starting NPH simulation' ' with an estimated equilibrium temperature: ' f' {temp_estimate} K') self.outstanding_nph = self.conduct_sim( self.sim_params, workflow, self.path_type + '/' + str(iter_num) + '/NPH', ) self.nph_calcs.append(self.outstanding_nph) # checkpoint after the batch of calc is submitted self.current_state = self.current_state | { 'n_iter': n_iter, 'temp_estimate': temp_estimate, } self.progress_flag = 'nph' self.checkpoint_property() workflow.block_until_completed(self.outstanding_nph) run_path_nph = workflow.get_job_path(self.outstanding_nph) log_nph = run_path_nph + '/' + 'lammps.out' if 'ERROR: Lost atoms:' in open(log_nph).read(): calc_id_error.append(self.outstanding_nph) self.logger.info( f'Lost atoms error for: {self.outstanding_nph}') if len(calc_id_error) > 0: traj_files = glob(f'{run_path_nph}/*.lammpstrj') file_times = [getmtime(f) for f in traj_files] newest_file = split(traj_files[np.argmax(file_times)])[1] results_dict = { 'property_value': newest_file, 'property_std': None, 'calc_ids': calc_id_error, 'success': False, } return results_dict self.outstanding_nph = None q_profile_nph = run_path_nph + '/' + 'q_profile.dat' log_msd_nph = run_path_nph + '/' + 'log_msd.dat' two_phase_flag, two_phase_temp, two_phase_temp_std, \ label_unique, average_q_nph, \ total_atoms = AnalyzeLammpsLog.extract_q( [run_path_nph, log_msd_nph, q_profile_nph, str(eps_dbscan)]) density_nph = AnalyzeLammpsLog.extract_density([log_msd_nph]) density_upper = (exp_den_solid + exp_den_liquid) / 2 + den_tol density_lower = (exp_den_solid + exp_den_liquid) / 2 - den_tol if density_nph < density_upper and density_nph > density_lower: self.logger.info( f'Overall density ({density_nph} g/cm^3) is in expected' f' range, continue melting point calculations ') else: raise AnalysisError( 'Overall density for two phase is out of expected range') for i in range(0, len(label_unique)): self.logger.info( f'Phase {label_unique[i]} includes {total_atoms[i]} atoms ' f'with an average order parameter of {average_q_nph[i]}') if not two_phase_flag: n_iter += 1 if n_iter > max_iter or temp_estimate >= temp_max: self.logger.info('Two phase cannot be generated ' 'please check your settings. ' 'Exiting the melting calculations') temp_final = -1 temp_std_final = -1 break else: temp_final = two_phase_temp temp_std_final = two_phase_temp_std self.logger.info(f'Final equilibrium temperature that results ' f'in solid/liquid phases is: {temp_final} K ' f'with a std of {two_phase_temp_std} K ') for key in ['temp_min', 'temp_max', 'n_iter', 'temp_estimate']: _ = self.current_state.pop(key, None) self.progress_flag = 'done' return_npt = self.npt_calcs self.npt_calcs = [] return_nph = self.nph_calcs self.nph_calcs = [] self.checkpoint_property() # return results results_dict = { 'property_value': temp_final, 'property_std': temp_std_final, 'calc_ids': (return_npt, return_nph), 'success': True, } return results_dict
[docs] def conduct_sim( self, sim_params: Dict[str, Any], workflow: Workflow, sim_path: str, ) -> int: """ Perform simulations for the target property calculations Additional parameters used by this method are temp (float, simulation temperature), press (float, simulation pressure), ice_temp (float, ice temperature), below_temp_diff (float, temperature to subtract to define below temperature), above_temp_diff (float, temperature to add to define above temperature), farabove_temp_diff (float, temperature to add to define far above temperature), units (string, units to be used in LAMMPS simulations), atom_style (string, atom style name), pair_style (string, pair style name), potential (string, potential name), element (string, type of element), mass (float, mass of the element), lattice (string, lattice type), lattice_param (float, lattice spacing), lx, ly, lz (int, lattice numbers in x, y and z directions), timestep (float, timestep size in time units), q_num_neigh (int, number of nearest neighbors for q_x calculations), nph_steps (int, number of steps for the NPH stage) which are defined by self.sim_params and set at class instantiation. These parameters are modified by calculate_property method on-the-fly and supplied to this method for setting simulation temperature and pressure. :param workflow: the workflow for managing job submission :type workflow: Workflow :param sim_path: path to perform simulations for target property calculations :type sim_path: str :returns: path corresponding to a spawned simulation :rtype: string """ press = sim_params.get('press', 0.0) temp = sim_params.get('temp', 298.15) ice_temp = sim_params.get('ice_temp', 0.0) below_temp_diff = sim_params.get('below_temp_diff', 100.0) above_temp_diff = sim_params.get('above_temp_diff', 100.0) farabove_temp_diff = sim_params.get('farabove_temp_diff', 2000.0) units = sim_params.get('units', 'metal') atom_style = sim_params.get('atom_style', 'atomic') pair_style = sim_params.get('pair_style', 'unknown') assert pair_style != 'unknown' and isinstance(pair_style, str), \ "Pair style is not given or assigned to wrong type" potential = sim_params.get('potential', 'unknown') assert potential != 'unknown' and isinstance(potential, str), \ "Potential is not given or assigned to wrong type" element = sim_params.get('element', 'unknown') assert element != 'unknown' and isinstance(element, str), \ "Element is not given or assigned to wrong type" mass = sim_params.get('mass', 'unknown') assert mass != 'unknown' and mass > 0, \ "Mass is not given or assigned to a wrong value" lattice = sim_params.get('lattice', 'unknown') assert lattice != 'unknown' and isinstance(lattice, str), \ "Lattice is not given or assigned to wrong type" lattice_param = sim_params.get('lattice_param', 'unknown') assert lattice_param != 'unknown' and lattice_param > 0, \ "Lattice parameter is not given or assigned to a wrong value" l_x_npt = sim_params.get('l_x_npt', 10) l_y_npt = sim_params.get('l_y_npt', 10) l_z_npt = sim_params.get('l_z_npt', 10) l_x_nph = sim_params.get('l_x_nph', 10) l_y_nph = sim_params.get('l_y_nph', 10) l_z_nph = sim_params.get('l_z_nph', 70) timestep = sim_params.get('timestep', 0.001) order_parameter = sim_params.get('order_parameter', 6) q_num_neigh = sim_params.get('q_num_neigh', 'unknown') assert q_num_neigh != 'unknown' and q_num_neigh > 0, \ "NNN for q parameter is not given or assigned to a wrong value" npt_steps = sim_params.get('npt_steps', 500000) nph_steps = sim_params.get('nph_steps', 100000) if self.random_seed_use is True: random_seed = random.randint(1, 10000) else: random_seed = 999 input_args = { 'temperature': temp, 'pressure': press, 'ice_temp': ice_temp, 'below_temp_diff': below_temp_diff, 'above_temp_diff': above_temp_diff, 'farabove_temp_diff': farabove_temp_diff, 'units': units, 'atom_style': atom_style, 'seed': random_seed, 'pair_style': pair_style, 'potential': potential, 'element': element, 'mass': mass, 'lattice': lattice, 'lattice_param': lattice_param, 'l_x_npt': l_x_npt, 'l_y_npt': l_y_npt, 'l_z_npt': l_z_npt, 'l_x_nph': l_x_nph, 'l_y_nph': l_y_nph, 'l_z_nph': l_z_nph, 'timestep': timestep, 'order_parameter': order_parameter, 'q_num_neigh': q_num_neigh, 'npt_steps': npt_steps, 'nph_steps': nph_steps } init_config_args = { 'make_config': self.init_config_use, 'config_handle': self.init_config, 'storage': 'path', 'random_seed': random_seed } if 'NPT' in sim_path: self.npt_job_details[ 'input_file_name'] = self.input_template_npt.rsplit('/', 1)[1] calc_id = self.built_simulator_npt.run( sim_path, self.model_path, input_args, init_config_args, workflow=workflow, job_details=self.npt_job_details, ) elif 'NPH' in sim_path: self.nph_job_details[ 'input_file_name'] = self.input_template_nph.rsplit('/', 1)[1] calc_id = self.built_simulator_nph.run( sim_path, self.model_path, input_args, init_config_args, workflow=workflow, job_details=self.nph_job_details, ) else: self.logger.info('Ensemble type has not been implemented ' 'for melting point calculations') exit() return calc_id
[docs] def calculate_with_error( self, n_calc: int, modified_params: Optional[Dict[str, Any]] = None, potential: Optional[Union[str, Potential]] = None, workflow: Optional[Workflow] = None, ): """ Calculate a target property with mean and standard deviation Mean and standard deviation will be obtained from multiple number of calculations (n_calc) :param n_calc: total number of calculations to perform :type n_calc: int :param modified_params: simulation parameters to modify the initially provided values, including interatomic potentials, simulation temperature and pressure :type modified_params: dict :param potential: interatomic potential to be used in LAMMPS :type potential: str :param workflow: the workflow for managing job submission :type workflow: Workflow :returns: dictionary with the average final temperature that results in solid/liquid phases as the property_value, std from the n_calc outputs for the property_std, and a tuple with no NPT and all the final NPH calculation IDs as the calc_ids :rtype: dict """ if workflow is None: workflow = self.default_wf if not self.random_seed_use: self.random_seed_use = True self.logger.warning('random_seed_use was false, but is required ' 'for calculate_with_error(). Setting to True') melt_temp_ave = None melt_temp_std = None n_calc = self.num_calculations # if progress had been made, populate these values from restart dict # otherwise set to default/starting values start_iteration = self.current_state.get('iter_num', 0) calc_id_nph_list = self.current_state.get('calc_list', []) melt_temp_list = self.current_state.get('temp_list', []) if self.progress_flag == 'error_loop': # we're restarting without any outstanding calculate_property # running, so we don't want to have it try to restart from the last # calculation it saved self.restart = False for i in range(start_iteration, n_calc): results_dict = self.calculate_property( iter_num=i, modified_params=None, potential=None, workflow=workflow, ) melt_temp = results_dict['property_value'] melt_std = results_dict['property_std'] c_ids = results_dict['calc_ids'] if isinstance(melt_temp, float): melt_temp_list.append((melt_temp, melt_std)) # note the final calculation from NPH runs calc_id_nph_list.append(c_ids[1][-1]) self.progress_flag = 'error_loop' self.npt_calcs = None self.nph_calc = None # don't need to merge state dict with conduct_sim quantities self.current_state = { 'temp_list': melt_temp_list, 'calc_list': calc_id_nph_list, 'iter_num': i, } self.checkpoint_property() raise Exception('try restart from here') self.logger.info(f'Final melt/std list is: {melt_temp_list}') if len(melt_temp_list) > 1: melt_temp_ave = np.mean([t[0] for t in melt_temp_list]) melt_temp_std = np.std([t[0] for t in melt_temp_list]) else: self.logger.info( 'Number of successful melting temperature calculations is ' 'less than two. Mean and std cannot be estimated') exit() self.logger.info( f'Ave melting temp:{melt_temp_ave} with std:{melt_temp_std}') self.progress_flag = 'init' self.npt_calcs = None self.nph_calc = None self.current_state = {} self.checkpoint_property() # return results results_dict = { 'property_value': melt_temp_ave, 'property_std': melt_temp_std, 'calc_ids': ([], calc_id_nph_list), } return results_dict
[docs] def save_configurations( self, calc_ids: Union[int, List[int]], storage: Storage, dataset_handle: str, workflow: Workflow, ) -> str: """ save configurations generated by the melting point module :param path_ids: single or list of ``calc_ids`` associated with simulator jobs. The path is extracted from the :class:`~orchestrator.workflow.workflow_base.JobStatus`. :type path_ids: list of int or int :param storage: storage module that hosts the dataset :type storage: Storage :param dataset_handle: handle for the dataset where configs will be stored :type dataset_handle: str :param workflow: the workflow that managed job submission for the provided calc_id :type workflow: Workflow :returns: updated dataset_handle including the new configurations :rtype: str """ # the save configuration simulator method is agnostic to the input # template, so nph/npt will not make a difference return self.built_simulator_nph.save_configurations( calc_ids, storage, dataset_handle, workflow, )
[docs] def check_density( self, phase: str, density_npt: float, exp_den_solid: float, exp_den_liquid: float, den_tol: float, calc_id: int, ) -> None: """ check if density is within an expected range during npt simulations :param phase: phase detected in the simulation (e.g. solid, liquid) :type phase: str :param density_npt: density calculated from the npt simulations :type density_npt: float :param exp_den_solid: expected density of the material from the experiments. This does not need to be exact value, especially if the experimental value is not available :type exp_den_solid: float :param exp_den_liquid: expected density of the material from the experiments. This does not need to be exact value, especially if the experimental value is not available :type exp_den_liquid: float :param den_tol: density tolerance to allow calculated density to deviate from the experimental value :type den_tol: float :param calc_id: calculation id associated with a simulator job :type calc_id: int """ if phase == 'solid': if (density_npt < exp_den_solid + den_tol) and (density_npt > exp_den_solid - den_tol): self.logger.info( f'Solid density ({density_npt} g/cm^3) is in expected' f' range, continue melting point calculations ') else: self.failed_job_id = calc_id raise DensityOOBError( 'density for solid phase is out of expected range') elif phase == 'liquid': if (density_npt < exp_den_liquid + den_tol) and (density_npt > exp_den_liquid - den_tol): self.logger.info( f'Liquid density ({density_npt} g/cm^3) is in expected' f' range, continue melting point calculations ') else: self.failed_job_id = calc_id raise DensityOOBError( 'density for liquid phase is out of expected range') else: raise AnalysisError( 'No phase has been detected, please check msd analysis')
[docs] @staticmethod def sample_configs( beg: int, end: int, step: int, elements_list: list[str], traj_name: Optional[str] = 'dump.lammpstrj', calc_ids: Optional[list[int]] = None, workflow: Optional[Workflow] = None, in_paths: Optional[list[str]] = None, ) -> list[Atoms]: """ sample configurations from the NPT or NPH simulations :param beg: first frame of the trajectory to start sampling :type beg: int :param end: final frame of the trajectory for sampling :type end: int :param step: frequency of sampling configurations from beg to end :type step: int :param elements_list: list of elements in same order that they were passed to the simulator. Used to correctly assign chemical IDs to extracted ASE Atoms :type elements_list: list of str :param traj_name: name of the trajectory used for sampling |default| 'dump.lammpstrj' :type traj_name: str :param calc_ids: list of ``calc_ids`` associated with simulator jobs. The path is extracted from the :class:`~orchestrator.workflow.workflow_base.JobStatus`. |default| ``None`` :type calc_ids: list of int :param workflow: the workflow that managed job submission for the provided calc_id. Required if calc_ids are given. |default| ``None`` :type workflow: Workflow :param in_paths: list of paths that include trajectories to use for sampling. This option can be used in case ``calc_ids`` are not available. If used, workflow does not need to be provided. |default| ``None`` :type in_paths: list of str :returns: a list of ASE Atoms objects :rtype: list """ atoms_list = [] if calc_ids is None: for path in in_paths: traj = join(path, traj_name) atoms = ase.io.read(traj, index=f'{beg}:{end}:{step}') atoms_list.extend(atoms) else: for calc_id in calc_ids: path = workflow.get_job_path(calc_id) traj = join(path, traj_name) atoms = ase.io.read(traj, index=f'{beg}:{end}:{step}') atoms_list.extend(atoms) for config in atoms_list: atom_ids = config.get_atomic_numbers() atom_labels = [elements_list[i - 1] for i in atom_ids] config.set_chemical_symbols(atom_labels) return atoms_list