Source code for orchestrator.utils.input_output

import os
import re
import glob
import numpy as np
from datetime import datetime
from ase.io import read, write
from typing import Optional

from orchestrator.utils.data_standard import (ENERGY_KEY, FORCES_KEY,
                                              STRESS_KEY, METADATA_KEY)
from orchestrator.oracle.aiida.espresso import AiidaEspressoOracle
from orchestrator.oracle.aiida.vasp import AiidaVaspOracle
from orchestrator.storage import Storage
from orchestrator.utils.exceptions import DatasetDoesNotExistError


[docs] def ase_glob_read(root_dir, file_ext='.xyz', file_format='extxyz'): """ Reads all ASE atoms objects in `root_dir` with the matching` file_ext. """ if file_ext[0] != '.': file_ext = '.' + file_ext images = [] for f in sorted(glob.glob(os.path.join(root_dir, f'*{file_ext}'))): images += safe_read(f, format=file_format) return images
def try_loading_ase_keys(images): """ Try to populate energy/forces/stress fields, in case they weren't loaded due to changes in ASE >= 3.23 """ if not isinstance(images, list): images = [images] for atoms in images: try: atoms.info[ENERGY_KEY] = atoms.get_potential_energy() except Exception: pass try: atoms.arrays[FORCES_KEY] = atoms.get_forces() except Exception: pass try: atoms.info[STRESS_KEY] = atoms.get_stress() except Exception: pass return images def safe_read(path, **kwargs): """ This is a wrapper to ASE.read that attempts to load energy/forces/stress from the SinglePointCalculator. """ return try_loading_ase_keys(read(path, **kwargs)) def safe_write(path, images, **kwargs): """ This is a wrapper to ASE.write that **removes the SinglePointCalculator** from all atoms objects, if the calculator is attached. This is to avoid issues caused by ase>=3.23 which uses a dummy SinglePointCalculator to store energy/forces/stress keys. """ if not isinstance(images, list): images = [images] # Note: some Oracles may have valid calculators that should NOT be removed # e.g. "KIMModelCalculator" from ase.calculators.singlepoint import SinglePointCalculator for atoms in images: if isinstance(atoms.calc, SinglePointCalculator): atoms.calc = None write(path, images, **kwargs) def sort_configs_and_tag_atoms(list_of_atoms, id_key='co-id'): """ Sorts the configurations by their ID, and assigns unique tags to each atom. Intended to be used for error trajectory logging. The tags will be stored under the atoms.arrays['atom_id'] field. :param list_of_atoms: the atoms to be sorted :type list_of_atoms: list :param id_key: the key used for sorting the atoms. Must exist in atoms.info dict. Default is 'co_id'. :type id_key: str """ sorted_atoms = sorted(list_of_atoms, key=lambda atoms: atoms.info[id_key]) counter = 0 for atoms in sorted_atoms: n = len(atoms) atoms.arrays['atom_id'] = np.arange(counter, counter + n) counter += n return sorted_atoms def read_in_external_calculations( folder_paths: list[str], code: str, input_file: str, outfile: str, storage: Storage, dataset_name: Optional[str] = None, dataset_handle: Optional[str] = None, ): """ Helper fucntion for ingesting data generated outside Orchestrator :param folder_paths: List containing the path to each folder with individual calculations. :param code: Which code was used to calculate. Currently, VASP and QE are supported. Default: VASP. :param input_file: Name of the file to which calculation settings are stored. For VASP, this is the INCAR and for QE it is typically *.in was stored. :param outfile: Name of the file to which calculation information is stored. For VASP, this is the OUTCAR and for QE it is where the output was stored (typically *.out). :param storage: Storage module where the dataset should be saved :param dataset_name: name of the new dataset in storage to upload to :param dataset_handle: handle of an existing dataset to append the data to. Will be used in place of dataset_name if provided. """ # Check if paths are correct. incorrect = [] for path in folder_paths: if not os.path.isdir(path): incorrect.append(path) if incorrect: raise ValueError( f'The following paths were incorrect: {incorrect}. Correct ' 'them and try again.') configs = [] for path in folder_paths: match code: case 'VASP': atoms = safe_read(f'{path}/{outfile}', format='vasp-out') # Will expect the incar to be in the same location. parameters = {} with open(f'{path}/{input_file}', 'r') as infile: for line in infile: split = line.strip("\n").split("=") parameters[split[0]] = split[1] universal = AiidaVaspOracle.translate_universal_parameters( parameters) case 'QE': atoms = safe_read(outfile, format='espresso-out') parameters = {} with open(input_file, 'r') as f: text = f.read() namelists = re.findall(r'&(\w+)(.*?)/', text, re.DOTALL) for name, body in namelists: params = dict(re.findall(r'(\w+)\s*=\s*([^\n,]+)', body)) parameters[name] = { k.strip(): v.strip() for k, v in params.items() } universal = AiidaEspressoOracle.translate_universal_parameters( parameters) dataset_metadata = { 'parameters': { 'code': parameters, 'universal': universal } } atoms.info[METADATA_KEY] = dataset_metadata configs.append(atoms) current_date = datetime.today().strftime('%Y-%m-%d') if dataset_name is None: # no names or IDs provided, make new dataset name dataset_name = storage.generate_dataset_name( f'{code}_parsed_dataset', f'{current_date}', check_uniqueness=True, ) elif dataset_name: # only dataset name provided try: # since extracted, we do not need to validate form dataset_handle = storage._get_id_from_name(dataset_name) except DatasetDoesNotExistError: pass else: # dataset handle provided, ensure it is of correct form if dataset_handle[:3] != 'DS_': raise ValueError( 'dataset handles should be in format DS_************_#') if dataset_handle: # handle is a colabfit ID, dataset exists new_handle = storage.add_data(dataset_handle, configs, dataset_metadata) else: # handle is a name, create new dataset new_handle = storage.new_dataset(dataset_name, configs, dataset_metadata) return new_handle