From c2c3925979932c22a303fe0cc277cf727d0ee883 Mon Sep 17 00:00:00 2001 From: rozetko Date: Thu, 30 Aug 2018 21:55:37 +0300 Subject: [PATCH] Make all models work && add reverse peak model (#124) - Subtract min value from dataset before passing to model - Rename StepModel -> DropModel - Use cache to save state in all models - Return `Segment { 'from': , 'to': }` instead of `Segment { 'from': , 'to': }` in all models - Integrate new peaks model (from https://github.com/hastic/hastic-server/pull/123) - Integrate new reverse-peaks model (from https://github.com/hastic/hastic-server/pull/123) - Refactor: make `predict` method in `Model` not abstract and remove it from all children - Refactor: add abstract `do_predict` method to models --- analytics/analytic_unit_manager.py | 21 +++- analytics/analytic_unit_worker.py | 6 +- analytics/detectors/detector.py | 6 +- .../general_detector/general_detector.py | 8 +- analytics/detectors/pattern_detector.py | 24 ++-- analytics/models/__init__.py | 9 +- analytics/models/custom_model.py | 5 +- .../models/{step_model.py => drop_model.py} | 40 +++---- analytics/models/jump_model.py | 53 ++++---- analytics/models/model.py | 22 +++- analytics/models/peak_model.py | 113 ++++++++++++++++++ analytics/models/peaks_model.py | 59 --------- analytics/models/reverse_peak_model.py | 112 +++++++++++++++++ analytics/utils/__init__.py | 9 +- .../src/controllers/analytics_controller.ts | 73 ++++++----- .../src/models/analytic_unit_cache_model.ts | 34 ++---- 16 files changed, 396 insertions(+), 198 deletions(-) rename analytics/models/{step_model.py => drop_model.py} (80%) create mode 100644 analytics/models/peak_model.py delete mode 100644 analytics/models/peaks_model.py create mode 100644 analytics/models/reverse_peak_model.py diff --git a/analytics/analytic_unit_manager.py b/analytics/analytic_unit_manager.py index a4b9516..b1cf855 100644 --- a/analytics/analytic_unit_manager.py +++ b/analytics/analytic_unit_manager.py @@ -33,12 +33,11 @@ async def handle_analytic_task(task): worker = ensure_worker(task['analyticUnitId'], payload['pattern']) - data = pd.DataFrame(payload['data'], columns=['timestamp', 'value']) - data['timestamp'] = pd.to_datetime(data['timestamp']) + data = prepare_data(payload['data']) result_payload = {} - if task['type'] == "LEARN": + if task['type'] == 'LEARN': result_payload = await worker.do_learn(payload['segments'], data, payload['cache']) - elif task['type'] == "PREDICT": + elif task['type'] == 'PREDICT': result_payload = await worker.do_predict(data, payload['cache']) else: raise ValueError('Unknown task type "%s"' % task['type']) @@ -52,8 +51,20 @@ async def handle_analytic_task(task): logger.error("handle_analytic_task exception: '%s'" % error_text) # TODO: move result to a class which renders to json for messaging to analytics return { - 'status': "FAILED", + 'status': 'FAILED', 'error': str(e) } +def prepare_data(data: list): + """ + Takes list + - converts it into pd.DataFrame, + - converts 'timestamp' column to pd.Datetime, + - subtracts min value from dataset + """ + data = pd.DataFrame(data, columns=['timestamp', 'value']) + data['timestamp'] = pd.to_datetime(data['timestamp']) + data['value'] = data['value'] - min(data['value']) + + return data diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index 34b0eb6..5584d5b 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -2,6 +2,8 @@ import config import detectors import logging import pandas as pd +from typing import Optional +from models import AnalyticUnitCache logger = logging.getLogger('AnalyticUnitWorker') @@ -13,8 +15,8 @@ class AnalyticUnitWorker: self.analytic_unit_id = analytic_unit_id self.detector = detector - async def do_learn(self, segments: list, data: pd.DataFrame, cache: dict) -> dict: + async def do_learn(self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: return await self.detector.train(data, segments, cache) - async def do_predict(self, data: pd.DataFrame, cache: dict) -> dict: + async def do_predict(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: return await self.detector.predict(data, cache) diff --git a/analytics/detectors/detector.py b/analytics/detectors/detector.py index b5bed34..6e58653 100644 --- a/analytics/detectors/detector.py +++ b/analytics/detectors/detector.py @@ -1,13 +1,15 @@ +from models import AnalyticUnitCache from abc import ABC, abstractmethod from pandas import DataFrame +from typing import Optional class Detector(ABC): @abstractmethod - async def train(self, dataframe: DataFrame, segments: list, cache: dict) -> dict: + async def train(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: pass @abstractmethod - async def predict(self, dataframe: DataFrame, cache: dict) -> dict: + async def predict(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: pass diff --git a/analytics/detectors/general_detector/general_detector.py b/analytics/detectors/general_detector/general_detector.py index 275ef1a..470557d 100644 --- a/analytics/detectors/general_detector/general_detector.py +++ b/analytics/detectors/general_detector/general_detector.py @@ -1,11 +1,13 @@ from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm from detectors import Detector +from models import AnalyticUnitCache import utils import pandas as pd import logging import config import json +from typing import Optional NANOSECONDS_IN_MS = 1000000 @@ -18,7 +20,7 @@ class GeneralDetector(Detector): def __init__(self): self.model = None - async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict): + async def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: confidence = 0.02 start_index, stop_index = 0, len(dataframe) @@ -43,9 +45,9 @@ class GeneralDetector(Detector): last_prediction_time = 0 logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) - return last_prediction_time + return cache - async def predict(self, dataframe: pd.DataFrame, cache: dict): + async def predict(self, dataframe: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name) last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index f8d4506..8632a5d 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -4,6 +4,7 @@ import logging import config import pandas as pd +from typing import Optional from detectors import Detector @@ -13,9 +14,11 @@ logger = logging.getLogger('PATTERN_DETECTOR') def resolve_model_by_pattern(pattern: str) -> models.Model: if pattern == 'PEAK': - return models.PeaksModel() + return models.PeakModel() + if pattern == 'REVERSE_PEAK': + return models.ReversePeakModel() if pattern == 'DROP': - return models.StepModel() + return models.DropModel() if pattern == 'JUMP': return models.JumpModel() if pattern == 'CUSTOM': @@ -30,23 +33,24 @@ class PatternDetector(Detector): self.model = resolve_model_by_pattern(self.pattern_type) window_size = 100 - async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict): + async def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache: # TODO: pass only part of dataframe that has segments - self.model.fit(dataframe, segments, cache) - # TODO: save model after fit + new_cache = self.model.fit(dataframe, segments, cache) return { - 'cache': cache + 'cache': new_cache } - async def predict(self, dataframe: pd.DataFrame, cache: dict): - predicted = await self.model.predict(dataframe, cache) + async def predict(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict: + # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) + predicted = self.model.predict(dataframe, cache) - segments = [{ 'from': segment[0], 'to': segment[1] } for segment in predicted] + segments = [{ 'from': segment[0], 'to': segment[1] } for segment in predicted['segments']] + newCache = predicted['cache'] last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_prediction_time = last_dataframe_time.value return { - 'cache': cache, + 'cache': newCache, 'segments': segments, 'lastPredictionTime': last_prediction_time } diff --git a/analytics/models/__init__.py b/analytics/models/__init__.py index b6ffc10..2c106eb 100644 --- a/analytics/models/__init__.py +++ b/analytics/models/__init__.py @@ -1,5 +1,8 @@ -from models.model import Model -from models.step_model import StepModel -from models.peaks_model import PeaksModel +from models.model import Model, AnalyticUnitCache +from models.drop_model import DropModel +from models.peak_model import PeakModel from models.jump_model import JumpModel from models.custom_model import CustomModel + +from models.custom_model import CustomModel +from models.reverse_peak_model import ReversePeakModel diff --git a/analytics/models/custom_model.py b/analytics/models/custom_model.py index db9fbcf..b0c78e7 100644 --- a/analytics/models/custom_model.py +++ b/analytics/models/custom_model.py @@ -1,6 +1,7 @@ from models import Model import utils import pandas as pd +from typing import Optional # Paste your model here: class CustomModel(Model): @@ -11,8 +12,8 @@ class CustomModel(Model): # It will be saved in filesystem and loaded after server restart self.state = {} - def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: + def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[dict]) -> dict: pass - def predict(self, dataframe, cache: dict): + def predict(self, dataframe, cache: Optional[dict]): return [] diff --git a/analytics/models/step_model.py b/analytics/models/drop_model.py similarity index 80% rename from analytics/models/step_model.py rename to analytics/models/drop_model.py index 767bd36..8c582a8 100644 --- a/analytics/models/step_model.py +++ b/analytics/models/drop_model.py @@ -1,4 +1,4 @@ -from models import Model +from models import Model, AnalyticUnitCache import scipy.signal from scipy.fftpack import fft @@ -8,10 +8,11 @@ from scipy.stats import gaussian_kde import utils import numpy as np import pandas as pd +from typing import Optional WINDOW_SIZE = 400 -class StepModel(Model): +class DropModel(Model): def __init__(self): super() self.segments = [] @@ -23,13 +24,12 @@ class StepModel(Model): 'DROP_LENGTH': 1, } - def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: + def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + if type(cache) is AnalyticUnitCache: + self.state = cache self.segments = segments - d_min = min(dataframe['value']) - for i in range(0,len(dataframe['value'])): - dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min - data = dataframe['value'] + data = dataframe['value'] confidences = [] convolve_list = [] drop_height_list = [] @@ -76,46 +76,34 @@ class StepModel(Model): convolve_list.append(max(convolve)) if len(confidences) > 0: - self.state['confidence'] = min(confidences) + self.state['confidence'] = float(min(confidences)) else: self.state['confidence'] = 1.5 if len(convolve_list) > 0: - self.state['convolve_max'] = max(convolve_list) + self.state['convolve_max'] = float(max(convolve_list)) else: self.state['convolve_max'] = WINDOW_SIZE if len(drop_height_list) > 0: - self.state['DROP_HEIGHT'] = min(drop_height_list) + self.state['DROP_HEIGHT'] = int(min(drop_height_list)) else: self.state['DROP_HEIGHT'] = 1 if len(drop_length_list) > 0: - self.state['DROP_LENGTH'] = max(drop_length_list) + self.state['DROP_LENGTH'] = int(max(drop_length_list)) else: self.state['DROP_LENGTH'] = 1 - async def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict: - d_min = min(dataframe['value']) - for i in range(0,len(dataframe['value'])): - dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min - - result = await self.__predict(dataframe) - - if len(self.segments) > 0: - return [segment for segment in result if not utils.is_intersect(segment, self.segments)] + return self.state - async def __predict(self, dataframe): - #window_size = 24 - #all_max_flatten_data = data.rolling(window=window_size).mean() - #all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0] - #print(self.state['DROP_HEIGHT'],self.state['DROP_LENGTH']) + def do_predict(self, dataframe: pd.DataFrame): data = dataframe['value'] possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) filtered = self.__filter_prediction(possible_drops, data) return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered] - def __filter_prediction(self, segments, data): + def __filter_prediction(self, segments: list, data: list): delete_list = [] variance_error = int(0.004 * len(data)) if variance_error > 50: diff --git a/analytics/models/jump_model.py b/analytics/models/jump_model.py index 6391a5b..bc0f5d2 100644 --- a/analytics/models/jump_model.py +++ b/analytics/models/jump_model.py @@ -1,4 +1,4 @@ -from models import Model +from models import Model, AnalyticUnitCache import utils import numpy as np @@ -9,6 +9,7 @@ from scipy.signal import argrelextrema import math from scipy.stats import gaussian_kde from scipy.stats import norm +from typing import Optional WINDOW_SIZE = 400 @@ -26,8 +27,11 @@ class JumpModel(Model): 'JUMP_LENGTH': 1, } - def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: + def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + if type(cache) is AnalyticUnitCache: + self.state = cache self.segments = segments + data = dataframe['value'] confidences = [] convolve_list = [] @@ -35,13 +39,17 @@ class JumpModel(Model): jump_length_list = [] for segment in segments: if segment['labeled']: - segment_data = data.loc[segment['from'] : segment['to'] + 1].reset_index(drop=True) + segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'])) + segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'])) + + segment_data = data.loc[segment_from_index : segment_to_index + 1].reset_index(drop=True) segment_min = min(segment_data) segment_max = max(segment_data) confidences.append(0.20 * (segment_max - segment_min)) flat_segment = segment_data.rolling(window=5).mean() - pdf = gaussian_kde(flat_segment.dropna()) - x = np.linspace(flat_segment.dropna().min() - 1, flat_segment.dropna().max() + 1, len(flat_segment.dropna())) + flat_segment_dropna = flat_segment.dropna() + pdf = gaussian_kde(flat_segment_dropna) + x = np.linspace(flat_segment_dropna.min() - 1, flat_segment_dropna.max() + 1, len(flat_segment_dropna)) y = pdf(x) ax_list = [] for i in range(len(x)): @@ -56,12 +64,12 @@ class JumpModel(Model): segment_max_line = ax_list[max_peak_index, 0] jump_height = 0.9 * (segment_max_line - segment_min_line) jump_height_list.append(jump_height) - jump_lenght = utils.find_jump_length(segment_data, segment_min_line, segment_max_line) - jump_length_list.append(jump_lenght) + jump_length = utils.find_jump_length(segment_data, segment_min_line, segment_max_line) + jump_length_list.append(jump_length) cen_ind = utils.intersection_segment(flat_segment, segment_median) #finds all interseprions with median #cen_ind = utils.find_ind_median(segment_median, flat_segment) jump_center = cen_ind[0] - segment_cent_index = jump_center - 5 + segment['from'] + segment_cent_index = jump_center - 5 + segment_from_index self.ijumps.append(segment_cent_index) labeled_drop = data[segment_cent_index - WINDOW_SIZE : segment_cent_index + WINDOW_SIZE] labeled_min = min(labeled_drop) @@ -71,41 +79,33 @@ class JumpModel(Model): convolve_list.append(max(convolve)) if len(confidences) > 0: - self.state['confidence'] = min(confidences) + self.state['confidence'] = float(min(confidences)) else: self.state['confidence'] = 1.5 if len(convolve_list) > 0: - self.state['convolve_max'] = max(convolve_list) + self.state['convolve_max'] = float(max(convolve_list)) else: self.state['convolve_max'] = WINDOW_SIZE if len(jump_height_list) > 0: - self.state['JUMP_HEIGHT'] = min(jump_height_list) + self.state['JUMP_HEIGHT'] = int(min(jump_height_list)) else: self.state['JUMP_HEIGHT'] = 1 if len(jump_length_list) > 0: - self.state['JUMP_LENGTH'] = max(jump_length_list) + self.state['JUMP_LENGTH'] = int(max(jump_length_list)) else: - self.state['JUMP_LENGTH'] = 1 - - def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict: - data = dataframe['value'] + self.state['JUMP_LENGTH'] = 1 - result = self.__predict(data) - result.sort() - if len(self.segments) > 0: - result = [segment for segment in result if not utils.is_intersect(segment, self.segments)] - return result + return self.state - def __predict(self, data): - #window_size = 24 - #all_max_flatten_data = data.rolling(window=window_size).mean() - #all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0] + def do_predict(self, dataframe: pd.DataFrame): + data = dataframe['value'] possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1) + filtered = self.__filter_prediction(possible_jumps, data) - return [(x - 1, x + 1) for x in self.__filter_prediction(possible_jumps, data)] + return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered] def __filter_prediction(self, segments, data): delete_list = [] @@ -138,5 +138,4 @@ class JumpModel(Model): for ijump in self.ijumps: segments.append(ijump) - return segments diff --git a/analytics/models/model.py b/analytics/models/model.py index 35dba6f..b37a4f6 100644 --- a/analytics/models/model.py +++ b/analytics/models/model.py @@ -1,13 +1,31 @@ +import utils + from abc import ABC, abstractmethod from pandas import DataFrame +from typing import Optional +AnalyticUnitCache = dict class Model(ABC): @abstractmethod - def fit(self, dataframe: DataFrame, segments: list, cache: dict) -> dict: + def fit(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: pass @abstractmethod - def predict(self, dataframe: DataFrame, cache: dict) -> dict: + def do_predict(self, dataframe: DataFrame): pass + + def predict(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: + if type(cache) is AnalyticUnitCache: + self.state = cache + + result = self.do_predict(dataframe) + result.sort() + + if len(self.segments) > 0: + result = [segment for segment in result if not utils.is_intersect(segment, self.segments)] + return { + 'segments': result, + 'cache': self.state + } diff --git a/analytics/models/peak_model.py b/analytics/models/peak_model.py new file mode 100644 index 0000000..ca170c7 --- /dev/null +++ b/analytics/models/peak_model.py @@ -0,0 +1,113 @@ +from models import Model, AnalyticUnitCache + +import scipy.signal +from scipy.fftpack import fft +from scipy.signal import argrelextrema + +import utils +import numpy as np +import pandas as pd +from typing import Optional + +WINDOW_SIZE = 240 + + +class PeakModel(Model): + + def __init__(self): + super() + self.segments = [] + self.ipeaks = [] + self.state = { + 'confidence': 1.5, + 'convolve_max': 570000 + } + + def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + if type(cache) is AnalyticUnitCache: + self.state = cache + + self.segments = segments + data = dataframe['value'] + + confidences = [] + convolve_list = [] + for segment in segments: + if segment['labeled']: + segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'])) + segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'])) + + segment_data = data[segment_from_index: segment_to_index + 1] + segment_min = min(segment_data) + segment_max = max(segment_data) + confidences.append(0.2 * (segment_max - segment_min)) + flat_segment = segment_data.rolling(window=5).mean() + flat_segment = flat_segment.dropna() + segment_max_index = flat_segment.idxmax() # + segment['start'] + self.ipeaks.append(segment_max_index) + labeled_drop = data[segment_max_index - WINDOW_SIZE: segment_max_index + WINDOW_SIZE] + labeled_min = min(labeled_drop) + for value in labeled_drop: + value = value - labeled_min + convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop) + convolve_list.append(max(convolve)) + + if len(confidences) > 0: + self.state['confidence'] = float(min(confidences)) + else: + self.state['confidence'] = 1.5 + + if len(convolve_list) > 0: + self.state['convolve_max'] = float(max(convolve_list)) + else: + self.state['convolve_max'] = 570000 + + return self.state + + def do_predict(self, dataframe: pd.DataFrame): + data = dataframe['value'] + window_size = 24 + all_max_flatten_data = data.rolling(window=window_size).mean() + all_maxs = argrelextrema(np.array(all_max_flatten_data), np.greater)[0] + + extrema_list = [] + for i in utils.exponential_smoothing(data + self.state['confidence'], 0.02): + extrema_list.append(i) + + segments = [] + for i in all_maxs: + if all_max_flatten_data[i] > extrema_list[i]: + segments.append(i+12) + + filtered = self.__filter_prediction(segments, data) + return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered] + + def __filter_prediction(self, segments: list, all_max_flatten_data: list): + delete_list = [] + variance_error = int(0.004 * len(all_max_flatten_data)) + if variance_error > 100: + variance_error = 100 + for i in range(1, len(segments)): + if segments[i] < segments[i - 1] + variance_error: + delete_list.append(segments[i]) + for item in delete_list: + segments.remove(item) + + delete_list = [] + if len(segments) == 0 or len(self.ipeaks) == 0: + segments = [] + return segments + + pattern_data = all_max_flatten_data[self.ipeaks[0] - WINDOW_SIZE: self.ipeaks[0] + WINDOW_SIZE] + for segment in segments: + if segment > WINDOW_SIZE: + convol_data = all_max_flatten_data[segment - WINDOW_SIZE: segment + WINDOW_SIZE] + conv = scipy.signal.fftconvolve(pattern_data, convol_data) + if max(conv) > self.state['convolve_max'] * 1.2 or max(conv) < self.state['convolve_max'] * 0.8: + delete_list.append(segment) + else: + delete_list.append(segment) + for item in delete_list: + segments.remove(item) + + return segments diff --git a/analytics/models/peaks_model.py b/analytics/models/peaks_model.py deleted file mode 100644 index 0c2b7c5..0000000 --- a/analytics/models/peaks_model.py +++ /dev/null @@ -1,59 +0,0 @@ -from models import Model - -import utils -from scipy import signal -import numpy as np -import pandas as pd - - -class PeaksModel(Model): - - def __init__(self): - super() - - def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: - pass - - def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict: - array = dataframe['value'].as_matrix() - window_size = 20 - # window = np.ones(101) - # mean_filtered = signal.fftconvolve( - # np.concatenate([np.zeros(window_size), array, np.zeros(window_size)]), - # window, - # mode='valid' - # ) - # filtered = np.divide(array, mean_filtered / 101) - - window = signal.general_gaussian(2 * window_size + 1, p=0.5, sig=5) - #print(window) - filtered = signal.fftconvolve(array, window, mode='valid') - - # filtered = np.concatenate([ - # np.zeros(window_size), - # filtered, - # np.zeros(window_size) - # ]) - filtered = filtered / np.sum(window) - array = array[window_size:-window_size] - filtered = np.subtract(array, filtered) - - # filtered = np.convolve(array, step, mode='valid') - # print(len(array)) - # print(len(filtered)) - - # step = np.hstack((np.ones(window_size), 0, -1*np.ones(window_size))) - # - # conv = np.convolve(array, step, mode='valid') - # - # conv = np.concatenate([ - # np.zeros(window_size), - # conv, - # np.zeros(window_size)]) - - #data = step_detect.t_scan(array, window=window_size) - data = filtered - data /= data.max() - - result = utils.find_steps(data, 0.1) - return [(dataframe.index[x], dataframe.index[x + window_size]) for x in result] diff --git a/analytics/models/reverse_peak_model.py b/analytics/models/reverse_peak_model.py new file mode 100644 index 0000000..f50cddc --- /dev/null +++ b/analytics/models/reverse_peak_model.py @@ -0,0 +1,112 @@ +from models import Model, AnalyticUnitCache + +import scipy.signal +from scipy.fftpack import fft +from scipy.signal import argrelextrema + +import utils +import numpy as np +import pandas as pd +from typing import Optional + +WINDOW_SIZE = 240 + +class ReversePeakModel(Model): + + def __init__(self): + super() + self.segments = [] + self.ipeaks = [] + self.state = { + 'confidence': 1.5, + 'convolve_max': 570000 + } + + def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + if type(cache) is AnalyticUnitCache: + self.state = cache + + self.segments = segments + data = dataframe['value'] + + confidences = [] + convolve_list = [] + for segment in segments: + if segment['labeled']: + segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'])) + segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'])) + + segment_data = data[segment_from_index: segment_to_index + 1] + segment_min = min(segment_data) + segment_max = max(segment_data) + confidences.append(0.2 * (segment_max - segment_min)) + flat_segment = segment_data.rolling(window=5).mean() + flat_segment = flat_segment.dropna() + segment_min_index = flat_segment.idxmin() #+ segment['start'] + self.ipeaks.append(segment_min_index) + labeled_drop = data[segment_min_index - WINDOW_SIZE : segment_min_index + WINDOW_SIZE] + labeled_min = min(labeled_drop) + for value in labeled_drop: + value = value - labeled_min + convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop) + convolve_list.append(max(convolve)) + + if len(confidences) > 0: + self.state['confidence'] = min(confidences) + else: + self.state['confidence'] = 1.5 + + if len(convolve_list) > 0: + self.state['convolve_max'] = max(convolve_list) + else: + self.state['convolve_max'] = 570000 + + return self.state + + def do_predict(self, dataframe: pd.DataFrame): + data = dataframe['value'] + window_size = 24 + all_max_flatten_data = data.rolling(window=window_size).mean() + all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0] + + extrema_list = [] + for i in utils.exponential_smoothing(data - self.state['confidence'], 0.02): + extrema_list.append(i) + + segments = [] + for i in all_mins: + if all_max_flatten_data[i] < extrema_list[i]: + segments.append(i + 12) + + filtered = self.__filter_prediction(segments, data) + return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered] + + def __filter_prediction(self, segments: list, all_max_flatten_data: list): + delete_list = [] + variance_error = int(0.004 * len(all_max_flatten_data)) + if variance_error > 100: + variance_error = 100 + for i in range(1, len(segments)): + if segments[i] < segments[i - 1] + variance_error: + delete_list.append(segments[i]) + for item in delete_list: + segments.remove(item) + + delete_list = [] + if len(segments) == 0 or len(self.ipeaks) == 0 : + segments = [] + return segments + + pattern_data = all_max_flatten_data[self.ipeaks[0] - WINDOW_SIZE : self.ipeaks[0] + WINDOW_SIZE] + for segment in segments: + if segment > WINDOW_SIZE: + convol_data = all_max_flatten_data[segment - WINDOW_SIZE : segment + WINDOW_SIZE] + conv = scipy.signal.fftconvolve(pattern_data, convol_data) + if max(conv) > self.state['convolve_max'] * 1.2 or max(conv) < self.state['convolve_max'] * 0.8: + delete_list.append(segment) + else: + delete_list.append(segment) + for item in delete_list: + segments.remove(item) + + return segments diff --git a/analytics/utils/__init__.py b/analytics/utils/__init__.py index 80eb7d3..caccc47 100644 --- a/analytics/utils/__init__.py +++ b/analytics/utils/__init__.py @@ -148,7 +148,8 @@ def find_jump_length(segment_data, min_line, max_line): if (idl[0] - idx[-1] + 1) > 0: return idl[0] - idx[-1] + 1 else: - return print("retard alert!") + print("retard alert!") + return 0 def find_jump(data, height, lenght): j_list = [] @@ -192,10 +193,10 @@ def drop_intersection(segment_data, median_line): idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0 return idx -def find_drop(data, height, lenght): +def find_drop(data, height, length): d_list = [] - for i in range(len(data)-lenght-1): - for x in range(1, lenght): + for i in range(len(data)-length-1): + for x in range(1, length): if(data[i+x] < data[i] - height): d_list.append(i+36) return(d_list) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 55eb9e1..0899c4e 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -112,6 +112,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { let oldCache = await AnalyticUnitCache.findById(id); if(oldCache !== null) { oldCache = oldCache.data; + } else { + await AnalyticUnitCache.create(id); } let task = new AnalyticsTask( id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data, cache: oldCache } @@ -121,7 +123,7 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { throw new Error(result.error) } - AnalyticUnitCache.setData(id, result.payload.cache); + await AnalyticUnitCache.setData(id, result.payload.cache); } catch (err) { let message = err.message || JSON.stringify(err); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); @@ -129,34 +131,6 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { } -function processPredictionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, taskResult: any): { - lastPredictionTime: number, - segments: Segment.Segment[], - cache: any -} { - let payload = taskResult.payload; - if(payload === undefined) { - throw new Error(`Missing payload in result: ${taskResult}`); - } - if(payload.segments === undefined || !Array.isArray(payload.segments)) { - throw new Error(`Missing segments in result or it is corrupted: ${JSON.stringify(payload)}`); - } - if(payload.lastPredictionTime === undefined || isNaN(+payload.lastPredictionTime)) { - throw new Error( - `Missing lastPredictionTime is result or it is corrupted: ${JSON.stringify(payload)}` - ); - } - - let segments = payload.segments.map(segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false)); - - return { - lastPredictionTime: payload.lastPredictionTime, - segments: segments, - cache: {} - }; - -} - export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { let previousLastPredictionTime: number = undefined; @@ -166,20 +140,26 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { let pattern = unit.type; let segments = await Segment.findMany(id, { labeled: true }); - if (segments.length < 2) { + if(segments.length < 2) { throw new Error('Need at least 2 labeled segments'); } let { from, to } = getQueryRangeForLearningBySegments(segments); let data = await queryByMetric(unit.metric, unit.panelUrl, from, to); - if (data.length === 0) { + if(data.length === 0) { throw new Error('Empty data to predict on'); } + let oldCache = await AnalyticUnitCache.findById(id); + if(oldCache !== null) { + oldCache = oldCache.data; + } else { + await AnalyticUnitCache.create(id); + } let task = new AnalyticsTask( id, AnalyticsTaskType.PREDICT, - { pattern, lastPredictionTime: unit.lastPredictionTime, data, cache: {} } + { pattern, lastPredictionTime: unit.lastPredictionTime, data, cache: oldCache } ); let result = await runTask(task); if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { @@ -200,6 +180,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { } Segment.insertSegments(payload.segments); + AnalyticUnitCache.setData(id, payload.cache); AnalyticUnit.setPredictionTime(id, payload.lastPredictionTime); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); } catch(err) { @@ -211,6 +192,34 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { } } +function processPredictionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, taskResult: any): { + lastPredictionTime: number, + segments: Segment.Segment[], + cache: any +} { + let payload = taskResult.payload; + if (payload === undefined) { + throw new Error(`Missing payload in result: ${taskResult}`); + } + if (payload.segments === undefined || !Array.isArray(payload.segments)) { + throw new Error(`Missing segments in result or it is corrupted: ${JSON.stringify(payload)}`); + } + if (payload.lastPredictionTime === undefined || isNaN(+payload.lastPredictionTime)) { + throw new Error( + `Missing lastPredictionTime is result or it is corrupted: ${JSON.stringify(payload)}` + ); + } + + let segments = payload.segments.map(segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false)); + + return { + lastPredictionTime: payload.lastPredictionTime, + segments: segments, + cache: payload.cache + }; + +} + export function isAnalyticReady(): boolean { return analyticsService.ready; } diff --git a/server/src/models/analytic_unit_cache_model.ts b/server/src/models/analytic_unit_cache_model.ts index 5c53478..3957bd8 100644 --- a/server/src/models/analytic_unit_cache_model.ts +++ b/server/src/models/analytic_unit_cache_model.ts @@ -5,40 +5,32 @@ import { Collection, makeDBQ } from '../services/data_service'; let db = makeDBQ(Collection.ANALYTIC_UNIT_CACHES); -export type AnalyticUnitCacheId = string; - export class AnalyticUnitCache { public constructor( - public analyticUnitId: AnalyticUnitId, - public data?: any, - public id?: AnalyticUnitCacheId, + public id: AnalyticUnitId, + public data?: any ) { - if(analyticUnitId === undefined) { - throw new Error(`Missing field "analyticUnitId"`); + if(id === undefined) { + throw new Error(`Missing field "id"`); } } public toObject() { return { - _id: this.id, - analyticUnitId: this.analyticUnitId, - data: this.data + data: this.data || null, + _id: this.id }; } static fromObject(obj: any): AnalyticUnitCache { - if(obj.method === undefined) { - throw new Error('No method in obj:' + obj); - } return new AnalyticUnitCache( - obj.method, + obj._id, obj.data, - obj._id ); } } -export async function findById(id: AnalyticUnitCacheId): Promise { +export async function findById(id: AnalyticUnitId): Promise { let obj = await db.findOne(id); if(obj === null) { return null; @@ -46,15 +38,15 @@ export async function findById(id: AnalyticUnitCacheId): Promise { - let obj = unit.toObject(); - return db.insertOne(obj); +export async function create(id: AnalyticUnitId): Promise { + let cache = new AnalyticUnitCache(id); + return db.insertOne(cache.toObject()); } -export async function setData(id: AnalyticUnitCacheId, data: any) { +export async function setData(id: AnalyticUnitId, data: any) { return db.updateOne(id, { data }); } -export async function remove(id: AnalyticUnitCacheId): Promise { +export async function remove(id: AnalyticUnitId): Promise { await db.removeOne(id); }