diff --git a/bibmon/_alarms.py b/bibmon/_alarms.py index dea0af9..f0810e6 100644 --- a/bibmon/_alarms.py +++ b/bibmon/_alarms.py @@ -44,3 +44,345 @@ def detecOutlier(data, lim, count = False, count_limit = 1): alarm = +1 return alarm + +def detect_drift_bias(data, window=10, threshold=2.0): + """ + Detects drift or bias in a time series using a sliding window approach. + + Parameters + ---------- + data : array-like + Input time series data. + window : int + Size of the window to check for drift/bias. + threshold : float + Minimum absolute difference between the mean of the first and second half of the window to trigger the alarm. + + Returns + ------- + alarm : int + 1 if drift/bias is detected, 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + if len(data) < window: + return 0 + for i in range(len(data) - window + 1): + win = data[i:i+window] + first_half = win[:window//2] + second_half = win[window//2:] + diff = np.abs(np.mean(second_half) - np.mean(first_half)) + if diff > threshold: + return 1 + return 0 + +def detect_nelson_rule1(data): + """ + Detects Nelson Rule 1: one point above 3 standard deviations from the mean. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if at least one point is above (mean + 3*std) or below (mean - 3*std), 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + mean = np.mean(data) + std = np.std(data) + if np.any(data > mean + 3*std) or np.any(data < mean - 3*std): + return 1 + return 0 + +def detect_nelson_rule2(data): + """ + Detects Nelson Rule 2: nine consecutive points on the same side of the mean. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if nine or more consecutive points are above or below the mean, 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + mean = np.mean(data) + above = data > mean + below = data < mean + # Check for 9 consecutive Trues in above or below + def has_n_consecutive(arr, n): + count = 0 + for val in arr: + if val: + count += 1 + if count >= n: + return True + else: + count = 0 + return False + if has_n_consecutive(above, 9) or has_n_consecutive(below, 9): + return 1 + return 0 + +def detect_nelson_rule3(data): + """ + Detects Nelson Rule 3: six consecutive points all increasing or all decreasing. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if six or more consecutive points are strictly increasing or strictly decreasing, 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + n = 6 + for i in range(len(data) - n + 1): + window = data[i:i+n] + if np.all(np.diff(window) > 0): + return 1 + if np.all(np.diff(window) < 0): + return 1 + return 0 + +def detect_nelson_rule4(data): + """ + Detects Nelson Rule 4: fourteen points in a row alternating up and down. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if fourteen or more consecutive points alternate above and below the mean, 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + mean = np.mean(data) + n = 14 + # Create a boolean array: True if above mean, False if below + above = data > mean + # Check for 14 consecutive alternations + for i in range(len(above) - n + 1): + window = above[i:i+n] + # Alternating means: window[0] != window[1], window[1] != window[2], ... + if all(window[j] != window[j+1] for j in range(n-1)): + return 1 + return 0 + +def detect_nelson_rule5(data): + """ + Detects Nelson Rule 5: two out of three consecutive points above 2 standard deviations from the mean, all on the same side. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if two out of three consecutive points are above (mean + 2*std) or below (mean - 2*std), all on the same side of the mean (>= or <=), 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + mean = np.mean(data) + std = np.std(data) + n = 3 + for i in range(len(data) - n + 1): + window = data[i:i+n] + above = window > mean + 2*std + below = window < mean - 2*std + # All on the same side of the mean (>= or <=) + all_above = np.all(window >= mean) + all_below = np.all(window <= mean) + if (np.sum(above) >= 2 and all_above) or (np.sum(below) >= 2 and all_below): + return 1 + return 0 + +def detect_nelson_rule6(data): + """ + Detects Nelson Rule 6: four out of five consecutive points above 1 standard deviation from the mean, all on the same side. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if four out of five consecutive points are above (mean + 1*std) or below (mean - 1*std), all on the same side of the mean (>= or <=), 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + mean = np.mean(data) + std = np.std(data) + n = 5 + for i in range(len(data) - n + 1): + window = data[i:i+n] + above = window > mean + 1*std + below = window < mean - 1*std + # All on the same side of the mean (>= or <=) + all_above = np.all(window >= mean) + all_below = np.all(window <= mean) + if (np.sum(above) >= 4 and all_above) or (np.sum(below) >= 4 and all_below): + return 1 + return 0 + +def detect_nelson_rule7(data): + """ + Detects Nelson Rule 7: fifteen consecutive points within 1 standard deviation of the mean, in both directions. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if fifteen consecutive points are within 1 standard deviation of the mean, in both directions, 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + mean = np.mean(data) + std = np.std(data) + n = 15 + for i in range(len(data) - n + 1): + window = data[i:i+n] + within = np.abs(window - mean) < std + # Check if all 15 points are within 1 sigma and there are points both above and below the mean + if np.all(within) and np.any(window > mean) and np.any(window < mean): + return 1 + return 0 + +def detect_nelson_rule8(data): + """ + Detects Nelson Rule 8: eight consecutive points outside 1 standard deviation of the mean, all on the same side. + + Parameters + ---------- + data : array-like + Input time series data. + + Returns + ------- + alarm : int + 1 if eight consecutive points are outside 1 standard deviation of the mean, all on the same side (>= or <=), 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + mean = np.mean(data) + std = np.std(data) + n = 8 + for i in range(len(data) - n + 1): + window = data[i:i+n] + above = window > mean + 1*std + below = window < mean - 1*std + # All on the same side of the mean (>= or <=) + all_above = np.all(window >= mean) + all_below = np.all(window <= mean) + if (np.sum(above) >= n and all_above) or (np.sum(below) >= n and all_below): + return 1 + return 0 + +def detect_variance_change(data, window_size=20, threshold=1.5): + """ + Detects sudden changes in variance of a time series. + + Parameters + ---------- + data : array-like + Input time series data. + window_size : int + Size of the sliding window to calculate variance. + threshold : float + Minimum ratio of variance change to trigger alarm. + + Returns + ------- + alarm : int + 1 if sudden variance change is detected, 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + if len(data) < 2 * window_size: + return 0 + + for i in range(len(data) - 2 * window_size + 1): + window1 = data[i:i+window_size] + window2 = data[i+window_size:i+2*window_size] + + var1 = np.var(window1) + var2 = np.var(window2) + + # Avoid division by zero + if var1 == 0: + var1 = 1e-10 + if var2 == 0: + var2 = 1e-10 + + ratio = max(var1/var2, var2/var1) + + if ratio > threshold: + return 1 + return 0 + +def detect_outlier_frequency_change(data, window_size=20, threshold=0.1): + """ + Detects changes in outlier frequency of a time series. + + Parameters + ---------- + data : array-like + Input time series data. + window_size : int + Size of the sliding window to calculate outlier frequency. + threshold : float + Minimum difference in outlier frequency to trigger alarm. + + Returns + ------- + alarm : int + 1 if outlier frequency change is detected, 0 otherwise. + """ + import numpy as np + data = np.asarray(data) + if len(data) < 2 * window_size: + return 0 + + mean = np.mean(data) + std = np.std(data) + + for i in range(len(data) - 2 * window_size + 1): + window1 = data[i:i+window_size] + window2 = data[i+window_size:i+2*window_size] + + # Count outliers in each window (points beyond 2 standard deviations) + outliers1 = np.sum(np.abs(window1 - mean) > 2 * std) + outliers2 = np.sum(np.abs(window2 - mean) > 2 * std) + + # Calculate outlier frequency + freq1 = outliers1 / window_size + freq2 = outliers2 / window_size + + # Check if the difference exceeds threshold + if abs(freq1 - freq2) > threshold: + return 1 + return 0 diff --git a/test/test_tools.py b/test/test_tools.py index 03f0d5d..ccacb5e 100644 --- a/test/test_tools.py +++ b/test/test_tools.py @@ -8,6 +8,11 @@ import bibmon import pandas as pd +import pytest +import numpy as np +from datetime import datetime +from bibmon import comparative_table +from sklearn.metrics import r2_score, mean_absolute_error def test_complete_analysis(): @@ -60,4 +65,384 @@ def test_complete_analysis(): fault_start = '2018-01-02 06:00:00', fault_end = '2018-01-02 09:00:00') - model.plot_importances() \ No newline at end of file + model.plot_importances() + +# Fixtures for test data +@pytest.fixture +def sample_data(): + """Generate synthetic data for training, validation and testing.""" + X_train = pd.DataFrame({ + 'feature1': np.random.randn(100), + 'feature2': np.random.randn(100) + }) + Y_train = pd.Series(np.random.randn(100)) + X_validation = pd.DataFrame({ + 'feature1': np.random.randn(50), + 'feature2': np.random.randn(50) + }) + Y_validation = pd.Series(np.random.randn(50)) + X_test = pd.DataFrame({ + 'feature1': np.random.randn(30), + 'feature2': np.random.randn(30) + }) + Y_test = pd.Series(np.random.randn(30)) + return X_train, X_validation, X_test, Y_train, Y_validation, Y_test + +@pytest.fixture +def model_with_y(): + """Mock model with Y variable (regression).""" + class MockModel: + def __init__(self): + self.has_Y = True + self.name = "Model with Y" + self.lim_conf = 0.99 + self.Y_train_orig = None + self.X_train_orig = None + self.Y_train_pred_orig = None + self.X_train_pred_orig = None + self.train_time = 0.0 + self.test_time = 0.0 + self.Y_test_orig = None + self.Y_test_pred_orig = None + self.X_test_orig = None + self.X_test_pred_orig = None + self.alarms = {} + def predict(self, X, Y=None, *args, **kwargs): + pred = pd.Series(np.random.randn(len(X)), index=X.index) + if Y is not None: + self.Y_test_orig = Y + self.Y_test_pred_orig = pred + self.X_test_orig = X + self.X_test_pred_orig = pred + self.test_time = 0.1 + return pred + def fit(self, X_train, Y_train, f_pp=None, a_pp=None, f_pp_test=None, a_pp_test=None, lim_conf=0.99, redefine_limit=False): + self.lim_conf = lim_conf + self.Y_train_orig = Y_train + self.X_train_orig = X_train + self.Y_train_pred_orig = pd.Series(np.random.randn(len(Y_train)), index=Y_train.index) + self.X_train_pred_orig = pd.Series(np.random.randn(len(X_train)), index=X_train.index) + self.train_time = 0.1 + return self + return MockModel() + +@pytest.fixture +def model_without_y(): + """Mock model without Y variable (reconstruction).""" + class MockModel: + def __init__(self): + self.has_Y = False + self.name = "Model without Y" + self.lim_conf = 0.99 + self.X_train_orig = None + self.X_train_pred_orig = None + self.train_time = 0.0 + self.test_time = 0.0 + self.X_test_orig = None + self.X_test_pred_orig = None + self.alarms = {} + def predict(self, X, Y=None, *args, **kwargs): + pred = pd.DataFrame(np.random.randn(*X.shape), index=X.index, columns=X.columns) + self.X_test_orig = X + self.X_test_pred_orig = pred + self.test_time = 0.1 + return pred + def fit(self, X_train, Y_train, f_pp=None, a_pp=None, f_pp_test=None, a_pp_test=None, lim_conf=0.99, redefine_limit=False): + self.lim_conf = lim_conf + self.X_train_orig = X_train + self.X_train_pred_orig = pd.DataFrame(np.random.randn(*X_train.shape), index=X_train.index, columns=X_train.columns) + self.train_time = 0.1 + return self + return MockModel() + +# MC/DC Test Cases +def test_comparative_table_with_y_and_metrics(sample_data, model_with_y): + """Test Case 1: C1=True, C2=True, C3=False - Model with Y and metrics.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = [r2_score, mean_absolute_error] + X_pred_to_plot = None + result = comparative_table( + models=[model_with_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + Y_train=Y_train, + Y_validation=Y_validation, + Y_test=Y_test, + metrics=metrics, + X_pred_to_plot=X_pred_to_plot, + plot_SPE=False, + plot_predictions=False + ) + assert len(result) >= 1 + assert any('mean_absolute_error' in str(df) for df in result) + +def test_comparative_table_without_y_with_xpred(sample_data, model_without_y): + """Test Case 2: C1=True, C2=False, C3=True - Model without Y, with X_pred_to_plot.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = [r2_score, mean_absolute_error] + X_pred_to_plot = 'feature1' + result = comparative_table( + models=[model_without_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + metrics=metrics, + X_pred_to_plot=X_pred_to_plot, + plot_SPE=False, + plot_predictions=False + ) + assert len(result) >= 1 + assert any('mean_absolute_error' in str(df) for df in result) + +def test_comparative_table_without_y_without_xpred(sample_data, model_without_y): + """Test Case 3: C1=True, C2=False, C3=False - Model without Y and without X_pred_to_plot.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = [r2_score, mean_absolute_error] + X_pred_to_plot = None + result = comparative_table( + models=[model_without_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + metrics=metrics, + X_pred_to_plot=X_pred_to_plot, + plot_SPE=False, + plot_predictions=False, + times=True + ) + assert len(result) == 1 + assert 'Train' in result[0].columns + +def test_comparative_table_without_metrics(sample_data, model_with_y): + """Test Case 4: C1=False - Without metrics, only time table.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = None + X_pred_to_plot = None + result = comparative_table( + models=[model_with_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + Y_train=Y_train, + Y_validation=Y_validation, + Y_test=Y_test, + metrics=metrics, + X_pred_to_plot=X_pred_to_plot, + plot_SPE=False, + plot_predictions=False + ) + assert len(result) == 1 + assert 'Train' in result[0].columns + +def test_comparative_table_with_fault_period(sample_data, model_with_y): + """Test Case 5: C4=True, C5=True - Fault with defined start and end.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = [r2_score, mean_absolute_error] + fault_start = '2023-01-01 00:00:00' + fault_end = '2023-01-02 00:00:00' + result = comparative_table( + models=[model_with_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + Y_train=Y_train, + Y_validation=Y_validation, + Y_test=Y_test, + metrics=metrics, + fault_start=fault_start, + fault_end=fault_end, + plot_SPE=False, + plot_predictions=False + ) + assert len(result) >= 2 + assert any('FDR' in str(df) for df in result) + assert any('FAR' in str(df) for df in result) + +def test_comparative_table_with_fault_start_only(sample_data, model_with_y): + """Test Case 6: C4=True, C5=False - Fault with only start defined.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = [r2_score, mean_absolute_error] + fault_start = '2023-01-01 00:00:00' + fault_end = None + result = comparative_table( + models=[model_with_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + Y_train=Y_train, + Y_validation=Y_validation, + Y_test=Y_test, + metrics=metrics, + fault_start=fault_start, + fault_end=fault_end, + plot_SPE=False, + plot_predictions=False + ) + assert len(result) >= 2 + assert any('FDR' in str(df) for df in result) + assert any('FAR' in str(df) for df in result) + +def test_comparative_table_with_mask(sample_data, model_with_y): + """Test Case 7: C4=False, C6=False - With detection mask.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = [r2_score, mean_absolute_error] + mask = np.array([0, 1, 1, 0, 1]) + model_with_y.fit(X_train, Y_train) + result = comparative_table( + models=[model_with_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + Y_train=Y_train, + Y_validation=Y_validation, + Y_test=Y_test, + metrics=metrics, + mask=mask, + plot_SPE=False, + plot_predictions=False, + fit_model=False + ) + assert len(result) >= 2 + assert any('FDR' in str(df) for df in result) + assert any('FAR' in str(df) for df in result) + +def test_comparative_table_without_fault_and_mask(sample_data, model_with_y): + """Test Case 8: C4=False, C6=True - Without fault and mask, only prediction table.""" + X_train, X_validation, X_test, Y_train, Y_validation, Y_test = sample_data + metrics = [r2_score, mean_absolute_error] + fault_start = None + fault_end = None + mask = None + result = comparative_table( + models=[model_with_y], + X_train=X_train, + X_validation=X_validation, + X_test=X_test, + Y_train=Y_train, + Y_validation=Y_validation, + Y_test=Y_test, + metrics=metrics, + fault_start=fault_start, + fault_end=fault_end, + mask=mask, + plot_SPE=False, + plot_predictions=False, + times=False + ) + assert len(result) == 1 + assert 'Train' in result[0].columns + assert 'Validation' in result[0].columns + assert 'Test' in result[0].columns + +def test_detect_drift_bias(): + """Test for drift/bias detection in a time series.""" + from bibmon import _alarms + # Time series with clear drift + data = np.concatenate([np.ones(50), np.ones(50)*10]) + window = 10 + threshold = 2.0 + # The function should return 1 (or True) if drift/bias is detected + alarm = _alarms.detect_drift_bias(data, window=window, threshold=threshold) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule1(): + """Test for Nelson Rule 1: one point above 3 standard deviations from the mean.""" + from bibmon import _alarms + import numpy as np + # Series with one outlier above 3 sigma + data = np.concatenate([np.zeros(20), np.array([10]), np.zeros(20)]) + # The function should return 1 (or True) if Nelson Rule 1 is detected + alarm = _alarms.detect_nelson_rule1(data) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule2(): + """Test for Nelson Rule 2: nine consecutive points on the same side of the mean.""" + from bibmon import _alarms + import numpy as np + # Series with nine consecutive points above the mean + data = np.concatenate([np.zeros(10), np.ones(9)*5, np.zeros(10)]) + # The function should return 1 (or True) if Nelson Rule 2 is detected + alarm = _alarms.detect_nelson_rule2(data) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule3(): + """Test for Nelson Rule 3: six consecutive points all increasing or all decreasing.""" + from bibmon import _alarms + import numpy as np + # Series with six consecutive increasing values + data = np.concatenate([np.zeros(10), np.arange(1, 7), np.zeros(10)]) + # The function should return 1 (or True) if Nelson Rule 3 is detected + alarm = _alarms.detect_nelson_rule3(data) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule4(): + """Test for Nelson Rule 4: fourteen points in a row alternating up and down.""" + from bibmon import _alarms + import numpy as np + # Series with fourteen points alternating above and below the mean + data = np.array([1, -1] * 7 + [0]*10) # 14 alternations, then zeros + # The function should return 1 (or True) if Nelson Rule 4 is detected + alarm = _alarms.detect_nelson_rule4(data) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule5(): + """Test for Nelson Rule 5: two out of three consecutive points above 2 standard deviations from the mean, all on the same side.""" + from bibmon import _alarms + import numpy as np + # Series with three points far above +2 sigma + data = np.concatenate([np.ones(30), np.array([30, 35, 40]), np.ones(30)]) + # The function should return 1 (or True) if Nelson Rule 5 is detected + alarm = _alarms.detect_nelson_rule5(data) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule6(): + """Test for Nelson Rule 6: four out of five consecutive points above 1 standard deviation from the mean, all on the same side.""" + from bibmon import _alarms + import numpy as np + # Series with five points far above +1 sigma + data = np.concatenate([np.ones(30), np.array([10, 12, 14, 16, 18]), np.ones(30)]) + # The function should return 1 (or True) if Nelson Rule 6 is detected + alarm = _alarms.detect_nelson_rule6(data) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule7(): + """Test for Nelson Rule 7: fifteen consecutive points within 1 standard deviation of the mean, in both directions.""" + from bibmon import _alarms + import numpy as np + # Series with 15 points clearly within 1 sigma of the mean + data = np.concatenate([np.ones(10), np.array([0.8, 1.2, 0.9, 1.1, 0.7, 1.3, 0.6, 1.4, 0.5, 1.5, 0.4, 1.6, 0.3, 1.7, 0.2]), np.ones(10)]) + # The function should return 1 (or True) if Nelson Rule 7 is detected + alarm = _alarms.detect_nelson_rule7(data) + assert alarm == 1 or alarm is True + +def test_detect_nelson_rule8(): + """Test for Nelson Rule 8: eight consecutive points outside 1 standard deviation of the mean, all on the same side.""" + from bibmon import _alarms + import numpy as np + # Series with eight consecutive points above +1 sigma + data = np.concatenate([np.ones(30), np.array([5, 6, 7, 8, 9, 10, 11, 12]), np.ones(30)]) + # The function should return 1 (or True) if Nelson Rule 8 is detected + alarm = _alarms.detect_nelson_rule8(data) + assert alarm == 1 or alarm is True + +def test_detect_variance_change(): + """Test for sudden variance change detection.""" + from bibmon import _alarms + import numpy as np + # Series with sudden variance change + data = np.concatenate([np.random.normal(0, 0.1, 50), np.random.normal(0, 2.0, 50)]) + # The function should return 1 (or True) if variance change is detected + alarm = _alarms.detect_variance_change(data, window_size=20, threshold=1.5) + assert alarm == 1 or alarm is True + +def test_detect_outlier_frequency_change(): + """Test for outlier frequency change detection.""" + from bibmon import _alarms + import numpy as np + # Series with change in outlier frequency + data = np.concatenate([np.random.normal(0, 1, 50), np.random.normal(0, 1, 50) + np.random.choice([0, 5], 50, p=[0.8, 0.2])]) + # The function should return 1 (or True) if outlier frequency change is detected + alarm = _alarms.detect_outlier_frequency_change(data, window_size=20, threshold=0.1) + assert alarm == 1 or alarm is True + \ No newline at end of file