Source code for abstraction.mt_loop

"""
Machine Teaching loop for Sensor2EventLog framework.
"""

from __future__ import annotations

from typing import Any, Dict, Optional, Tuple, List

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

from models.hmm_model import HMMModel


[docs] class MachineTeachingLoop: """ Orchestrates feature extraction, diagnostics, and model training. """ def __init__(self, model_type: str, feature_extractor, diagnostic_analyzer, config): self.model_type = model_type self.feature_extractor = feature_extractor self.diagnostic_analyzer = diagnostic_analyzer self.config = config self._review_summary: Optional[Dict[str, Any]] = None if model_type != "hmm": raise ValueError(f"Unsupported model_type: {model_type}") self.model = HMMModel(config=self.config) self._scaler = StandardScaler()
[docs] def get_review_summary(self) -> Optional[Dict[str, Any]]: return self._review_summary
[docs] def run( self, df: pd.DataFrame, feature_plan: Dict[str, list], mode: str = "unsupervised", n_unsup: Optional[int] = None, random_seed: int = 42, ) -> Dict[str, Any]: features, diagnostics = self._extract_features_and_diagnostics(df, feature_plan) X_train, X_test, df_train, df_test = self._split_train_test(df, features) X_train_scaled = self._scaler.fit_transform(X_train) X_test_scaled = self._scaler.transform(X_test) X_train_np, lengths_train = self._pack_sequences(df_train, X_train_scaled) X_test_np, lengths_test = self._pack_sequences(df_test, X_test_scaled) state_list, state_to_idx, idx_to_state = self._build_state_maps(df) y_train = df_train["state"].map(state_to_idx).values y_test = df_test["state"].map(state_to_idx).values if mode.lower() == "supervised": y_pred_test, model, state_mapping = self.model.train_supervised( X_train_np, lengths_train, X_test_np, lengths_test, y_train, y_test, state_list, idx_to_state, ) else: y_pred_test, model, state_mapping = self.model.train_unsupervised( X_train_np, lengths_train, X_test_np, lengths_test, y_train, y_test, state_list, idx_to_state, n_unsup, ) # Predict full sequence for event log generation X_full_scaled = self._scaler.transform(features) X_full_np, lengths_full = self._pack_sequences(df, X_full_scaled) y_pred_full = self.model.predict(X_full_np, lengths_full) self._review_summary = diagnostics return { "features": features, "diagnostics": diagnostics, "model": model, "predictions": y_pred_full, "state_mapping": state_mapping, }
def _extract_features_and_diagnostics( self, df: pd.DataFrame, feature_plan: Dict[str, list] ) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]: if hasattr(self.feature_extractor, "analyze_rule_performance"): result = self.feature_extractor.analyze_rule_performance(df, feature_plan) return result["features"], result.get("diagnostics") features = self.feature_extractor.compute_features(df, feature_plan) event_cols = [c for c in features.columns if c.startswith("event_")] diagnostics = None if event_cols: diagnostics = self.diagnostic_analyzer.compute_rule_metrics(df, features[event_cols]) return features, diagnostics def _split_train_test( self, df: pd.DataFrame, features: pd.DataFrame, train_ratio: float = 0.6 ) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame, pd.DataFrame]: batch_ids = df["batch_id"].unique() n_train = max(1, int(train_ratio * len(batch_ids))) train_batch_ids = set(batch_ids[:n_train]) test_batch_ids = set(batch_ids[n_train:]) df_train = df[df["batch_id"].isin(train_batch_ids)] df_test = df[df["batch_id"].isin(test_batch_ids)] X_train = features.loc[df_train.index].values X_test = features.loc[df_test.index].values return X_train, X_test, df_train, df_test @staticmethod def _pack_sequences(df_subset: pd.DataFrame, X_subset: np.ndarray) -> Tuple[np.ndarray, List[int]]: lengths = df_subset.groupby("batch_id").size().tolist() return X_subset, lengths def _build_state_maps(self, df: pd.DataFrame) -> Tuple[List[str], Dict[str, int], Dict[int, str]]: states = df["state"].unique().tolist() ordered = [] for key in ("production", "cip"): if hasattr(self.config, "PROCESS_STATES") and key in self.config.PROCESS_STATES: ordered.extend(self.config.PROCESS_STATES[key]) state_list = [s for s in ordered if s in states] or sorted(states) state_to_idx = {s: i for i, s in enumerate(state_list)} idx_to_state = {i: s for s, i in state_to_idx.items()} return state_list, state_to_idx, idx_to_state