Source code for gnrs.core.task

"""
Core task class for all tasks.

This source code is licensed under the BSD-3-Clause license found in the
LICENSE file in the root directory of this source tree.
"""
from __future__ import annotations

__author__ = ["Yi Yang", "Rithwik Tom"]
__email__ = "yiy5@andrew.cmu.edu"
__group__ = "https://www.noamarom.com/"

import os
import abc
import time

from mpi4py import MPI

import gnrs.output as gout
from gnrs.core import folders
from gnrs.parallel.io import read_parallel, write_parallel
from gnrs.parallel.structs import DistributedStructs


[docs] class TaskABC(abc.ABC): """ Abstract base class for all tasks. This class defines the common interface and workflow for tasks like: - Structure generation - Descriptor evaluation - Energy evaluation - Geometry optimization - Clustering and selection """
[docs] def __init__( self, comm: MPI.Comm, config: dict, gnrs_info: dict, instance_id: str | None = None, ) -> None: """ Initialize the task with MPI communicator and settings. Args: comm: MPI communicator config: Config dictionary gnrs_info: Genarris info dictionary instance_id: Unique ID for this task. """ self.comm = comm self.rank = comm.Get_rank() self.size = comm.Get_size() self.is_master = self.rank == 0 self.config = config self.gnrs_info = gnrs_info self._instance_id = instance_id
[docs] def run(self) -> None: """ Execute the complete task workflow. The workflow consists of: 1. Initialize task 2. Pack settings 3. Print settings 4. Create folders 5. Perform task 6. Collect results 7. Analyze results 8. Finalize task """ self.initialize() task_set = self.pack_settings() self.print_settings(task_set) self.create_folders() self.perform_task(task_set) self.collect_results() self.analyze() self.finalize()
def _merge_config(self, task_type: str, instance_id: str) -> dict: """ Merge base task-type config with per-instance overrides. Args: task_type: Task type name (e.g. "dedup"). instance_id: This run's unique instance ID (e.g. "dedup_2"). Returns: Merged settings dictionary. """ base = self.config.get(task_type, {}).copy() if instance_id != task_type: base.update(self.config.get(instance_id, {})) return base
[docs] @abc.abstractmethod def initialize(self, task_name: str, title: str) -> None: """ Initialize the task with required setup. Args: task_name: Name of the task title: Title to display for the task """ instance_id = self._instance_id or task_name self._active_instance_id = instance_id self.comm.barrier() gout.print_title(title) self.start_time = time.time() self.debug_mode = self.config["master"].get("debug_mode", False) self.struct_dir = os.path.join(self.gnrs_info["struct_dir"], instance_id) self.struct_path = os.path.join(self.struct_dir, "structures.json") self.calc_dir = os.path.join(self.gnrs_info["tmp_dir"], instance_id) self.gnrs_info[instance_id] = { "start_time": self.start_time, "status": "running", "struct_dir": self.struct_dir, "struct_path": self.struct_path, "calc_dir": self.calc_dir } # Get structs path from last run task last_struct_path = self.gnrs_info.get("last_struct_path") if last_struct_path is not None: # if not generation task self.structs = read_parallel(last_struct_path) ds = DistributedStructs(self.structs) n_structs = ds.get_num_structs() gout.emit(f"Starting {instance_id} task with {n_structs} Structures.")
[docs] @abc.abstractmethod def pack_settings(self) -> dict: """ Collect and pack settings needed for the task. Returns: Task settings dictionary """ pass
[docs] @abc.abstractmethod def print_settings(self, task_set: dict) -> None: """ Print task settings in a formatted table. Args: task_set: Task settings dictionary """ gout.print_dict_table(task_set, ["Option", "Value"])
[docs] @abc.abstractmethod def create_folders(self) -> None: """ Create the folder structure required for the task. """ folders.mkdir(self.struct_dir) folders.mkdir(self.calc_dir) self.comm.barrier() # Wait for folder creation
[docs] @abc.abstractmethod def perform_task(self, task_set: dict) -> None: """ Execute the main task. Args: task_set: Task settings dictionary """ pass
[docs] @abc.abstractmethod def collect_results(self) -> None: """ Collect and save the results of the task. """ write_parallel(self.struct_path, self.structs)
[docs] @abc.abstractmethod def analyze(self) -> None: """ Analyze the results of the task. """ pass
[docs] @abc.abstractmethod def finalize(self, task_name: str) -> None: """ Finalize the task and update runtime settings. Args: task_name: Name of the task """ instance_id = self._active_instance_id self.gnrs_info["last_struct_path"] = self.struct_path self.gnrs_info[instance_id]["results"] = self.struct_path self.gnrs_info[instance_id]["status"] = "completed" end_time = self.gnrs_info[instance_id]["end_time"] = time.time() elapsed = end_time - self.start_time gout.emit(f"Completed {instance_id} task in {elapsed:.2f} seconds.") gout.section_complete() os.chdir(self.gnrs_info["work_dir"]) self.comm.Barrier()