Source code for orchestrator.storage.local

from .storage_base import Storage
from ..utils.exceptions import DatasetDoesNotExistError
from ..utils.input_output import ase_glob_read, safe_write
from ..utils.data_standard import METADATA_KEY
import os
from os import system, walk, path, makedirs
from ase import Atoms
import json
from typing import Optional
from uuid import uuid4


[docs] class LocalStorage(Storage): """ Class to store data in the local disk The Storage class deals with all functionalities associated to data storage inside Orchestrator. Its functions include the initialization of the database, and data additions, updates, and queries. The Orchestrator uses ASE Atoms as the internal data representation. A given database (Storage instance) can include multiple datasets (collections of configurations and properties) and generally persists in time. :param storage_args: dictionary with initialization parameters, including database_name and database_path. database_path defaults to './local_storage_database' while database_name defaults to the last string component from the database_path. LocalStorage does not require any additional arguments :type storage_args: dict """
[docs] def __init__( self, database_path: Optional[str] = './local_storage_database', database_name: Optional[str] = None, **kwargs, ): """ Set variables and initialize the recorder :param database_path: Path to the local storage database |default| ``'./local_storage_database'`` :type database_path: str :param database_name: Name of the local database :type databse_name: str """ super().__init__(**kwargs) self.database_path = database_path # default database_name to last component of database_path if not set if database_name is None: self.database_name = self.database_path.strip('/').split('/')[-1] else: self.database_name = database_name self.storage_init_args = { "database_path": self.database_path, "database_name": self.database_name, } self.STORAGE_ID_KEY = 'storage_id' self.property_map = None makedirs(self.database_path, exist_ok=True) self.logger.info(f'Local database set as {self.database_path}') self._dataset_sizes = {} self._id_map = {} self._read_database_state()
def _read_database_state(self): """ Helper function to facilitate synchronizing a database Updates the internal _dataset_sizes dictionary with the current state of the database read from disk. This can be called on restarts or to ensure the internal representation is up to date before querying its state for other purposes. """ root_dir_parsed = False for root, dirs, files in walk(self.database_path): if not root_dir_parsed: root_dir_parsed = True self.logger.info(f'Found {len(dirs)} datasets in {root}') if len(files) > 0: self.logger.info(('It appears that your database has files' ' unassociated with any dataset!')) else: dataset = root.split('/')[-1] config_num = 0 configs = ase_glob_read(root) config_num = len(configs) for i, config in enumerate(configs): storage_id = config.info[METADATA_KEY]['storage_id'] config_id = i _ = self._add_to_id_map(dataset, storage_id, config_id) self._dataset_sizes[dataset] = config_num def _add_to_id_map( self, dataset: str, storage_id: str, config_id: Optional[int] = None, ) -> int: """ helper function to add to the storage --> config map, handles KeyError :param dataset: name of the dataset for the mapping :type dataset: str :param storage_id: UUID of the configuration :type storage_id: str :param config_id: configuration number of the config in this dataset if ``None``, get the config num from the current dataset_size |default| ``None`` :type config_id: int :returns: the config number in the dataset (useful if config_id = None) :rtype: int """ if config_id is None: config_id = self._dataset_sizes[dataset] self._dataset_sizes[dataset] += 1 try: self._id_map[dataset][storage_id] = config_id except KeyError: self._id_map[dataset] = {storage_id: config_id} return config_id
[docs] def check_if_dataset_name_unique(self, dataset_name: str) -> bool: """ check if the provided dataset_name is unique in the database :param dataset_name: name to check (human readable) :type dataset_name: str :returns: true if the database is not present in the database, false if it does exist :rtype: boolean """ if path.isdir(f'{self.database_path}/{dataset_name}'): return False else: return True
def _insert_data_to_database( self, dataset_handle: str, data: list[Atoms], dataset_metadata: Optional[dict] = None, ) -> str: """ Internal utility function for adding data to the database Called by add_data(), new_dataset(), and update_data() :param dataset_handle: name of dataset :type dataset_handle: str :param data: list of ASE.Atoms objects containing the configurations and associated properties to add to the database. Note that configuration-specific metadata should be stored under the `atoms.info[METADATA_KEY]` field. :type data: list :param dataset_metadata: A dictionary of metadata specific to the dataset as a whole. :type metadata: dict :returns: unique handle for the dataset :rtype: str """ self._read_database_state() for configuration in data: if METADATA_KEY not in configuration.info: # this configuration is new to the dataset storage_id = uuid4().hex configuration.info[METADATA_KEY] = {'storage_id': storage_id} config_num = self._add_to_id_map(dataset_handle, storage_id) elif 'storage_id' in configuration.info[METADATA_KEY]: # this configuration already has an ID storage_id = configuration.info[METADATA_KEY]['storage_id'] if storage_id in self._id_map[dataset_handle]: # this config is in the dataset already config_num = self._id_map[dataset_handle][storage_id] else: # this config is new to the dataset config_num = self._add_to_id_map(dataset_handle, storage_id) else: # this configuration has metadata but no ID (new to dataset) storage_id = uuid4().hex configuration.info[METADATA_KEY]['storage_id'] = storage_id config_num = self._add_to_id_map(dataset_handle, storage_id) new_data_path = (f'{self.database_path}/{dataset_handle}/' f'configuration_{config_num:05}.xyz') # overwrite current xyz file with the new configuration (and data) safe_write(new_data_path, configuration, format='extxyz') if dataset_metadata is not None: new_metadata_path = (f'{self.database_path}/{dataset_handle}/' f'metadata_global.json') with open(new_metadata_path, 'w') as fout: # TODO: hopefully this throws an error if not a dict? json.dump(dataset_metadata, fout, sort_keys=True, indent=4) return dataset_handle
[docs] def add_data( self, dataset_handle: str, data: list[Atoms], dataset_metadata: Optional[dict] = None, ) -> str: """ Add new configurations (and associated properties) to the database This method is used to add to an existing dataset with new configurations. The new configurations may or may not have other properties associated with them. :param dataset_handle: name of dataset :type dataset_handle: str :param data: list of ASE.Atoms objects containing the configurations and associated properties to add to the database. Note that configuration-specific metadata should be stored under the `atoms.info[METADATA_KEY]` field. :type data: list :param dataset_metadata: A dictionary of metadata specific to the dataset as a whole. :type metadata: dict :returns: handle for the dataset which includes the new additions :rtype: str """ if dataset_handle not in self._dataset_sizes: raise DatasetDoesNotExistError( (f'{dataset_handle} has not been created in ' f'{self.database_path}, use new_dataset()')) return self._insert_data_to_database(dataset_handle, data, dataset_metadata)
[docs] def new_dataset( self, dataset_handle: str, data: list[Atoms], dataset_metadata: Optional[dict] = None, ) -> str: """ Create a new dataset with the provided data and metadata The new dataset will have a human readable name specificed by dataset_handle and will ingest the data and metadata provided. :param dataset_handle: name of the dataset to be created :type dataset_handle: str :param data: list of ASE.Atoms objects containing the configurations and associated properties to add to the database. Note that configuration-specific metadata should be stored under the `atoms.info[METADATA_KEY]` field. :type data: list :param dataset_metadata: A dictionary of metadata specific to the dataset as a whole. :type metadata: dict :returns: name of the dataset :rtype: str """ if path.isdir(f'{self.database_path}/{dataset_handle}'): self.logger.info((f'It appears dataset "{dataset_handle}" already ' f'exists and contains ' f'{self._dataset_sizes[dataset_handle]} ' f'configurations. Adding to it.')) else: system( f'mkdir -p {self.database_path}/{dataset_handle} 2> /dev/null') self._dataset_sizes[dataset_handle] = 0 self._id_map[dataset_handle] = {} return self._insert_data_to_database(dataset_handle, data, dataset_metadata)
[docs] def update_data( self, dataset_handle: str, data: list[Atoms], new_data_key: str, ) -> str: """ Update an existing dataset - overwriting or adding new properties This method operates on existing configurations and/or properties. data are provided as a KliFF dataset of properties that should be added to either the configuration as a new property or overwriting existing properties within the database. :param dataset_handle: name or ID of dataset :type dataset_handle: str or int :param data: list of ASE.Atoms objects containing the configurations and associated properties to add to the database. Note that configuration-specific metadata should be stored under the `atoms.info[METADATA_KEY]` field. :type data: list :param dataset_metadata: A dictionary of metadata specific to the dataset as a whole. :type metadata: dict :returns: unique handle for the dataset :rtype: str """ configs = self.get_data(dataset_handle) for config in configs: storage_id = config.info[METADATA_KEY][self.STORAGE_ID_KEY] data_to_update = data[storage_id] if len(data_to_update) == len(config): config.set_array(new_data_key, data_to_update) else: config.info[METADATA_KEY][new_data_key] = data_to_update if dataset_handle.rsplit('_', 1)[-1][0] != 'v': # original dataset name, we'll append _v* index = 1 else: index = int(dataset_handle.rsplit('_', 1)[-1][1:]) + 1 base_name = dataset_handle.rsplit('_', 1)[0] new_handle = self.new_dataset(f'{base_name}_v{index}', configs) return new_handle
[docs] def get_data( self, dataset_handle: str, query_options: Optional[dict] = None, ) -> list[Atoms]: """ Extract data from storage Return the dataset specified by dataset_handle as a list of ASE Atoms. Further options for parameterizing the extraction can be provided by the query_options dictionary. :param dataset_handle: name of the dataset to extract :type dataset_handle: str :param query_options: dict of options for data extraction and return |default| ``None`` :type query_options: dict :returns: requested data as a list of ASE Atoms :rtype: list """ if query_options: self.logger.info('Querey options are not currently supported') configs = ase_glob_read( os.path.join(self.database_path, dataset_handle)) # Note: it's assumed that the metadata is already in the ASE file return configs
[docs] def delete_dataset(self, dataset_handle: str): """ Remove the dataset specified by dataset_handle from the database :param dataset_handle: name or ID of dataset :type dataset_handle: str """ system(f'rm -r {self.database_path}/{dataset_handle} 2> /dev/null') self.logger.info(f'Deleted dataset {dataset_handle} from storage')
[docs] def list_data(self, dataset_handle: Optional[str] = None): """ Utility function to query the database Prints an overview of the database contents if no dataset_handle is provided, otherwise provides information about the specific dataset contents. :param dataset_handle: name of dataset |default| ``None`` :type dataset_handle: str """ self._read_database_state() if dataset_handle is None: print(f'Database {self.database_name} has the following datasets:') for dataset in self._dataset_sizes: print(f'{dataset}: {self._dataset_sizes[dataset]}') else: dataset_size = self._dataset_sizes.get(dataset_handle) if dataset_size is None: print( f'{self.database_name} does not contain {dataset_handle}') else: print( f'{dataset_handle} contains {dataset_size} configurations')