diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 4c75c84..23e565b 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -5,7 +5,7 @@ import logging import config import pandas as pd -from typing import Optional, Generator +from typing import Optional, Generator, List from detectors import Detector from analytic_types import DataBucket @@ -45,10 +45,12 @@ class PatternDetector(Detector): self.model = resolve_model_by_pattern(self.pattern_type) self.bucket = DataBucket() - def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: + def train(self, dataframe: pd.DataFrame, segments: List[dict], cache: Optional[models.ModelCache]) -> models.ModelState: # TODO: pass only part of dataframe that has segments - new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id, cache) - if new_cache == None or len(new_cache) == 0: + self.model.state = self.model.get_state(cache) + new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id) + new_cache = new_cache.to_json() + if len(new_cache) == 0: logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}, analytic unit: {}'.format(dataframe, segments, cache, self.analytic_unit_id)) return { 'cache': new_cache @@ -58,30 +60,30 @@ class PatternDetector(Detector): logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) - if cache is None or cache == {}: + if cache is None: msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' logger.error(msg) raise ValueError(msg) - - window_size = cache.get('WINDOW_SIZE') + self.model.state = self.model.get_state(cache) + window_size = self.model.state.window_size if window_size is None: - message = '{} got cache without WINDOW_SIZE for detection'.format(self.analytic_unit_id) + message = '{} got cache without window_size for detection'.format(self.analytic_unit_id) logger.error(message) raise ValueError(message) if len(dataframe) < window_size * 2: - message = f'{self.analytic_unit_id} skip detection: data length: {len(dataframe)} less than WINDOW_SIZE: {window_size}' + message = f'{self.analytic_unit_id} skip detection: data length: {len(dataframe)} less than window_size: {window_size}' logger.error(message) raise ValueError(message) - detected = self.model.detect(dataframe, self.analytic_unit_id, cache) + detected = self.model.detect(dataframe, self.analytic_unit_id) segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] - newCache = detected['cache'] + new_cache = detected['cache'].to_json() last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) return { - 'cache': newCache, + 'cache': new_cache, 'segments': segments, 'lastDetectionTime': last_detection_time } @@ -89,7 +91,7 @@ class PatternDetector(Detector): def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) - if cache is None or cache == {}: + if cache is None: logging.debug(f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}, skip') return None @@ -100,7 +102,8 @@ class PatternDetector(Detector): self.bucket.receive_data(data_without_nan) - window_size = cache['WINDOW_SIZE'] + # TODO: use ModelState + window_size = cache['windowSize'] bucket_len = len(self.bucket.data) if bucket_len < window_size * 2: @@ -124,4 +127,4 @@ class PatternDetector(Detector): def get_window_size(self, cache: Optional[ModelCache]) -> int: if cache is None: return self.DEFAULT_WINDOW_SIZE - return cache.get('WINDOW_SIZE', self.DEFAULT_WINDOW_SIZE) + return cache.get('windowSize', self.DEFAULT_WINDOW_SIZE) diff --git a/analytics/analytics/models/__init__.py b/analytics/analytics/models/__init__.py index 29a38ce..e3cdcc8 100644 --- a/analytics/analytics/models/__init__.py +++ b/analytics/analytics/models/__init__.py @@ -1,7 +1,7 @@ from models.model import Model, ModelCache, ModelState -from models.drop_model import DropModel -from models.peak_model import PeakModel -from models.jump_model import JumpModel +from models.drop_model import DropModel, DropModelState +from models.peak_model import PeakModel, PeakModelState +from models.jump_model import JumpModel, JumpModelState from models.custom_model import CustomModel -from models.trough_model import TroughModel -from models.general_model import GeneralModel +from models.trough_model import TroughModel, TroughModelState +from models.general_model import GeneralModel, GeneralModelState diff --git a/analytics/analytics/models/drop_model.py b/analytics/analytics/models/drop_model.py index b5363a4..2720690 100644 --- a/analytics/analytics/models/drop_model.py +++ b/analytics/analytics/models/drop_model.py @@ -4,12 +4,12 @@ import scipy.signal from scipy.fftpack import fft from scipy.signal import argrelextrema from scipy.stats import gaussian_kde -from typing import Optional +from typing import Optional, List, Tuple import utils import utils.meta import numpy as np import pandas as pd - +from analytic_types import AnalyticUnitId @utils.meta.JSONClass class DropModelState(ModelState): @@ -28,21 +28,6 @@ class DropModelState(ModelState): class DropModel(Model): - def __init__(self): - super() - self.segments = [] - self.state = { - 'pattern_center': [], - 'pattern_model': [], - 'confidence': 1.5, - 'convolve_max': 200, - 'convolve_min': 200, - 'DROP_HEIGHT': 1, - 'DROP_LENGTH': 1, - 'WINDOW_SIZE': 0, - 'conv_del_min': 54000, - 'conv_del_max': 55000, - } def get_model_type(self) -> (str, bool): model = 'drop' @@ -55,18 +40,18 @@ class DropModel(Model): segment_center_index = utils.find_pattern_center(segment, start, 'drop') return segment_center_index - def get_cache(self, cache: Optional[dict] = None) -> DropModelState: + def get_state(self, cache: Optional[dict] = None) -> DropModelState: return DropModelState.from_json(cache) - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] - window_size = self.state['WINDOW_SIZE'] - last_pattern_center = self.state.get('pattern_center', []) - self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) - self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) - correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) + window_size = self.state.window_size + last_pattern_center = self.state.pattern_center + self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list'])) + self.state.pattern_model = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size) + correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size) height_list = learning_info['patterns_value'] del_conv_list = [] @@ -76,32 +61,32 @@ class DropModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted_drop = utils.get_interval(data, segment_cent_index, window_size) deleted_drop = utils.subtract_min_without_nan(deleted_drop) - del_conv_drop = scipy.signal.fftconvolve(deleted_drop, self.state['pattern_model']) + del_conv_drop = scipy.signal.fftconvolve(deleted_drop, self.state.pattern_model) if len(del_conv_drop): del_conv_list.append(max(del_conv_drop)) - self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) - self.state['DROP_HEIGHT'] = int(min(learning_info['pattern_height'], default = 1)) - self.state['DROP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) + self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list) + self.state.drop_height = int(min(learning_info['pattern_height'], default = 1)) + self.state.drop_length = int(max(learning_info['pattern_width'], default = 1)) - def do_detect(self, dataframe: pd.DataFrame, id: str) -> list: + def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]: data = utils.cut_dataframe(dataframe) data = data['value'] - possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) + possible_drops = utils.find_drop(data, self.state.drop_height, self.state.drop_length + 1) result = self.__filter_detection(possible_drops, data) return [(val - 1, val + 1) for val in result] def __filter_detection(self, segments: list, data: list): delete_list = [] - variance_error = self.state['WINDOW_SIZE'] + variance_error = self.state.window_size close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'min') - if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: + if len(segments) == 0 or len(self.state.pattern_center) == 0: segments = [] return segments - pattern_data = self.state['pattern_model'] + pattern_data = self.state.pattern_model for segment in segments: - if segment > self.state['WINDOW_SIZE'] and segment < (len(data) - self.state['WINDOW_SIZE']): - convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) + if segment > self.state.window_size and segment < (len(data) - self.state.window_size): + convol_data = utils.get_interval(data, segment, self.state.window_size) percent_of_nans = convol_data.isnull().sum() / len(convol_data) if len(convol_data) == 0 or percent_of_nans > 0.5: delete_list.append(segment) @@ -111,10 +96,10 @@ class DropModel(Model): convol_data = utils.nan_to_zero(convol_data, nan_list) pattern_data = utils.nan_to_zero(pattern_data, nan_list) conv = scipy.signal.fftconvolve(convol_data, pattern_data) - upper_bound = self.state['convolve_max'] * 1.2 - lower_bound = self.state['convolve_min'] * 0.8 - delete_up_bound = self.state['conv_del_max'] * 1.02 - delete_low_bound = self.state['conv_del_min'] * 0.98 + upper_bound = self.state.convolve_max * 1.2 + lower_bound = self.state.convolve_min * 0.8 + delete_up_bound = self.state.conv_del_max * 1.02 + delete_low_bound = self.state.conv_del_min * 0.98 try: if max(conv) > upper_bound or max(conv) < lower_bound: delete_list.append(segment) diff --git a/analytics/analytics/models/general_model.py b/analytics/analytics/models/general_model.py index d4fcb8d..421be58 100644 --- a/analytics/analytics/models/general_model.py +++ b/analytics/analytics/models/general_model.py @@ -14,8 +14,9 @@ from scipy.stats import gaussian_kde from scipy.stats import norm import logging -from typing import Optional +from typing import Optional, List, Tuple import math +from analytic_types import AnalyticUnitId PEARSON_FACTOR = 0.7 @@ -28,17 +29,6 @@ class GeneralModelState(ModelState): class GeneralModel(Model): - def __init__(self): - self.state = { - 'pattern_center': [], - 'pattern_model': [], - 'convolve_max': 240, - 'convolve_min': 200, - 'WINDOW_SIZE': 0, - 'conv_del_min': 0, - 'conv_del_max': 0, - } - def get_model_type(self) -> (str, bool): model = 'general' type_model = True @@ -50,54 +40,50 @@ class GeneralModel(Model): center_ind = start + math.ceil((end - start) / 2) return center_ind - def get_cache(self, cache: Optional[dict] = None) -> GeneralModelState: + def get_state(self, cache: Optional[dict] = None) -> GeneralModelState: return GeneralModelState.from_json(cache) - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: AnalyticUnitId) -> None: - logging.debug('Start method do_fit for analytic unit: {}'.format(id)) + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] - last_pattern_center = self.state.get('pattern_center', []) - self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) - self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, self.state['WINDOW_SIZE']) - correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, self.state['WINDOW_SIZE']) + last_pattern_center = self.state.pattern_center + self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list'])) + self.state.pattern_model = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, self.state.window_size) + correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, self.state.window_size) del_conv_list = [] delete_pattern_timestamp = [] for segment in deleted_segments: del_mid_index = segment.center_index delete_pattern_timestamp.append(segment.pattern_timestamp) - deleted_pat = utils.get_interval(data, del_mid_index, self.state['WINDOW_SIZE']) + deleted_pat = utils.get_interval(data, del_mid_index, self.state.window_size) deleted_pat = utils.subtract_min_without_nan(deleted_pat) - del_conv_pat = scipy.signal.fftconvolve(deleted_pat, self.state['pattern_model']) + del_conv_pat = scipy.signal.fftconvolve(deleted_pat, self.state.pattern_model) if len(del_conv_pat): del_conv_list.append(max(del_conv_pat)) - self.state['convolve_min'], self.state['convolve_max'] = utils.get_min_max(convolve_list, self.state['WINDOW_SIZE'] / 3) - self.state['conv_del_min'], self.state['conv_del_max'] = utils.get_min_max(del_conv_list, self.state['WINDOW_SIZE']) - logging.debug('Method do_fit completed correctly for analytic unit: {}'.format(id)) + self.state.convolve_min, self.state.convolve_max = utils.get_min_max(convolve_list, self.state.window_size / 3) + self.state.conv_del_min, self.state.conv_del_max = utils.get_min_max(del_conv_list, self.state.window_size) - def do_detect(self, dataframe: pd.DataFrame, id: AnalyticUnitId) -> List[int]: - logging.debug('Start method do_detect for analytic unit: {}'.format(id)) + def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]: data = utils.cut_dataframe(dataframe) data = data['value'] - pat_data = self.state.get('pattern_model', []) + pat_data = self.state.pattern_model if pat_data.count(0) == len(pat_data): raise ValueError('Labeled patterns must not be empty') - window_size = self.state.get('WINDOW_SIZE', 0) + window_size = self.state.window_size all_corr = utils.get_correlation_gen(data, window_size, pat_data) all_corr_peaks = utils.find_peaks(all_corr, window_size * 2) filtered = self.__filter_detection(all_corr_peaks, data) filtered = list(filtered) - logging.debug('Method do_detect completed correctly for analytic unit: {}'.format(id)) return [(item, item + window_size * 2) for item in filtered] def __filter_detection(self, segments: Generator[int, None, None], data: pd.Series) -> Generator[int, None, None]: - if not self.state.get('pattern_center'): + if not self.state.pattern_center: return [] - window_size = self.state.get('WINDOW_SIZE', 0) - pattern_model = self.state.get('pattern_model', []) + window_size = self.state.window_size + pattern_model = self.state.pattern_model for ind, val in segments: watch_data = data[ind - window_size: ind + window_size + 1] watch_data = utils.subtract_min_without_nan(watch_data) @@ -106,8 +92,8 @@ class GeneralModel(Model): watch_conv = max(convolve_segment) else: continue - if watch_conv < self.state['convolve_min'] * 0.8 or val < PEARSON_FACTOR: + if watch_conv < self.state.convolve_min * 0.8 or val < PEARSON_FACTOR: continue - if watch_conv < self.state['conv_del_max'] * 1.02 and watch_conv > self.state['conv_del_min'] * 0.98: + if watch_conv < self.state.conv_del_max * 1.02 and watch_conv > self.state.conv_del_min * 0.98: continue yield ind diff --git a/analytics/analytics/models/jump_model.py b/analytics/analytics/models/jump_model.py index b250839..77035e2 100644 --- a/analytics/analytics/models/jump_model.py +++ b/analytics/analytics/models/jump_model.py @@ -6,10 +6,11 @@ import numpy as np import pandas as pd import scipy.signal from scipy.fftpack import fft -from typing import Optional +from typing import Optional, List, Tuple import math from scipy.signal import argrelextrema from scipy.stats import gaussian_kde +from analytic_types import AnalyticUnitId @utils.meta.JSONClass @@ -28,22 +29,6 @@ class JumpModelState(ModelState): class JumpModel(Model): - - def __init__(self): - super() - self.segments = [] - self.state = { - 'pattern_center': [], - 'pattern_model': [], - 'confidence': 1.5, - 'convolve_max': 230, - 'convolve_min': 230, - 'JUMP_HEIGHT': 1, - 'JUMP_LENGTH': 1, - 'WINDOW_SIZE': 0, - 'conv_del_min': 54000, - 'conv_del_max': 55000, - } def get_model_type(self) -> (str, bool): model = 'jump' @@ -56,18 +41,18 @@ class JumpModel(Model): segment_center_index = utils.find_pattern_center(segment, start, 'jump') return segment_center_index - def get_cache(self, cache: Optional[dict] = None) -> JumpModelState: + def get_state(self, cache: Optional[dict] = None) -> JumpModelState: return JumpModelState.from_json(cache) - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] - window_size = self.state['WINDOW_SIZE'] - last_pattern_center = self.state.get('pattern_center', []) - self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) - self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) - correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) + window_size = self.state.window_size + last_pattern_center = self.state.pattern_center + self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list'])) + self.state.pattern_model = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size) + correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size) height_list = learning_info['patterns_value'] del_conv_list = [] @@ -77,37 +62,37 @@ class JumpModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted_jump = utils.get_interval(data, segment_cent_index, window_size) deleted_jump = utils.subtract_min_without_nan(deleted_jump) - del_conv_jump = scipy.signal.fftconvolve(deleted_jump, self.state['pattern_model']) + del_conv_jump = scipy.signal.fftconvolve(deleted_jump, self.state.pattern_model) if len(del_conv_jump): del_conv_list.append(max(del_conv_jump)) - self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) - self.state['JUMP_HEIGHT'] = float(min(learning_info['pattern_height'], default = 1)) - self.state['JUMP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) + self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list) + self.state.jump_height = float(min(learning_info['pattern_height'], default = 1)) + self.state.jump_length = int(max(learning_info['pattern_width'], default = 1)) - def do_detect(self, dataframe: pd.DataFrame, id: str) -> list: + def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]: data = utils.cut_dataframe(dataframe) data = data['value'] - possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1) + possible_jumps = utils.find_jump(data, self.state.jump_height, self.state.jump_length + 1) result = self.__filter_detection(possible_jumps, data) return [(val - 1, val + 1) for val in result] def __filter_detection(self, segments, data): delete_list = [] - variance_error = self.state['WINDOW_SIZE'] + variance_error = self.state.window_size close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'max') - if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: + if len(segments) == 0 or len(self.state.pattern_center) == 0: segments = [] return segments - pattern_data = self.state['pattern_model'] - upper_bound = self.state['convolve_max'] * 1.2 - lower_bound = self.state['convolve_min'] * 0.8 - delete_up_bound = self.state['conv_del_max'] * 1.02 - delete_low_bound = self.state['conv_del_min'] * 0.98 + pattern_data = self.state.pattern_model + upper_bound = self.state.convolve_max * 1.2 + lower_bound = self.state.convolve_min * 0.8 + delete_up_bound = self.state.conv_del_max * 1.02 + delete_low_bound = self.state.conv_del_min * 0.98 for segment in segments: - if segment > self.state['WINDOW_SIZE'] and segment < (len(data) - self.state['WINDOW_SIZE']): - convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) + if segment > self.state.window_size and segment < (len(data) - self.state.window_size): + convol_data = utils.get_interval(data, segment, self.state.window_size) percent_of_nans = convol_data.isnull().sum() / len(convol_data) if len(convol_data) == 0 or percent_of_nans > 0.5: delete_list.append(segment) diff --git a/analytics/analytics/models/model.py b/analytics/analytics/models/model.py index 7ad338c..abb10ef 100644 --- a/analytics/analytics/models/model.py +++ b/analytics/analytics/models/model.py @@ -2,7 +2,7 @@ import utils from abc import ABC, abstractmethod from attrdict import AttrDict -from typing import Optional, List +from typing import Optional, List, Tuple import pandas as pd import math import logging @@ -73,11 +73,11 @@ class Model(ABC): DEL_CONV_ERROR = 0.02 @abstractmethod - def do_fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache], learning_info: dict) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None: pass @abstractmethod - def do_detect(self, dataframe: pd.DataFrame) -> list: + def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]: pass @abstractmethod @@ -89,14 +89,12 @@ class Model(ABC): pass @abstractmethod - def get_cache(self, cache: Optional[dict] = None) -> ModelState: + def get_state(self, cache: Optional[dict] = None) -> ModelState: pass - def fit(self, dataframe: pd.DataFrame, segments: list, id: AnalyticUnitId, cache: Optional[ModelCache]) -> ModelCache: + def fit(self, dataframe: pd.DataFrame, segments: List[dict], id: AnalyticUnitId) -> ModelState: logging.debug('Start method fit for analytic unit {}'.format(id)) data = dataframe['value'] - if cache != None and len(cache) > 0: - self.state = cache max_length = 0 labeled = [] deleted = [] @@ -114,48 +112,37 @@ class Model(ABC): assert len(labeled) > 0, f'labeled list empty, skip fitting for {id}' - if self.state.get('WINDOW_SIZE') == 0: - self.state['WINDOW_SIZE'] = math.ceil(max_length / 2) if max_length else 0 + if self.state.window_size == 0: + self.state.window_size = math.ceil(max_length / 2) if max_length else 0 model, model_type = self.get_model_type() learning_info = self.get_parameters_from_segments(dataframe, labeled, deleted, model, model_type) - self.do_fit(dataframe, labeled, deleted, learning_info, id) + self.do_fit(dataframe, labeled, deleted, learning_info) logging.debug('fit complete successful with self.state: {} for analytic unit: {}'.format(self.state, id)) return self.state - def detect(self, dataframe: pd.DataFrame, id: str, cache: Optional[ModelCache]) -> dict: - #If cache is None or empty dict - default parameters will be used instead - if cache != None and len(cache) > 0: - self.state = cache - else: - logging.debug('Get empty cache in detect') - if not self.state: - logging.warning('self.state is empty - skip do_detect') - return { - 'segments': [], - 'cache': {}, - } - result = self.do_detect(dataframe, id) + def detect(self, dataframe: pd.DataFrame, id: AnalyticUnitId) -> dict: + logging.debug('Start method detect for analytic unit {}'.format(id)) + result = self.do_detect(dataframe) segments = [( utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x[0]]), utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x[1]]), ) for x in result] if not self.state: logging.warning('Return empty self.state after detect') + logging.debug('Method detect complete successful for analytic unit {}'.format(id)) return { 'segments': segments, 'cache': self.state, } - def _update_fiting_result(self, state: dict, confidences: list, convolve_list: list, del_conv_list: list, height_list: list) -> None: - if type(state) is dict: - state['confidence'] = float(min(confidences, default = 1.5)) - state['convolve_min'], state['convolve_max'] = utils.get_min_max(convolve_list, state['WINDOW_SIZE']) - state['conv_del_min'], state['conv_del_max'] = utils.get_min_max(del_conv_list, 0) - state['height_min'], state['height_max'] = utils.get_min_max(height_list, 0) - else: - raise ValueError('got non-dict as state for update fiting result: {}'.format(state)) + def _update_fiting_result(self, state: ModelState, confidences: list, convolve_list: list, del_conv_list: list, height_list: Optional[list] = None) -> None: + state.confidence = float(min(confidences, default = 1.5)) + state.convolve_min, state.convolve_max = utils.get_min_max(convolve_list, state.window_size) + state.conv_del_min, state.conv_del_max = utils.get_min_max(del_conv_list, 0) + if height_list is not None: + state.height_min, state.height_max = utils.get_min_max(height_list, 0) - def get_parameters_from_segments(self, dataframe: pd.DataFrame, labeled: list, deleted: list, model: str, model_type: bool) -> dict: + def get_parameters_from_segments(self, dataframe: pd.DataFrame, labeled: List[dict], deleted: List[dict], model: str, model_type: bool) -> dict: logging.debug('Start parsing segments') learning_info = { 'confidence': [], @@ -173,11 +160,11 @@ class Model(ABC): segment_center = segment.center_index learning_info['segment_center_list'].append(segment_center) learning_info['pattern_timestamp'].append(segment.pattern_timestamp) - aligned_segment = utils.get_interval(data, segment_center, self.state['WINDOW_SIZE']) + aligned_segment = utils.get_interval(data, segment_center, self.state.window_size) aligned_segment = utils.subtract_min_without_nan(aligned_segment) if len(aligned_segment) == 0: logging.warning('cant add segment to learning because segment is empty where segments center is: {}, window_size: {}, and len_data: {}'.format( - segment_center, self.state['WINDOW_SIZE'], len(data))) + segment_center, self.state.window_size, len(data))) continue learning_info['patterns_list'].append(aligned_segment) if model == 'peak' or model == 'trough': @@ -187,7 +174,7 @@ class Model(ABC): pattern_height, pattern_length = utils.find_parameters(segment.data, segment.start, model) learning_info['pattern_height'].append(pattern_height) learning_info['pattern_width'].append(pattern_length) - learning_info['patterns_value'].append(aligned_segment.values[self.state['WINDOW_SIZE']]) + learning_info['patterns_value'].append(aligned_segment.values[self.state.window_size]) logging.debug('Parsing segments ended correctly with learning_info: {}'.format(learning_info)) return learning_info diff --git a/analytics/analytics/models/peak_model.py b/analytics/analytics/models/peak_model.py index b6dfa8d..53ac3f1 100644 --- a/analytics/analytics/models/peak_model.py +++ b/analytics/analytics/models/peak_model.py @@ -3,11 +3,12 @@ from models import Model, ModelState import scipy.signal from scipy.fftpack import fft from scipy.signal import argrelextrema -from typing import Optional, List +from typing import Optional, List, Tuple import utils import utils.meta import numpy as np import pandas as pd +from analytic_types import AnalyticUnitId SMOOTHING_COEFF = 2400 EXP_SMOOTHING_FACTOR = 0.01 @@ -31,22 +32,6 @@ class PeakModelState(ModelState): class PeakModel(Model): - def __init__(self): - super() - self.segments = [] - self.state = { - 'pattern_center': [], - 'pattern_model': [], - 'confidence': 1.5, - 'convolve_max': 0, - 'convolve_min': 0, - 'WINDOW_SIZE': 0, - 'conv_del_min': 0, - 'conv_del_max': 0, - 'height_max': 0, - 'height_min': 0, - } - def get_model_type(self) -> (str, bool): model = 'peak' type_model = True @@ -57,18 +42,18 @@ class PeakModel(Model): segment = data[start: end] return segment.idxmax() - def get_cache(self, cache: Optional[dict] = None) -> PeakModelState: + def get_state(self, cache: Optional[dict] = None) -> PeakModelState: return PeakModelState.from_json(cache) - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] - window_size = self.state['WINDOW_SIZE'] - last_pattern_center = self.state.get('pattern_center', []) - self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) - self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) - correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) + window_size = self.state.window_size + last_pattern_center = self.state.pattern_center + self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list'])) + self.state.pattern_model = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size) + correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size) height_list = learning_info['patterns_value'] del_conv_list = [] @@ -80,20 +65,20 @@ class PeakModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted = utils.get_interval(data, del_max_index, window_size) deleted = utils.subtract_min_without_nan(deleted) - del_conv = scipy.signal.fftconvolve(deleted, self.state['pattern_model']) + del_conv = scipy.signal.fftconvolve(deleted, self.state.pattern_model) if len(del_conv): del_conv_list.append(max(del_conv)) delete_pattern_height.append(utils.find_confidence(deleted)[1]) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) - def do_detect(self, dataframe: pd.DataFrame, id: str): + def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]: data = utils.cut_dataframe(dataframe) data = data['value'] window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data all_maxs = argrelextrema(np.array(data), np.greater)[0] extrema_list = [] - for i in utils.exponential_smoothing(data + self.state['confidence'], EXP_SMOOTHING_FACTOR): + for i in utils.exponential_smoothing(data + self.state.confidence, EXP_SMOOTHING_FACTOR): extrema_list.append(i) segments = [] @@ -101,27 +86,27 @@ class PeakModel(Model): if data[i] > extrema_list[i]: segments.append(i) result = self.__filter_detection(segments, data) - result = utils.get_borders_of_peaks(result, data, self.state.get('WINDOW_SIZE'), self.state.get('confidence')) + result = utils.get_borders_of_peaks(result, data, self.state.window_size, self.state.confidence) return result def __filter_detection(self, segments: list, data: list) -> list: delete_list = [] - variance_error = self.state['WINDOW_SIZE'] + variance_error = self.state.window_size close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'max') - if len(segments) == 0 or len(self.state.get('pattern_model', [])) == 0: + if len(segments) == 0 or len(self.state.pattern_model) == 0: return [] - pattern_data = self.state['pattern_model'] - up_height = self.state['height_max'] * (1 + self.HEIGHT_ERROR) - low_height = self.state['height_min'] * (1 - self.HEIGHT_ERROR) - up_conv = self.state['convolve_max'] * (1 + 1.5 * self.CONV_ERROR) - low_conv = self.state['convolve_min'] * (1 - self.CONV_ERROR) - up_del_conv = self.state['conv_del_max'] * (1 + self.DEL_CONV_ERROR) - low_del_conv = self.state['conv_del_min'] * (1 - self.DEL_CONV_ERROR) + pattern_data = self.state.pattern_model + up_height = self.state.height_max * (1 + self.HEIGHT_ERROR) + low_height = self.state.height_min * (1 - self.HEIGHT_ERROR) + up_conv = self.state.convolve_max * (1 + 1.5 * self.CONV_ERROR) + low_conv = self.state.convolve_min * (1 - self.CONV_ERROR) + up_del_conv = self.state.conv_del_max * (1 + self.DEL_CONV_ERROR) + low_del_conv = self.state.conv_del_min * (1 - self.DEL_CONV_ERROR) for segment in segments: - if segment > self.state['WINDOW_SIZE']: - convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) + if segment > self.state.window_size: + convol_data = utils.get_interval(data, segment, self.state.window_size) convol_data = utils.subtract_min_without_nan(convol_data) percent_of_nans = convol_data.isnull().sum() / len(convol_data) if percent_of_nans > 0.5: @@ -132,7 +117,7 @@ class PeakModel(Model): convol_data = utils.nan_to_zero(convol_data, nan_list) pattern_data = utils.nan_to_zero(pattern_data, nan_list) conv = scipy.signal.fftconvolve(convol_data, pattern_data) - pattern_height = convol_data.values[self.state['WINDOW_SIZE']] + pattern_height = convol_data.values[self.state.window_size] if pattern_height > up_height or pattern_height < low_height: delete_list.append(segment) continue diff --git a/analytics/analytics/models/trough_model.py b/analytics/analytics/models/trough_model.py index 206627a..5b0fc3c 100644 --- a/analytics/analytics/models/trough_model.py +++ b/analytics/analytics/models/trough_model.py @@ -3,11 +3,12 @@ from models import Model, ModelState import scipy.signal from scipy.fftpack import fft from scipy.signal import argrelextrema -from typing import Optional +from typing import Optional, List, Tuple import utils import utils.meta import numpy as np import pandas as pd +from analytic_types import AnalyticUnitId SMOOTHING_COEFF = 2400 EXP_SMOOTHING_FACTOR = 0.01 @@ -30,22 +31,6 @@ class TroughModelState(ModelState): class TroughModel(Model): - - def __init__(self): - super() - self.segments = [] - self.state = { - 'pattern_center': [], - 'pattern_model': [], - 'confidence': 1.5, - 'convolve_max': 570000, - 'convolve_min': 530000, - 'WINDOW_SIZE': 0, - 'conv_del_min': 54000, - 'conv_del_max': 55000, - 'height_max': 0, - 'height_min': 0, - } def get_model_type(self) -> (str, bool): model = 'trough' @@ -57,18 +42,18 @@ class TroughModel(Model): segment = data[start: end] return segment.idxmin() - def get_cache(self, cache: Optional[dict] = None) -> TroughModelState: + def get_state(self, cache: Optional[dict] = None) -> TroughModelState: return TroughModelState.from_json(cache) - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] - window_size = self.state['WINDOW_SIZE'] - last_pattern_center = self.state.get('pattern_center', []) - self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) - self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) - correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) + window_size = self.state.window_size + last_pattern_center = self.state.pattern_center + self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list'])) + self.state.pattern_model = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size) + correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size) height_list = learning_info['patterns_value'] del_conv_list = [] @@ -80,20 +65,20 @@ class TroughModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted = utils.get_interval(data, del_min_index, window_size) deleted = utils.subtract_min_without_nan(deleted) - del_conv = scipy.signal.fftconvolve(deleted, self.state['pattern_model']) + del_conv = scipy.signal.fftconvolve(deleted, self.state.pattern_model) if len(del_conv): del_conv_list.append(max(del_conv)) delete_pattern_height.append(utils.find_confidence(deleted)[1]) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) - def do_detect(self, dataframe: pd.DataFrame, id: str): + def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]: data = utils.cut_dataframe(dataframe) data = data['value'] window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data all_mins = argrelextrema(np.array(data), np.less)[0] extrema_list = [] - for i in utils.exponential_smoothing(data - self.state['confidence'], EXP_SMOOTHING_FACTOR): + for i in utils.exponential_smoothing(data - self.state.confidence, EXP_SMOOTHING_FACTOR): extrema_list.append(i) segments = [] @@ -101,27 +86,27 @@ class TroughModel(Model): if data[i] < extrema_list[i]: segments.append(i) result = self.__filter_detection(segments, data) - result = utils.get_borders_of_peaks(result, data, self.state.get('WINDOW_SIZE'), self.state.get('confidence'), inverse = True) + result = utils.get_borders_of_peaks(result, data, self.state.window_size, self.state.confidence, inverse = True) return result def __filter_detection(self, segments: list, data: list) -> list: delete_list = [] - variance_error = self.state['WINDOW_SIZE'] + variance_error = self.state.window_size close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'min') - if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: + if len(segments) == 0 or len(self.state.pattern_center) == 0: segments = [] return segments - pattern_data = self.state['pattern_model'] - up_height = self.state['height_max'] * (1 + self.HEIGHT_ERROR) - low_height = self.state['height_min'] * (1 - self.HEIGHT_ERROR) - up_conv = self.state['convolve_max'] * (1 + 1.5 * self.CONV_ERROR) - low_conv = self.state['convolve_min'] * (1 - self.CONV_ERROR) - up_del_conv = self.state['conv_del_max'] * (1 + self.DEL_CONV_ERROR) - low_del_conv = self.state['conv_del_min'] * (1 - self.DEL_CONV_ERROR) + pattern_data = self.state.pattern_model + up_height = self.state.height_max * (1 + self.HEIGHT_ERROR) + low_height = self.state.height_min * (1 - self.HEIGHT_ERROR) + up_conv = self.state.convolve_max * (1 + 1.5 * self.CONV_ERROR) + low_conv = self.state.convolve_min * (1 - self.CONV_ERROR) + up_del_conv = self.state.conv_del_max * (1 + self.DEL_CONV_ERROR) + low_del_conv = self.state.conv_del_min * (1 - self.DEL_CONV_ERROR) for segment in segments: - if segment > self.state['WINDOW_SIZE']: - convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) + if segment > self.state.window_size: + convol_data = utils.get_interval(data, segment, self.state.window_size) convol_data = utils.subtract_min_without_nan(convol_data) percent_of_nans = convol_data.isnull().sum() / len(convol_data) if percent_of_nans > 0.5: diff --git a/analytics/tests/test_dataset.py b/analytics/tests/test_dataset.py index 00249e2..f880ab7 100644 --- a/analytics/tests/test_dataset.py +++ b/analytics/tests/test_dataset.py @@ -23,9 +23,9 @@ class TestDataset(unittest.TestCase): for model in model_instances: model_name = model.__class__.__name__ - + model.state = model.get_state(None) with self.assertRaises(AssertionError): - model.fit(dataframe, segments, 'test', dict()) + model.fit(dataframe, segments, 'test') def test_peak_antisegments(self): data_val = [1.0, 1.0, 1.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 5.0, 7.0, 5.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] @@ -36,7 +36,8 @@ class TestDataset(unittest.TestCase): try: model = models.PeakModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -49,7 +50,8 @@ class TestDataset(unittest.TestCase): try: model = models.JumpModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -62,7 +64,8 @@ class TestDataset(unittest.TestCase): try: model = models.TroughModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -75,7 +78,8 @@ class TestDataset(unittest.TestCase): try: model = models.DropModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -88,7 +92,8 @@ class TestDataset(unittest.TestCase): try: model = models.GeneralModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -101,7 +106,8 @@ class TestDataset(unittest.TestCase): try: model = models.JumpModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -113,8 +119,9 @@ class TestDataset(unittest.TestCase): try: model = models.DropModel() + model.state = model.get_state(None) model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -125,8 +132,9 @@ class TestDataset(unittest.TestCase): try: model = models.JumpModel() + model.state = model.get_state(None) model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -166,7 +174,8 @@ class TestDataset(unittest.TestCase): try: for model in model_instances: model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -175,78 +184,84 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}] model = models.GeneralModel() - model.fit(dataframe, segments,'test', dict()) + model.state = model.get_state(None) + model.fit(dataframe, segments,'test') result = len(data_val) + 1 for _ in range(2): - model.do_detect(dataframe,'test') - max_pattern_index = max(model.do_detect(dataframe, 'test')) + model.do_detect(dataframe) + max_pattern_index = max(model.do_detect(dataframe)) self.assertLessEqual(max_pattern_index[0], result) def test_peak_model_for_cache(self): cache = { - 'pattern_center': [1, 6], - 'model_peak': [1, 4, 0], + 'patternCenter': [1, 6], + 'patternModel': [1, 4, 0], 'confidence': 2, - 'convolve_max': 8, - 'convolve_min': 7, - 'WINDOW_SIZE': 1, - 'conv_del_min': 0, - 'conv_del_max': 0, + 'convolveMax': 8, + 'convolveMin': 7, + 'windowSize': 1, + 'convDelMin': 0, + 'convDelMax': 0, + 'heightMax': 4, + 'heightMin': 4, } data_val = [2.0, 5.0, 1.0, 1.0, 1.0, 2.0, 5.0, 1.0, 1.0, 2.0, 3.0, 7.0, 1.0, 1.0, 1.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] model = models.PeakModel() - result = model.fit(dataframe, segments, 'test', cache) - self.assertEqual(len(result['pattern_center']), 3) + model.state = model.get_state(cache) + result = model.fit(dataframe, segments, 'test') + self.assertEqual(len(result.pattern_center), 3) def test_trough_model_for_cache(self): cache = { - 'pattern_center': [2, 6], - 'pattern_model': [5, 0.5, 4], + 'patternCenter': [2, 6], + 'patternModel': [5, 0.5, 4], 'confidence': 2, - 'convolve_max': 8, - 'convolve_min': 7, - 'WINDOW_SIZE': 1, - 'conv_del_min': 0, - 'conv_del_max': 0, + 'convolveMax': 8, + 'convolveMin': 7, + 'window_size': 1, + 'convDelMin': 0, + 'convDelMax': 0, } data_val = [5.0, 5.0, 1.0, 4.0, 5.0, 5.0, 0.0, 4.0, 5.0, 5.0, 6.0, 1.0, 5.0, 5.0, 5.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] model = models.TroughModel() - result = model.fit(dataframe, segments, 'test', cache) - self.assertEqual(len(result['pattern_center']), 3) + model.state = model.get_state(cache) + result = model.fit(dataframe, segments, 'test') + self.assertEqual(len(result.pattern_center), 3) def test_jump_model_for_cache(self): cache = { - 'pattern_center': [2, 6], - 'pattern_model': [5, 0.5, 4], + 'patternCenter': [2, 6], + 'patternModel': [5, 0.5, 4], 'confidence': 2, - 'convolve_max': 8, - 'convolve_min': 7, - 'WINDOW_SIZE': 1, - 'conv_del_min': 0, - 'conv_del_max': 0, + 'convolveMax': 8, + 'convolveMin': 7, + 'window_size': 1, + 'convDelMin': 0, + 'convDelMax': 0, } data_val = [1.0, 1.0, 1.0, 4.0, 4.0, 0.0, 0.0, 5.0, 5.0, 0.0, 0.0, 4.0, 4.0, 4.0, 4.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}] model = models.JumpModel() - result = model.fit(dataframe, segments, 'test', cache) - self.assertEqual(len(result['pattern_center']), 3) + model.state = model.get_state(cache) + result = model.fit(dataframe, segments, 'test') + self.assertEqual(len(result.pattern_center), 3) def test_models_for_pattern_model_cache(self): cache = { - 'pattern_center': [4, 12], - 'pattern_model': [], + 'patternCenter': [4, 12], + 'patternModel': [], 'confidence': 2, - 'convolve_max': 8, - 'convolve_min': 7, - 'WINDOW_SIZE': 2, - 'conv_del_min': 0, - 'conv_del_max': 0, + 'convolveMax': 8, + 'convolveMin': 7, + 'window_size': 2, + 'convDelMin': 0, + 'convDelMax': 0, } data_val = [5.0, 5.0, 5.0, 5.0, 1.0, 1.0, 1.0, 1.0, 9.0, 9.0, 9.0, 9.0, 0, 0, 0, 0, 0, 0, 6.0, 6.0, 6.0, 1.0, 1.0, 1.0, 1.0, 1.0] dataframe = create_dataframe(data_val) @@ -254,7 +269,8 @@ class TestDataset(unittest.TestCase): try: model = models.DropModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, 'test', cache) + model.state = model.get_state(cache) + model.fit(dataframe, segments, 'test') except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -267,11 +283,13 @@ class TestDataset(unittest.TestCase): 2.0, 8.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] data = create_dataframe(problem_data) cache = { - 'pattern_center': [5, 50], - 'pattern_model': [], - 'WINDOW_SIZE': 2, - 'convolve_min': 0, - 'convolve_max': 0, + 'patternCenter': [5, 50], + 'patternModel': [], + 'windowSize': 2, + 'convolveMin': 0, + 'convolveMax': 0, + 'convDelMin': 0, + 'convDelMax': 0, } max_ws = 20 iteration = 1 @@ -279,14 +297,15 @@ class TestDataset(unittest.TestCase): for _ in range(iteration): pattern_model = create_random_model(ws) convolve = scipy.signal.fftconvolve(pattern_model, pattern_model) - cache['WINDOW_SIZE'] = ws - cache['pattern_model'] = pattern_model - cache['convolve_min'] = max(convolve) - cache['convolve_max'] = max(convolve) + cache['windowSize'] = ws + cache['patternModel'] = pattern_model + cache['convolveMin'] = max(convolve) + cache['convolveMax'] = max(convolve) try: model = models.GeneralModel() + model.state = model.get_state(cache) model_name = model.__class__.__name__ - model.detect(data, 'test', cache) + model.detect(data, 'test') except ValueError: self.fail('Model {} raised unexpectedly with av_model {} and window size {}'.format(model_name, pattern_model, ws)) @@ -294,37 +313,37 @@ class TestDataset(unittest.TestCase): data = create_random_model(random.randint(1, 100)) data = create_dataframe(data) model_instances = [ - models.GeneralModel(), models.PeakModel(), models.TroughModel() ] cache = { - 'pattern_center': [5, 50], - 'pattern_model': [], - 'WINDOW_SIZE': 2, - 'convolve_min': 0, - 'convolve_max': 0, + 'patternCenter': [5, 50], + 'patternModel': [], + 'windowSize': 2, + 'convolveMin': 0, + 'convolveMax': 0, 'confidence': 0, - 'height_max': 0, - 'height_min': 0, - 'conv_del_min': 0, - 'conv_del_max': 0, + 'heightMax': 0, + 'heightMin': 0, + 'convDelMin': 0, + 'convDelMax': 0, } ws = random.randint(1, int(len(data['value']/2))) pattern_model = create_random_model(ws) convolve = scipy.signal.fftconvolve(pattern_model, pattern_model) confidence = 0.2 * (data['value'].max() - data['value'].min()) - cache['WINDOW_SIZE'] = ws - cache['pattern_model'] = pattern_model - cache['convolve_min'] = max(convolve) - cache['convolve_max'] = max(convolve) + cache['windowSize'] = ws + cache['patternModel'] = pattern_model + cache['convolveMin'] = max(convolve) + cache['convolveMax'] = max(convolve) cache['confidence'] = confidence - cache['height_max'] = data['value'].max() - cache['height_min'] = confidence + cache['heightMax'] = data['value'].max() + cache['heightMin'] = confidence try: for model in model_instances: model_name = model.__class__.__name__ - model.detect(data, 'test', cache) + model.state = model.get_state(cache) + model.detect(data, 'test') except ValueError: self.fail('Model {} raised unexpectedly with dataset {} and cache {}'.format(model_name, data['value'], cache)) diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 0f42643..05d98e2 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -9,7 +9,7 @@ class TestPatternDetector(unittest.TestCase): data = [[0,1], [1,2]] dataframe = pd.DataFrame(data, columns=['timestamp', 'values']) - cache = {'WINDOW_SIZE': 10} + cache = {'windowSize': 10} detector = pattern_detector.PatternDetector('GENERAL', 'test_id') diff --git a/analytics/tests/test_manager.py b/analytics/tests/test_manager.py index 44be22a..813f5c2 100644 --- a/analytics/tests/test_manager.py +++ b/analytics/tests/test_manager.py @@ -1,5 +1,6 @@ from models import PeakModel, DropModel, TroughModel, JumpModel, GeneralModel - +from models import PeakModelState, DropModelState, TroughModelState, JumpModelState, GeneralModelState +import utils.meta import aiounittest from analytic_unit_manager import AnalyticUnitManager from collections import namedtuple @@ -97,19 +98,3 @@ class TestDataset(aiounittest.AsyncTestCase): without_manager = await self._test_detect(test_data) self.assertEqual(with_manager, without_manager) - async def test_cache(self): - cache_attrs = { - 'PEAK': PeakModel().state.keys(), - 'JUMP': JumpModel().state.keys(), - 'DROP': DropModel().state.keys(), - 'TROUGH': TroughModel().state.keys(), - 'GENERAL': GeneralModel().state.keys() - } - - for pattern, attrs in cache_attrs.items(): - test_data = TestData(get_random_id(), pattern, *self._get_test_dataset(pattern)) - learn_task = self._get_learn_task(test_data) - cache = await self._learn(learn_task) - - for a in attrs: - self.assertTrue(a in cache.keys(), msg='{} not in cache keys: {}'.format(a, cache.keys()))