Source code for contextualization.event_log

"""
Event log object with PM4Py compatibility
"""

import pandas as pd
import numpy as np
from typing import Optional, List, Dict, Any, Union
from datetime import datetime


[docs] class Event: """ Single event in an event log. Attributes: ----------- case_id : str Identifier for the process case activity : str Name of the activity/state start_time : datetime Start timestamp of the event end_time : datetime End timestamp of the event duration : float Duration in seconds """ def __init__(self, case_id: str, activity: str, start_time: datetime, end_time: datetime, duration: float = None, **kwargs): self.case_id = str(case_id) self.activity = activity self.start_time = start_time self.end_time = end_time self.duration = duration or (end_time - start_time).total_seconds() self.attributes = kwargs
[docs] def to_dict(self) -> Dict: """Convert event to dictionary.""" return { 'case_id': self.case_id, 'activity': self.activity, 'start_time': self.start_time, 'end_time': self.end_time, 'duration': self.duration, **self.attributes }
[docs] class EventLog: """ Event log container with PM4Py compatibility. This class provides a standardized interface for event logs that can be exported to various formats (CSV, XES) and used with process mining tools like PM4Py. Example: >>> log = EventLog(df) >>> log.to_csv("event_log.csv") >>> log.to_xes("event_log.xes") >>> pm4py_log = log.to_pm4py() # Use with PM4Py """
[docs] def __init__(self, data: Union[pd.DataFrame, List[Event]]): """ Initialize event log from DataFrame or list of Events. Parameters: ----------- data : pd.DataFrame or List[Event] Input event log data """ if isinstance(data, pd.DataFrame): self._df = self._validate_dataframe(data) elif isinstance(data, list): self._df = self._from_events(data) else: raise ValueError("Data must be DataFrame or list of Events") self._pm4py_log = None
def _validate_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: """Validate and standardize DataFrame format.""" required_cols = ['case_id', 'activity', 'start_timestamp', 'end_timestamp'] for col in required_cols: if col not in df.columns: raise ValueError(f"DataFrame missing required column: {col}") # Ensure timestamp columns are datetime for col in ['start_timestamp', 'end_timestamp']: if not pd.api.types.is_datetime64_any_dtype(df[col]): df[col] = pd.to_datetime(df[col]) # Add duration if missing if 'duration_seconds' not in df.columns: df['duration_seconds'] = ( pd.to_datetime(df['end_timestamp']) - pd.to_datetime(df['start_timestamp']) ).dt.total_seconds() return df def _from_events(self, events: List[Event]) -> pd.DataFrame: """Convert list of Events to DataFrame.""" return pd.DataFrame([e.to_dict() for e in events])
[docs] def to_dataframe(self) -> pd.DataFrame: """Get event log as pandas DataFrame.""" return self._df.copy()
[docs] def to_csv(self, path: str, filtered: bool = False) -> None: """ Export event log to CSV. Parameters: ----------- path : str Output file path filtered : bool If True, saves the filtered version (if available) """ df_to_save = self._df if filtered and 'filtered' in self._df.columns: df_to_save = self._df[self._df['filtered'] == True] df_to_save.to_csv(path, index=False) print(f"Event log saved to: {path}")
[docs] def to_xes(self, path: str, case_id_key: str = 'case:concept:name', timestamp_key: str = 'time:timestamp') -> None: """ Export event log to XES format using PM4Py. Parameters: ----------- path : str Output file path case_id_key : str Column name to use as case identifier in XES timestamp_key : str Column name to use as timestamp in XES """ try: import pm4py except ImportError: raise ImportError("PM4Py is required for XES export. Install with: pip install pm4py") # Convert to PM4Py format pm4py_log = self.to_pm4py(case_id_key, timestamp_key) # Export to XES pm4py.write_xes(pm4py_log, path) print(f"Event log exported to XES: {path}")
[docs] def to_pm4py(self, case_id_key: str = 'case:concept:name', timestamp_key: str = 'time:timestamp') -> 'pm4py.objects.log.obj.EventLog': """ Convert to PM4Py EventLog object for further analysis. Parameters: ----------- case_id_key : str Column name to use as case identifier timestamp_key : str Column name to use as timestamp Returns: -------- pm4py.objects.log.obj.EventLog PM4Py event log object """ try: import pm4py except ImportError: raise ImportError("PM4Py is required for this functionality. Install with: pip install pm4py") # Prepare data for PM4Py format df_for_pm4py = self._df.copy() # Rename columns for PM4Py df_for_pm4py = df_for_pm4py.rename(columns={ 'case_id': case_id_key, 'activity': 'concept:name', 'start_timestamp': timestamp_key }) # Add end timestamp as separate attribute if available if 'end_timestamp' in df_for_pm4py.columns: df_for_pm4py['end_timestamp'] = df_for_pm4py['end_timestamp'].astype(str) # Convert to PM4Py event log event_log = pm4py.format_dataframe_to_event_log( df_for_pm4py, case_id=case_id_key, activity_key='concept:name', timestamp_key=timestamp_key ) self._pm4py_log = event_log return event_log
[docs] def filter_duration(self, min_seconds: float = 0, max_seconds: float = float('inf')) -> 'EventLog': """ Filter events by duration. Parameters: ----------- min_seconds : float Minimum duration in seconds max_seconds : float Maximum duration in seconds Returns: -------- EventLog Filtered event log """ filtered_df = self._df[ (self._df['duration_seconds'] >= min_seconds) & (self._df['duration_seconds'] <= max_seconds) ].copy() filtered_df['filtered'] = True return EventLog(filtered_df)
[docs] def get_cases(self) -> List[str]: """Get list of unique case IDs.""" return self._df['case_id'].unique().tolist()
[docs] def get_activities(self) -> List[str]: """Get list of unique activities.""" return self._df['activity'].unique().tolist()
[docs] def get_case(self, case_id: str) -> 'EventLog': """Get all events for a specific case.""" case_df = self._df[self._df['case_id'] == str(case_id)].copy() return EventLog(case_df)
[docs] def get_statistics(self) -> Dict[str, Any]: """ Compute basic statistics about the event log. Returns: -------- dict with: - total_cases: number of cases - total_events: number of events - unique_activities: number of distinct activities - avg_case_duration: average case duration in seconds - activity_frequencies: frequency of each activity """ stats = { 'total_cases': self._df['case_id'].nunique(), 'total_events': len(self._df), 'unique_activities': self._df['activity'].nunique(), 'avg_case_duration': self._df.groupby('case_id')['duration_seconds'].sum().mean(), 'activity_frequencies': self._df['activity'].value_counts().to_dict() } return stats
[docs] def __len__(self) -> int: """Return number of events.""" return len(self._df)
[docs] def __repr__(self) -> str: """String representation.""" return f"EventLog(cases={self.get_statistics()['total_cases']}, events={len(self._df)}, activities={self.get_statistics()['unique_activities']})"
[docs] def head(self, n: int = 5) -> pd.DataFrame: """Return first n events.""" return self._df.head(n)
[docs] def create_interval_event_log_normalized(df, y_pred, state_mapping, case_id_col="batch_id", timestamp_col="timestamp"): """ Create interval-based event log using normalized timestamps. This function is kept for backward compatibility. """ df_with_pred = df.copy() df_with_pred['predicted_state'] = [state_mapping.get(i, f"Unknown_{i}") for i in y_pred] event_log_segments = [] for case_id in df_with_pred[case_id_col].unique(): case_data = df_with_pred[df_with_pred[case_id_col] == case_id].copy() case_data = case_data.sort_values(timestamp_col) current_state = None segment_start = None segment_indices = [] for idx, row in case_data.iterrows(): if current_state is None: current_state = row['predicted_state'] segment_start = row[timestamp_col] segment_indices = [idx] elif row['predicted_state'] == current_state: segment_indices.append(idx) else: segment_end = case_data.loc[segment_indices[-1], timestamp_col] duration = (pd.to_datetime(segment_end) - pd.to_datetime(segment_start)).total_seconds() event_log_segments.append({ 'case_id': case_id, 'activity': current_state, 'start_timestamp': segment_start, 'end_timestamp': segment_end, 'duration_seconds': duration, 'event_count': len(segment_indices) }) current_state = row['predicted_state'] segment_start = row[timestamp_col] segment_indices = [idx] # Add the last segment if current_state is not None and segment_start is not None: segment_end = case_data.loc[segment_indices[-1], timestamp_col] duration = (pd.to_datetime(segment_end) - pd.to_datetime(segment_start)).total_seconds() event_log_segments.append({ 'case_id': case_id, 'activity': current_state, 'start_timestamp': segment_start, 'end_timestamp': segment_end, 'duration_seconds': duration, 'event_count': len(segment_indices) }) event_log = pd.DataFrame(event_log_segments) event_log['activity_sequence'] = event_log.groupby('case_id').cumcount() + 1 event_log = event_log[['case_id', 'activity_sequence', 'activity', 'start_timestamp', 'end_timestamp', 'duration_seconds', 'event_count']] return event_log