Source code for gnrs.workflow
"""
This module provides the workflow orchestration for Genarris.
This source code is licensed under the BSD-3-Clause license found in the
LICENSE file in the root directory of this source tree.
"""
from __future__ import annotations
__author__ = ["Yi Yang", "Rithwik Tom"]
__email__ = "yiy5@andrew.cmu.edu"
__group__ = "https://www.noamarom.com/"
import os
import time
import logging
from mpi4py import MPI
from gnrs.core import folders
from gnrs.core.logging import GenarrisLogger
from gnrs.core.registry import resolve_task
import gnrs.output as gout
from gnrs.parallel import init_parallel
from gnrs.parser import UserSettingsParser, UserSettingsSanityChecker
from gnrs.parallel.test import test_bcast
from gnrs.restart import restart_init, is_task_completed, load_restart, write_restart
from gnrs.gnrsutil.core import check_if_exp_found
import argparse
[docs]
class Genarris:
"""Defines the flow of control in Genarris for crystal structure generation and optimization."""
[docs]
def __init__(self, args: argparse.Namespace) -> None:
"""
Initialize Genarris.
"""
self.config = {}
self.gnrs_info = {}
self.seed = args.seed
self.restart = args.restart
self._mpi_init()
self._log_init()
self._output_init()
self._parallel_init(seed=self.seed)
self._gnrs_info_init()
self._config_init(args)
restart_init(self.comm, self.config, self.gnrs_info)
if not self.restart:
self._folders_init()
else:
self.attempt_restart()
self.comm.barrier()
self.logger.info("Genarris initialized successfully")
[docs]
def run(self) -> None:
"""
Execute Genarris with the configured tasks.
"""
self.logger.info("Starting Genarris Tasks")
tasks = self.config.get("workflow", {}).get("tasks", [])
self._run_tasks(tasks)
def _log_init(self) -> None:
"""
Initialize logger with MPI communicator.
"""
self.Genlogger = GenarrisLogger(self.comm)
self.logger = logging.getLogger("genarris")
def _mpi_init(self) -> None:
"""
Initialize MPI.
"""
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.size = self.comm.Get_size()
self.is_master = self.rank == 0
def _parallel_init(self, seed: int = 42) -> None:
"""
Initialize parallel processing environment.
"""
init_parallel(self.comm, seed=seed)
def _output_init(self) -> None:
"""
Initialize output system and display welcome message.
"""
gout.init_output(self.comm)
gout.welcome_message()
def _config_init(self, args: argparse.Namespace) -> None:
"""
Parse configuration settings from input file.
"""
self.config_path = os.path.abspath(args.config)
self.gnrs_info["config_path"] = self.config_path
# Parse Config file
gout.print_title("Parsing User Config File")
gout.emit(f"Reading {self.config_path}.")
# parse config file
if self.is_master:
parser = UserSettingsParser(self.config_path)
config = parser.load_config()
# Update log level
new_level = config["master"]["log_level"]
self.Genlogger.reset_loglevel(new_level)
UserSettingsSanityChecker(config)
self.config.update(config)
# Broadcast settings to all processes
self.config = self.comm.bcast(self.config, root=0)
gout.print_configs(self.config)
def _gnrs_info_init(self) -> None:
"""
Initialize Genarris information with paths and execution metadata.
"""
self.logger.info("Setting runtime values")
# Set working directories
self.work_dir = os.getcwd()
self.gnrs_info["work_dir"] = self.work_dir
self.gnrs_info["struct_dir"] = os.path.join(self.work_dir, "structures")
self.gnrs_info["tmp_dir"] = os.path.join(self.work_dir, "tmp")
# Initialize data containers
self.gnrs_info["energy_list"] = []
self.gnrs_info["genarris_start_time"] = time.time()
self.gnrs_info["size"] = self.size
[docs]
def attempt_restart(self) -> None:
"""
Load restart data if restart flag is set
"""
load_restart()
gout.print_title("Restarting Genarris")
gout.print_configs(self.config)
gout.double_separator()
def _folders_init(self) -> None:
"""
Initialize folder structure for execution.
Creates tmp and structures directories and copies molecule data
when not in restart mode.
"""
folders.init_folders(self.is_master)
if not self.restart:
self.logger.info("Setting up folders: structures and tmp")
folders.setup_main_folders(self.gnrs_info)
folders.copy_molecule(self.config, self.gnrs_info)
def _run_tasks(self, tasks: list) -> None:
"""
Run specific tasks in config file
Args:
tasks: List of task names to execute
"""
self.logger.info(f"Running configured tasks: {tasks}")
gout.emit(f"Executing {len(tasks)} configured tasks")
for task in tasks:
try:
cls, extra_args = resolve_task(task)
except ValueError:
self.logger.error(f"Unknown task: {task}.")
gout.emit(f"Error: Unknown task: {task}. Skipping.")
continue
if not is_task_completed(task):
gout.emit(f"Running task: {task}")
cls(self.comm, self.config, self.gnrs_info, *extra_args).run()
write_restart()
test_bcast()
check_if_exp_found(self.config, self.gnrs_info)
else:
self.logger.info(f"{task} task was completed before restart")
gout.skip_task(task)