Source code for core.pipeline

"""
Core pipeline orchestrator for Sensor2EventLog framework
"""

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from typing import Optional, Dict, Any, Union

from abstraction.mt_loop import MachineTeachingLoop
from contextualization.event_log import EventLog
from features.feature_library import ModularFeatureLibrary
from evaluation.rule_analyzer import RuleDiagnosticAnalyzer
import config


[docs] class Sensor2EventLogPipeline: """ Main pipeline class for transforming sensor data to event logs. This class orchestrates the entire Machine Teaching process: 1. Feature extraction 2. Model training (HMM with supervised/unsupervised modes) 3. Diagnostic analysis 4. Event log generation Example: >>> pipeline = Sensor2EventLogPipeline(config) >>> result = pipeline.run( ... data_path="sensor_data.csv", ... feature_plan=feature_plan, ... mode="unsupervised" ... ) >>> event_log = result['event_log'] >>> event_log.to_xes("output.xes") """
[docs] def __init__(self, config_module=None): """ Initialize the pipeline with configuration. Parameters: ----------- config_module : module, optional Configuration module with all parameters. If None, uses default config """ self.config = config_module or config # Initialize components self.feature_library = ModularFeatureLibrary( window_sizes=self.config.FEATURE_CONFIG["window_sizes"], stability_eps=self.config.FEATURE_CONFIG["stability_eps"], peak_threshold=self.config.FEATURE_CONFIG["peak_threshold"] ) self.diagnostic_analyzer = RuleDiagnosticAnalyzer( coverage_threshold=self.config.DIAGNOSTIC_CONFIG["coverage_threshold"], precision_threshold=self.config.DIAGNOSTIC_CONFIG["precision_threshold"], explainability_threshold=self.config.DIAGNOSTIC_CONFIG["explainability_threshold"] ) self.mt_loop = MachineTeachingLoop( model_type="hmm", feature_extractor=self.feature_library, diagnostic_analyzer=self.diagnostic_analyzer, config=self.config ) self._scaler = StandardScaler() self._fitted = False
[docs] def run(self, data_path: str, feature_plan: Dict[str, list], mode: str = "unsupervised", use_cip: bool = False, n_unsup: Optional[int] = None, random_seed: int = 42, min_duration_seconds: float = 2.0, return_intermediate: bool = False) -> Dict[str, Any]: """ Run the complete pipeline. Parameters: ----------- data_path : str Path to CSV data file feature_plan : dict Feature extraction plan with families and signals mode : str "supervised" or "unsupervised" use_cip : bool Whether to include CIP states n_unsup : int Number of states for unsupervised mode random_seed : int Random seed for reproducibility min_duration_seconds : float Minimum duration for filtering brief states return_intermediate : bool If True, returns intermediate results (features, diagnostics) Returns: -------- dict with keys: - event_log: EventLog object - model: trained HMM model - predictions: predicted state sequences - features (if return_intermediate): extracted features - diagnostics (if return_intermediate): diagnostic results """ # Load and prepare data df = self._load_and_prepare_data(data_path, use_cip) # Run Machine Teaching loop mt_result = self.mt_loop.run( df=df, feature_plan=feature_plan, mode=mode, n_unsup=n_unsup, random_seed=random_seed ) features = mt_result['features'] model = mt_result['model'] predictions = mt_result['predictions'] diagnostics = mt_result['diagnostics'] state_mapping = mt_result['state_mapping'] # Generate event log event_log = self._generate_event_log( df, predictions, state_mapping, min_duration_seconds ) result = { 'event_log': event_log, 'model': model, 'predictions': predictions } if return_intermediate: result['features'] = features result['diagnostics'] = diagnostics return result
def _load_and_prepare_data(self, data_path: str, use_cip: bool) -> pd.DataFrame: """Load and prepare data for analysis.""" df = pd.read_csv(data_path) # Determine state list state_list = self.config.PROCESS_STATES["production"].copy() if use_cip: state_list += self.config.PROCESS_STATES["cip"] # Filter to relevant states and sort df = df[df["state"].isin(state_list)].copy() df.sort_values(["batch_id", "timestamp"], inplace=True) return df def _generate_event_log(self, df: pd.DataFrame, predictions: np.ndarray, state_mapping: Dict, min_duration_seconds: float) -> 'EventLog': """Generate event log from predictions.""" from contextualization.event_log import create_interval_event_log_normalized # Normalize timestamps from utils.hmm_utils import normalize_timestamps df_normalized = normalize_timestamps(df) # Create event log event_df = create_interval_event_log_normalized( df_normalized, predictions, state_mapping ) # Filter brief states from utils.hmm_utils import filter_brief_states filtered_df = filter_brief_states(event_df, min_duration_seconds) # Wrap in EventLog object event_log = EventLog(filtered_df) # Save if paths are configured if hasattr(self.config, 'PATHS'): event_log.to_csv(self.config.PATHS.get("event_log", "event_log.csv")) event_log.to_csv(self.config.PATHS.get("filtered_log", "filtered_log.csv"), filtered=True) return event_log