Source code for orchestrator.trainer.kliff.kliff

import pathlib
import shutil
from glob import glob
from ase import Atoms
import yaml
from kliff.legacy.loss import Loss
from ..trainer_base import Trainer
from ...utils.data_standard import (
    ENERGY_WEIGHT_KEY,
    FORCES_WEIGHTS_KEY,
)
from typing import Optional, Union, Dict, Any, List
import numpy as np
from ...potential.potential_base import Potential
from ...workflow.workflow_base import Workflow

from typing import TYPE_CHECKING
if TYPE_CHECKING:  # avoid circular imports
    from orchestrator.storage.storage_base import Storage
    from pathlib import Path


[docs] class KLIFFTrainer(Trainer): """ Train and deploy a potential using KLIFF The trainer class is responsible for handling the loading/assignment of training data, as well as the actual process of training a potential. One should use specific subclasses of KLIFFTrainer instead of this base class. :param training_split: Fraction of the dataset to be allocated for training (e.g., 0.8 for 80%). Defaults to 0.8. :type training_split: float :param loss_method: The type of loss function to be used during training (e.g., "mse" for mean squared error). :type loss_method: str :param max_evals: Maximum number of evaluations (e.g., iterations or function calls) for the optimizer. Defaults to 1000. :type max_evals: int :param optimization_method: The optimization algorithm to employ for training the potential (e.g., "L-BFGS-B", "Adam") :type optimization_method: str :param scratch: Path to a directory for storing temporary or scratch files during training. If None, it defaults to './scratch_kliff' within the execution directory. :type scratch: str, optional :param kwargs: Arbitrary keyword arguments that may be used by specific subclasses or for advanced configuration options. :type kwargs: dict """
[docs] def __init__( self, training_split: float = 0.8, loss_method: str = "mse", max_evals: int = 1000, optimization_method: str = "L-BFGS-B", scratch: str = None, **kwargs, ): """ set variables and initialize the recorder and default workflow :param training_split: Fraction of the dataset to be allocated for training (e.g., 0.8 for 80%). Defaults to 0.8. :type training_split: float :param loss_method: The type of loss function to be used during training (e.g., "mse" for mean squared error). :type loss_method: str :param max_evals: Maximum number of evaluations (e.g., iterations or function calls) for the optimizer. Defaults to 1000. :type max_evals: int :param optimization_method: The optimization algorithm to employ for training the potential (e.g., "L-BFGS-B", "Adam") :type optimization_method: str :param scratch: Path to a directory for storing temporary or scratch files during training. If None, it defaults to './scratch_kliff' within the execution directory. :type scratch: str, optional :param kwargs: Arbitrary keyword arguments that may be used by specific subclasses or for advanced configuration options. :type kwargs: dict """ self.loss_method = loss_method self.max_evals = max_evals self.optimization_method = optimization_method self.training_split = training_split if scratch is None: self.scratch = './scratch_kliff' # arguments to reinitialize an instance of the trainer self.trainer_init_args = { 'loss_method': loss_method, 'max_evals': max_evals, 'optimization_method': optimization_method, 'scratch': scratch, } super().__init__(**kwargs)
def _get_training_data( self, dataset_handle: str, storage: "Storage", ) -> List[Atoms]: """ Get the training data configurations Retrieve the dataset specified by dataset_handle from the passed storage module. This dataset can be augmented or otherwise modified (i.e. adding weights) as necessary for training. :param dataset_handle: the identifier of the dataset to extract from the storage module :type dataset_handle: str :param storage: storage instance where the training data is saved :type storage: Storage :returns: training data of configurations :rtype: KliFF Dataset """ self.logger.info('Reading training data from storage') # TODO: direct initialization of ds from colabfit try: training_set = storage.get_data(dataset_handle) except Exception as e: # catch all? print(('Storage module is not properly set. Cannot ' 'get training data from Storage.')) print(f'Caught this error {e}') raise e return training_set def _save_model( self, path_type: Union[str, "Path"], potential: Potential, potential_name: str = 'kim_potential', loss: Optional[Loss] = None, create_path: bool = True, workflow: Optional[Workflow] = None, ) -> str: """ Deploy a KIM model. Writes KIM potential to Current Working Directory Write the model (and loss) data to disk from memory using the KIM/KliFF infrastructure. Written pkl files can be directly loaded, while the installed version of the potential can be run using KIM libraries. :param path_type: specifier for the workflow path, to differentiate training runs and where the model will be saved :type path_type: str :param potential: potential to be saved; one of :class:`~orchestrator.potential.dnn.KliffBPPotential` ,:class:`~orchestrator.potential.kim.KIMPotential` :type potential: KliffBPPotential or KIMPotential or a kliff model :param potential_name: name to save the potential as |default| 'kim_potential' :type potential_name: str :param loss: loss object to save, optional. Used if potential is a pytorch model. |default| ``None`` :type loss: Loss (kliff.loss) :param create_path: if the function needs to create a new path, or if path_type should be used as the full path |default| ``True`` :type create_path: boolean :param workflow: the workflow for managing path definition, if none are supplied, will use the default workflow defined in this class |default| ``None`` :type workflow: Workflow :returns: path where the model is saved (inclusive) :rtype: str """ if workflow is None: workflow = self.default_wf if create_path: save_path = workflow.make_path(self.__class__.__name__, path_type) else: save_path = path_type self.logger.info(f'Saving model state in {save_path}') # potential._write_potential_to_file(f'{save_path}/final_model') _ = potential.save_potential_files(work_dir=f'{save_path}/final_model', import_to_kimkit=False, write_to_tmp_dir=False) try: if potential.model.is_torch and loss is not None: loss.save_optimizer_state(f'{save_path}/optimizer_state.pkl') except AttributeError: # non torch KIMModel pass potential.install_potential_in_kim_api(save_path=save_path, potential_name=potential.kim_id, install_locality='CWD') # clean up any default files if present. # Usually after the training you might have kliff.log, *.pkl, and # kliff_saved_model folder. fully_qualified_save_path = f'{save_path}/{potential_name}' # kliff log file try: shutil.move("kliff.log", fully_qualified_save_path) except Exception as err: self.logger.info(f'Failed to move kliff.log: {err}' ' May be this file does not exist?') # any saved fingerprints try: pkl_files = glob("finger*.pkl") for file in pkl_files: shutil.move(file, fully_qualified_save_path) except Exception as err: self.logger.info(f'Failed to move {pkl_files}: {err}' ' May be no pkl files exist?') # kliff_saved_model folder try: shutil.move("kliff_saved_model", fully_qualified_save_path) except Exception as err: self.logger.info(f'Failed to move kliff_saved_model: {err}' ' May be this folder does not exist?') return fully_qualified_save_path
[docs] def train( self, path_type: str, potential: Potential, storage: "Storage", dataset_list: list, workflow: Optional[Workflow] = None, eweight: float = 1.0, fweight: float = 1.0, vweight: float = 1.0, per_atom_weights: bool = False, upload_to_kimkit=True, ) -> tuple[Potential, Loss]: """ Train the potential based on the specific trainer details KLIFFTrainer should not be used for training, it is a parent class to specific implementations :param path_type: specifier for the workflow path, to differentiate training runs :type path_type: str :param potential: potential to be trained. The actual model itself is set as an attribute of the Potential object :type potential: Potential :param storage: an instance of the storage class :type storage: Storage :dataset_list: the list of dataset_handles (e.g. collabfit-IDs) within the storage object to use as the dataset. :type dataset_list: list :param workflow: the workflow for managing path definition and job submission, if none are supplied, will use the default workflow defined in this class |default| ``None`` :type workflow: Workflow :param eweight: weight of energy data in the loss function :type eweight: float :param fweight: weight of the force data in the loss function :type fweight: float :param vweight: weight of the stress data in the loss function :type vweight: float :param per_atom_weights: True to read from dataset, |default| ``False`` :type per_atom_weights: bool :param upload_to_kimkit: True to upload to kimkit repository :type upload_to_kimkit: bool :returns: trained model, loss object :rtype: implementation dependent """ raise NotImplementedError(f'{self.__class__.__name__} does not have ' f'train implemented. Use a subclass!')
[docs] def submit_train( self, path_type: str, potential: Potential, storage_args: dict, storage: "Storage", dataset_list: list, workflow: Optional[Workflow] = None, eweight: float = 1.0, fweight: float = 1.0, vweight: float = 1.0, per_atom_weights: bool = False, upload_to_kimkit=True, ) -> int: """ Asynchronously train the potential based on the trainer details This is a main method of the trainer class, and uses the parameters supplied at instantiation to perform the potential training by minimizing a loss function. While :meth:`train` works synchronously, this method submits training to a job scheduler. :param path_type: specifier for the workflow path, to differentiate training runs :type path_type: str :param potential: potential to be trained. The actual model itself is set as an attribute of the Potential object :type potential: Potential :param storage: an instance of the storage class :type storage: Storage :dataset_list: the list of dataset_handles (e.g. collabfit-IDs) within the storage object to use as the dataset. :type dataset_list: list :param workflow: the workflow for managing path definition and job submission, if none are supplied, will use the default workflow defined in this class |default| ``None`` :type workflow: Workflow :param eweight: weight of energy data in the loss function :type eweight: float :param fweight: weight of the force data in the loss function :type fweight: float :param vweight: weight of the stress data in the loss function :type vweight: float :param per_atom_weights: Per atom weights for the loss function, If boolean, value is provided, the weights are assumed to be present in the provided dataset. |default| ``False`` :type per_atom_weights: bool :param upload_to_kimkit: True to upload to kimkit repository :type upload_to_kimkit: bool :returns: calculation ID of the submitted job :rtype: int """ raise NotImplementedError(f'{self.__class__.__name__} does not have ' f'submit_train implemented. Use a subclass!')
@staticmethod def _create_training_manifest( workspace_name: str, seed: int, dataset_type: str, dataset_path: str, model_name: str, batch_size: int, epochs: int, optimizer: str, loss: str = 'mse', weights: Optional[Dict[str, Any]] = None, train_size: Optional[Union[int, np.ndarray]] = None, val_size: Optional[Union[int, np.ndarray]] = None, export: Optional[Dict[str, Any]] = None, transform_params: Union[bool, List[str]] = True, shuffle: bool = True, lr: Optional[float] = None, ) -> Dict[str, Any]: """ Create a training manifest skeleton for KLIFF :param workspace_name: name of the workspace :type workspace_name: str :param seed: random seed for reproducibility :type seed: int :param dataset_type: type of dataset (e.g. 'ase', 'path') :type dataset_type: str :param dataset_path: path to the dataset :type dataset_path: str :param model_name: name of the model :type model_name: str :param batch_size: batch size for training :type batch_size: int :param epochs: number of training epochs :type epochs: int :param optimizer: optimizer to use for training :type optimizer: str :param loss: loss function to use for training :type loss: str :param weights: weights for the loss function :type weights: dict :param train_size: size of the training dataset :type train_size: int or np.ndarray :param val_size: size of the validation dataset :type val_size: int or np.ndarray :param export: export options for the model :type export: dict :param transform_params: parameters for data transformation :type transform_params: bool or list :param shuffle: whether to shuffle the dataset :type shuffle: bool :param lr: learning rate for the optimizer :type lr: float :returns: training manifest dictionary :rtype: dict """ manifest = { "workspace": { "name": workspace_name, "seed": seed, "resume": False, }, "dataset": { "type": dataset_type, "path": dataset_path, "shuffle": shuffle, }, "model": { "name": model_name, }, "training": { "loss": { "function": loss, "weights": weights }, "batch_size": batch_size, "epochs": epochs, "optimizer": { "name": optimizer }, "training_dataset": { "train_size": train_size, }, "validation_dataset": { "val_size": val_size }, "verbose": True, }, "transforms": {} } if isinstance(transform_params, list): manifest["transforms"]["parameter"] = transform_params if lr is not None: manifest["training"]["optimizer"]["lr"] = lr if export is not None: manifest["export"] = export return manifest @staticmethod def _generate_per_atom_weights_yaml( dataset: list[Atoms], path: Optional[Union[str, pathlib.Path, None]] = None, ) -> str: """ Generate a YAML file for per-atom weights :param dataset: list of ASE Atoms objects with weights embedded :type dataset: list[Atoms] :param path: path to save the YAML file, file will be saved as path/weights.yaml default is current working directory :type path: str or pathlib.Path :returns: path to the generated YAML file """ if path is None: path = pathlib.Path.cwd() else: path = pathlib.Path(path) path = path / 'weights.yaml' path.parent.mkdir(parents=True, exist_ok=True) weights = [] with open(path, 'w') as f: for atom in dataset: energy_weight = atom.info.get(ENERGY_WEIGHT_KEY, 1.0) forces_weight = atom.info.get(FORCES_WEIGHTS_KEY, None) if forces_weight is None: forces_weight = np.ones(atom.get_number_of_atoms()) weights.append({ "energy": energy_weight, "forces": forces_weight.tolist() }) yaml.safe_dump(weights, f) return str(path)