from .storage_base import Storage
from ..utils.data_standard import (
ENERGY_KEY,
FORCES_KEY,
STRESS_KEY,
METADATA_KEY,
)
from ..utils.exceptions import (
DuplicateDatasetNameError,
UnidentifiedStorageError,
DatasetDoesNotExistError,
# UnknownKeyError,
UnsupportedComparisonError,
)
from ..utils.data_utils import inspect_configs
from colabfit.tools.database import DataManager
from colabfit.tools.configuration import AtomicConfiguration
from datetime import datetime
import json
import subprocess as sp
from os import system
import numpy as np
from ase import Atoms
from ase.data import chemical_symbols
from typing import Optional
[docs]
class ColabfitStorage(Storage):
"""
Manage data using Colabfit
Colabfit documentation can be found at:
https://colabfit.github.io/colabfit-tools/html/index.html.
:param storage_args: dictionary with initialization parameters, including
database_name, database_path, external_file, and credential_file.
database_path is the uri to the mongodb data server (required).
database_name is the name of the mongodb database client (required).
external_file is the explicit path to an lmdb file to handle
configurations larger than 20,000 atoms. This file will be generated by
Colabfit if it does not yet exist (optional).
credential_file is a path to a json file which contains the
database_path, database_name, and optionally external_file path. If a
credential_file is provided, its contents override any other arguments.
None of these parameters have default values.
:type storage_args: dict
"""
[docs]
def __init__(
self,
credential_file: Optional[str] = None,
database_path: Optional[str] = None,
database_name: Optional[str] = None,
database_port: Optional[str] = None,
database_user: Optional[str] = None,
database_password: Optional[str] = None,
external_file: Optional[str] = None,
**kwargs,
):
"""
:param credential_file: Path to a JSON file with the path, name,
port, user, password, and external_file keys. This is the
preferred method for initializing a storage module. No other keys
are needed if credential_file is set
:type credential_file: str
:param database_path: URI to the PostgreSQL data server
:type database_path: str
:param database_name: Name of the PostgreSQL database client
:type database_name: str
:param database_port: Port for the PostgreSQL server
:type database_port: str
:param database_user: Username for the PostgreSQL server
:type database_user: str
:param database_password: Password for the PostgreSQL server
:type database_password: str
:param external_file: Path to an LMDB file for large configurations
:type external_file: str
"""
super().__init__(**kwargs)
self.STORAGE_ID_KEY = 'co-id'
self.credential_file = credential_file
self.database_path = database_path
self.database_name = database_name
self.database_port = database_port
self.database_user = database_user
self.database_password = database_password
self.external_file = external_file
# Override values with credential_file if provided
if self.credential_file is not None:
with open(self.credential_file, 'r') as fin:
file_content = json.load(fin)
self.database_path = file_content.get('database_path',
self.database_path)
self.database_name = file_content.get('database_name',
self.database_name)
self.external_file = file_content.get('external_file',
self.external_file)
self.database_port = file_content.get('database_port',
self.database_port)
self.database_user = file_content.get('database_user',
self.database_user)
self.database_password = file_content.get('database_password',
self.database_password)
if self.database_path is None:
raise ValueError(
'Database path must be specified for ColabfitStorage')
if self.database_name is None:
raise ValueError(
'Database name must be specified for ColabfitStorage')
self.storage_init_args = {
"database_path": self.database_path,
"database_name": self.database_name,
"database_port": self.database_port,
"database_user": self.database_user,
"database_password": self.database_password,
"external_file": self.external_file,
}
# TODO: Add support for external file
self.database_client = DataManager(
dbname=self.database_name,
user=self.database_user,
password=self.database_password,
host=self.database_path,
port=self.database_port,
)
# TODO: Currently only works when credential file is passed
self.query_string = (f'colabfit query -c {self.credential_file} ')
user = sp.run(
'whoami',
capture_output=True,
shell=True,
encoding='UTF-8',
).stdout.strip()
self.default_author = f'{user} via ColabfitStorage'
self.property_map = None # should be set before adding data
self.default_parameters = {}
[docs]
def check_if_dataset_name_unique(self, dataset_name: str) -> bool:
"""
check if the provided dataset_name is unique in the database
:param dataset_name: name to check (human readable)
:type dataset_name: str
:returns: true if the database is not present in the database, false if
it does exist
:rtype: boolean
"""
query_out = sp.run(
f'{self.query_string} -t {dataset_name}',
capture_output=True,
shell=True,
encoding='UTF-8',
)
query = query_out.stdout.split()
# if len(query) == 0:
# no stdout
# self.logger.info(
# f'Problem with colabfit query: {query_out.stderr}')
# raise UnidentifiedStorageError
name_indices = [i + 1 for i, x in enumerate(query) if x == "'name':"]
count = 0
for ni in name_indices:
if query[ni].strip(",'") == dataset_name:
count += 1
if count == 0:
self.logger.info(f'Dataset {dataset_name} not found in storage')
return True
else:
self.logger.info(f'Dataset {dataset_name} found in storage with '
f'({count} instances found)')
return False
[docs]
def add_data(
self,
dataset_handle: str,
data: list[Atoms],
dataset_metadata: Optional[dict] = None,
updated_description: Optional[str] = None,
updated_authors: Optional[list[str]] = None,
) -> str:
"""
Add new configurations (and associated properties) to the db.
This method is used to add to an existing dataset with new
configurations. update_data' can serve the
same role (along with others) but requires all data
(new and existing) to be passed in as an argument.
Assumes property format (property_map)is the same as the
original dataset.
:param dataset_handle: name or ID of dataset
:param data: list of ASE.Atoms objects containing the configurations
and associated properties to add to the database. Note that
configuration-specific metadata should be stored under the
`atoms.info[METADATA_KEY]` field.
:param dataset_metadata: A dictionary of metadata specific to the
dataset as a whole. This function needs to have 'parameters'
provided which consists or 'universal' and 'code' nested
dictionaries.
:param updated_description: If not None, will also update the dataset
description
:param updated_authors: If not None, will also update the dataset
authors
:returns: handle for the dataset which includes the new additions
"""
# check if dataset_handle is a short name or ID, if name, get most
# recent ID
dataset_id = self._get_id_from_name(dataset_handle)
# get existing data
existing_data = self.get_data(
dataset_id,
rename_properties=True, # assume new data has same map as old
return_dataset_info=False)
existing_property_map = self.get_dataset_property_map(dataset_id)
len_new_data = len(data)
data.extend(existing_data)
parameters = {}
keys = ['energy', 'atomic-forces', 'cauchy-stress']
if any(key in existing_property_map.keys() for key in keys):
parameters = dataset_metadata.get('parameters', None)
if not parameters:
raise ValueError(
"Must provided 'parameters' in the 'dataset_metadata' "
"variable.")
new_handle = self.database_client.update_dataset_pg_no_spark(
data,
dataset_id,
parameters=parameters,
prop_map=existing_property_map,
strict=True,
description=updated_description,
authors=updated_authors,
)
self.logger.info(f'Added {len_new_data} configs to {dataset_id}, new'
f' ID: {new_handle}')
return new_handle
[docs]
def new_dataset(
self,
dataset_name: str,
data: list[Atoms],
dataset_metadata: Optional[dict] = None,
strict: bool = True,
) -> str:
"""
Create a new dataset with the provided data and metadata
The new dataset will have a human readable name specificed by
dataset_name and will ingest the data and metadata provided.
:param dataset_name: name of the dataset to be created
:type dataset_name: str
:param data: list of ASE.Atoms objects containing the configurations
and associated properties to add to the database. Note that
configuration-specific metadata should be stored under the
`atoms.info[METADATA_KEY]` field.
:type data: list
:param dataset_metadata: A dictionary of metadata specific to the
dataset as a whole. Current options are authors (str), description
(str), and parameters (dict) which consists of two nested
dictionaries named 'universal' and 'code' for the universal input
parameter names and the code specific dictionaries.
:type dataset_metadata: dict
:param strict: If strict, ingested data must all contain the properties
specified in the property map. |default| ``True``
:type strict: bool
:returns: unique handle for the dataset
:rtype: str
"""
# check if dataset_handle (i.e. name) alrady exists
if not self.check_if_dataset_name_unique(dataset_name):
existing_ds = self._get_id_from_name(dataset_name)
self.logger.info(f'{dataset_name} already exists in the database'
f' as {existing_ds}, cannot create new dataset '
'with this name!')
raise DuplicateDatasetNameError(
f'{dataset_name} exists as {existing_ds}')
self.check_example_config(data[0]) # sanity check
# check property_map
if self.property_map is None:
raise Exception('''Property map must not None.
Please call set_default_property_map or
set_property_map first''')
current_date = datetime.today().strftime('%Y-%m-%d')
if not isinstance(dataset_metadata, dict):
self.logger.info('dataset_metadata is not a dict, setting to {}')
dataset_metadata = {}
description = dataset_metadata.get(
'description',
f'Inserted by Orchestrator on {current_date}',
)
authors = dataset_metadata.get('authors', self.default_author)
# If any of the following keys are in property_map then we require
# that there are code and universal parameters.
parameters = {}
keys = ['energy', 'atomic-forces', 'cauchy-stress']
if any(key in self.property_map.keys() for key in keys):
parameters = dataset_metadata.get('parameters',
self.default_parameters)
pkeys = parameters.keys()
if 'code' not in pkeys or 'universal' not in pkeys:
raise ValueError(
'Must specify both the universal and code specific input '
'parameters.')
try:
new_dataset_handle = (
self.database_client.insert_data_and_create_datset(
data,
name=dataset_name,
authors=authors,
description=description,
prop_map=self.property_map,
parameters=parameters,
strict=strict,
# TODO: Support fork=True,
))
old_ds = None # insert code here
self.logger.info(f'Forking dataset from {old_ds}')
except UnboundLocalError:
new_dataset_handle = (
self.database_client.insert_data_and_create_datset(
data,
name=dataset_name,
authors=authors,
description=description,
prop_map=self.property_map,
parameters=parameters,
))
self.logger.info(f'Created dataset {dataset_name} with {len(data)} '
f' configs, ID: {new_dataset_handle}')
return new_dataset_handle
def _get_id_from_name(self, dataset_name: str) -> str:
"""
Finds the most recent dataset ID to be associated with a database name
Take a dataset name and return the ID. If an ID is given, it is
returned itself, allowing this method to also yield the "correct" ID to
use.
:param dataset_name: name of the dataset (human readable)
:type dataset_name: str
:returns: colabfit-id of the most recent dataset with name dataset_name
:rtype: str
"""
if dataset_name[:3] == 'DS_':
dataset_id = dataset_name
else:
query_out = sp.run(
f'{self.query_string} -t {dataset_name}',
capture_output=True,
shell=True,
encoding='UTF-8',
)
query = query_out.stdout.split()
if len(query) == 0:
# no stdout
self.logger.info(
f'Problem with colabfit query: {query_out.stderr}')
raise UnidentifiedStorageError
self.logger.info(
f'Found {query[1]} instance(s) of this dataset name!')
id_indexes = [
i + 1 for i, x in enumerate(query) if x == "{'colabfit-id':"
]
if len(id_indexes) == 0:
# no database with this name
self.logger.info(
f'Could not find any databases with name: {dataset_name}')
raise DatasetDoesNotExistError(
f'No dataset with name "{dataset_name}"')
else:
# get the colabfit IDs from the output
ids = [query[i].strip(",'") for i in id_indexes]
# extract their version numbers
versions = [int(id.split('_')[-1]) for id in ids]
max_version = versions.index(max(versions))
# and the base indices
bases = [id.split('_')[1] for id in ids]
if len(set(bases)) > 1:
self.logger.info(
'Warning: multiple datasets use the same name!')
latest_dataset = ids[max_version]
self.logger.info(f'{latest_dataset} is the latest version of '
f'{dataset_name} found in storage')
dataset_id = latest_dataset
return dataset_id
[docs]
def get_data(
self,
dataset_handle: str,
query_options: Optional[dict] = None,
inspect: Optional[bool] = False,
rename_properties: Optional[bool] = False,
return_dataset_info: Optional[bool] = False,
) -> list[Atoms]:
"""
Extract data from storage
Return the dataset specified by dataset_handle as a list of ASE Atoms.
Further options for parameterizing the extraction can be provided by
the query_options dictionary.
:param dataset_handle: ID of dataset
:type dataset_handle: str
:param query_options: dict of options for data extraction and return
|default| ``None``
:type query_options: dict
:param inspect: whether to inspect data and print summary
:type inspect: bool
:param rename_properties: whether to rename properties based upon
previous dataset's property map. Useful to keep consistent naming
when adding data to dataset
:type inspect: bool
:param return_dataset_info: whether to return dataset info such
as name, authors, etc in addition to data
:type inspect: bool
:returns: requested data as a list of ASE.Atoms objects and
dataset info if return_dataset_info is True
:rtype: list or list and dict
"""
if query_options is None:
query_options = {}
else:
self.logger.info('Query options are not currently supported')
property_objects = self.database_client.get_dataset_data(
dataset_handle)
if not property_objects:
# TODO: this should search the datasets collection instead
raise DatasetDoesNotExistError(f"{dataset_handle} not found")
configs = [self._colabfit_po_to_ase(po) for po in property_objects]
if inspect:
inspect_configs(configs)
# get associated property_map from old dataset and rename properties
# according to it
if rename_properties:
old_prop_map = self.get_dataset_property_map(dataset_handle)
update_names = {}
for k, v in old_prop_map.items():
# if k != '_metadata':
if 1:
if isinstance(v, dict):
v = [v]
for i in v:
for k2, v2 in i.items():
if "field" in v2:
if k == '_metadata':
name = 'metadata'
else:
name = (f"{k.replace('-', '_')}"
f"_{k2.replace('-', '_')}")
update_names[name] = v2['field']
for c in configs:
for k, v in update_names.items():
if k != v:
if k in c.info:
c.info[v] = c.info[k]
c.info.pop(k)
elif k in c.arrays:
c.arrays[v] = c.arrays[k]
c.arrays.pop(k)
else:
# warning mess up tests so print
raise Warning(
(f"Configuration doesn't have {k} in its "
"info or arrays dict"))
if return_dataset_info:
ds = self.database_client.get_dataset_pg(dataset_handle)
ds_info = {}
for key in [
'id', 'name', 'authors', 'description', 'uploader',
'property_map'
]:
ds_info[key] = ds[0][key]
return configs, ds_info
return configs
def _colabfit_po_to_ase(self, config: dict) -> Atoms:
"""
:param config: a dictionary returned from ColabFit containing
information about the atomic configuration and computed properties
:type config: dict
:returns: an ASE.Atoms object containing all of the properties from the
ColabFit data object
:rtype: ASE.Atoms
"""
cell = np.asarray(config['cell'])
species = [chemical_symbols[int(i)] for i in config["atomic_numbers"]]
coords = np.asarray(config['positions'])
pbc = config['pbc']
atoms = Atoms(symbols=species, cell=cell, positions=coords, pbc=pbc)
# add metadata
atoms.info['po-id'] = config['id']
atoms.info['co-id'] = config['configuration_id']
atoms.info['ds-id'] = config['dataset_id']
if config['metadata'] is not None:
atoms.info[METADATA_KEY] = json.loads(config['metadata'])
schema = self.database_client.get_table_schema('property_objects')
# walk through schema
for s in schema:
if s[0] not in [
'id',
'hash',
'dataset_id',
'configuration_id',
'last_modified',
'multiplicity',
'metadata',
]:
if s[1] == 'ARRAY' and s[0] != 'cauchy_stress_stress':
if config[s[0]] is not None:
# check whether to put in info or arrays
if len(atoms) == np.array(config[s[0]]).shape[0]:
atoms.arrays[s[0]] = np.array(config[s[0]])
else:
atoms.info[s[0]] = config[s[0]]
else:
if config[s[0]] is not None:
atoms.info[s[0]] = config[s[0]]
return atoms
[docs]
def update_data(
self,
dataset_handle: str,
data: list[Atoms],
parameters: dict = None,
property_map: Optional[dict] = None,
use_orig_property_map: bool = True,
new_properties: Optional[dict] = None,
strict: bool = True,
updated_description: Optional[str] = None,
updated_authors: Optional[list[str]] = None,
) -> str:
"""
Update an existing dataset - adding new properties to configurations
This method operates on existing configurations and/or properties. Data
is a list of ASE Atoms objects. NOTE: This should include all data
that is to be associated with datasets. Call get_data if you
want old data and potentially new data to be in dataset. The property
map is automatically pulled from the original dataset. If this isn't
wanted set use_orig_property_map=False and specify
property_map which should include mappings for all data to add.
dataset_handle specifies the dataset where these data should
be updated and should be the dataset ID, (DS_XXXXXX).
:param dataset_handle: ID of dataset
:param data: list of ase.Atoms which include the new data to add
:param parameters: The 'universal' and 'code' specific parameters from
the simulations. These should be the same as the parameters
in the database.
:param use_orig_property_map: whether or not to use the dataset's
original property map. Useful when get_data(rename_properties=True)
has been used. If False self.property_map is used instead.
:param new_properties: These properties will be added to the
property_map via add_property_mapping
:param strict: If strict, ingested data must all contain the properties
specified in the property map. |default| ``True``
:param updated_description: If not None, will also update the dataset
description
:param updated_authors: If not None, will also update the dataset
authors
:returns: updated handle for the dataset
"""
if use_orig_property_map:
assert property_map is None, '''Only one of property_map and
use_orig_property_map should not be None'''
property_map = self.get_dataset_property_map(dataset_handle)
if property_map is None:
raise Exception('''Property map must not be None.
Please set use_orig_property_map=True
or explicitly set property_map in the arguments
depending on the situation.''')
if new_properties is not None:
self.property_map = property_map
for k, v in new_properties.items():
property_map = self.add_property_mapping(k, v)
if parameters is None:
parameters = {}
# If any of the following keys are in property_map then we require
# that there are code and universal parameters.
keys = ['energy', 'atomic-forces', 'cauchy-stress']
if any(key in property_map.keys() for key in keys):
pkeys = parameters.keys()
if 'code' not in pkeys or 'universal' not in pkeys:
raise ValueError(
'Must specify both the universal and code specific input '
'parameters.')
new_dataset_handle = self.database_client.update_dataset_pg_no_spark(
data,
dataset_handle,
parameters,
property_map,
strict=strict,
description=updated_description,
authors=updated_authors,
)
return new_dataset_handle
[docs]
def list_data(
self,
dataset_handle: Optional[str] = None,
text: Optional[str] = None,
properties: Optional[str] = None,
elements: Optional[str] = None,
elements_exact: Optional[bool] = False,
):
"""
Utility function to query the database
Prints an overview of the database contents if no dataset_handle is
provided, otherwise provides information about the specific dataset
contents. Currently only dataset_handles which reference the dataset
name (not the colabfit ID) will work for showing the selective query
result.
:param dataset_handle: name of the dataset |default| ``None``
:type dataset_handle: str
:param text: text to search for within the dataset. This can be
authors, descriptions, uploader. |default| ``None``
:type test: str
:param properties: name of properties to search for. Multiple should
be included as "energy atomic-forces" |default| ``None``
:type properties: str
:param elements: elements to search for. Multiple should be included as
"C H". Will return datsets containing these plus other elements.
See elements_exact |default| ``None``
:type elements: str
:param elements_exact: whether to restrict element search to return
datasets containing only specified elements |default| ``False``
:type elements_exact: bool
"""
colabfit_query_installed = system(
'which colabfit 1> /dev/null 2> /dev/null')
if dataset_handle is not None and text is not None:
raise Exception(
"Only one of dataset_handle and text should be used.")
query = ""
if text is not None:
query += f"-t '{text}' "
if properties is not None:
query += f"-p '{properties}' "
if elements is not None:
if elements_exact:
query += f"-ee '{elements}'"
else:
query += f"-e '{elements}'"
if colabfit_query_installed == 0:
if dataset_handle is None:
system(f'{self.query_string} {query}')
else:
system(f'{self.query_string} -t "{dataset_handle}" {query}')
else:
self.logger.info('Error: cfkit-cli must be installed to list data')
[docs]
def delete_dataset(
self,
dataset_handle: str,
delete_children: Optional[bool] = True,
):
"""
Remove the dataset specified by dataset_handle from the database
:param dataset_handle: ID of dataset
:type dataset_handle: str
:param delete_cildren: if true will also delete all POs
and COs (not associated with another DS)
:type dataset_handle: bool
"""
self.database_client.delete_dataset(dataset_handle, delete_children)
self.logger.info(f'Deleted dataset {dataset_handle} from storage')
[docs]
def delete_items(self, item_ids_list: list[str]):
"""
Remove the COs and/or POs specified by item_ids_list from the database
"""
self.database_client.delete_items(item_ids_list)
self.logger.info(f'Deleted {len(item_ids_list)} POs/Cos from storage')
# TODO: Support in next update
[docs]
def dataset_intersection_and_differences(self, dataset1, dataset2, mode):
"""
returns the intersection or differences between two datasets
behavior is controlled by the mode variable, which can be set to
'intersection' or 'difference'. The corresponding results will be
returned. If 'difference' is chosen, the returned ASE Atoms list
contains all configurations IN dataset1 but NOT IN dataset2.
:param dataset1: name of the first dataset to compare
:type dataset1: str
:param dataset2: name of the second dataset to compare
:type dataset2: str
:param mode: switch for if the intersection or difference is returned
:type mode: str
:returns: a list of ASE Atoms of the shared configurations
:rtype: list
"""
ds_id1 = self._get_id_from_name(dataset1)
ds_id2 = self._get_id_from_name(dataset2)
comparison = self.database_client.compare_datasets(ds_id1, ds_id2)
if mode == 'intersection':
key = 'DS1 & DS2'
elif mode == 'difference':
key = 'DS1 - DS2'
else:
raise UnsupportedComparisonError(f'{mode} not supported!')
selected_dos = comparison[key]
# get Dataset from DOs
return selected_dos
[docs]
def define_new_properties(self, property_list: list[dict]):
"""
Define new properties to add to the database
New properties only need to be defined once for the database.
:param property_list: List of dictionaries containing properties
to be stored in a client
:type property_dict: dict
"""
if not isinstance(property_list, list):
property_list = [property_list]
for prop in property_list:
if not isinstance(prop, dict):
raise TypeError(
f'''Each entry of property_list should be a dict
but {prop} is a {type(prop)}''')
self.database_client.insert_property_definition(prop)
[docs]
def set_property_map(
self,
keys: Optional[dict] = None,
file_example: Optional[str] = None,
) -> dict:
"""
Set the mapping between input properties and colabfit representation
Definition of a set of basic properties to be stored in a Colabfit
database. This will be used to map input data to the articulated
properties which are stored in the Colabfit database. The property_map
is used when inserting data into the database. A default property map
is defined, but can be overwritten by setting ``self.property_map`` to
the output of this function with specified keys/examples.
:param keys: dictionary defining the mapping between ingested
properties and their internal database representation. Keys can
include 'energy_field', 'force_field', and 'stress_field', with the
values corresponding to how that property is demarcated in the
input. Additional keys can be included but must include their
full mapping. |default| ``None``
:type keys: dict
:param file_example: path to a file with a header representing the
property tags, from which possible energy, force, and stress
mappings (defined by the options in this method) are extracted
|default| ``None``
:type file_example: str
:returns: dictionary with all properties used in a dataset
:type property_map: dict
"""
# TODO: Should we support this. Not sure exactly what it does
energy_options = [
'energy',
'Energy',
'ENERGY',
'energies',
'Energies',
'ENERGIES',
]
force_options = [
'force',
'Force',
'FORCE',
'forces',
'Forces',
'FORCES',
]
stress_options = [
'stress',
'Stress',
'STRESS',
'stresses',
'Stresses',
'STRESSES',
]
# removed defaults
energy_field = None
force_field = None
stress_field = None
if keys is not None:
energy_field = keys.get('energy_field', None)
force_field = keys.get('force_field', None)
stress_field = keys.get('stress_field', None)
if file_example is not None:
with open(file_example, 'r') as fin:
_ = fin.readline() # first line is # atoms
file_header = fin.readline() # second line is header
energy_field = self._check_header_for_fields(
energy_field,
energy_options,
file_header,
)
force_field = self._check_header_for_fields(
force_field,
force_options,
file_header,
)
stress_field = self._check_header_for_fields(
stress_field,
stress_options,
file_header,
)
property_map = {}
if energy_field is not None:
property_map['energy'] = [{
'energy': {
'field': energy_field,
'units': 'eV'
},
'per-atom': {
'value': False,
'units': None
},
}]
if force_field is not None:
property_map['atomic-forces'] = [{
'forces': {
'field': force_field,
'units': 'eV/A'
},
}]
if stress_field is not None:
property_map['cauchy-stress'] = [{
'stress': {
'field': stress_field,
'units': 'eV/A^3'
},
"volume-normalized": {
"value": False,
"units": None
},
}]
self.property_map = property_map
# TODO: Support same behavior if file is used instead
# add any additional property mappings that have been provided
if keys is not None:
for k, v in keys.items():
if k not in ['energy_field', 'force_field', 'stress_field']:
self.logger.info(f'Adding {k} to property map.')
self.add_property_mapping(k, v)
return self.property_map
[docs]
def set_default_property_map(self) -> dict:
"""
Set the default mapping between input properties
and colabfit representation. Includes energy,
atomic-forces, and cauchy-stress.
"""
_ = self.set_property_map({
'energy_field': ENERGY_KEY,
'force_field': FORCES_KEY,
'stress_field': STRESS_KEY,
METADATA_KEY: {
'metadata': {
'field': METADATA_KEY
}
},
})
return self.property_map
def _check_header_for_fields(
self,
default_field_value: str,
field_options: list[str],
header_line: str,
) -> str:
"""
helper function to match possible field values with those from a file
:param default_field_value: field value to return if none of the
options can be found
:type default_field_value: str
:param field_options: list of possible options to search for in the
header_line
:type field_options: list
:param header_line: line from a file which should contain the potential
fields to search for.
:type header_line: str
:returns: the field of provided options which appears in the supplied
header_line
:rtype: str
"""
for option in field_options:
if option in header_line:
self.logger.info((f'Found field for "{default_field_value}" in'
f' header, setting map to {option}'))
return option
self.logger.info((f'Did not find field for "{default_field_value}" in '
f'header, setting map to {default_field_value}'))
return default_field_value
[docs]
def check_example_config(self, example_config: Atoms):
# Sanity check; does not affect code behavior
for property_name in self.property_map: # potential-energy
if property_name not in ["_metadata"]:
for key in self.property_map[property_name][0]: # [energy]
if 'field' in self.property_map[property_name][0][key]:
sk = self.property_map[property_name][0][key]['field']
if sk not in example_config.info:
if sk not in example_config.arrays:
self.logger.info(
f'Key "{sk}" not found on example config. '
'Did you update property_map before '
'trying to save data?')
[docs]
def add_property_mapping(
self,
new_property_name: str,
new_map: dict,
overwrite: Optional[bool] = False,
) -> dict:
"""
add a new property to the property entry into the internal property map
Example usage::
storage.add_property_map(
'new_property_name',
{
'key_1': {'field': 'key_1_for_ASE', 'units': None},
'key_2': {'field': 'key_2_for_ASE', 'units': None},
}
)
:param new_property_name: name of property mapping being added
:type new_property_name: str
:param new_map: the colabfit-style property mapping. A dictionary
specifying the ``'field'`` which will be used to load the data off
of an ASE atoms object (from the ``.info`` or ``.arrays``
dictionaries), and the units. Note that colabfit expects `new_map`
to actually be a list; this function will wrap ``new_map`` in a
list if it is not already one.
:type new_map: dict or list
:param overwrite: True allows existing maps with the same name to be
overwritten. Default is False.
:type overwrite: bool
:returns: updated property_map
:rtype: dict
"""
# kim-property expects "-" not _ but bypass if _metadata
if new_property_name != '_metadata':
new_property_name = new_property_name.replace('_', '-')
if (new_property_name in self.property_map) and not overwrite:
self.logger.info(
f'A mapping for the property "{new_property_name}" already '
'exists. Use ``overwrite=True`` to overwrite existing maps')
if isinstance(new_map, list):
self.property_map[new_property_name] = new_map
elif isinstance(new_map, dict):
self.property_map[new_property_name] = [new_map]
else:
raise RuntimeError(f'Invalid data type {type(new_map)} for new '
'mapping "{new_property_name}"')
return self.property_map
[docs]
def get_dataset_property_map(self, dataset_id: str) -> dict:
"""
Given a dataset_id will return the property_map that was used
to ingest that dataset.
:param dataset_id: ID of dataset
:type dataset_id: str
:returns: dictionary with all properties used in a dataset
:rtype property_map: dict
"""
property_map = self.database_client.get_dataset_property_map(
dataset_id)
return property_map
[docs]
def get_dataset_name_from_id(self, dataset_id: str) -> str:
"""
Given a dataset_id will return the dataset's name
:param dataset_id: ID of dataset
:type dataset_id: str
:returns: name of the dataset
:rtype dataset_name: str
"""
dataset_name = self.database_client.get_dataset_name_from_id(
dataset_id)
return dataset_name
[docs]
def get_property_definitions(self) -> list:
"""
:returns: all properties currently in database
:rtype: list
"""
property_definitions = self.database_client.get_property_definitions()
return property_definitions
[docs]
def update_property_definition(self, prop_def: str, new_keys: dict):
"""
Updates an existing property definition with new keys
Only keys that are not currently a part of the definition should be
add in new_keys. Populates existing entries with provided default value
Form of new_keys should be similar to::
{'energy': {
'type': 'float',
'has-unit': True,
'extent': [],
'required': True,
'description': 'The potential energy of the system.',
'default-value': None
}}
The default default-value is NULL.
:param prop_def: name of definition to update
:type prop_def: str
:param new_keys: dict containing new keys to add with default values to
populate existing entries
:type prop_def: dict
"""
# get property definition
property_dict = None
definitions = self.get_property_definitions()
prop_def = prop_def.replace('_', '-')
for d in definitions:
if d.get('property-name') == prop_def:
property_dict = d
original_dict = d.copy()
if property_dict is None:
raise Exception(f'''Property with provided name
{prop_def} not found.''')
# add keys to definition
for k, v in new_keys.items():
default_value = v.pop('default-value', "NULL")
property_dict[k] = v
# update PO tables
column_name = property_dict['property-name'].replace('-', '_') \
+ f'_{k}'.replace('-', '_')
if v['type'] == 'float':
data_type = "DOUBLE PRECISION"
elif v['type'] == 'int':
data_type = "INT"
elif v['type'] == 'bool':
data_type = "BOOL"
else:
data_type = "VARCHAR (10000)"
for i in range(len(v['extent'])):
data_type += '[]'
self.database_client.insert_new_column('property_objects',
column_name,
data_type,
default=default_value)
# update PD in DB
print(f'Original definition: {original_dict}')
print(f'Updated definition: {property_dict}')
sql = f'''UPDATE property_definitions
SET definition = '{json.dumps(property_dict)}'
WHERE definition = '{json.dumps(original_dict)}';
'''
self.database_client.general_query(sql)
[docs]
def setup_tables(self) -> None:
"""
Builds all necessary PostgreSQL tables.
For use with newly created databases.
Won't affect existing databases if called.
Also add energy, forces, and stress props
"""
self.database_client.create_pg_tables()
from colabfit.tools.property_definitions import (energy_pd,
atomic_forces_pd,
cauchy_stress_pd)
self.define_new_properties(
[energy_pd, atomic_forces_pd, cauchy_stress_pd])
[docs]
@staticmethod
def sort_configurations(configs: list[Atoms]) -> list[Atoms]:
"""
Given a list of Atoms will return a sorted version
based upon what the CO-id would be.
Useful for sorting configs to be in the same order
as returned configurations from get_data.
:param configs: list of configurations
:type config: list(Atoms)
:returns: sorted configs
:rtype: list(Atoms)
"""
ac = [AtomicConfiguration.from_ase(i) for i in configs]
sorted_indices = sorted(range(len(ac)),
key=lambda i: f'CO_{ac[i]._hash}')
sorted_configs = [configs[i] for i in sorted_indices]
return sorted_configs