Source code for gnrs.cluster.ap

"""
This module provides the AP clustering algorithm.

This source code is licensed under the BSD-3-Clause license found in the
LICENSE file in the root directory of this source tree.
"""

from __future__ import annotations

__author__ = ["Yi Yang", "Rithwik Tom"]
__email__ = "yiy5@andrew.cmu.edu"
__group__ = "https://www.noamarom.com/"

import os
import numpy as np
import logging

from mpi4py import MPI
from bisect import bisect_left
from sklearn.cluster import AffinityPropagation
from sklearn.metrics.pairwise import euclidean_distances

import gnrs.parallel as gp
import gnrs.output as gout
from gnrs.core.cluster import ClusterABC

logger = logging.getLogger("AP")


[docs] class APCluster(ClusterABC): """ Affinity Propagation clustering implementation. This class implements the Affinity Propagation clustering algorithm with support for distributed computation using MPI. """
[docs] def __init__(self, comm: MPI.Comm, task_settings: dict) -> None: """ Initialize the APCluster object. Args: comm: MPI communicator task_settings: Task settings for AP clustering """ super().__init__(comm, task_settings) self.result = None self.cluster_method = "ap" self.feature_name = self.tsk_set["feature_name"] # AP parameters self.damping = self.tsk_set.get("damping", 0.5) self.max_iter = self.tsk_set.get("max_iter", 200) self.convergence_iter = self.tsk_set.get("convergence_iter", 15) self.preference_range = self.tsk_set.get("preference_range", "quantile") self.max_sampling_attempts = self.tsk_set.get("max_sampling_attempts", 10) # Clustering parameters self.n_clusters = self.tsk_set.get("n_clusters") self.clusters_tol = self.tsk_set.get("clusters_tol", 0.5) self.debug_mode = self.tsk_set.get("debug_mode", False) # Files self.ids_file = self.tsk_set.get("ids_file", "ids.dat") self.simmat_file = self.tsk_set.get("simmat_file", "similarity_matrix.dat") self.feature_file = self.tsk_set.get("feature_file", "acsf.dat") # Internal variables self.preference = None self.success_rank = None self.final_n_clusters = None # AP model self.ap_model = None self.max_ap_attempts = self.tsk_set.get("max_ap_attempts", 10) # Limit the number of ranks that run AP fitting max_ap_workers = self.tsk_set.get("max_ap_workers", 8) self.n_ap_workers = min(max_ap_workers, self.size) self.is_ap_worker = self.rank < self.n_ap_workers # Sub-communicator spanning only the AP worker ranks. Non-workers # receive MPI.COMM_NULL which must never be used for collectives. self.ap_comm = self.comm.Split( color=0 if self.is_ap_worker else MPI.UNDEFINED, key=self.rank, )
[docs] def initialize(self) -> None: """ Initialize the clustering by gathering crystal structures and computing similarity matrix. This method: 1. Gathers structures from all processes 2. Computes or loads feature vectors 3. Computes or loads similarity matrix 4. Sets up preference range for AP clustering 5. Initializes the AP model """ local_features = np.array( [xtal.info[self.feature_name][0, :] for xtal in self.structs.values()] ) local_ids = list(self.structs.keys()) all_features = self.comm.gather(local_features, root=0) all_ids = self.comm.gather(local_ids, root=0) if self.is_master: all_features = np.vstack([feat for sublist in all_features if sublist is not None for feat in sublist], dtype=np.float32) n_samples, n_features = all_features.shape if not os.path.exists(self.feature_file): gout.emit(f"Creating feature file: {self.feature_file}") features = np.memmap( self.feature_file, dtype="float32", mode="w+", shape=(n_samples, n_features), ) features[:] = all_features[:] features.flush() else: gout.emit(f"Loading feature file: {self.feature_file}") features = np.memmap( self.feature_file, dtype="float32", mode="r", shape=(n_samples, n_features), ) del all_features if not os.path.exists(self.simmat_file): gout.emit(f"Creating similarity matrix file: {self.simmat_file}") chunk_size = min(1000, n_samples) sim_mat_file = np.memmap( self.simmat_file, dtype="float32", mode="w+", shape=(n_samples, n_samples), ) for i in range(0, n_samples, chunk_size): end_i = min(i + chunk_size, n_samples) sim_mat_file[i:end_i, :] = -euclidean_distances( features[i:end_i], features, squared=True ) sim_mat_file.flush() self.sim_mat = sim_mat_file else: gout.emit(f"Loading similarity matrix file: {self.simmat_file}") self.sim_mat = np.memmap( self.simmat_file, dtype="float32", mode="r", shape=(n_samples, n_samples), ) del features ids_array = np.vstack([ _id for sublist in all_ids if sublist is not None for _id in sublist ]) if not os.path.exists(self.ids_file): gout.emit(f"Creating IDs file: {self.ids_file}") self.ids = np.memmap( self.ids_file, dtype="<U15", mode="w+", shape=ids_array.shape ) self.ids[:] = ids_array[:] self.ids.flush() else: gout.emit(f"Loading IDs file: {self.ids_file}") self.ids = np.memmap( self.ids_file, dtype="<U15", mode="r", shape=(n_samples,) ) del ids_array del all_ids else: self.ids = None self.sim_mat = None n_samples = None n_samples = self.comm.bcast(n_samples, root=0) self._n_samples = n_samples # Only AP workers need the similarity matrix for fitting. # Non-workers only need self.ids (via broadcast in predict) and # self.result (broadcast at the end of fit). if not self.is_master and self.is_ap_worker: self.sim_mat = np.memmap( self.simmat_file, dtype="float32", mode="r", shape=(n_samples, n_samples), ) self.ids = np.memmap( self.ids_file, dtype="<U15", mode="r", shape=(n_samples,) ) elif not self.is_master: self.ids = np.memmap( self.ids_file, dtype="<U15", mode="r", shape=(n_samples,) ) # Preference range: compute on rank 0 only, then broadcast to all. if self.is_master: if self.preference_range == "quantile": self.preference_range = [ float(np.quantile(self.sim_mat, 0.05)), float(np.quantile(self.sim_mat, 0.95)), ] elif self.preference_range == "mean-median": self.preference_range = sorted( [float(np.mean(self.sim_mat)), float(np.median(self.sim_mat))] ) else: self.preference_range = None self.preference_range = self.comm.bcast(self.preference_range, root=0) gout.emit(f"Using preference range: {self.preference_range}") if self.is_ap_worker: self.ap_model = AffinityPropagation( damping=self.damping, max_iter=self.max_iter, convergence_iter=self.convergence_iter, copy=True, affinity="precomputed", random_state=gp.base_seed, ) logger.info(f"Started AP clustering with {self.n_ap_workers} AP workers out of {self.size} ranks") gout.emit( f"Running AP clustering targeting {self.n_clusters} clusters " f"with {self.n_ap_workers} AP workers out of {self.size} ranks" )
[docs] def fit(self) -> None: """ Fit the clustering model to the data. Only AP-worker ranks run AP clustering. """ n_attempts = 0 pref_range = list(self.preference_range) converged_result = None result = None while n_attempts < self.max_sampling_attempts: gout.emit( f"Beginning attempt {n_attempts} with preference range " f"[{pref_range[0]:.4f}, {pref_range[-1]:.4f}] " f"on {self.n_ap_workers} AP workers" ) if self.is_ap_worker: ap_rank = self.ap_comm.Get_rank() ap_size = self.ap_comm.Get_size() pref = round( pref_range[-1] + float(pref_range[0] - pref_range[-1]) * float(ap_rank + 1) / float(ap_size + 1), 4 ) try: result = self._affinity_propagation(pref, pref_range) ap_payload = [result["n_clusters"], result["preference"]] except StopIteration: result = None ap_payload = [None, None] n_clusters_pref = self.ap_comm.gather(ap_payload, root=0) if self.is_master: # Filter out workers that failed (StopIteration) while # preserving the AP-worker rank index for each entry. valid_indices = [ i for i, p in enumerate(n_clusters_pref) if p[0] is not None ] valid = [n_clusters_pref[i] for i in valid_indices] if valid: converged_result = self.check_convergence( valid, pref_range, n_attempts ) # Map the index within `valid` back to the AP-worker rank. if converged_result["rank"] is not None: converged_result["rank"] = valid_indices[ converged_result["rank"] ] else: converged_result = { "converged": False, "rank": None, "pref": None, "n_cluster": None, "new_pref_range": pref_range, } else: converged_result = None del n_clusters_pref converged_result = self.ap_comm.bcast(converged_result, root=0) # Broadcast convergence status to all ranks converged_result = self.comm.bcast(converged_result, root=0) if converged_result["converged"]: if self.is_ap_worker and converged_result["rank"] == ap_rank: self.result = result break else: pref_range = converged_result["new_pref_range"] gout.emit( f"Attempt {n_attempts}: Preference range updated: " f"[{pref_range[0]:.4f}, {pref_range[-1]:.4f}]" ) n_attempts += 1 if converged_result is not None and converged_result["converged"]: gout.emit(f"Affinity Propagation with fixed number of clusters succeeded!") else: gout.emit( f"Failed to cluster to {self.n_clusters} clusters " f"with tolerance {self.clusters_tol}" ) # The winning rank is identified within the AP sub-communicator. # Broadcast the result from the winning AP worker to all ranks if converged_result is not None and converged_result["converged"]: winning_ap_rank = converged_result["rank"] else: winning_ap_rank = 0 # Map the AP-local rank back to the global rank winning_global_rank = winning_ap_rank self.result = self.comm.bcast( self.result if self.rank == winning_global_rank else None, root=winning_global_rank, ) n_clusters_done = self.result["n_clusters"] gout.emit(f"Affinity Propagation completed with {n_clusters_done} clusters") logger.info(f"Completed AP fitting with {n_clusters_done} clusters")
[docs] def predict(self) -> int: """ Assign cluster labels to structures. Returns: int: Final number of clusters """ # Assign cluster labels to structures for ids, xtal in self.structs.items(): label = self.result["assigned_cluster"][ids] if ids in self.result["exemplar_ids"]: xtal.info[self.cluster_method] = str(label) + "_center" else: xtal.info[self.cluster_method] = str(label) logger.info("Completed predicting clusters") return self.result["n_clusters"]
[docs] def finalize(self) -> None: """ Finalize the clustering process. """ if self.ap_comm is not None and self.ap_comm != MPI.COMM_NULL: self.ap_comm.Free() self.ap_comm = None logger.info("Completed AP clustering") gout.emit("Completed AP clustering.\n") gout.emit("")
def _affinity_propagation(self, pref: float, pref_range: list) -> dict: """ Run Affinity Propagation clustering with current preference value. Returns: dict: Clustering results """ for iteration in range(self.max_ap_attempts): self.ap_model.preference = pref clustering = self.ap_model.fit(self.sim_mat) # Check if clustering converged if clustering.labels_[0] != -1: break # If not converged, try a random preference value factor = (iteration + 1) / self.max_ap_attempts mid_point = (pref_range[0] + pref_range[1]) / 2 if iteration % 2 == 0: new_pref = mid_point + ( (pref_range[1] - pref_range[0]) * 0.5 * factor ) else: new_pref = mid_point - ( (pref_range[1] - pref_range[0]) * 0.5 * factor ) pref = round(new_pref, 4) if self.debug_mode: print( f"preference: {pref} failed to converge. Trying a random preference: {new_pref}", flush=True ) if iteration == self.max_ap_attempts - 1: raise StopIteration() cluster_centers = clustering.cluster_centers_indices_ n_clusters = len(cluster_centers) labels = clustering.labels_ exemplar_indices = np.array(cluster_centers, dtype="int") exemplar_ids = [self.ids[idx] for idx in exemplar_indices] assigned_cluster = {str(_id): label for _id, label in zip(self.ids, labels)} return { "n_clusters": n_clusters, "assigned_cluster": assigned_cluster, "exemplar_ids": exemplar_ids, "preference": pref, }
[docs] def check_convergence( self, n_clusters_pref: list, pref_range: list, n_attempts: int, ) -> dict: """ Check if the clustering has converged. Args: n_clusters_pref: List of [num_clusters, preference] pairs from all processes pref_range: List of preference values from all processes n_attempts: Current attempt number Returns: Dictionary containing: - converged: Whether a suitable clustering was found - rank: Rank with suitable clustering - pref: preference used for clustering - n_cluster: Number of clusters found - new_pref_range: New preference range to try """ n_clusters, pref_list = map(list, zip(*n_clusters_pref)) sorted_clusters_with_indices = sorted( (clu, idx) for idx, clu in enumerate(n_clusters) ) sorted_n_clusters = [clu for clu, _ in sorted_clusters_with_indices] # Find the insertion point idx = bisect_left(sorted_n_clusters, self.n_clusters) candidates = [] if idx > 0: candidates.append(sorted_clusters_with_indices[idx - 1]) if idx < len(n_clusters): candidates.append(sorted_clusters_with_indices[idx]) # Find the closest cluster if not candidates: closest_clu, closest_idx = None, None else: closest_clu, closest_idx = min( candidates, key=lambda x: abs(x[0] - self.n_clusters) ) is_within_tolerance = ( self.clusters_tol is not None and closest_clu is not None and abs(closest_clu - self.n_clusters) <= self.clusters_tol ) # Return success if we found a good solution or reached max iterations if is_within_tolerance or n_attempts == self.max_sampling_attempts - 1: return { "converged": True, "rank": closest_idx, "pref": pref_list[closest_idx], "n_cluster": closest_clu, "new_pref_range": None, } if idx == 0: # Target clusters is lower than any value obtained - decrease preference range new_pref_range = [ pref_range[0] - abs(pref_range[0]) * 0.2, pref_list[sorted_clusters_with_indices[0][1]] ] elif idx == len(pref_list): # Target clusters is higher than any value obtained - increase preference range new_pref_range = [ pref_list[sorted_clusters_with_indices[-1][1]], pref_range[1] + abs(pref_range[1]) * 0.2 ] else: # Target clusters is between values obtained # Get the preference values on either side of where self.n_clusters would be inserted # Find the indices in the original list that correspond to the sorted positions new_pref_range = sorted([ pref_list[sorted_clusters_with_indices[idx - 1][1]], pref_list[sorted_clusters_with_indices[idx][1]] ]) return { "converged": False, "rank": None, "pref": None, "n_cluster": None, "new_pref_range": new_pref_range, }