From 942c4ca11274da9dab584dee0013de5b6df05157 Mon Sep 17 00:00:00 2001 From: rozetko Date: Fri, 31 Aug 2018 19:44:07 +0300 Subject: [PATCH] general predictor -> general model (#130) --- analytics/analytic_unit_manager.py | 6 +- analytics/detectors/__init__.py | 1 - .../detectors/general_detector/__init__.py | 1 - .../general_detector/general_detector.py | 80 --------------- .../general_detector/supervised_algorithm.py | 61 ------------ analytics/detectors/pattern_detector.py | 2 + analytics/models/__init__.py | 3 +- analytics/models/general_model.py | 97 +++++++++++++++++++ analytics/utils/__init__.py | 7 ++ 9 files changed, 108 insertions(+), 150 deletions(-) delete mode 100644 analytics/detectors/general_detector/__init__.py delete mode 100644 analytics/detectors/general_detector/general_detector.py delete mode 100644 analytics/detectors/general_detector/supervised_algorithm.py create mode 100644 analytics/models/general_model.py diff --git a/analytics/analytic_unit_manager.py b/analytics/analytic_unit_manager.py index b1cf855..a4cdf87 100644 --- a/analytics/analytic_unit_manager.py +++ b/analytics/analytic_unit_manager.py @@ -12,11 +12,7 @@ analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict() def get_detector_by_type(analytic_unit_type) -> detectors.Detector: - if analytic_unit_type == 'GENERAL': - detector = detectors.GeneralDetector() - else: - detector = detectors.PatternDetector(analytic_unit_type) - return detector + return detectors.PatternDetector(analytic_unit_type) def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: if analytic_unit_id in analytic_workers: diff --git a/analytics/detectors/__init__.py b/analytics/detectors/__init__.py index ff8e6ab..1357706 100644 --- a/analytics/detectors/__init__.py +++ b/analytics/detectors/__init__.py @@ -1,3 +1,2 @@ from detectors.detector import Detector from detectors.pattern_detector import PatternDetector -from detectors.general_detector import GeneralDetector diff --git a/analytics/detectors/general_detector/__init__.py b/analytics/detectors/general_detector/__init__.py deleted file mode 100644 index f5dd540..0000000 --- a/analytics/detectors/general_detector/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from detectors.general_detector.general_detector import GeneralDetector \ No newline at end of file diff --git a/analytics/detectors/general_detector/general_detector.py b/analytics/detectors/general_detector/general_detector.py deleted file mode 100644 index 470557d..0000000 --- a/analytics/detectors/general_detector/general_detector.py +++ /dev/null @@ -1,80 +0,0 @@ -from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm -from detectors import Detector -from models import AnalyticUnitCache -import utils - -import pandas as pd -import logging -import config -import json -from typing import Optional - - -NANOSECONDS_IN_MS = 1000000 - -logger = logging.getLogger('GENERAL_DETECTOR') - - -class GeneralDetector(Detector): - - def __init__(self): - self.model = None - - async def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: - - confidence = 0.02 - start_index, stop_index = 0, len(dataframe) - if len(segments) > 0: - confidence = 0.0 - min_time, max_time = utils.segments_box(segments) - dataframe = dataframe[dataframe['timestamp'] <= max_time] - dataframe = dataframe[dataframe['timestamp'] >= min_time] - - train_augmented = self.preprocessor.get_augmented_data( - dataframe.index[0], - dataframe.index[-1], - segments - ) - - self.model = SupervisedAlgorithm() - await self.model.fit(train_augmented, confidence) - if len(segments) > 0: - last_dataframe_time = dataframe.iloc[-1]['timestamp'] - last_prediction_time = int(last_dataframe_time.timestamp() * 1000) - else: - last_prediction_time = 0 - - logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) - return cache - - async def predict(self, dataframe: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: - logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name) - last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') - - start_index = self.data_prov.get_upper_bound(last_prediction_time) - stop_index = self.data_prov.size() - last_prediction_time = int(last_prediction_time.value / NANOSECONDS_IN_MS) - - predicted_anomalies = [] - if start_index < stop_index: - max_chunk_size = 50000 - predicted = pd.Series() - for index in range(start_index, stop_index, max_chunk_size): - chunk_start = index - chunk_finish = min(index + max_chunk_size, stop_index) - predict_augmented = self.preprocessor.get_augmented_data(chunk_start, chunk_finish) - - assert(len(predict_augmented) == chunk_finish - chunk_start) - - predicted_current = await self.model.predict(predict_augmented) - predicted = pd.concat([predicted, predicted_current]) - predicted_anomalies = self.preprocessor.inverse_transform_anomalies(predicted) - - last_row = self.data_prov.get_data_range(stop_index - 1, stop_index) - - last_dataframe_time = last_row.iloc[0]['timestamp'] - predicted_anomalies = utils.anomalies_to_timestamp(predicted_anomalies) - last_prediction_time = int(last_dataframe_time.timestamp() * 1000) - - logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name) - return predicted_anomalies, last_prediction_time diff --git a/analytics/detectors/general_detector/supervised_algorithm.py b/analytics/detectors/general_detector/supervised_algorithm.py deleted file mode 100644 index 207ce42..0000000 --- a/analytics/detectors/general_detector/supervised_algorithm.py +++ /dev/null @@ -1,61 +0,0 @@ -import pickle -from tsfresh.transformers.feature_selector import FeatureSelector -from sklearn.preprocessing import MinMaxScaler -from sklearn.ensemble import IsolationForest -import pandas as pd - - -class SupervisedAlgorithm(object): - frame_size = 16 - good_features = [ - #"value__agg_linear_trend__f_agg_\"max\"__chunk_len_5__attr_\"intercept\"", - # "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_12__w_20", - # "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_13__w_5", - # "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_2__w_10", - # "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_2__w_20", - # "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_8__w_20", - # "value__fft_coefficient__coeff_3__attr_\"abs\"", - "time_of_day_column_x", - "time_of_day_column_y", - "value__abs_energy", - # "value__absolute_sum_of_changes", - # "value__sum_of_reoccurring_data_points", - ] - clf = None - scaler = None - - def __init__(self): - self.features = [] - self.col_to_max, self.col_to_min, self.col_to_median = None, None, None - self.augmented_path = None - - async def fit(self, dataset, contamination=0.005): - dataset = dataset[self.good_features] - dataset = dataset[-100000:] - - self.scaler = MinMaxScaler(feature_range=(-1, 1)) - # self.clf = svm.OneClassSVM(nu=contamination, kernel="rbf", gamma=0.1) - self.clf = IsolationForest(contamination=contamination) - - self.scaler.fit(dataset) - - dataset = self.scaler.transform(dataset) - self.clf.fit(dataset) - - async def predict(self, dataframe): - dataset = dataframe[self.good_features] - dataset = self.scaler.transform(dataset) - prediction = self.clf.predict(dataset) - - # for i in range(len(dataset)): - # print(str(dataset[i]) + " " + str(prediction[i])) - - prediction = [x < 0.0 for x in prediction] - return pd.Series(prediction, index=dataframe.index) - - def __select_features(self, x, y): - # feature_selector = FeatureSelector() - feature_selector = FeatureSelector() - - feature_selector.fit(x, y) - return feature_selector.relevant_features diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index 8632a5d..4bdd2b6 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -13,6 +13,8 @@ logger = logging.getLogger('PATTERN_DETECTOR') def resolve_model_by_pattern(pattern: str) -> models.Model: + if pattern == 'GENERAL': + return models.GeneralModel() if pattern == 'PEAK': return models.PeakModel() if pattern == 'REVERSE_PEAK': diff --git a/analytics/models/__init__.py b/analytics/models/__init__.py index 2c106eb..8607dd8 100644 --- a/analytics/models/__init__.py +++ b/analytics/models/__init__.py @@ -2,7 +2,6 @@ from models.model import Model, AnalyticUnitCache from models.drop_model import DropModel from models.peak_model import PeakModel from models.jump_model import JumpModel -from models.custom_model import CustomModel - from models.custom_model import CustomModel from models.reverse_peak_model import ReversePeakModel +from models.general_model import GeneralModel diff --git a/analytics/models/general_model.py b/analytics/models/general_model.py new file mode 100644 index 0000000..80a78ba --- /dev/null +++ b/analytics/models/general_model.py @@ -0,0 +1,97 @@ +from models import Model, AnalyticUnitCache + +import utils +import numpy as np +import pandas as pd +import scipy.signal +from scipy.fftpack import fft +from scipy.signal import argrelextrema +import math +from scipy.stats import gaussian_kde +from scipy.stats import norm +from typing import Optional + + +WINDOW_SIZE = 350 + + +class GeneralModel(Model): + + def __init__(self): + super() + self.segments = [] + self.ipats = [] + self.state = { + 'convolve_max': WINDOW_SIZE, + } + self.all_conv = [] + + def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + if type(cache) is AnalyticUnitCache: + self.state = cache + self.segments = segments + + data = dataframe['value'] + convolve_list = [] + for segment in segments: + if segment['labeled']: + segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'])) + segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'])) + + segment_data = data[segment_from_index: segment_to_index + 1] + if len(segment_data) == 0: + continue + self.ipats.append(segment_from_index + int((segment_to_index - segment_from_index) / 2)) + segment_min = min(segment_data) + segment_data = segment_data - segment_min + segment_max = max(segment_data) + segment_data = segment_data / segment_max + + convolve = scipy.signal.fftconvolve(segment_data, segment_data) + convolve_list.append(max(convolve)) + + if len(convolve_list) > 0: + self.state['convolve_max'] = float(max(convolve_list)) + else: + self.state['convolve_max'] = WINDOW_SIZE / 3 + + return self.state + + def do_predict(self, dataframe: pd.DataFrame): + data = dataframe['value'] + pat_data = data[self.ipats[0] - WINDOW_SIZE: self.ipats[0] + WINDOW_SIZE] + x = min(pat_data) + pat_data = pat_data - x + y = max(pat_data) + pat_data = pat_data / y + + for i in range(WINDOW_SIZE * 2, len(data)): + watch_data = data[i - WINDOW_SIZE * 2: i] + w = min(watch_data) + watch_data = watch_data - w + r = max(watch_data) + if r < y: + watch_data = watch_data / y + else: + watch_data = watch_data / r + conv = scipy.signal.fftconvolve(pat_data, watch_data) + self.all_conv.append(max(conv)) + all_conv_peaks = utils.peak_finder(self.all_conv, WINDOW_SIZE * 2) + + filtered = self.__filter_prediction(all_conv_peaks, data) + return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered] + + def __filter_prediction(self, segments: list, data: list): + if len(segments) == 0 or len(self.ipats) == 0: + segments = [] + return segments + delete_list = [] + + for val in segments: + if self.all_conv[val] < self.state['convolve_max'] * 0.8: + delete_list.append(val) + + for item in delete_list: + segments.remove(item) + + return segments diff --git a/analytics/utils/__init__.py b/analytics/utils/__init__.py index caccc47..513c20a 100644 --- a/analytics/utils/__init__.py +++ b/analytics/utils/__init__.py @@ -207,3 +207,10 @@ def timestamp_to_index(dataframe, timestamp): for i in range(len(data)): if data[i] >= timestamp: return i + +def peak_finder(data, size): + all_max = [] + for i in range(size, len(data) - size): + if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]: + all_max.append(i) + return all_max