diff --git a/analytics/detectors/general_detector.py b/analytics/detectors/general_detector.py index 3a9bda9..2307ab6 100644 --- a/analytics/detectors/general_detector.py +++ b/analytics/detectors/general_detector.py @@ -1,3 +1,4 @@ +import utils from grafana_data_provider import GrafanaDataProvider from data_preprocessor import data_preprocessor import pandas as pd @@ -7,18 +8,12 @@ import config import os.path import json + NANOSECONDS_IN_MS = 1000000 logger = logging.getLogger('analytic_toolset') -def anomalies_to_timestamp(anomalies): - for anomaly in anomalies: - anomaly['start'] = int(anomaly['start'].timestamp() * 1000) - anomaly['finish'] = int(anomaly['finish'].timestamp() * 1000) - return anomalies - - class GeneralDetector: def __init__(self, anomaly_name): @@ -46,37 +41,27 @@ class GeneralDetector: self.__load_model() - def anomalies_box(self, anomalies): - max_time = 0 - min_time = float("inf") - for anomaly in anomalies: - max_time = max(max_time, anomaly['finish']) - min_time = min(min_time, anomaly['start']) - min_time = pd.to_datetime(min_time, unit='ms') - max_time = pd.to_datetime(max_time, unit='ms') - return min_time, max_time - - async def learn(self, anomalies): + async def learn(self, segments): logger.info("Start to learn for anomaly_name='%s'" % self.anomaly_name) confidence = 0.02 dataframe = self.data_prov.get_dataframe() start_index, stop_index = 0, len(dataframe) - if len(anomalies) > 0: + if len(segments) > 0: confidence = 0.0 - min_time, max_time = self.anomalies_box(anomalies) + min_time, max_time = utils.segments_box(segments) dataframe = dataframe[dataframe['timestamp'] <= max_time] dataframe = dataframe[dataframe['timestamp'] >= min_time] train_augmented = self.preprocessor.get_augmented_data( dataframe.index[0], dataframe.index[-1], - anomalies + segments ) self.model = self.create_algorithm() await self.model.fit(train_augmented, confidence) - if len(anomalies) > 0: + if len(segments) > 0: last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_prediction_time = int(last_dataframe_time.timestamp() * 1000) else: @@ -112,7 +97,7 @@ class GeneralDetector: last_row = self.data_prov.get_data_range(stop_index - 1, stop_index) last_dataframe_time = last_row.iloc[0]['timestamp'] - predicted_anomalies = anomalies_to_timestamp(predicted_anomalies) + predicted_anomalies = utils.anomalies_to_timestamp(predicted_anomalies) last_prediction_time = int(last_dataframe_time.timestamp() * 1000) logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name) diff --git a/analytics/detectors/jump_detector.py b/analytics/detectors/jump_detector.py index e87f730..e563cb9 100644 --- a/analytics/detectors/jump_detector.py +++ b/analytics/detectors/jump_detector.py @@ -1,3 +1,4 @@ +import utils import numpy as np import pickle import scipy.signal @@ -6,20 +7,6 @@ from scipy.signal import argrelextrema import math -def is_intersect(target_segment, segments): - for segment in segments: - start = max(segment['start'], target_segment[0]) - finish = min(segment['finish'], target_segment[1]) - if start <= finish: - return True - return False - -def exponential_smoothing(series, alpha): - result = [series[0]] - for n in range(1, len(series)): - result.append(alpha * series[n] + (1 - alpha) * result[n-1]) - return result - class Jumpdetector: def __init__(self): @@ -56,6 +43,7 @@ class Jumpdetector: async def fit(self, dataframe, segments): + #self.alpha_finder() data = dataframe['value'] confidences = [] convolve_list = [] @@ -131,7 +119,7 @@ class Jumpdetector: result.sort() if len(self.segments) > 0: - result = [segment for segment in result if not is_intersect(segment, self.segments)] + result = [segment for segment in result if not utils.is_intersect(segment, self.segments)] return result def __predict(self, data): @@ -140,7 +128,7 @@ class Jumpdetector: extrema_list = [] # добавить все пересечения экспоненты со сглаженным графиком - for i in exponential_smoothing(data + self.confidence, 0.02): + for i in utils.exponential_smoothing(data + self.confidence, 0.02): extrema_list.append(i) segments = [] diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index 6107c83..5dd09be 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -1,4 +1,5 @@ import detectors +import utils from grafana_data_provider import GrafanaDataProvider @@ -14,15 +15,6 @@ import pandas as pd logger = logging.getLogger('analytic_toolset') -def segments_box(segments): - max_time = 0 - min_time = float("inf") - for segment in segments: - min_time = min(min_time, segment['start']) - max_time = max(max_time, segment['finish']) - min_time = pd.to_datetime(min_time, unit='ms') - max_time = pd.to_datetime(max_time, unit='ms') - return min_time, max_time def resolve_detector_by_pattern(pattern): if pattern == "peak": diff --git a/analytics/detectors/peaks_detector.py b/analytics/detectors/peaks_detector.py index 2a87cb6..2c4d497 100644 --- a/analytics/detectors/peaks_detector.py +++ b/analytics/detectors/peaks_detector.py @@ -1,37 +1,8 @@ +import utils from scipy import signal import numpy as np -def find_steps(array, threshold): - """ - Finds local maxima by segmenting array based on positions at which - the threshold value is crossed. Note that this thresholding is - applied after the absolute value of the array is taken. Thus, - the distinction between upward and downward steps is lost. However, - get_step_sizes can be used to determine directionality after the - fact. - Parameters - ---------- - array : numpy array - 1 dimensional array that represents time series of data points - threshold : int / float - Threshold value that defines a step - Returns - ------- - steps : list - List of indices of the detected steps - """ - steps = [] - array = np.abs(array) - above_points = np.where(array > threshold, 1, 0) - ap_dif = np.diff(above_points) - cross_ups = np.where(ap_dif == 1)[0] - cross_dns = np.where(ap_dif == -1)[0] - for upi, dni in zip(cross_ups,cross_dns): - steps.append(np.argmax(array[upi:dni]) + upi) - return steps - - class PeaksDetector: def __init__(self): pass @@ -80,7 +51,7 @@ class PeaksDetector: data = filtered data /= data.max() - result = find_steps(data, 0.1) + result = utils.find_steps(data, 0.1) return [(dataframe.index[x], dataframe.index[x + window_size]) for x in result] def save(self, model_filename): diff --git a/analytics/detectors/step_detector.py b/analytics/detectors/step_detector.py index d0fec1e..7a2ed44 100644 --- a/analytics/detectors/step_detector.py +++ b/analytics/detectors/step_detector.py @@ -2,25 +2,11 @@ import scipy.signal from scipy.fftpack import fft from scipy.signal import argrelextrema +import utils import numpy as np import pickle - -def is_intersect(target_segment, segments): - for segment in segments: - start = max(segment['start'], target_segment[0]) - finish = min(segment['finish'], target_segment[1]) - if start <= finish: - return True - return False - -def exponential_smoothing(series, alpha): - result = [series[0]] - for n in range(1, len(series)): - result.append(alpha * series[n] + (1 - alpha) * result[n-1]) - return result - class StepDetector: def __init__(self): @@ -58,20 +44,20 @@ class StepDetector: async def predict(self, dataframe): data = dataframe['value'] - result = self.__predict(data) + result = await self.__predict(data) result.sort() if len(self.segments) > 0: - result = [segment for segment in result if not is_intersect(segment, self.segments)] + result = [segment for segment in result if not utils.is_intersect(segment, self.segments)] return result - def __predict(self, data): + async def __predict(self, data): window_size = 24 all_max_flatten_data = data.rolling(window=window_size).mean() all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0] extrema_list = [] - for i in exponential_smoothing(data - self.confidence, 0.03): + for i in utils.exponential_smoothing(data - self.confidence, 0.03): extrema_list.append(i) segments = [] diff --git a/analytics/utils/__init__.py b/analytics/utils/__init__.py new file mode 100644 index 0000000..1b3d7ed --- /dev/null +++ b/analytics/utils/__init__.py @@ -0,0 +1,61 @@ +import numpy as np + + +def is_intersect(target_segment, segments): + for segment in segments: + start = max(segment['start'], target_segment[0]) + finish = min(segment['finish'], target_segment[1]) + if start <= finish: + return True + return False + +def exponential_smoothing(series, alpha): + result = [series[0]] + for n in range(1, len(series)): + result.append(alpha * series[n] + (1 - alpha) * result[n - 1]) + return result + +def find_steps(array, threshold): + """ + Finds local maxima by segmenting array based on positions at which + the threshold value is crossed. Note that this thresholding is + applied after the absolute value of the array is taken. Thus, + the distinction between upward and downward steps is lost. However, + get_step_sizes can be used to determine directionality after the + fact. + Parameters + ---------- + array : numpy array + 1 dimensional array that represents time series of data points + threshold : int / float + Threshold value that defines a step + Returns + ------- + steps : list + List of indices of the detected steps + """ + steps = [] + array = np.abs(array) + above_points = np.where(array > threshold, 1, 0) + ap_dif = np.diff(above_points) + cross_ups = np.where(ap_dif == 1)[0] + cross_dns = np.where(ap_dif == -1)[0] + for upi, dni in zip(cross_ups,cross_dns): + steps.append(np.argmax(array[upi:dni]) + upi) + return steps + +def anomalies_to_timestamp(anomalies): + for anomaly in anomalies: + anomaly['start'] = int(anomaly['start'].timestamp() * 1000) + anomaly['finish'] = int(anomaly['finish'].timestamp() * 1000) + return anomalies + +def segments_box(segments): + max_time = 0 + min_time = float("inf") + for segment in segments: + min_time = min(min_time, segment['start']) + max_time = max(max_time, segment['finish']) + min_time = pd.to_datetime(min_time, unit='ms') + max_time = pd.to_datetime(max_time, unit='ms') + return min_time, max_time