Source code for orchestrator.storage.colabfit

from .storage_base import Storage
from ..utils.data_standard import (
    ENERGY_KEY,
    FORCES_KEY,
    STRESS_KEY,
    METADATA_KEY,
)
from ..utils.exceptions import (
    DuplicateDatasetNameError,
    UnidentifiedStorageError,
    DatasetDoesNotExistError,
    #     UnknownKeyError,
    UnsupportedComparisonError,
)
from ..utils.data_utils import inspect_configs
from colabfit.tools.database import DataManager
from colabfit.tools.configuration import AtomicConfiguration
from datetime import datetime
import json
import subprocess as sp
from os import system
import numpy as np
from ase import Atoms
from ase.data import chemical_symbols
from typing import Optional


[docs] class ColabfitStorage(Storage): """ Manage data using Colabfit Colabfit documentation can be found at: https://colabfit.github.io/colabfit-tools/html/index.html. :param storage_args: dictionary with initialization parameters, including database_name, database_path, external_file, and credential_file. database_path is the uri to the mongodb data server (required). database_name is the name of the mongodb database client (required). external_file is the explicit path to an lmdb file to handle configurations larger than 20,000 atoms. This file will be generated by Colabfit if it does not yet exist (optional). credential_file is a path to a json file which contains the database_path, database_name, and optionally external_file path. If a credential_file is provided, its contents override any other arguments. None of these parameters have default values. :type storage_args: dict """
[docs] def __init__( self, credential_file: Optional[str] = None, database_path: Optional[str] = None, database_name: Optional[str] = None, database_port: Optional[str] = None, database_user: Optional[str] = None, database_password: Optional[str] = None, external_file: Optional[str] = None, **kwargs, ): """ :param credential_file: Path to a JSON file with the path, name, port, user, password, and external_file keys. This is the preferred method for initializing a storage module. No other keys are needed if credential_file is set :type credential_file: str :param database_path: URI to the PostgreSQL data server :type database_path: str :param database_name: Name of the PostgreSQL database client :type database_name: str :param database_port: Port for the PostgreSQL server :type database_port: str :param database_user: Username for the PostgreSQL server :type database_user: str :param database_password: Password for the PostgreSQL server :type database_password: str :param external_file: Path to an LMDB file for large configurations :type external_file: str """ super().__init__(**kwargs) self.STORAGE_ID_KEY = 'co-id' self.credential_file = credential_file self.database_path = database_path self.database_name = database_name self.database_port = database_port self.database_user = database_user self.database_password = database_password self.external_file = external_file # Override values with credential_file if provided if self.credential_file is not None: with open(self.credential_file, 'r') as fin: file_content = json.load(fin) self.database_path = file_content.get('database_path', self.database_path) self.database_name = file_content.get('database_name', self.database_name) self.external_file = file_content.get('external_file', self.external_file) self.database_port = file_content.get('database_port', self.database_port) self.database_user = file_content.get('database_user', self.database_user) self.database_password = file_content.get('database_password', self.database_password) if self.database_path is None: raise ValueError( 'Database path must be specified for ColabfitStorage') if self.database_name is None: raise ValueError( 'Database name must be specified for ColabfitStorage') self.storage_init_args = { "database_path": self.database_path, "database_name": self.database_name, "database_port": self.database_port, "database_user": self.database_user, "database_password": self.database_password, "external_file": self.external_file, } # TODO: Add support for external file self.database_client = DataManager( dbname=self.database_name, user=self.database_user, password=self.database_password, host=self.database_path, port=self.database_port, ) # TODO: Currently only works when credential file is passed self.query_string = (f'colabfit query -c {self.credential_file} ') user = sp.run( 'whoami', capture_output=True, shell=True, encoding='UTF-8', ).stdout.strip() self.default_author = f'{user} via ColabfitStorage' self.property_map = None # should be set before adding data self.default_parameters = {}
[docs] def check_if_dataset_name_unique(self, dataset_name: str) -> bool: """ check if the provided dataset_name is unique in the database :param dataset_name: name to check (human readable) :type dataset_name: str :returns: true if the database is not present in the database, false if it does exist :rtype: boolean """ query_out = sp.run( f'{self.query_string} -t {dataset_name}', capture_output=True, shell=True, encoding='UTF-8', ) query = query_out.stdout.split() # if len(query) == 0: # no stdout # self.logger.info( # f'Problem with colabfit query: {query_out.stderr}') # raise UnidentifiedStorageError name_indices = [i + 1 for i, x in enumerate(query) if x == "'name':"] count = 0 for ni in name_indices: if query[ni].strip(",'") == dataset_name: count += 1 if count == 0: self.logger.info(f'Dataset {dataset_name} not found in storage') return True else: self.logger.info(f'Dataset {dataset_name} found in storage with ' f'({count} instances found)') return False
[docs] def add_data( self, dataset_handle: str, data: list[Atoms], dataset_metadata: Optional[dict] = None, updated_description: Optional[str] = None, updated_authors: Optional[list[str]] = None, ) -> str: """ Add new configurations (and associated properties) to the db. This method is used to add to an existing dataset with new configurations. update_data' can serve the same role (along with others) but requires all data (new and existing) to be passed in as an argument. Assumes property format (property_map)is the same as the original dataset. :param dataset_handle: name or ID of dataset :param data: list of ASE.Atoms objects containing the configurations and associated properties to add to the database. Note that configuration-specific metadata should be stored under the `atoms.info[METADATA_KEY]` field. :param dataset_metadata: A dictionary of metadata specific to the dataset as a whole. This function needs to have 'parameters' provided which consists or 'universal' and 'code' nested dictionaries. :param updated_description: If not None, will also update the dataset description :param updated_authors: If not None, will also update the dataset authors :returns: handle for the dataset which includes the new additions """ # check if dataset_handle is a short name or ID, if name, get most # recent ID dataset_id = self._get_id_from_name(dataset_handle) # get existing data existing_data = self.get_data( dataset_id, rename_properties=True, # assume new data has same map as old return_dataset_info=False) existing_property_map = self.get_dataset_property_map(dataset_id) len_new_data = len(data) data.extend(existing_data) parameters = {} keys = ['energy', 'atomic-forces', 'cauchy-stress'] if any(key in existing_property_map.keys() for key in keys): parameters = dataset_metadata.get('parameters', None) if not parameters: raise ValueError( "Must provided 'parameters' in the 'dataset_metadata' " "variable.") new_handle = self.database_client.update_dataset_pg_no_spark( data, dataset_id, parameters=parameters, prop_map=existing_property_map, strict=True, description=updated_description, authors=updated_authors, ) self.logger.info(f'Added {len_new_data} configs to {dataset_id}, new' f' ID: {new_handle}') return new_handle
[docs] def new_dataset( self, dataset_name: str, data: list[Atoms], dataset_metadata: Optional[dict] = None, strict: bool = True, ) -> str: """ Create a new dataset with the provided data and metadata The new dataset will have a human readable name specificed by dataset_name and will ingest the data and metadata provided. :param dataset_name: name of the dataset to be created :type dataset_name: str :param data: list of ASE.Atoms objects containing the configurations and associated properties to add to the database. Note that configuration-specific metadata should be stored under the `atoms.info[METADATA_KEY]` field. :type data: list :param dataset_metadata: A dictionary of metadata specific to the dataset as a whole. Current options are authors (str), description (str), and parameters (dict) which consists of two nested dictionaries named 'universal' and 'code' for the universal input parameter names and the code specific dictionaries. :type dataset_metadata: dict :param strict: If strict, ingested data must all contain the properties specified in the property map. |default| ``True`` :type strict: bool :returns: unique handle for the dataset :rtype: str """ # check if dataset_handle (i.e. name) alrady exists if not self.check_if_dataset_name_unique(dataset_name): existing_ds = self._get_id_from_name(dataset_name) self.logger.info(f'{dataset_name} already exists in the database' f' as {existing_ds}, cannot create new dataset ' 'with this name!') raise DuplicateDatasetNameError( f'{dataset_name} exists as {existing_ds}') self.check_example_config(data[0]) # sanity check # check property_map if self.property_map is None: raise Exception('''Property map must not None. Please call set_default_property_map or set_property_map first''') current_date = datetime.today().strftime('%Y-%m-%d') if not isinstance(dataset_metadata, dict): self.logger.info('dataset_metadata is not a dict, setting to {}') dataset_metadata = {} description = dataset_metadata.get( 'description', f'Inserted by Orchestrator on {current_date}', ) authors = dataset_metadata.get('authors', self.default_author) # If any of the following keys are in property_map then we require # that there are code and universal parameters. parameters = {} keys = ['energy', 'atomic-forces', 'cauchy-stress'] if any(key in self.property_map.keys() for key in keys): parameters = dataset_metadata.get('parameters', self.default_parameters) pkeys = parameters.keys() if 'code' not in pkeys or 'universal' not in pkeys: raise ValueError( 'Must specify both the universal and code specific input ' 'parameters.') try: new_dataset_handle = ( self.database_client.insert_data_and_create_datset( data, name=dataset_name, authors=authors, description=description, prop_map=self.property_map, parameters=parameters, strict=strict, # TODO: Support fork=True, )) old_ds = None # insert code here self.logger.info(f'Forking dataset from {old_ds}') except UnboundLocalError: new_dataset_handle = ( self.database_client.insert_data_and_create_datset( data, name=dataset_name, authors=authors, description=description, prop_map=self.property_map, parameters=parameters, )) self.logger.info(f'Created dataset {dataset_name} with {len(data)} ' f' configs, ID: {new_dataset_handle}') return new_dataset_handle
def _get_id_from_name(self, dataset_name: str) -> str: """ Finds the most recent dataset ID to be associated with a database name Take a dataset name and return the ID. If an ID is given, it is returned itself, allowing this method to also yield the "correct" ID to use. :param dataset_name: name of the dataset (human readable) :type dataset_name: str :returns: colabfit-id of the most recent dataset with name dataset_name :rtype: str """ if dataset_name[:3] == 'DS_': dataset_id = dataset_name else: query_out = sp.run( f'{self.query_string} -t {dataset_name}', capture_output=True, shell=True, encoding='UTF-8', ) query = query_out.stdout.split() if len(query) == 0: # no stdout self.logger.info( f'Problem with colabfit query: {query_out.stderr}') raise UnidentifiedStorageError self.logger.info( f'Found {query[1]} instance(s) of this dataset name!') id_indexes = [ i + 1 for i, x in enumerate(query) if x == "{'colabfit-id':" ] if len(id_indexes) == 0: # no database with this name self.logger.info( f'Could not find any databases with name: {dataset_name}') raise DatasetDoesNotExistError( f'No dataset with name "{dataset_name}"') else: # get the colabfit IDs from the output ids = [query[i].strip(",'") for i in id_indexes] # extract their version numbers versions = [int(id.split('_')[-1]) for id in ids] max_version = versions.index(max(versions)) # and the base indices bases = [id.split('_')[1] for id in ids] if len(set(bases)) > 1: self.logger.info( 'Warning: multiple datasets use the same name!') latest_dataset = ids[max_version] self.logger.info(f'{latest_dataset} is the latest version of ' f'{dataset_name} found in storage') dataset_id = latest_dataset return dataset_id
[docs] def get_data( self, dataset_handle: str, query_options: Optional[dict] = None, inspect: Optional[bool] = False, rename_properties: Optional[bool] = False, return_dataset_info: Optional[bool] = False, ) -> list[Atoms]: """ Extract data from storage Return the dataset specified by dataset_handle as a list of ASE Atoms. Further options for parameterizing the extraction can be provided by the query_options dictionary. :param dataset_handle: ID of dataset :type dataset_handle: str :param query_options: dict of options for data extraction and return |default| ``None`` :type query_options: dict :param inspect: whether to inspect data and print summary :type inspect: bool :param rename_properties: whether to rename properties based upon previous dataset's property map. Useful to keep consistent naming when adding data to dataset :type inspect: bool :param return_dataset_info: whether to return dataset info such as name, authors, etc in addition to data :type inspect: bool :returns: requested data as a list of ASE.Atoms objects and dataset info if return_dataset_info is True :rtype: list or list and dict """ if query_options is None: query_options = {} else: self.logger.info('Query options are not currently supported') property_objects = self.database_client.get_dataset_data( dataset_handle) if not property_objects: # TODO: this should search the datasets collection instead raise DatasetDoesNotExistError(f"{dataset_handle} not found") configs = [self._colabfit_po_to_ase(po) for po in property_objects] if inspect: inspect_configs(configs) # get associated property_map from old dataset and rename properties # according to it if rename_properties: old_prop_map = self.get_dataset_property_map(dataset_handle) update_names = {} for k, v in old_prop_map.items(): # if k != '_metadata': if 1: if isinstance(v, dict): v = [v] for i in v: for k2, v2 in i.items(): if "field" in v2: if k == '_metadata': name = 'metadata' else: name = (f"{k.replace('-', '_')}" f"_{k2.replace('-', '_')}") update_names[name] = v2['field'] for c in configs: for k, v in update_names.items(): if k != v: if k in c.info: c.info[v] = c.info[k] c.info.pop(k) elif k in c.arrays: c.arrays[v] = c.arrays[k] c.arrays.pop(k) else: # warning mess up tests so print raise Warning( (f"Configuration doesn't have {k} in its " "info or arrays dict")) if return_dataset_info: ds = self.database_client.get_dataset_pg(dataset_handle) ds_info = {} for key in [ 'id', 'name', 'authors', 'description', 'uploader', 'property_map' ]: ds_info[key] = ds[0][key] return configs, ds_info return configs
def _colabfit_po_to_ase(self, config: dict) -> Atoms: """ :param config: a dictionary returned from ColabFit containing information about the atomic configuration and computed properties :type config: dict :returns: an ASE.Atoms object containing all of the properties from the ColabFit data object :rtype: ASE.Atoms """ cell = np.asarray(config['cell']) species = [chemical_symbols[int(i)] for i in config["atomic_numbers"]] coords = np.asarray(config['positions']) pbc = config['pbc'] atoms = Atoms(symbols=species, cell=cell, positions=coords, pbc=pbc) # add metadata atoms.info['po-id'] = config['id'] atoms.info['co-id'] = config['configuration_id'] atoms.info['ds-id'] = config['dataset_id'] if config['metadata'] is not None: atoms.info[METADATA_KEY] = json.loads(config['metadata']) schema = self.database_client.get_table_schema('property_objects') # walk through schema for s in schema: if s[0] not in [ 'id', 'hash', 'dataset_id', 'configuration_id', 'last_modified', 'multiplicity', 'metadata', ]: if s[1] == 'ARRAY' and s[0] != 'cauchy_stress_stress': if config[s[0]] is not None: # check whether to put in info or arrays if len(atoms) == np.array(config[s[0]]).shape[0]: atoms.arrays[s[0]] = np.array(config[s[0]]) else: atoms.info[s[0]] = config[s[0]] else: if config[s[0]] is not None: atoms.info[s[0]] = config[s[0]] return atoms
[docs] def update_data( self, dataset_handle: str, data: list[Atoms], parameters: dict = None, property_map: Optional[dict] = None, use_orig_property_map: bool = True, new_properties: Optional[dict] = None, strict: bool = True, updated_description: Optional[str] = None, updated_authors: Optional[list[str]] = None, ) -> str: """ Update an existing dataset - adding new properties to configurations This method operates on existing configurations and/or properties. Data is a list of ASE Atoms objects. NOTE: This should include all data that is to be associated with datasets. Call get_data if you want old data and potentially new data to be in dataset. The property map is automatically pulled from the original dataset. If this isn't wanted set use_orig_property_map=False and specify property_map which should include mappings for all data to add. dataset_handle specifies the dataset where these data should be updated and should be the dataset ID, (DS_XXXXXX). :param dataset_handle: ID of dataset :param data: list of ase.Atoms which include the new data to add :param parameters: The 'universal' and 'code' specific parameters from the simulations. These should be the same as the parameters in the database. :param use_orig_property_map: whether or not to use the dataset's original property map. Useful when get_data(rename_properties=True) has been used. If False self.property_map is used instead. :param new_properties: These properties will be added to the property_map via add_property_mapping :param strict: If strict, ingested data must all contain the properties specified in the property map. |default| ``True`` :param updated_description: If not None, will also update the dataset description :param updated_authors: If not None, will also update the dataset authors :returns: updated handle for the dataset """ if use_orig_property_map: assert property_map is None, '''Only one of property_map and use_orig_property_map should not be None''' property_map = self.get_dataset_property_map(dataset_handle) if property_map is None: raise Exception('''Property map must not be None. Please set use_orig_property_map=True or explicitly set property_map in the arguments depending on the situation.''') if new_properties is not None: self.property_map = property_map for k, v in new_properties.items(): property_map = self.add_property_mapping(k, v) if parameters is None: parameters = {} # If any of the following keys are in property_map then we require # that there are code and universal parameters. keys = ['energy', 'atomic-forces', 'cauchy-stress'] if any(key in property_map.keys() for key in keys): pkeys = parameters.keys() if 'code' not in pkeys or 'universal' not in pkeys: raise ValueError( 'Must specify both the universal and code specific input ' 'parameters.') new_dataset_handle = self.database_client.update_dataset_pg_no_spark( data, dataset_handle, parameters, property_map, strict=strict, description=updated_description, authors=updated_authors, ) return new_dataset_handle
[docs] def list_data( self, dataset_handle: Optional[str] = None, text: Optional[str] = None, properties: Optional[str] = None, elements: Optional[str] = None, elements_exact: Optional[bool] = False, ): """ Utility function to query the database Prints an overview of the database contents if no dataset_handle is provided, otherwise provides information about the specific dataset contents. Currently only dataset_handles which reference the dataset name (not the colabfit ID) will work for showing the selective query result. :param dataset_handle: name of the dataset |default| ``None`` :type dataset_handle: str :param text: text to search for within the dataset. This can be authors, descriptions, uploader. |default| ``None`` :type test: str :param properties: name of properties to search for. Multiple should be included as "energy atomic-forces" |default| ``None`` :type properties: str :param elements: elements to search for. Multiple should be included as "C H". Will return datsets containing these plus other elements. See elements_exact |default| ``None`` :type elements: str :param elements_exact: whether to restrict element search to return datasets containing only specified elements |default| ``False`` :type elements_exact: bool """ colabfit_query_installed = system( 'which colabfit 1> /dev/null 2> /dev/null') if dataset_handle is not None and text is not None: raise Exception( "Only one of dataset_handle and text should be used.") query = "" if text is not None: query += f"-t '{text}' " if properties is not None: query += f"-p '{properties}' " if elements is not None: if elements_exact: query += f"-ee '{elements}'" else: query += f"-e '{elements}'" if colabfit_query_installed == 0: if dataset_handle is None: system(f'{self.query_string} {query}') else: system(f'{self.query_string} -t "{dataset_handle}" {query}') else: self.logger.info('Error: cfkit-cli must be installed to list data')
[docs] def delete_dataset( self, dataset_handle: str, delete_children: Optional[bool] = True, ): """ Remove the dataset specified by dataset_handle from the database :param dataset_handle: ID of dataset :type dataset_handle: str :param delete_cildren: if true will also delete all POs and COs (not associated with another DS) :type dataset_handle: bool """ self.database_client.delete_dataset(dataset_handle, delete_children) self.logger.info(f'Deleted dataset {dataset_handle} from storage')
[docs] def delete_items(self, item_ids_list: list[str]): """ Remove the COs and/or POs specified by item_ids_list from the database """ self.database_client.delete_items(item_ids_list) self.logger.info(f'Deleted {len(item_ids_list)} POs/Cos from storage')
# TODO: Support in next update
[docs] def dataset_intersection_and_differences(self, dataset1, dataset2, mode): """ returns the intersection or differences between two datasets behavior is controlled by the mode variable, which can be set to 'intersection' or 'difference'. The corresponding results will be returned. If 'difference' is chosen, the returned ASE Atoms list contains all configurations IN dataset1 but NOT IN dataset2. :param dataset1: name of the first dataset to compare :type dataset1: str :param dataset2: name of the second dataset to compare :type dataset2: str :param mode: switch for if the intersection or difference is returned :type mode: str :returns: a list of ASE Atoms of the shared configurations :rtype: list """ ds_id1 = self._get_id_from_name(dataset1) ds_id2 = self._get_id_from_name(dataset2) comparison = self.database_client.compare_datasets(ds_id1, ds_id2) if mode == 'intersection': key = 'DS1 & DS2' elif mode == 'difference': key = 'DS1 - DS2' else: raise UnsupportedComparisonError(f'{mode} not supported!') selected_dos = comparison[key] # get Dataset from DOs return selected_dos
[docs] def define_new_properties(self, property_list: list[dict]): """ Define new properties to add to the database New properties only need to be defined once for the database. :param property_list: List of dictionaries containing properties to be stored in a client :type property_dict: dict """ if not isinstance(property_list, list): property_list = [property_list] for prop in property_list: if not isinstance(prop, dict): raise TypeError( f'''Each entry of property_list should be a dict but {prop} is a {type(prop)}''') self.database_client.insert_property_definition(prop)
[docs] def set_property_map( self, keys: Optional[dict] = None, file_example: Optional[str] = None, ) -> dict: """ Set the mapping between input properties and colabfit representation Definition of a set of basic properties to be stored in a Colabfit database. This will be used to map input data to the articulated properties which are stored in the Colabfit database. The property_map is used when inserting data into the database. A default property map is defined, but can be overwritten by setting ``self.property_map`` to the output of this function with specified keys/examples. :param keys: dictionary defining the mapping between ingested properties and their internal database representation. Keys can include 'energy_field', 'force_field', and 'stress_field', with the values corresponding to how that property is demarcated in the input. Additional keys can be included but must include their full mapping. |default| ``None`` :type keys: dict :param file_example: path to a file with a header representing the property tags, from which possible energy, force, and stress mappings (defined by the options in this method) are extracted |default| ``None`` :type file_example: str :returns: dictionary with all properties used in a dataset :type property_map: dict """ # TODO: Should we support this. Not sure exactly what it does energy_options = [ 'energy', 'Energy', 'ENERGY', 'energies', 'Energies', 'ENERGIES', ] force_options = [ 'force', 'Force', 'FORCE', 'forces', 'Forces', 'FORCES', ] stress_options = [ 'stress', 'Stress', 'STRESS', 'stresses', 'Stresses', 'STRESSES', ] # removed defaults energy_field = None force_field = None stress_field = None if keys is not None: energy_field = keys.get('energy_field', None) force_field = keys.get('force_field', None) stress_field = keys.get('stress_field', None) if file_example is not None: with open(file_example, 'r') as fin: _ = fin.readline() # first line is # atoms file_header = fin.readline() # second line is header energy_field = self._check_header_for_fields( energy_field, energy_options, file_header, ) force_field = self._check_header_for_fields( force_field, force_options, file_header, ) stress_field = self._check_header_for_fields( stress_field, stress_options, file_header, ) property_map = {} if energy_field is not None: property_map['energy'] = [{ 'energy': { 'field': energy_field, 'units': 'eV' }, 'per-atom': { 'value': False, 'units': None }, }] if force_field is not None: property_map['atomic-forces'] = [{ 'forces': { 'field': force_field, 'units': 'eV/A' }, }] if stress_field is not None: property_map['cauchy-stress'] = [{ 'stress': { 'field': stress_field, 'units': 'eV/A^3' }, "volume-normalized": { "value": False, "units": None }, }] self.property_map = property_map # TODO: Support same behavior if file is used instead # add any additional property mappings that have been provided if keys is not None: for k, v in keys.items(): if k not in ['energy_field', 'force_field', 'stress_field']: self.logger.info(f'Adding {k} to property map.') self.add_property_mapping(k, v) return self.property_map
[docs] def set_default_property_map(self) -> dict: """ Set the default mapping between input properties and colabfit representation. Includes energy, atomic-forces, and cauchy-stress. """ _ = self.set_property_map({ 'energy_field': ENERGY_KEY, 'force_field': FORCES_KEY, 'stress_field': STRESS_KEY, METADATA_KEY: { 'metadata': { 'field': METADATA_KEY } }, }) return self.property_map
def _check_header_for_fields( self, default_field_value: str, field_options: list[str], header_line: str, ) -> str: """ helper function to match possible field values with those from a file :param default_field_value: field value to return if none of the options can be found :type default_field_value: str :param field_options: list of possible options to search for in the header_line :type field_options: list :param header_line: line from a file which should contain the potential fields to search for. :type header_line: str :returns: the field of provided options which appears in the supplied header_line :rtype: str """ for option in field_options: if option in header_line: self.logger.info((f'Found field for "{default_field_value}" in' f' header, setting map to {option}')) return option self.logger.info((f'Did not find field for "{default_field_value}" in ' f'header, setting map to {default_field_value}')) return default_field_value
[docs] def check_example_config(self, example_config: Atoms): # Sanity check; does not affect code behavior for property_name in self.property_map: # potential-energy if property_name not in ["_metadata"]: for key in self.property_map[property_name][0]: # [energy] if 'field' in self.property_map[property_name][0][key]: sk = self.property_map[property_name][0][key]['field'] if sk not in example_config.info: if sk not in example_config.arrays: self.logger.info( f'Key "{sk}" not found on example config. ' 'Did you update property_map before ' 'trying to save data?')
[docs] def add_property_mapping( self, new_property_name: str, new_map: dict, overwrite: Optional[bool] = False, ) -> dict: """ add a new property to the property entry into the internal property map Example usage:: storage.add_property_map( 'new_property_name', { 'key_1': {'field': 'key_1_for_ASE', 'units': None}, 'key_2': {'field': 'key_2_for_ASE', 'units': None}, } ) :param new_property_name: name of property mapping being added :type new_property_name: str :param new_map: the colabfit-style property mapping. A dictionary specifying the ``'field'`` which will be used to load the data off of an ASE atoms object (from the ``.info`` or ``.arrays`` dictionaries), and the units. Note that colabfit expects `new_map` to actually be a list; this function will wrap ``new_map`` in a list if it is not already one. :type new_map: dict or list :param overwrite: True allows existing maps with the same name to be overwritten. Default is False. :type overwrite: bool :returns: updated property_map :rtype: dict """ # kim-property expects "-" not _ but bypass if _metadata if new_property_name != '_metadata': new_property_name = new_property_name.replace('_', '-') if (new_property_name in self.property_map) and not overwrite: self.logger.info( f'A mapping for the property "{new_property_name}" already ' 'exists. Use ``overwrite=True`` to overwrite existing maps') if isinstance(new_map, list): self.property_map[new_property_name] = new_map elif isinstance(new_map, dict): self.property_map[new_property_name] = [new_map] else: raise RuntimeError(f'Invalid data type {type(new_map)} for new ' 'mapping "{new_property_name}"') return self.property_map
[docs] def get_dataset_property_map(self, dataset_id: str) -> dict: """ Given a dataset_id will return the property_map that was used to ingest that dataset. :param dataset_id: ID of dataset :type dataset_id: str :returns: dictionary with all properties used in a dataset :rtype property_map: dict """ property_map = self.database_client.get_dataset_property_map( dataset_id) return property_map
[docs] def get_dataset_name_from_id(self, dataset_id: str) -> str: """ Given a dataset_id will return the dataset's name :param dataset_id: ID of dataset :type dataset_id: str :returns: name of the dataset :rtype dataset_name: str """ dataset_name = self.database_client.get_dataset_name_from_id( dataset_id) return dataset_name
[docs] def get_property_definitions(self) -> list: """ :returns: all properties currently in database :rtype: list """ property_definitions = self.database_client.get_property_definitions() return property_definitions
[docs] def update_property_definition(self, prop_def: str, new_keys: dict): """ Updates an existing property definition with new keys Only keys that are not currently a part of the definition should be add in new_keys. Populates existing entries with provided default value Form of new_keys should be similar to:: {'energy': { 'type': 'float', 'has-unit': True, 'extent': [], 'required': True, 'description': 'The potential energy of the system.', 'default-value': None }} The default default-value is NULL. :param prop_def: name of definition to update :type prop_def: str :param new_keys: dict containing new keys to add with default values to populate existing entries :type prop_def: dict """ # get property definition property_dict = None definitions = self.get_property_definitions() prop_def = prop_def.replace('_', '-') for d in definitions: if d.get('property-name') == prop_def: property_dict = d original_dict = d.copy() if property_dict is None: raise Exception(f'''Property with provided name {prop_def} not found.''') # add keys to definition for k, v in new_keys.items(): default_value = v.pop('default-value', "NULL") property_dict[k] = v # update PO tables column_name = property_dict['property-name'].replace('-', '_') \ + f'_{k}'.replace('-', '_') if v['type'] == 'float': data_type = "DOUBLE PRECISION" elif v['type'] == 'int': data_type = "INT" elif v['type'] == 'bool': data_type = "BOOL" else: data_type = "VARCHAR (10000)" for i in range(len(v['extent'])): data_type += '[]' self.database_client.insert_new_column('property_objects', column_name, data_type, default=default_value) # update PD in DB print(f'Original definition: {original_dict}') print(f'Updated definition: {property_dict}') sql = f'''UPDATE property_definitions SET definition = '{json.dumps(property_dict)}' WHERE definition = '{json.dumps(original_dict)}'; ''' self.database_client.general_query(sql)
[docs] def setup_tables(self) -> None: """ Builds all necessary PostgreSQL tables. For use with newly created databases. Won't affect existing databases if called. Also add energy, forces, and stress props """ self.database_client.create_pg_tables() from colabfit.tools.property_definitions import (energy_pd, atomic_forces_pd, cauchy_stress_pd) self.define_new_properties( [energy_pd, atomic_forces_pd, cauchy_stress_pd])
[docs] @staticmethod def sort_configurations(configs: list[Atoms]) -> list[Atoms]: """ Given a list of Atoms will return a sorted version based upon what the CO-id would be. Useful for sorting configs to be in the same order as returned configurations from get_data. :param configs: list of configurations :type config: list(Atoms) :returns: sorted configs :rtype: list(Atoms) """ ac = [AtomicConfiguration.from_ase(i) for i in configs] sorted_indices = sorted(range(len(ac)), key=lambda i: f'CO_{ac[i]._hash}') sorted_configs = [configs[i] for i in sorted_indices] return sorted_configs
[docs] def get_dataset_input_parameters( self, dataset_id: str, ) -> tuple[dict, dict]: """ Collect the input parameters associated with a dataset id. Will need to parse two different tables and join the results as a single output. If there are no input parameters found, return an error. To enforce conformity among the dataset, there will only be one input parameter values generated and allowed per dataset family. :param dataset_id: The dataset identification within the database. :returns universal: Dictionary containing the universal input parameters. :returns code: Dictionary containing code specific input parameters. """ family_id = dataset_id.split('_')[1] code_sql = f"""SELECT code_specific_inputs from dataset_code_specific_parameters where dataset_id = '{family_id}';""" parameters = self.database_client.general_query(code_sql) if parameters: parameters = parameters[0]['code_specific_inputs'] # Should never have a case where there are values in the code specific # inputs and none in the universal. if not isinstance(parameters, dict): raise DatasetDoesNotExistError( f'The {dataset_id} dataset does not appear to exist in the ' 'specified database. Ensure you are checking the correct ' 'database and have copied the dataset id correctly.') return parameters