Source code for evaluation.rule_analyzer

"""
Rule diagnostic analyzer for evaluating rule performance metrics
"""

import numpy as np
import pandas as pd
from typing import Dict, List, Union
import re


[docs] class RuleDiagnosticAnalyzer: """ Analyzes rule performance using coverage, precision, and explainability metrics. """ def __init__(self, coverage_threshold: float = 0.6, precision_threshold: float = 0.7, explainability_threshold: float = 0.3): self.c_low = coverage_threshold self.p_low = precision_threshold self.epsilon_unex = explainability_threshold
[docs] def compute_rule_metrics(self, df: pd.DataFrame, event_features: pd.DataFrame, state_column: str = 'state') -> Dict: """ Compute coverage, precision, and explainability metrics for all event features. Parameters: ----------- df : DataFrame with state labels event_features : DataFrame containing event rule features (binary columns) state_column : Column name containing state labels Returns: -------- Dict with comprehensive diagnostic results """ results = { 'rule_metrics': {}, 'state_metrics': {}, 'recommendations': [], 'unexplainable_states': [] } states = df[state_column].unique() # Compute metrics for each rule-state combination rule_metrics = {} for rule_col in event_features.columns: if rule_col.startswith('event_'): rule_metrics[rule_col] = {} for state in states: # Get timestamps for this state state_mask = df[state_column] == state state_timestamps = state_mask[state_mask].index if len(state_timestamps) == 0: continue # Rule activations for this state rule_activations_state = event_features.loc[state_timestamps, rule_col] # Total rule activations total_rule_activations = event_features[rule_col].sum() # Compute coverage and precision coverage = rule_activations_state.sum() / len(state_timestamps) precision = (rule_activations_state.sum() / total_rule_activations if total_rule_activations > 0 else 0) rule_metrics[rule_col][state] = { 'coverage': coverage, 'precision': precision, 'effectiveness': np.sqrt(coverage * precision) if coverage > 0 and precision > 0 else 0 } results['rule_metrics'] = rule_metrics # Compute state-level metrics state_metrics = {} for state in states: state_mask = df[state_column] == state state_timestamps = state_mask[state_mask].index if len(state_timestamps) == 0: continue # Find best coverage across all rules for this state best_coverage = 0 best_rule = None for rule_col, state_metrics_dict in rule_metrics.items(): if state in state_metrics_dict: coverage = state_metrics_dict[state]['coverage'] if coverage > best_coverage: best_coverage = coverage best_rule = rule_col explainability = best_coverage gap = 1 - explainability state_metrics[state] = { 'explainability': explainability, 'gap': gap, 'best_rule': best_rule, 'best_coverage': best_coverage, 'state_frequency': len(state_timestamps) / len(df), 'unexplainable': explainability < self.epsilon_unex } if explainability < self.epsilon_unex: results['unexplainable_states'].append(state) results['state_metrics'] = state_metrics # Generate recommendations results['recommendations'] = self._generate_recommendations(rule_metrics, state_metrics) return results
def _generate_recommendations(self, rule_metrics: Dict, state_metrics: Dict) -> List[Dict]: """Generate actionable recommendations based on diagnostic metrics.""" recommendations = [] # Analyze each rule-state combination for rule_col, state_dict in rule_metrics.items(): for state, metrics in state_dict.items(): coverage = metrics['coverage'] precision = metrics['precision'] # Rule categorization and recommendations if coverage >= self.c_low and precision >= self.p_low: # Optimal rule - no action needed continue elif coverage >= self.c_low and precision < self.p_low: # Overly sensitive rule recommendations.append({ 'type': 'OVERLY_SENSITIVE_RULE', 'rule': rule_col, 'state': state, 'coverage': coverage, 'precision': precision, 'action': f"Rule '{rule_col}' for state '{state}' has good coverage ({coverage:.1%}) but low precision ({precision:.1%}). Add temporal stability constraints or interaction features to reduce false positives.", 'priority': 'HIGH', 'suggested_families': ['stability', 'interaction'] }) elif coverage < self.c_low and precision >= self.p_low: # Overly specific rule recommendations.append({ 'type': 'OVERLY_SPECIFIC_RULE', 'rule': rule_col, 'state': state, 'coverage': coverage, 'precision': precision, 'action': f"Rule '{rule_col}' for state '{state}' has high precision ({precision:.1%}) but low coverage ({coverage:.1%}). Relax thresholds or remove restrictive conditions.", 'priority': 'MEDIUM', 'suggested_families': ['temporal', 'statistical'] }) elif coverage < self.c_low and precision < self.p_low: # Ineffective rule recommendations.append({ 'type': 'INEFFECTIVE_RULE', 'rule': rule_col, 'state': state, 'coverage': coverage, 'precision': precision, 'action': f"Rule '{rule_col}' for state '{state}' performs poorly (coverage: {coverage:.1%}, precision: {precision:.1%}). Consider complete redesign with alternative sensor combinations.", 'priority': 'HIGH', 'suggested_families': ['all'] }) # Analyze unexplainable states for state, metrics in state_metrics.items(): if metrics['unexplainable']: recommendations.append({ 'type': 'UNEXPLAINABLE_STATE', 'state': state, 'explainability': metrics['explainability'], 'frequency': metrics['state_frequency'], 'action': f"State '{state}' is largely unexplained (explainability: {metrics['explainability']:.1%}). Consider state decomposition, feature space expansion, or probabilistic approaches.", 'priority': 'CRITICAL' if metrics['state_frequency'] > 0.1 else 'HIGH', 'suggested_approaches': ['state_decomposition', 'feature_expansion', 'probabilistic_modeling'] }) # Sort by priority priority_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3} recommendations.sort(key=lambda x: priority_order[x['priority']]) return recommendations
[docs] def print_diagnostic_report(self, diagnostic_results: Dict): """Print comprehensive diagnostic report.""" print("=" * 80) print("RULE DIAGNOSTIC REPORT") print("=" * 80) # Rule performance summary print("\n1. RULE PERFORMANCE SUMMARY:") print("-" * 40) rule_metrics = diagnostic_results['rule_metrics'] for rule_col, state_dict in rule_metrics.items(): print(f"\nRule: {rule_col}") for state, metrics in state_dict.items(): print(f" State: {state:15} | Coverage: {metrics['coverage']:6.1%} | " f"Precision: {metrics['precision']:6.1%} | " f"Effectiveness: {metrics['effectiveness']:6.1%}") # State explainability print("\n2. STATE EXPLAINABILITY ANALYSIS:") print("-" * 40) state_metrics = diagnostic_results['state_metrics'] for state, metrics in state_metrics.items(): unexplainable_flag = " ⚠ UNEXPLAINABLE" if metrics['unexplainable'] else "" print(f"State: {state:15} | Explainability: {metrics['explainability']:6.1%} | " f"Best Rule: {metrics['best_rule'] or 'None'}{unexplainable_flag}") # Recommendations print("\n3. ACTIONABLE RECOMMENDATIONS:") print("-" * 40) for i, rec in enumerate(diagnostic_results['recommendations'], 1): print(f"\n{i}. [{rec['priority']}] {rec['type']}") print(f" {rec['action']}") if 'suggested_families' in rec: print(f" Suggested feature families: {', '.join(rec['suggested_families'])}") if 'suggested_approaches' in rec: print(f" Suggested approaches: {', '.join(rec['suggested_approaches'])}") # Summary statistics print("\n4. SUMMARY STATISTICS:") print("-" * 40) total_states = len(state_metrics) unexplainable_states = len(diagnostic_results['unexplainable_states']) avg_explainability = np.mean([m['explainability'] for m in state_metrics.values()]) print(f"Total states analyzed: {total_states}") print(f"Unexplainable states: {unexplainable_states} ({unexplainable_states/total_states:.1%})") print(f"Average explainability: {avg_explainability:.1%}") print(f"Recommendations generated: {len(diagnostic_results['recommendations'])}")