import os
from os import PathLike
from os.path import abspath
from json import loads
from subprocess import check_output
import kimkit
import kim_edn
from typing import Union, Optional
from ..storage import Storage
from ..potential import Potential
from ..workflow import Workflow
from ..utils.exceptions import StrayFilesError, TestNotFoundError, \
KIMRunFlattenError
from ..utils.isinstance import isinstance_no_import
from . import TargetProperty
import tarfile
import numpy as np
from ..utils.restart import restarter
[docs]
class KIMRun(TargetProperty):
"""
Class for calculating properties using KIM Tests
"""
CODE_TO_DIR = {
'MO': 'models',
'MD': 'model-drivers',
'TE': 'tests',
'TD': 'test-drivers',
'SM': 'simulator-models',
}
[docs]
def __init__(
self,
container_manager: str = "podman",
image_path:
PathLike = "ghcr.io/openkim/developer-platform:v1.7.8-minimal",
**kwargs,
):
"""
Class for calculating properties using KIM Tests
:param container_manager: Whether to use singularity or podman.
:type container_manager: str
:param image_path:
For podman, this should point to a KIM Developer Platform (KDP)
image (https://github.com/openkim/developer-platform).
For all possible ways to specify this, see
https://docs.podman.io/en/latest/markdown/podman-run.1.html#image
By default, this points to a pinned version of the minimal KDP in
the GitHub Container Registry (grcr.io), which will automatically
be downloaded at runtime if needed.
For Singularity, this should be a path to a local Singularity
image file, built like this from the Docker image:
.. code-block:: bash
singularity build foo.sif \\
docker://ghcr.io/openkim/developer-platform:v1.7.8-minimal
:type image_path: PathLike
"""
if container_manager not in ("podman", "singularity"):
raise RuntimeError("Only 'singularity' or 'podman' supported")
self.container_manager = container_manager
self.image_path = image_path
self.outstanding_calc = None
super().__init__(**kwargs)
[docs]
def checkpoint_property(self):
"""
checkpoint the property module into the checkpoint file
save necessary internal variables into a dict with key checkpoint_name
and write to the (json) checkpoint file for restart capabilities
"""
save_dict = {
self.checkpoint_name: {
'outstanding_calc': self.outstanding_calc
}
}
restarter.write_checkpoint_file(self.checkpoint_file, save_dict)
[docs]
def restart_property(self):
"""
restart the property module from the checkpoint file
check if the checkpoint_file has an entry matching the checkpoint_name
and set internal variables accordingly if so
"""
restart_dict = restarter.read_checkpoint_file(
self.checkpoint_file,
self.checkpoint_name,
)
self.outstanding_calc = restart_dict.get('outstanding_calc')
[docs]
def calculate_property(self,
get_test_result_args: Union[list[dict], dict],
flatten: bool = False,
iter_num: int = 0,
potential: Optional[Union[str, Potential]] = None,
workflow: Optional[Workflow] = None,
storage: Optional[Storage] = None,
**kwargs) -> dict:
"""
Run `potential` against a list of KIM tests
:param get_test_result_args:
Inputs to a `get_test_result` KIM Query as a dictionary of keyword
arguments. The `model` argument should be omitted, as it will be
taken from the `potential` input.
See https://openkim.org/doc/usage/kim-query/#get_test_result for
info. This determines the output of this module. If a list of
dictionaries is passed, multiple query results will be returned.
:type get_test_result_args: Union[list[dict],dict]
:param flatten:
whether to flatten the (doubly+) nested list returned
:type flatten: bool
:param iter_num:
iteration number
:type iter_num: int
:param potential:
interatomic potential to use, specified as a Potential object
(must be able to write a KIM API-compliant model using
Potential._save_potential_to_kimkit()) or a string naming
a potential already installed in KIMKit.
:type potential: str or Potential
:param workflow:
the workflow for managing job submission
:type workflow: Workflow
:returns:
dictionary with None for the property_std, calculation id as the
calc_ids, and a list of query results for the
property_value. By default, property_value will be a doubly or
more nested list.
First index is over each dictionary in 'get_test_result_args'.
Second index is over the number of times the queried Test returned
the queried property (e.g. multiple surface energies)
Third index is over the keys requested within the property. The
values may be arrays of arbitrary dimension themselves.
If 'flatten' is set to true, this is all flattened into a
1-D numpy array of floats.
:rtype: dict
"""
if workflow is None:
workflow = self.default_wf
if not isinstance(get_test_result_args, list):
get_test_result_args = [get_test_result_args]
potential_is_temporary = False
# get potential from path, or name or module
if isinstance_no_import(potential, 'Potential'):
try:
potential_name = potential.save_potential_files()
potential_is_temporary = True
except kimkit.src.config.KimCodeAlreadyInUseError:
if not hasattr(potential, 'kim_id'):
raise RuntimeError(
"kimkit.src.config.KimCodeAlreadyInUseError "
"encountered, but the Potential object has no kim_id "
"attribute, don't know how to get it from KIMKit!")
potential_name = potential.kim_id
else:
potential_name = potential
if not kimkit.kimcodes.iskimid(potential_name):
raise kimkit.config.InvalidKIMCode(
'Potential name (passed as string, taken from '
'Potential.kim_id, or returned by '
'Potential._save_potential_to_kimkit()) is not a valid KIM '
'ID, which is required by KIMRun.')
test_list = []
for get_test_result_args_instance in get_test_result_args:
# This will automatically raise InvalidKIMCode if 'test'
# is not valid
test_kimid = get_test_result_args_instance['test'][0]
test_db_entry = kimkit.src.mongodb.find_item_by_kimcode(test_kimid)
if test_db_entry is None:
raise TestNotFoundError(
'Test ' + test_kimid + ' was not found in KIMkit.'
+ 'See TargetProperty documentation for '
+ 'instructions on importing Tests.')
test_list.append(test_db_entry['kimcode'])
sim_params = {
'test_list': test_list,
'image_path': self.image_path,
'potential': potential_name,
}
# The directory containing the simulations is named as follows:
# If `get_test_result_args` contained only one element,
# meaning only one Test will be run (dependencies may be
# run, but their results will not be queried for), then
# the first part of the directory name is the Test's short KIM ID.
# If multiple Test Results were requested, the first part
# states the number of tests that were requested. The second part
# is always the Short KIM ID of the potential as exported or loaded
# from KIMKit. For example:
# 2_tests_SM_631352869360_000
# TE_973027833948_004_MO_000000059789_000
if len(test_list) == 1:
test_description = kimkit.kimcodes.get_short_id(test_list[0])
else:
test_description = str(len(test_list)) + '_tests'
sim_path = test_description + '_' + \
kimkit.kimcodes.get_short_id(potential_name)
if self.outstanding_calc is None:
# run calculation
calc_id = self.conduct_sim(sim_params, workflow, sim_path)
self.outstanding_calc = calc_id
self.checkpoint_property()
else:
calc_id = self.outstanding_calc
# wait for completion
workflow.block_until_completed(calc_id)
work_path = workflow.get_job_path(calc_id)
# First, errors. Just a list of tests that errored
self.logger.info(
'LISTING OF ERRORS DIRECTORY: '
+ ', '.join(os.listdir(os.path.join(work_path, 'errors'))))
query_script = \
"from excerpts.mongodb import db\n" + \
"from excerpts.query_local.queryapi import get_test_result\n" + \
"import json\n" + \
"results=[]\n" + \
f"get_test_result_args = {get_test_result_args}\n" + \
"for get_test_result_args_instance in get_test_result_args:\n" + \
" results.append(get_test_result(db=db,model=[" + \
f"'{potential_name}'],**get_test_result_args_instance))\n" + \
"print(json.dumps(results))"
query_script_path = os.path.join(work_path, 'query.py')
with open(query_script_path, 'w') as f:
f.write(query_script)
query_command = \
f'{self._container_manager_preamble(work_path)} ' + \
f'--env PYTHONPATH=/pipeline/ {self.image_path} ' + \
f'python {query_script_path}'
query_stringified_output = check_output(query_command,
encoding='utf-8',
shell=True)
if not flatten:
property_value = loads(query_stringified_output)
else:
query_string_list_output = \
query_stringified_output.replace('[', ' ').\
replace(']', ' ').\
replace(',', ' ').\
split()
try:
property_value = np.asarray([
float(output_entry)
for output_entry in query_string_list_output
])
except ValueError:
raise KIMRunFlattenError(
"KIMRun failed to convert ouput data to floats for "
"flattening. It is likely that setting flatten=False "
"will produce a valid output.")
# return results
results_dict = {
"property_value": property_value,
"property_std": None,
"calc_ids": (calc_id, ),
"success": True,
}
if potential_is_temporary:
kimkit.models.delete(potential_name)
self.outstanding_calc = None
self.checkpoint_property()
return results_dict
[docs]
def conduct_sim(
self,
sim_params: dict,
workflow: Workflow,
sim_path: PathLike,
) -> int:
"""
Perform simulations for the target property calculations
:param sim_params: parameters that define the containerized
run, including ``test_list``, ``image_path`` and
``potential_name``
:type sim_params: dict
:param workflow: the workflow for managing job submission
:type workflow: Workflow
:param sim_path: path name to specify these calculations
:type sim_path: PathLike
:returns: calculation ID
:rtype: int
"""
model_name = sim_params.get('potential')
test_list = sim_params.get('test_list')
image_path = sim_params.get('image_path')
# The reason all the file creation has to happen inside conduct_sim
# is because I need to find the path using the command below
work_path = workflow.make_path(self.__class__.__name__, sim_path)
# convert path to absolute
work_path = abspath(work_path)
kdp_env_file_path = os.path.join(work_path, 'kimrun-kdp-env')
kim_api_config_path = os.path.join(work_path, '.kim-api/config')
with open(self._env_file_path(work_path), 'w') as f:
f.write(f"PIPELINE_ENVIRONMENT_FILE={kdp_env_file_path}\n"
+ f"KIM_API_CONFIGURATION_FILE={kim_api_config_path}")
with open(kdp_env_file_path, 'w') as f:
f.write("#!/bin/bash\n"
+ "#==============================================\n"
+ "# this file has a specific format since it is\n"
+ "# also read by python. only FOO=bar and\n"
+ "# the use of $BAZ can be substituted. comments\n"
+ "# start in column 0\n"
+ "#==============================================\n\n"
+ "PIPELINE_LOCAL_DEV=True\n"
+ f"LOCAL_REPOSITORY_PATH={work_path}\n"
+ f"LOCAL_DATABASE_PATH={work_path}/db")
kdp_directories = list(self.CODE_TO_DIR.values()) + \
['errors', '.kim-api', 'test-results']
# create KDP subdirectories
for dirname in kdp_directories:
dirpath = os.path.join(work_path, dirname)
os.mkdir(dirpath)
with open(kim_api_config_path, 'w') as f:
f.write(f"model-drivers-dir = {work_path}/md\n"
+ f"portable-models-dir = {work_path}/pm\n"
+ f"simulator-models-dir = {work_path}/sm\n")
with open(kim_api_config_path) as f:
for line in f:
if len(line) > 255:
raise RuntimeError(
'Path to the work directory is too long.\n'
'KIM API config file max line length = 255,\n'
'your path must be less than approx. 225 chars.')
test_run_order = self._get_items_from_kimkit([model_name] + test_list,
work_path)
self.logger.info(
f'RUNNING MODEL {model_name} WITH THE FOLLOWING TESTS: '
+ ', '.join([test for test in test_run_order]))
command = (
f"{self._container_manager_preamble(work_path)} {image_path} "
"bash -c 'unset SLURM_NODELIST && ")
command_lines = []
for test in test_run_order:
command_lines.append(f'pipeline-run-pair {test} {model_name}')
command += ' && '.join(command_lines)
command += "'"
calc_id = workflow.submit_job(command, work_path, {
'custom_preamble': '',
'nodes': 1,
'tasks': 1,
})
return calc_id
def _get_items_from_kimkit(
self,
items: Union[list[str], str],
work_dir_path: PathLike,
_test_run_order: Optional[list[str]] = None,
) -> list[str]:
"""
Get a list of items from kimkit and them into their appropriate
directory. If needed, recursively call for dependencies.
:param items: List of items to get from KIMKit
:type items: Union[list[str],str]
:param work_dir_path: path to directory containing tests,
test-drivers, etc. Equivalent to KDP Docker home directory
:type work_dir_path: PathLike
:param _test_run_order: helper argument for recursion,
to avoid running duplicate tests
:type _test_run_order: list[str]
:returns: List of Tests in an appropriate run order
(e.g. items always go after their dependencies)
:rtype list[str]:
"""
if _test_run_order is None:
_test_run_order = []
for filename in os.listdir(work_dir_path):
if filename.endswith('.txz'):
raise StrayFilesError("Work dir is not free of .txz files, "
"I can't work like this.")
if not isinstance(items, list):
items = [items]
test_run_order = []
for item in items:
items_processed = []
if item in _test_run_order or item in test_run_order:
self.logger.info('Test ' + item + ' already requested to run '
+ '(probably as a dependency), skipping...')
continue
kimkit.models.export(item, work_dir_path)
# this may get us multiple .txz archives
for filename in os.listdir(work_dir_path):
if filename.endswith('.txz'):
tarpath = os.path.join(work_dir_path, filename)
itemname, _ = os.path.splitext(filename)
destination_dir = \
os.path.join(work_dir_path,
self.CODE_TO_DIR[itemname.split('_')[-3]])
tar = tarfile.open(tarpath)
tar.extractall(destination_dir)
os.remove(tarpath)
items_processed.append(itemname)
self.logger.info("Placed these items into container work dir, "
+ "checking for dependencies... "
+ ", ".join(items_processed))
# OK, we've processed the item and its driver,
# now check if it's a test, in which case it might
# have dependencies which we also need to install
for itemname in items_processed:
if itemname.split('_')[-3] in 'TE':
deps_path = \
os.path.join(work_dir_path, 'tests',
itemname, 'dependencies.edn')
if os.path.exists(deps_path):
dep_shortcodes = kim_edn.load(deps_path)
for dep_shortcode in dep_shortcodes:
dep_db_entry = \
kimkit.src.mongodb.find_item_by_kimcode(
dep_shortcode
)
if dep_db_entry is None:
raise TestNotFoundError(
'Test ' + itemname + ' has dependency '
+ dep_shortcode
+ ', which could not be found in KIMkit.'
+ 'See TargetProperty documentation for '
+ 'instructions on importing Tests.')
else:
dep_kimcode = dep_db_entry['kimcode']
test_run_order += self._get_items_from_kimkit(
dep_kimcode, work_dir_path, test_run_order)
# if it is a test, append its
# name to test_run_order, but only after all
# dependencies have been gone through
test_run_order.append(itemname)
return test_run_order
def _env_file_path(self, work_path: PathLike) -> PathLike:
"""
get environment file path given the work path
:param work_path: work directory path
:type work_path: PathLike
:returns: Environment file path (to pass to ``--env-file``
in Singularity)
:rtype: PathLike
"""
return os.path.join(work_path, 'kimrun-env')
def _container_manager_preamble(self, work_path: PathLike) -> str:
"""
get universal command preamble for container manager
:param work_path: work directory path
:type work_path: PathLike
:returns: container manager preamble
:rtype: str
"""
# Mount path to itself inside podman, for maximum code sharing
# between podman and singularity
if self.container_manager == "podman":
return (
"unset XDG_RUNTIME_DIR && "
"podman run --rm -u=root --env-file "
f"{self._env_file_path(work_path)} -v {work_path}:{work_path}")
elif self.container_manager == "singularity":
return ("singularity exec --env-file "
f"{self._env_file_path(work_path)} -B {work_path}")
[docs]
def calculate_with_error(self,
n_calc,
modified_params=None,
potential=None,
workflow=None):
pass
[docs]
def save_configurations(self, calc_ids, dataset_handle, workflow, storage):
pass