from os import system, path
import numpy as np
from typing import Optional, Union
from ..storage.storage_base import Storage
from ..potential.potential_base import Potential
from ..workflow.workflow_base import Workflow
from ase import Atoms
from kliff.dataset.dataset import DatasetError # temporary
from .trainer_base import Trainer
from fitsnap3lib.scrapers.ase_funcs import get_apre
from ..utils.data_standard import (
ENERGY_KEY,
FORCES_KEY,
STRESS_KEY,
FORCES_WEIGHTS_KEY,
)
[docs]
class FitSnapTrainer(Trainer):
"""
Train and deploy a potential using FitSnap
The trainer class is responsible for handling the loading/assignment of
training data, as well as the actual process of training a potential.
This trainer is intended to be used with Snap model trained with ASE
training data.
"""
[docs]
def __init__(self, **kwargs):
"""
Train and deploy a general parametric model potential using FitSnap
"""
super().__init__(**kwargs)
# arguments to reinitialize an instance of the trainer
self.trainer_init_args = kwargs
[docs]
def checkpoint_trainer(self):
"""
checkpoint the trainer module into the checkpoint file
save necessary internal variables into a dict with key checkpoint_name
and write to the (json) checkpoint file for restart capabilities
"""
pass
[docs]
def restart_trainer(self):
"""
restart the trainer module from the checkpoint file
check if the checkpoint_file has an entry matching the checkpoint_name
and set internal variables accordingly if so
"""
pass
def _get_training_data(
self,
dataset_handle: str,
storage: Storage,
) -> list[Atoms]:
"""
Get the training data configurations
Retrieve the dataset specified by dataset_handle from the passed
storage module.
:param dataset_handle: the identifier of the dataset to extract from
the storage module
:type dataset_handle: str
:param storage: storage instance where the training data is saved
:type storage: Storage
:returns: training data of configurations
:rtype: ASE Dataset
"""
self.logger.info('Reading training data from storage')
try:
training_set = storage.get_data(dataset_handle)
except DatasetError:
print(('Storage module is not properly set. Cannot '
'get training data from Storage.'))
exit()
for c in training_set:
try:
c.info[ENERGY_KEY] = c.get_potential_energy()
except Exception:
pass
try:
c.info[STRESS_KEY] = c.get_stress()
except Exception:
pass
try:
c.set_array(FORCES_KEY, c.get_forces())
except Exception:
pass
return training_set
def _collect_weights(
self,
atoms: Atoms,
) -> np.ndarray:
"""
Function to collect per-atom weight data from ASE atoms objects.
:param atoms: ASE atoms object for a single configuration of atoms.
:type atoms: Atoms
:returns: a weight np array for a single configuration.
:rtype: np.ndarray
"""
return atoms.info[FORCES_WEIGHTS_KEY]
def _convert_to_3x3_stress_tensor(self,
stress_vector: np.ndarray) -> np.ndarray:
"""
Helper function to convert the (6,) stress vector to 3x3 expected by
FitSNAP
:param stress_vector: 6 stress components (Voigt notation)
:type stress_vector: np.ndarray
:returns: transformed matrix in full 3x3 format
:rtype: np.ndarray
"""
return np.array([
[stress_vector[0], stress_vector[5], stress_vector[4]],
[stress_vector[5], stress_vector[1], stress_vector[3]],
[stress_vector[4], stress_vector[3], stress_vector[2]],
])
def _collate_fitsnap_data(
self,
atoms: Atoms,
eweight: float,
fweight: float,
vweight: float,
) -> dict:
"""
Function to organize fitting data for FitSNAP from ASE atoms objects.
Args:
atoms: ASE atoms object for a single configuration of atoms.
Returns data dictionary in FitSNAP format for a single configuration.
"""
# Transform ASE cell to be appropriate for LAMMPS.
apre = get_apre(cell=atoms.cell)
r = np.dot(np.linalg.inv(atoms.cell), apre)
positions = np.matmul(atoms.get_positions(), r)
cell = apre.T
# Make a data dictionary for this config.
data = {}
data['Group'] = None
data['File'] = None
data['Stress'] = np.array(atoms.info[STRESS_KEY])
if data['Stress'].shape[0] == 6:
data['Stress'] = self._convert_to_3x3_stress_tensor(data['Stress'])
elif data['Stress'].shape != (3, 3):
raise ValueError('Stress tensor not supplied as 6, or 3x3 formats')
data['Positions'] = positions
data['Energy'] = atoms.info[ENERGY_KEY]
data['AtomTypes'] = atoms.get_chemical_symbols()
data['NumAtoms'] = len(atoms)
data['Forces'] = atoms.arrays[FORCES_KEY]
data['QMLattice'] = cell
data['test_bool'] = 0
data['Lattice'] = cell
data['Rotation'] = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
data['Translation'] = np.zeros((len(atoms), 3))
# Inject the weights.
data['eweight'] = eweight
data['fweight'] = fweight
data['vweight'] = vweight
return data
def _write_training_script(
self,
save_path: str,
dataset_list: list,
potential: Potential,
storage: Storage,
eweight: float = 1.0,
fweight: float = 1.0,
vweight: float = 1.0,
per_atom_weights: Union[bool, np.ndarray, str] = False,
upload_to_kimkit=True,
) -> str:
"""
Write a script to run the trainer outside of memory
This is a helper function for generating a script, training_script.py,
which can be executed via a workflow or offline. It additionally saves
needed additional files with it, such as a weights.txt data file.
:param save_path: path where the training script will be written
:type save_path: str
:param dataset_list: list of dataset handles which should be used for
the training procedure
:type dataset_list: list of str
:param potential: Potential instance to be trained, expect its
pre-trained state to be written to save_path/potential_to_train.pkl
:type potential: Potential
:param storage: an instance of the storage class, which contains the
datasets in dataset_list
:type storage: Storage
:returns: name of the script that is generated (training_script.py)
:param eweight: weight of energy data in the loss function
:type eweight: float
:param fweight: weight of the force data in the loss function
:type fweight: float
:param vweight: weight of the stress data in the loss function
:type vweight: float
:param per_atom_weights: True to read from dataset, or numpy array,
or a str for a numpy.loadtxt compatible filepath
|default| ``False``
:type per_atom_weights: either boolean or np.ndarray
:param upload_to_kimkit: True to upload to kimkit repository
:type upload_to_kimkit: bool
:returns: the name of the execution script
:rtype: str
"""
full_save_path = path.abspath(save_path)
import_lines = ('from orchestrator.utils.setup_input import '
'init_and_validate_module_type\n')
trainer_dict = {
'trainer_type': self.factory_token,
'trainer_args': self.trainer_init_args
}
init_trainer = ('trainer = init_and_validate_module_type("trainer", '
f'{trainer_dict}, single_input_dict=True)')
storage_dict = {
'storage_type': storage.factory_token,
'storage_args': storage.storage_init_args
}
init_storage = ('storage = init_and_validate_module_type("storage", '
f'{storage_dict}, single_input_dict=True)')
potential_dict = {
'potential_type': potential.factory_token,
'potential_args': potential.trainer_args
}
root_input = potential_dict['potential_args']['settings_path']
abs_input = path.abspath(root_input)
potential_dict['potential_args']['settings_path'] = abs_input
init_potential = ('potential = init_and_validate_module_type('
f'"potential", {potential_dict}, '
'single_input_dict=True)\n')
load_potential = "potential.build_potential()"
# per_atom_fit can be boolean, np.ndarray (or list), or str
# if list, convert to a np.ndarray first
if type(per_atom_weights) is list:
per_atom_weights = np.array(per_atom_weights)
# if bool, just pass along. The info is in storage object
if type(per_atom_weights) is bool:
per_atom_weights_for_script = per_atom_weights
# if str, collect file; must be a viable np.loadtxt() file path
# viability not checked during this step
elif type(per_atom_weights) is str:
standard_weights_path = path.abspath(
f'{full_save_path}/weights.txt')
# if located elsewhere, collect to current directory
current_abs_path = path.abspath(per_atom_weights)
if current_abs_path != standard_weights_path:
system(f'cp {current_abs_path} {standard_weights_path}')
per_atom_weights_for_script = "weights.txt"
# if np.array/list, save to the training directory and pass str
elif type(per_atom_weights) is np.ndarray:
np.savetxt(f'{full_save_path}/weights.txt', per_atom_weights)
per_atom_weights_for_script = "weights.txt"
else:
raise TypeError('per_atom_weights not a supported type!')
# Currently uses the workflow from trainer, not submit_train's input
construct_and_train = (
f'snap, errors = trainer.train(path_type="{full_save_path}",'
f'potential=potential,'
f'storage=storage,'
f'dataset_list={dataset_list},'
f'eweight={eweight},'
f'fweight={fweight},'
f'vweight={vweight},'
'write_training_script=False,')
if type(per_atom_weights_for_script) is str:
construct_and_train += (
f'per_atom_weights="{per_atom_weights_for_script}",')
else:
construct_and_train += (
f'per_atom_weights={per_atom_weights_for_script},')
construct_and_train += (f'upload_to_kimkit={upload_to_kimkit})')
script = '\n'.join([
import_lines,
init_trainer,
init_storage,
init_potential,
load_potential,
construct_and_train,
])
with open(f'{full_save_path}/training_script.py', 'w') as fout:
fout.write(script)
return 'training_script.py'
[docs]
def train(
self,
path_type: str,
potential: Potential,
storage: Storage,
dataset_list: list,
workflow: Optional[Workflow] = None,
eweight: float = 1.0,
fweight: float = 1.0,
vweight: float = 1.0,
per_atom_weights: Union[bool, np.ndarray, str] = False,
write_training_script: bool = True,
upload_to_kimkit=True,
) -> list:
"""
Train a Snap potential using FitSnap
This is the main method of the trainer class, and uses the parameters
supplied in the FitSnap settings file to perform the potential training
:param path_type: if write_training_script=True, specifier for the
workflow path, to differentiate training runs; else, the raw
path to save files
:type path_type: str
:param potential: :class:`~orchestrator.potential.fitsnap.
FitSnapPotential` class object containing fitsnap instance
:type potential: fitsnap instance
:param storage: an instance of the storage class
:type storage: Storage
:dataset_list: the list of dataset_handles (e.g. collabfit-IDs)
within the storage object to use as the dataset.
:type dataset_list: list
:param workflow: the workflow for managing path definition and job
submission, if none are supplied, will use the default workflow
defined in this class |default| ``None``
:type workflow: Workflow
:param eweight: weight of energy data in the loss function
:type eweight: float
:param fweight: weight of the force data in the loss function
:type fweight: float
:param vweight: weight of the stress data in the loss function
:type vweight: float
:param per_atom_weights: True to read from dataset, or numpy array,
or a str for a numpy.loadtxt compatible filepath
|default| ``False``
:type per_atom_weights: either boolean or np.ndarray
:param write_training_script: True to write a training script in the
workflow created directory |default| ``True``;
This is expected to always be left on if not being called by a
submit_train() workflow!
:type write_training_script: bool
:param upload_to_kimkit: True to upload to kimkit repository
:type upload_to_kimkit: bool
:returns: trained model, error metrics
:rtype: fitsnap instance, fitsnap error attribute
"""
# reset parameter_path for new training
potential.parameter_path = None
if dataset_list is None or storage is None:
raise ValueError('A storage object and list of dataset handles'
' are required!')
if not isinstance(dataset_list, list):
dataset_list = [dataset_list]
combined_dataset = []
for dataset_handle in dataset_list:
configs = self._get_training_data(dataset_handle, storage)
combined_dataset.extend(configs)
snap = potential.model
snap.data = [
self._collate_fitsnap_data(atoms, eweight, fweight, vweight)
for atoms in combined_dataset
]
self.logger.info(f"Found {len(snap.data)} configurations")
# for tracking files to upload to kimkit later
include_weights_file = False
# per_atom_fit can be boolean, np.array (or list), or str
# data (numpy array/list) be used directly
# filepath (str) will numpy.loadtxt() the data
# True will load data that exists in storage for the dataset
if type(per_atom_weights) is bool:
if per_atom_weights:
per_atom_fit = True
weights_lists = [
self._collect_weights(atoms) for atoms in combined_dataset
]
weights = np.array(
[elem for list in weights_lists for elem in list])
else:
per_atom_fit = False
elif type(per_atom_weights) is str:
per_atom_fit = True
weights = np.loadtxt(per_atom_weights)
if per_atom_weights == 'weights.txt':
include_weights_file = True
elif type(per_atom_weights) is list:
per_atom_fit = True
weights = np.array(per_atom_weights)
elif type(per_atom_weights) is np.ndarray:
per_atom_fit = True
weights = per_atom_weights
else:
raise TypeError('per_atom_weights not a supported type!')
# Calculate descriptors for all configurations
snap.process_configs()
# if weighted fitting activated by data input or True boolean,
# overwrite the weight matrix with a custom-defined one
if per_atom_fit:
row_types = snap.pt.fitsnap_dict['Row_Type']
manually_created_w_array = np.zeros(
len(snap.pt.shared_arrays['w'].array))
force_rows = [
True if row == 'Force' else False for row in row_types
]
assert (len(weights) * 3) == sum(force_rows), \
f"{len(weights)} weights given, need {sum(force_rows) / 3}"
energy_rows = [
True if row == 'Energy' else False for row in row_types
]
stress_rows = [
True if row == 'Stress' else False for row in row_types
]
manually_created_w_array[
energy_rows] = eweight # all identical currently
manually_created_w_array[force_rows] = fweight * \
np.array([val for val in weights.tolist() for i in range(3)])
manually_created_w_array[
stress_rows] = vweight # all identical currently
snap.pt.shared_arrays['w'].array = manually_created_w_array
# Perform the fit
snap.solver.perform_fit()
# Analyze error metrics
snap.solver.error_analysis()
# This should be superfluous
potential.model = snap
# write equivalent training script for documentation
# the only time this should NOT be the case is when train is being
# called from a training_script
if write_training_script:
# for normal training we need to make a path to save to
if workflow is None:
workflow = self.default_wf
save_path = workflow.make_path(self.__class__.__name__, path_type)
# Output the weights into a datafile if needed
# If True/False, can just assume storage holding weights is enough
if type(per_atom_weights) is bool:
per_atom_weights_for_script = per_atom_weights
# Otherwise, pre-emptively save the weights with a standard name,
# just pass the filename to _write_training_script.
elif type(per_atom_weights) in [str, list, np.ndarray]:
np.savetxt(f'{save_path}/weights.txt', weights)
per_atom_weights_for_script = f"{save_path}/weights.txt"
else:
raise TypeError(
'per_atom_weights: How did this not TypeError earlier?')
if per_atom_weights_for_script == 'weights.txt':
include_weights_file = True
# Output a training script equivalent to what was performed
_ = self._write_training_script(
save_path,
dataset_list,
potential,
storage,
eweight,
fweight,
vweight,
per_atom_weights_for_script,
)
# just use the path_type raw as the location to save the files
else:
save_path = path_type
# Finally output the model files
_ = self._save_model(
save_path,
potential,
potential_name='fitsnap_potential',
loss=snap.solver.errors,
create_path=False,
workflow=workflow,
)
# TODO: allow specifying auxiliary files to attach to upload?
if upload_to_kimkit:
training_files = [f'{save_path}/training_script.py']
if include_weights_file is True:
training_files.append(f'{save_path}/weights.txt')
_ = potential.save_potential_files(work_dir=save_path,
training_files=training_files,
import_to_kimkit=True,
write_to_tmp_dir=False)
return snap, snap.solver.errors
[docs]
def submit_train(
self,
path_type: str,
potential: Potential,
storage: Storage,
dataset_list: list,
workflow: Workflow,
job_details: dict,
eweight: float = 1.0,
fweight: float = 1.0,
vweight: float = 1.0,
per_atom_weights: Union[bool, np.ndarray, str] = False,
upload_to_kimkit=True,
) -> int:
"""
Asychronously train the potential based on the trainer details
This is a main method of the trainer class, and uses the parameters
supplied at instantiation to perform the potential training by
minimizing a loss function. While :meth:`train` works synchronously,
this method submits training to a job scheduler.
:param path_type: specifier for the workflow path, to differentiate
training runs
:type path_type: str
:param potential: potential to be trained. The actual model itself is
set as an attribute of the Potential object
:type potential: Potential
:param storage: an instance of the storage class
:type storage: Storage
:dataset_list: the list of dataset_handles (e.g. collabfit-IDs)
within the storage object to use as the dataset.
:type dataset_list: list
:param workflow: the workflow for managing path definition and job
submission, if none are supplied, will use the default workflow
defined in this class
:type workflow: Workflow
:param job_details: job parameters such as walltime or # of nodes
:type job_details: dict
:param eweight: weight of energy data in the loss function
:type eweight: float
:param fweight: weight of the force data in the loss function
:type fweight: float
:param vweight: weight of the stress data in the loss function
:type vweight: float
:param per_atom_weights: True to read from dataset, or numpy array,
or a str for a numpy.loadtxt compatible filepath
|default| ``False``
:type per_atom_weights: either boolean or np.ndarray
:param upload_to_kimkit: True to upload to kimkit repository
:type upload_to_kimkit: bool
:returns: calculation ID of the submitted job
:rtype: int
"""
# reset parameter_path for new training
potential.parameter_path = None
potential.trainer_args['parameter_path'] = None
if dataset_list is None or storage is None:
raise ValueError('A storage object and list of dataset handles'
' are required!')
if not isinstance(dataset_list, list):
dataset_list = [dataset_list]
save_path = workflow.make_path(self.__class__.__name__, f'{path_type}')
script_filename = self._write_training_script(
save_path,
dataset_list,
potential,
storage,
eweight,
fweight,
vweight,
per_atom_weights=per_atom_weights,
upload_to_kimkit=upload_to_kimkit)
job_details['custom_preamble'] = 'python'
calc_id = workflow.submit_job(
script_filename,
save_path,
job_details=job_details,
)
return calc_id
def _save_model(
self,
path_type: str,
potential: Potential,
potential_name: str = 'fitsnap_potential',
loss: Optional[None] = None,
create_path: bool = True,
workflow: Optional[Workflow] = None,
) -> str:
"""
Output FitSnap model files. Write error metric and LAMMPS input files
If the potential.parameter_path is not already set, this writes the
model coefficient and parameter (and error summary) files to disk
from memory using the FitSnap infrastructure, then sets
potential.parameter_path to the file location. If the parameter_path
is already set, copies the files to the new location and updates
the potential.parameter_path.
:param path_type: specifier for the workflow path, to differentiate
training runs and where the model will be saved
:type path_type: str
:param potential: potential to be saved
:type potential: Snap potential
:param potential_name: name to save the potential as
|default| 'fitsnap_potential'
:type potential_name: str
:param loss: FitSNAP error object; this can but probably should not
be supplied by the user
:type loss: FitSNAP error
:param create_path: if the function needs to create a new path, or if
path_type should be used as the full path |default| ``True``
:type create_path: boolean
:param workflow: the workflow for managing path definition, if none are
supplied, will use the default workflow defined in this class
|default| ``None``
:type workflow: Workflow
:returns: path where the model is saved (inclusive)
:rtype: str
"""
if workflow is None:
workflow = self.default_wf
if create_path:
save_path = workflow.make_path(self.__class__.__name__, path_type)
else:
save_path = path_type
if potential.parameter_path is not None:
# potential._write_potential_to_file(save_path)
_ = potential.save_potential_files(work_dir=save_path,
import_to_kimkit=False,
write_to_tmp_dir=False)
return potential.parameter_path
else: # first save after a local train()
self.logger.info(f'Saving model state in {save_path}')
snap = potential.model
vars(snap.config.sections['OUTFILE'])['potential_name'] = \
save_path + '/' + potential_name
vars(snap.config.sections['OUTFILE'])['metric_file'] = \
save_path + '/' + potential_name + '.md'
fit_coefficients = snap.solver.fit
if loss is None: # loss should probably not be supplied by user
errors = snap.solver.errors
else:
errors = loss
snap.output.output(fit_coefficients, errors)
# for compatibility with older versions of LAMMPS and KIM drivers
system('sed -i "s/switchinnerflag 0/# switchinnerflag 0 commented '
f'by orchestrator/" {save_path}/{potential_name}.snapparam')
# for compatibility with KIM model driver SNAP__MD_536750310735_000
# possibility of training a simulator model and reloading a
# portable model that can't handle these settings flags, but
# I think build_potential() should catch the settings issue
if potential.kim_item_type == "portable-model":
if potential.model_driver == "SNAP__MD_536750310735_000":
system(
'sed -i "s/wselfallflag 0/# wselfallflag 0 commented '
'by orchestrator/"'
f' {save_path}/{potential_name}.snapparam')
system('sed -i "s/chemflag 0/# chemflag 0 commented '
'by orchestrator/"'
f' {save_path}/{potential_name}.snapparam')
system('sed -i "s/bnormflag 0/# bnormflag 0 commented '
'by orchestrator/"'
f' {save_path}/{potential_name}.snapparam')
potential.parameter_path = f'{save_path}/{potential_name}'
potential.training_hash = snap.config.hash
self.logger.info(f'Output fitsnap files with Hash: '
f'{potential.training_hash} at location: '
f'{potential.parameter_path}')
return f'{save_path}/{potential_name}'
[docs]
def load_from_submitted_training(
self,
calc_id: int,
potential: Potential,
workflow: Workflow,
):
"""
reload a potential that was trained via a submitted job
:param calc_id: calculation ID of the submitted training job
:type calc_id: int
:param potential: :class:`~orchestrator.potential.dnn.KliffBPPotential`
class object that will be updated with the model saved to disk
after the training job.
:type potential: KliffBPPotential
:param workflow: the workflow for managing path definition and job
submission, if none are supplied, will use the default workflow
defined in this class |default| ``None``
:type workflow: Workflow
"""
workflow.block_until_completed(calc_id)
if potential.name is not None:
potential_name = potential.name
else:
potential_name = "fitsnap_potential"
parameter_path = workflow.get_job_path(calc_id) + '/' + potential_name
potential.parameter_path = parameter_path
self.logger.info(f'Loading potential from: {parameter_path}')