diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index 60ae17c..63c19d6 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -81,6 +81,6 @@ class AnalyticUnitWorker(object): if pattern_type == 'general': model = detectors.GeneralDetector(analytic_unit_id) else: - model = detectors.PatternDetectionModel(analytic_unit_id, pattern_type) + model = detectors.PatternDetector(analytic_unit_id, pattern_type) self.models_cache[analytic_unit_id] = model return self.models_cache[analytic_unit_id] diff --git a/analytics/detectors/__init__.py b/analytics/detectors/__init__.py index ed4522f..0bdd4bb 100644 --- a/analytics/detectors/__init__.py +++ b/analytics/detectors/__init__.py @@ -1,5 +1,5 @@ from detectors.general_detector import GeneralDetector -from detectors.pattern_detection_model import PatternDetectionModel +from detectors.pattern_detector import PatternDetector from detectors.peaks_detector import PeaksDetector from detectors.step_detector import StepDetector from detectors.jump_detector import Jumpdetector diff --git a/analytics/detectors/general_detector.py b/analytics/detectors/general_detector.py index 84bf3e4..3a9bda9 100644 --- a/analytics/detectors/general_detector.py +++ b/analytics/detectors/general_detector.py @@ -75,7 +75,7 @@ class GeneralDetector: ) self.model = self.create_algorithm() - self.model.fit(train_augmented, confidence) + await self.model.fit(train_augmented, confidence) if len(anomalies) > 0: last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_prediction_time = int(last_dataframe_time.timestamp() * 1000) diff --git a/analytics/detectors/jump_detector.py b/analytics/detectors/jump_detector.py index 0dbcd52..e87f730 100644 --- a/analytics/detectors/jump_detector.py +++ b/analytics/detectors/jump_detector.py @@ -5,6 +5,7 @@ from scipy.fftpack import fft from scipy.signal import argrelextrema import math + def is_intersect(target_segment, segments): for segment in segments: start = max(segment['start'], target_segment[0]) @@ -21,8 +22,7 @@ def exponential_smoothing(series, alpha): class Jumpdetector: - def __init__(self, pattern): - self.pattern = pattern + def __init__(self): self.segments = [] self.confidence = 1.5 self.convolve_max = 120 @@ -30,11 +30,11 @@ class Jumpdetector: def intersection_segment(self, data, median): cen_ind = [] for i in range(1, len(data)-1): - if data[i-1] < median and data[i+1] > median: + if data[i - 1] < median and data[i + 1] > median: cen_ind.append(i) del_ind = [] for i in range(1,len(cen_ind)): - if cen_ind[i] == cen_ind[i-1]+1: + if cen_ind[i] == cen_ind[i - 1] + 1: del_ind.append(i - 1) del_ind = del_ind[::-1] for i in del_ind: @@ -47,71 +47,75 @@ class Jumpdetector: F = 1 * height / (1 + math.exp(-i * alpha)) distribution.append(F) return distribution - def alpha_finder(self, data, ): - # поиск альфы для логистической сигмоиды - - - def fit(self, dataframe, segments): - data = dataframe['value'] - confidences = [] - convolve_list = [] - for segment in segments: - if segment['labeled']: - segment_data = data[segment['start'] : segment['finish'] + 1] - segment_min = min(segment_data) - segment_max = max(segment_data) - confidences.append(0.20 * (segment_max - segment_min)) - flat_segment = segment_data.rolling(window=4).mean() #сглаживаем сегмент - kde_segment = flat_data.dropna().plot.kde() # distribution density - ax = flat_data.dropna().plot.kde() - ax_list = ax.get_lines()[0].get_xydata() - mids = argrelextrema(np.array(ax_list), np.less)[0] - maxs = argrelextrema(np.array(ax_list), np.greater)[0] - min_peak = maxs[0] - max_peak = maxs[1] - min_line = ax_list[min_peak, 0] - max_line = ax_list[max_peak, 0] - sigm_heidht = max_line - min_line - pat_sigm = logistic_sigmoid(-120, 120, 1, sigm_heidht) - for i in range(0, len(pat_sigm)): - pat_sigm[i] = pat_sigm[i] + min_line - cen_ind = self.intersection_segment(flat_segment, mids[0]) - c = [] - for i in range(len(cen_ind)): - x = cen_ind[i] - cx = scipy.signal.fftconvolve(pat_sigm, flat_data[x-120:x+120]) - c.append(cx[240]) - - # в идеале нужно посмотреть гистограмму сегмента и выбрать среднее значение, - # далее от него брать + -120 - segment_summ = 0 - for val in flat_segment: - segment_summ += val - segment_mid = segment_summ / len(flat_segment) #посчитать нормально среднее значение/медиану - for ind in range(1, len(flat_segment) - 1): - if flat_segment[ind + 1] > segment_mid and flat_segment[ind - 1] < segment_mid: - flat_mid_index = ind # найти пересечение средней и графика, получить его индекс - segment_mid_index = flat_mid_index - 5 - labeled_drop = data[segment_mid_index - 120 : segment_mid_index + 120] - labeled_min = min(labeled_drop) - for value in labeled_drop: # обрезаем - value = value - labeled_min - labeled_max = max(labeled_drop) - for value in labeled_drop: # нормируем - value = value / labeled_max - convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop) - convolve_list.append(max(convolve)) # сворачиваем паттерн - # плюс надо впихнуть сюда логистическую сигмоиду и поиск альфы - - if len(confidences) > 0: - self.confidence = min(confidences) - else: - self.confidence = 1.5 - - if len(convolve_list) > 0: - self.convolve_max = max(convolve_list) - else: - self.convolve_max = 120 # макс метрика свертки равна отступу(120), вау! + + def alpha_finder(self, data): + """ + поиск альфы для логистической сигмоиды + """ + pass + + + async def fit(self, dataframe, segments): + data = dataframe['value'] + confidences = [] + convolve_list = [] + for segment in segments: + if segment['labeled']: + segment_data = data[segment['start'] : segment['finish'] + 1] + segment_min = min(segment_data) + segment_max = max(segment_data) + confidences.append(0.20 * (segment_max - segment_min)) + flat_segment = segment_data.rolling(window=4).mean() #сглаживаем сегмент + kde_segment = flat_data.dropna().plot.kde() # distribution density + ax = flat_data.dropna().plot.kde() + ax_list = ax.get_lines()[0].get_xydata() + mids = argrelextrema(np.array(ax_list), np.less)[0] + maxs = argrelextrema(np.array(ax_list), np.greater)[0] + min_peak = maxs[0] + max_peak = maxs[1] + min_line = ax_list[min_peak, 0] + max_line = ax_list[max_peak, 0] + sigm_heidht = max_line - min_line + pat_sigm = logistic_sigmoid(-120, 120, 1, sigm_heidht) + for i in range(0, len(pat_sigm)): + pat_sigm[i] = pat_sigm[i] + min_line + cen_ind = self.intersection_segment(flat_segment, mids[0]) + c = [] + for i in range(len(cen_ind)): + x = cen_ind[i] + cx = scipy.signal.fftconvolve(pat_sigm, flat_data[x-120:x+120]) + c.append(cx[240]) + + # в идеале нужно посмотреть гистограмму сегмента и выбрать среднее значение, + # далее от него брать + -120 + segment_summ = 0 + for val in flat_segment: + segment_summ += val + segment_mid = segment_summ / len(flat_segment) #посчитать нормально среднее значение/медиану + for ind in range(1, len(flat_segment) - 1): + if flat_segment[ind + 1] > segment_mid and flat_segment[ind - 1] < segment_mid: + flat_mid_index = ind # найти пересечение средней и графика, получить его индекс + segment_mid_index = flat_mid_index - 5 + labeled_drop = data[segment_mid_index - 120 : segment_mid_index + 120] + labeled_min = min(labeled_drop) + for value in labeled_drop: # обрезаем + value = value - labeled_min + labeled_max = max(labeled_drop) + for value in labeled_drop: # нормируем + value = value / labeled_max + convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop) + convolve_list.append(max(convolve)) # сворачиваем паттерн + # плюс надо впихнуть сюда логистическую сигмоиду и поиск альфы + + if len(confidences) > 0: + self.confidence = min(confidences) + else: + self.confidence = 1.5 + + if len(convolve_list) > 0: + self.convolve_max = max(convolve_list) + else: + self.convolve_max = 120 # макс метрика свертки равна отступу(120), вау! def logistic_sigmoid(x1, x2, alpha, height): distribution = [] @@ -131,20 +135,20 @@ class Jumpdetector: return result def __predict(self, data): - window_size = 24 - all_max_flatten_data = data.rolling(window=window_size).mean() - extrema_list = [] - # добавить все пересечения экспоненты со сглаженным графиком - - for i in exponential_smoothing(data + self.confidence, 0.02): - extrema_list.append(i) + window_size = 24 + all_max_flatten_data = data.rolling(window=window_size).mean() + extrema_list = [] + # добавить все пересечения экспоненты со сглаженным графиком + + for i in exponential_smoothing(data + self.confidence, 0.02): + extrema_list.append(i) - segments = [] - for i in all_mins: - if all_max_flatten_data[i] > extrema_list[i]: - segments.append(i - window_size) + segments = [] + for i in all_mins: + if all_max_flatten_data[i] > extrema_list[i]: + segments.append(i - window_size) - return [(x - 1, x + 1) for x in self.__filter_prediction(segments, all_max_flatten_data)] + return [(x - 1, x + 1) for x in self.__filter_prediction(segments, all_max_flatten_data)] def __filter_prediction(self, segments, all_max_flatten_data): delete_list = [] @@ -179,4 +183,4 @@ class Jumpdetector: with open(model_filename, 'rb') as file: (self.confidence, self.convolve_max) = pickle.load(file) except: - pass \ No newline at end of file + pass diff --git a/analytics/detectors/pattern_detection_model.py b/analytics/detectors/pattern_detector.py similarity index 85% rename from analytics/detectors/pattern_detection_model.py rename to analytics/detectors/pattern_detector.py index 9bbfce5..6107c83 100644 --- a/analytics/detectors/pattern_detection_model.py +++ b/analytics/detectors/pattern_detector.py @@ -1,5 +1,4 @@ -from detectors.step_detector import StepDetector -from detectors.peaks_detector import PeaksDetector +import detectors from grafana_data_provider import GrafanaDataProvider @@ -25,8 +24,17 @@ def segments_box(segments): max_time = pd.to_datetime(max_time, unit='ms') return min_time, max_time +def resolve_detector_by_pattern(pattern): + if pattern == "peak": + return detectors.PeaksDetector() + if pattern == "drop": + return detectors.StepDetector() + if pattern == "jump": + return detectors.Jumpdetector() + raise ValueError('Unknown pattern "%s"' % pattern) -class PatternDetectionModel: + +class PatternDetector: def __init__(self, analytic_unit_id, pattern_type): self.analytic_unit_id = analytic_unit_id @@ -53,14 +61,14 @@ class PatternDetectionModel: self.__load_model(pattern_type) async def learn(self, segments): - self.model = self.__create_model(self.pattern_type) + self.model = resolve_detector_by_pattern(self.pattern_type) window_size = 200 dataframe = self.data_prov.get_dataframe() segments = self.data_prov.transform_anomalies(segments) # TODO: pass only part of dataframe that has segments - self.model.fit(dataframe, segments) + await self.model.fit(dataframe, segments) self.__save_model() return 0 @@ -88,7 +96,7 @@ class PatternDetectionModel: 'finish': max(ts1, ts2) }) - last_dataframe_time = dataframe.iloc[- 1]['timestamp'] + last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_prediction_time = int(last_dataframe_time.timestamp() * 1000) return segments, last_prediction_time # return predicted_anomalies, last_prediction_time @@ -96,13 +104,6 @@ class PatternDetectionModel: def synchronize_data(self): self.data_prov.synchronize() - def __create_model(self, pattern): - if pattern == "peak": - return PeaksDetector() - if pattern == "jump" or pattern == "drop": - return StepDetector(pattern) - raise ValueError('Unknown pattern "%s"' % pattern) - def __load_anomaly_config(self): with open(os.path.join(config.ANALYTIC_UNITS_FOLDER, self.analytic_unit_id + ".json"), 'r') as config_file: self.anomaly_config = json.load(config_file) @@ -116,5 +117,5 @@ class PatternDetectionModel: logger.info("Load model '%s'" % self.analytic_unit_id) model_filename = os.path.join(config.MODELS_FOLDER, self.pattern_type + ".m") if os.path.exists(model_filename): - self.model = self.__create_model(pattern) + self.model = resolve_detector_by_pattern(pattern) self.model.load(model_filename) diff --git a/analytics/detectors/peaks_detector.py b/analytics/detectors/peaks_detector.py index c9a1610..2a87cb6 100644 --- a/analytics/detectors/peaks_detector.py +++ b/analytics/detectors/peaks_detector.py @@ -1,14 +1,42 @@ -import detectors.step_detect - from scipy import signal import numpy as np +def find_steps(array, threshold): + """ + Finds local maxima by segmenting array based on positions at which + the threshold value is crossed. Note that this thresholding is + applied after the absolute value of the array is taken. Thus, + the distinction between upward and downward steps is lost. However, + get_step_sizes can be used to determine directionality after the + fact. + Parameters + ---------- + array : numpy array + 1 dimensional array that represents time series of data points + threshold : int / float + Threshold value that defines a step + Returns + ------- + steps : list + List of indices of the detected steps + """ + steps = [] + array = np.abs(array) + above_points = np.where(array > threshold, 1, 0) + ap_dif = np.diff(above_points) + cross_ups = np.where(ap_dif == 1)[0] + cross_dns = np.where(ap_dif == -1)[0] + for upi, dni in zip(cross_ups,cross_dns): + steps.append(np.argmax(array[upi:dni]) + upi) + return steps + + class PeaksDetector: def __init__(self): pass - def fit(self, dataset, contamination=0.005): + async def fit(self, dataset, contamination=0.005): pass async def predict(self, dataframe): @@ -52,7 +80,7 @@ class PeaksDetector: data = filtered data /= data.max() - result = step_detect.find_steps(data, 0.1) + result = find_steps(data, 0.1) return [(dataframe.index[x], dataframe.index[x + window_size]) for x in result] def save(self, model_filename): diff --git a/analytics/detectors/step_detect.py b/analytics/detectors/step_detect.py deleted file mode 100644 index 4ff627d..0000000 --- a/analytics/detectors/step_detect.py +++ /dev/null @@ -1,231 +0,0 @@ - -""" -Thomas Kahn -thomas.b.kahn@gmail.com -""" -from __future__ import absolute_import -from math import sqrt -import multiprocessing as mp -import numpy as np -from six.moves import range -from six.moves import zip - - -def t_scan(L, window = 1e3, num_workers = -1): - """ - Computes t statistic for i to i+window points versus i-window to i - points for each point i in input array. Uses multiple processes to - do this calculation asynchronously. Array is decomposed into window - number of frames, each consisting of points spaced at window - intervals. This optimizes the calculation, as the drone function - need only compute the mean and variance for each set once. - Parameters - ---------- - L : numpy array - 1 dimensional array that represents time series of datapoints - window : int / float - Number of points that comprise the windows of data that are - compared - num_workers : int - Number of worker processes for multithreaded t_stat computation - Defult value uses num_cpu - 1 workers - Returns - ------- - t_stat : numpy array - Array which holds t statistic values for each point. The first - and last (window) points are replaced with zero, since the t - statistic calculation cannot be performed in that case. - """ - size = L.size - window = int(window) - frames = list(range(window)) - n_cols = (size // window) - 1 - - t_stat = np.zeros((window, n_cols)) - - if num_workers == 1: - results = [_t_scan_drone(L, n_cols, frame, window) for frame in frames] - else: - if num_workers == -1: - num_workers = mp.cpu_count() - 1 - pool = mp.Pool(processes = num_workers) - results = [pool.apply_async(_t_scan_drone, args=(L, n_cols, frame, window)) for frame in frames] - results = [r.get() for r in results] - pool.close() - - for index, row in results: - t_stat[index] = row - - t_stat = np.concatenate(( - np.zeros(window), - t_stat.transpose().ravel(order='C'), - np.zeros(size % window) - )) - - return t_stat - - -def _t_scan_drone(L, n_cols, frame, window=1e3): - """ - Drone function for t_scan. Not Intended to be called manually. - Computes t_scan for the designated frame, and returns result as - array along with an integer tag for proper placement in the - aggregate array - """ - size = L.size - window = int(window) - root_n = sqrt(window) - - output = np.zeros(n_cols) - b = L[frame:window+frame] - b_mean = b.mean() - b_var = b.var() - for i in range(window+frame, size-window, window): - a = L[i:i+window] - a_mean = a.mean() - a_var = a.var() - output[i // window - 1] = root_n * (a_mean - b_mean) / sqrt(a_var + b_var) - b_mean, b_var = a_mean, a_var - - return frame, output - - -def mz_fwt(x, n=2): - """ - Computes the multiscale product of the Mallat-Zhong discrete forward - wavelet transform up to and including scale n for the input data x. - If n is even, the spikes in the signal will be positive. If n is odd - the spikes will match the polarity of the step (positive for steps - up, negative for steps down). - This function is essentially a direct translation of the MATLAB code - provided by Sadler and Swami in section A.4 of the following: - http://www.dtic.mil/dtic/tr/fulltext/u2/a351960.pdf - Parameters - ---------- - x : numpy array - 1 dimensional array that represents time series of data points - n : int - Highest scale to multiply to - Returns - ------- - prod : numpy array - The multiscale product for x - """ - N_pnts = x.size - lambda_j = [1.5, 1.12, 1.03, 1.01][0:n] - if n > 4: - lambda_j += [1.0]*(n-4) - - H = np.array([0.125, 0.375, 0.375, 0.125]) - G = np.array([2.0, -2.0]) - - Gn = [2] - Hn = [3] - for j in range(1,n): - q = 2**(j-1) - Gn.append(q+1) - Hn.append(3*q+1) - - S = np.concatenate((x[::-1], x)) - S = np.concatenate((S, x[::-1])) - prod = np.ones(N_pnts) - for j in range(n): - n_zeros = 2**j - 1 - Gz = _insert_zeros(G, n_zeros) - Hz = _insert_zeros(H, n_zeros) - current = (1.0/lambda_j[j])*np.convolve(S,Gz) - current = current[N_pnts+Gn[j]:2*N_pnts+Gn[j]] - prod *= current - if j == n-1: - break - S_new = np.convolve(S, Hz) - S_new = S_new[N_pnts+Hn[j]:2*N_pnts+Hn[j]] - S = np.concatenate((S_new[::-1], S_new)) - S = np.concatenate((S, S_new[::-1])) - return prod - - -def _insert_zeros(x, n): - """ - Helper function for mz_fwt. Splits input array and adds n zeros - between values. - """ - newlen = (n+1)*x.size - out = np.zeros(newlen) - indices = list(range(0, newlen-n, n+1)) - out[indices] = x - return out - - -def find_steps(array, threshold): - """ - Finds local maxima by segmenting array based on positions at which - the threshold value is crossed. Note that this thresholding is - applied after the absolute value of the array is taken. Thus, - the distinction between upward and downward steps is lost. However, - get_step_sizes can be used to determine directionality after the - fact. - Parameters - ---------- - array : numpy array - 1 dimensional array that represents time series of data points - threshold : int / float - Threshold value that defines a step - Returns - ------- - steps : list - List of indices of the detected steps - """ - steps = [] - array = np.abs(array) - above_points = np.where(array > threshold, 1, 0) - ap_dif = np.diff(above_points) - cross_ups = np.where(ap_dif == 1)[0] - cross_dns = np.where(ap_dif == -1)[0] - for upi, dni in zip(cross_ups,cross_dns): - steps.append(np.argmax(array[upi:dni]) + upi) - return steps - - -def get_step_sizes(array, indices, window=1000): - """ - Calculates step size for each index within the supplied list. Step - size is determined by averaging over a range of points (specified - by the window parameter) before and after the index of step - occurrence. The directionality of the step is reflected by the sign - of the step size (i.e. a positive value indicates an upward step, - and a negative value indicates a downward step). The combined - standard deviation of both measurements (as a measure of uncertainty - in step calculation) is also provided. - Parameters - ---------- - array : numpy array - 1 dimensional array that represents time series of data points - indices : list - List of indices of the detected steps (as provided by - find_steps, for example) - window : int, optional - Number of points to average over to determine baseline levels - before and after step. - Returns - ------- - step_sizes : list - List of the calculated sizes of each step - step_error : list - """ - step_sizes = [] - step_error = [] - indices = sorted(indices) - last = len(indices) - 1 - for i, index in enumerate(indices): - if i == 0: - q = min(window, indices[i+1]-index) - elif i == last: - q = min(window, index - indices[i-1]) - else: - q = min(window, index-indices[i-1], indices[i+1]-index) - a = array[index:index+q] - b = array[index-q:index] - step_sizes.append(a.mean() - b.mean()) - step_error.append(sqrt(a.var()+b.var())) - return step_sizes, step_error \ No newline at end of file diff --git a/analytics/detectors/step_detector.py b/analytics/detectors/step_detector.py index a92234a..d0fec1e 100644 --- a/analytics/detectors/step_detector.py +++ b/analytics/detectors/step_detector.py @@ -23,13 +23,12 @@ def exponential_smoothing(series, alpha): class StepDetector: - def __init__(self, pattern): - self.pattern = pattern + def __init__(self): self.segments = [] self.confidence = 1.5 self.convolve_max = 570000 - def fit(self, dataframe, segments): + async def fit(self, dataframe, segments): data = dataframe['value'] confidences = [] convolve_list = [] diff --git a/analytics/supervised_algorithm.py b/analytics/supervised_algorithm.py index dbc7340..8217b80 100644 --- a/analytics/supervised_algorithm.py +++ b/analytics/supervised_algorithm.py @@ -31,7 +31,7 @@ class supervised_algorithm(object): self.col_to_max, self.col_to_min, self.col_to_median = None, None, None self.augmented_path = None - def fit(self, dataset, contamination=0.005): + async def fit(self, dataset, contamination=0.005): dataset = dataset[self.good_features] dataset = dataset[-100000:]