Browse Source

Use ModelState classes in models #582 (#586)

pull/1/head
Alexandr Velikiy 6 years ago committed by rozetko
parent
commit
20d1a974bb
  1. 33
      analytics/analytics/detectors/pattern_detector.py
  2. 10
      analytics/analytics/models/__init__.py
  3. 65
      analytics/analytics/models/drop_model.py
  4. 56
      analytics/analytics/models/general_model.py
  5. 65
      analytics/analytics/models/jump_model.py
  6. 57
      analytics/analytics/models/model.py
  7. 67
      analytics/analytics/models/peak_model.py
  8. 65
      analytics/analytics/models/trough_model.py
  9. 171
      analytics/tests/test_dataset.py
  10. 2
      analytics/tests/test_detectors.py
  11. 19
      analytics/tests/test_manager.py

33
analytics/analytics/detectors/pattern_detector.py

@ -5,7 +5,7 @@ import logging
import config import config
import pandas as pd import pandas as pd
from typing import Optional, Generator from typing import Optional, Generator, List
from detectors import Detector from detectors import Detector
from analytic_types import DataBucket from analytic_types import DataBucket
@ -45,10 +45,12 @@ class PatternDetector(Detector):
self.model = resolve_model_by_pattern(self.pattern_type) self.model = resolve_model_by_pattern(self.pattern_type)
self.bucket = DataBucket() self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: def train(self, dataframe: pd.DataFrame, segments: List[dict], cache: Optional[models.ModelCache]) -> models.ModelState:
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id, cache) self.model.state = self.model.get_state(cache)
if new_cache == None or len(new_cache) == 0: new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id)
new_cache = new_cache.to_json()
if len(new_cache) == 0:
logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}, analytic unit: {}'.format(dataframe, segments, cache, self.analytic_unit_id)) logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}, analytic unit: {}'.format(dataframe, segments, cache, self.analytic_unit_id))
return { return {
'cache': new_cache 'cache': new_cache
@ -58,30 +60,30 @@ class PatternDetector(Detector):
logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe)))
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
if cache is None or cache == {}: if cache is None:
msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection'
logger.error(msg) logger.error(msg)
raise ValueError(msg) raise ValueError(msg)
self.model.state = self.model.get_state(cache)
window_size = cache.get('WINDOW_SIZE') window_size = self.model.state.window_size
if window_size is None: if window_size is None:
message = '{} got cache without WINDOW_SIZE for detection'.format(self.analytic_unit_id) message = '{} got cache without window_size for detection'.format(self.analytic_unit_id)
logger.error(message) logger.error(message)
raise ValueError(message) raise ValueError(message)
if len(dataframe) < window_size * 2: if len(dataframe) < window_size * 2:
message = f'{self.analytic_unit_id} skip detection: data length: {len(dataframe)} less than WINDOW_SIZE: {window_size}' message = f'{self.analytic_unit_id} skip detection: data length: {len(dataframe)} less than window_size: {window_size}'
logger.error(message) logger.error(message)
raise ValueError(message) raise ValueError(message)
detected = self.model.detect(dataframe, self.analytic_unit_id, cache) detected = self.model.detect(dataframe, self.analytic_unit_id)
segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']]
newCache = detected['cache'] new_cache = detected['cache'].to_json()
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time)
return { return {
'cache': newCache, 'cache': new_cache,
'segments': segments, 'segments': segments,
'lastDetectionTime': last_detection_time 'lastDetectionTime': last_detection_time
} }
@ -89,7 +91,7 @@ class PatternDetector(Detector):
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id))
if cache is None or cache == {}: if cache is None:
logging.debug(f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}, skip') logging.debug(f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}, skip')
return None return None
@ -100,7 +102,8 @@ class PatternDetector(Detector):
self.bucket.receive_data(data_without_nan) self.bucket.receive_data(data_without_nan)
window_size = cache['WINDOW_SIZE'] # TODO: use ModelState
window_size = cache['windowSize']
bucket_len = len(self.bucket.data) bucket_len = len(self.bucket.data)
if bucket_len < window_size * 2: if bucket_len < window_size * 2:
@ -124,4 +127,4 @@ class PatternDetector(Detector):
def get_window_size(self, cache: Optional[ModelCache]) -> int: def get_window_size(self, cache: Optional[ModelCache]) -> int:
if cache is None: return self.DEFAULT_WINDOW_SIZE if cache is None: return self.DEFAULT_WINDOW_SIZE
return cache.get('WINDOW_SIZE', self.DEFAULT_WINDOW_SIZE) return cache.get('windowSize', self.DEFAULT_WINDOW_SIZE)

10
analytics/analytics/models/__init__.py

@ -1,7 +1,7 @@
from models.model import Model, ModelCache, ModelState from models.model import Model, ModelCache, ModelState
from models.drop_model import DropModel from models.drop_model import DropModel, DropModelState
from models.peak_model import PeakModel from models.peak_model import PeakModel, PeakModelState
from models.jump_model import JumpModel from models.jump_model import JumpModel, JumpModelState
from models.custom_model import CustomModel from models.custom_model import CustomModel
from models.trough_model import TroughModel from models.trough_model import TroughModel, TroughModelState
from models.general_model import GeneralModel from models.general_model import GeneralModel, GeneralModelState

65
analytics/analytics/models/drop_model.py

@ -4,12 +4,12 @@ import scipy.signal
from scipy.fftpack import fft from scipy.fftpack import fft
from scipy.signal import argrelextrema from scipy.signal import argrelextrema
from scipy.stats import gaussian_kde from scipy.stats import gaussian_kde
from typing import Optional from typing import Optional, List, Tuple
import utils import utils
import utils.meta import utils.meta
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from analytic_types import AnalyticUnitId
@utils.meta.JSONClass @utils.meta.JSONClass
class DropModelState(ModelState): class DropModelState(ModelState):
@ -28,21 +28,6 @@ class DropModelState(ModelState):
class DropModel(Model): class DropModel(Model):
def __init__(self):
super()
self.segments = []
self.state = {
'pattern_center': [],
'pattern_model': [],
'confidence': 1.5,
'convolve_max': 200,
'convolve_min': 200,
'DROP_HEIGHT': 1,
'DROP_LENGTH': 1,
'WINDOW_SIZE': 0,
'conv_del_min': 54000,
'conv_del_max': 55000,
}
def get_model_type(self) -> (str, bool): def get_model_type(self) -> (str, bool):
model = 'drop' model = 'drop'
@ -55,18 +40,18 @@ class DropModel(Model):
segment_center_index = utils.find_pattern_center(segment, start, 'drop') segment_center_index = utils.find_pattern_center(segment, start, 'drop')
return segment_center_index return segment_center_index
def get_cache(self, cache: Optional[dict] = None) -> DropModelState: def get_state(self, cache: Optional[dict] = None) -> DropModelState:
return DropModelState.from_json(cache) return DropModelState.from_json(cache)
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state.window_size
last_pattern_center = self.state.get('pattern_center', []) last_pattern_center = self.state.pattern_center
self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list']))
self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) self.state.pattern_model = utils.get_av_model(learning_info['patterns_list'])
convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size)
correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size)
height_list = learning_info['patterns_value'] height_list = learning_info['patterns_value']
del_conv_list = [] del_conv_list = []
@ -76,32 +61,32 @@ class DropModel(Model):
delete_pattern_timestamp.append(segment.pattern_timestamp) delete_pattern_timestamp.append(segment.pattern_timestamp)
deleted_drop = utils.get_interval(data, segment_cent_index, window_size) deleted_drop = utils.get_interval(data, segment_cent_index, window_size)
deleted_drop = utils.subtract_min_without_nan(deleted_drop) deleted_drop = utils.subtract_min_without_nan(deleted_drop)
del_conv_drop = scipy.signal.fftconvolve(deleted_drop, self.state['pattern_model']) del_conv_drop = scipy.signal.fftconvolve(deleted_drop, self.state.pattern_model)
if len(del_conv_drop): del_conv_list.append(max(del_conv_drop)) if len(del_conv_drop): del_conv_list.append(max(del_conv_drop))
self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list)
self.state['DROP_HEIGHT'] = int(min(learning_info['pattern_height'], default = 1)) self.state.drop_height = int(min(learning_info['pattern_height'], default = 1))
self.state['DROP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) self.state.drop_length = int(max(learning_info['pattern_width'], default = 1))
def do_detect(self, dataframe: pd.DataFrame, id: str) -> list: def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) possible_drops = utils.find_drop(data, self.state.drop_height, self.state.drop_length + 1)
result = self.__filter_detection(possible_drops, data) result = self.__filter_detection(possible_drops, data)
return [(val - 1, val + 1) for val in result] return [(val - 1, val + 1) for val in result]
def __filter_detection(self, segments: list, data: list): def __filter_detection(self, segments: list, data: list):
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state.window_size
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)
segments = utils.best_pattern(close_patterns, data, 'min') segments = utils.best_pattern(close_patterns, data, 'min')
if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: if len(segments) == 0 or len(self.state.pattern_center) == 0:
segments = [] segments = []
return segments return segments
pattern_data = self.state['pattern_model'] pattern_data = self.state.pattern_model
for segment in segments: for segment in segments:
if segment > self.state['WINDOW_SIZE'] and segment < (len(data) - self.state['WINDOW_SIZE']): if segment > self.state.window_size and segment < (len(data) - self.state.window_size):
convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) convol_data = utils.get_interval(data, segment, self.state.window_size)
percent_of_nans = convol_data.isnull().sum() / len(convol_data) percent_of_nans = convol_data.isnull().sum() / len(convol_data)
if len(convol_data) == 0 or percent_of_nans > 0.5: if len(convol_data) == 0 or percent_of_nans > 0.5:
delete_list.append(segment) delete_list.append(segment)
@ -111,10 +96,10 @@ class DropModel(Model):
convol_data = utils.nan_to_zero(convol_data, nan_list) convol_data = utils.nan_to_zero(convol_data, nan_list)
pattern_data = utils.nan_to_zero(pattern_data, nan_list) pattern_data = utils.nan_to_zero(pattern_data, nan_list)
conv = scipy.signal.fftconvolve(convol_data, pattern_data) conv = scipy.signal.fftconvolve(convol_data, pattern_data)
upper_bound = self.state['convolve_max'] * 1.2 upper_bound = self.state.convolve_max * 1.2
lower_bound = self.state['convolve_min'] * 0.8 lower_bound = self.state.convolve_min * 0.8
delete_up_bound = self.state['conv_del_max'] * 1.02 delete_up_bound = self.state.conv_del_max * 1.02
delete_low_bound = self.state['conv_del_min'] * 0.98 delete_low_bound = self.state.conv_del_min * 0.98
try: try:
if max(conv) > upper_bound or max(conv) < lower_bound: if max(conv) > upper_bound or max(conv) < lower_bound:
delete_list.append(segment) delete_list.append(segment)

56
analytics/analytics/models/general_model.py

@ -14,8 +14,9 @@ from scipy.stats import gaussian_kde
from scipy.stats import norm from scipy.stats import norm
import logging import logging
from typing import Optional from typing import Optional, List, Tuple
import math import math
from analytic_types import AnalyticUnitId
PEARSON_FACTOR = 0.7 PEARSON_FACTOR = 0.7
@ -28,17 +29,6 @@ class GeneralModelState(ModelState):
class GeneralModel(Model): class GeneralModel(Model):
def __init__(self):
self.state = {
'pattern_center': [],
'pattern_model': [],
'convolve_max': 240,
'convolve_min': 200,
'WINDOW_SIZE': 0,
'conv_del_min': 0,
'conv_del_max': 0,
}
def get_model_type(self) -> (str, bool): def get_model_type(self) -> (str, bool):
model = 'general' model = 'general'
type_model = True type_model = True
@ -50,54 +40,50 @@ class GeneralModel(Model):
center_ind = start + math.ceil((end - start) / 2) center_ind = start + math.ceil((end - start) / 2)
return center_ind return center_ind
def get_cache(self, cache: Optional[dict] = None) -> GeneralModelState: def get_state(self, cache: Optional[dict] = None) -> GeneralModelState:
return GeneralModelState.from_json(cache) return GeneralModelState.from_json(cache)
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: AnalyticUnitId) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None:
logging.debug('Start method do_fit for analytic unit: {}'.format(id))
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
last_pattern_center = self.state.get('pattern_center', []) last_pattern_center = self.state.pattern_center
self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list']))
self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) self.state.pattern_model = utils.get_av_model(learning_info['patterns_list'])
convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, self.state['WINDOW_SIZE']) convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, self.state.window_size)
correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, self.state['WINDOW_SIZE']) correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, self.state.window_size)
del_conv_list = [] del_conv_list = []
delete_pattern_timestamp = [] delete_pattern_timestamp = []
for segment in deleted_segments: for segment in deleted_segments:
del_mid_index = segment.center_index del_mid_index = segment.center_index
delete_pattern_timestamp.append(segment.pattern_timestamp) delete_pattern_timestamp.append(segment.pattern_timestamp)
deleted_pat = utils.get_interval(data, del_mid_index, self.state['WINDOW_SIZE']) deleted_pat = utils.get_interval(data, del_mid_index, self.state.window_size)
deleted_pat = utils.subtract_min_without_nan(deleted_pat) deleted_pat = utils.subtract_min_without_nan(deleted_pat)
del_conv_pat = scipy.signal.fftconvolve(deleted_pat, self.state['pattern_model']) del_conv_pat = scipy.signal.fftconvolve(deleted_pat, self.state.pattern_model)
if len(del_conv_pat): del_conv_list.append(max(del_conv_pat)) if len(del_conv_pat): del_conv_list.append(max(del_conv_pat))
self.state['convolve_min'], self.state['convolve_max'] = utils.get_min_max(convolve_list, self.state['WINDOW_SIZE'] / 3) self.state.convolve_min, self.state.convolve_max = utils.get_min_max(convolve_list, self.state.window_size / 3)
self.state['conv_del_min'], self.state['conv_del_max'] = utils.get_min_max(del_conv_list, self.state['WINDOW_SIZE']) self.state.conv_del_min, self.state.conv_del_max = utils.get_min_max(del_conv_list, self.state.window_size)
logging.debug('Method do_fit completed correctly for analytic unit: {}'.format(id))
def do_detect(self, dataframe: pd.DataFrame, id: AnalyticUnitId) -> List[int]: def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]:
logging.debug('Start method do_detect for analytic unit: {}'.format(id))
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
pat_data = self.state.get('pattern_model', []) pat_data = self.state.pattern_model
if pat_data.count(0) == len(pat_data): if pat_data.count(0) == len(pat_data):
raise ValueError('Labeled patterns must not be empty') raise ValueError('Labeled patterns must not be empty')
window_size = self.state.get('WINDOW_SIZE', 0) window_size = self.state.window_size
all_corr = utils.get_correlation_gen(data, window_size, pat_data) all_corr = utils.get_correlation_gen(data, window_size, pat_data)
all_corr_peaks = utils.find_peaks(all_corr, window_size * 2) all_corr_peaks = utils.find_peaks(all_corr, window_size * 2)
filtered = self.__filter_detection(all_corr_peaks, data) filtered = self.__filter_detection(all_corr_peaks, data)
filtered = list(filtered) filtered = list(filtered)
logging.debug('Method do_detect completed correctly for analytic unit: {}'.format(id))
return [(item, item + window_size * 2) for item in filtered] return [(item, item + window_size * 2) for item in filtered]
def __filter_detection(self, segments: Generator[int, None, None], data: pd.Series) -> Generator[int, None, None]: def __filter_detection(self, segments: Generator[int, None, None], data: pd.Series) -> Generator[int, None, None]:
if not self.state.get('pattern_center'): if not self.state.pattern_center:
return [] return []
window_size = self.state.get('WINDOW_SIZE', 0) window_size = self.state.window_size
pattern_model = self.state.get('pattern_model', []) pattern_model = self.state.pattern_model
for ind, val in segments: for ind, val in segments:
watch_data = data[ind - window_size: ind + window_size + 1] watch_data = data[ind - window_size: ind + window_size + 1]
watch_data = utils.subtract_min_without_nan(watch_data) watch_data = utils.subtract_min_without_nan(watch_data)
@ -106,8 +92,8 @@ class GeneralModel(Model):
watch_conv = max(convolve_segment) watch_conv = max(convolve_segment)
else: else:
continue continue
if watch_conv < self.state['convolve_min'] * 0.8 or val < PEARSON_FACTOR: if watch_conv < self.state.convolve_min * 0.8 or val < PEARSON_FACTOR:
continue continue
if watch_conv < self.state['conv_del_max'] * 1.02 and watch_conv > self.state['conv_del_min'] * 0.98: if watch_conv < self.state.conv_del_max * 1.02 and watch_conv > self.state.conv_del_min * 0.98:
continue continue
yield ind yield ind

65
analytics/analytics/models/jump_model.py

@ -6,10 +6,11 @@ import numpy as np
import pandas as pd import pandas as pd
import scipy.signal import scipy.signal
from scipy.fftpack import fft from scipy.fftpack import fft
from typing import Optional from typing import Optional, List, Tuple
import math import math
from scipy.signal import argrelextrema from scipy.signal import argrelextrema
from scipy.stats import gaussian_kde from scipy.stats import gaussian_kde
from analytic_types import AnalyticUnitId
@utils.meta.JSONClass @utils.meta.JSONClass
@ -28,22 +29,6 @@ class JumpModelState(ModelState):
class JumpModel(Model): class JumpModel(Model):
def __init__(self):
super()
self.segments = []
self.state = {
'pattern_center': [],
'pattern_model': [],
'confidence': 1.5,
'convolve_max': 230,
'convolve_min': 230,
'JUMP_HEIGHT': 1,
'JUMP_LENGTH': 1,
'WINDOW_SIZE': 0,
'conv_del_min': 54000,
'conv_del_max': 55000,
}
def get_model_type(self) -> (str, bool): def get_model_type(self) -> (str, bool):
model = 'jump' model = 'jump'
@ -56,18 +41,18 @@ class JumpModel(Model):
segment_center_index = utils.find_pattern_center(segment, start, 'jump') segment_center_index = utils.find_pattern_center(segment, start, 'jump')
return segment_center_index return segment_center_index
def get_cache(self, cache: Optional[dict] = None) -> JumpModelState: def get_state(self, cache: Optional[dict] = None) -> JumpModelState:
return JumpModelState.from_json(cache) return JumpModelState.from_json(cache)
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state.window_size
last_pattern_center = self.state.get('pattern_center', []) last_pattern_center = self.state.pattern_center
self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list']))
self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) self.state.pattern_model = utils.get_av_model(learning_info['patterns_list'])
convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size)
correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size)
height_list = learning_info['patterns_value'] height_list = learning_info['patterns_value']
del_conv_list = [] del_conv_list = []
@ -77,37 +62,37 @@ class JumpModel(Model):
delete_pattern_timestamp.append(segment.pattern_timestamp) delete_pattern_timestamp.append(segment.pattern_timestamp)
deleted_jump = utils.get_interval(data, segment_cent_index, window_size) deleted_jump = utils.get_interval(data, segment_cent_index, window_size)
deleted_jump = utils.subtract_min_without_nan(deleted_jump) deleted_jump = utils.subtract_min_without_nan(deleted_jump)
del_conv_jump = scipy.signal.fftconvolve(deleted_jump, self.state['pattern_model']) del_conv_jump = scipy.signal.fftconvolve(deleted_jump, self.state.pattern_model)
if len(del_conv_jump): del_conv_list.append(max(del_conv_jump)) if len(del_conv_jump): del_conv_list.append(max(del_conv_jump))
self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list)
self.state['JUMP_HEIGHT'] = float(min(learning_info['pattern_height'], default = 1)) self.state.jump_height = float(min(learning_info['pattern_height'], default = 1))
self.state['JUMP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) self.state.jump_length = int(max(learning_info['pattern_width'], default = 1))
def do_detect(self, dataframe: pd.DataFrame, id: str) -> list: def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1) possible_jumps = utils.find_jump(data, self.state.jump_height, self.state.jump_length + 1)
result = self.__filter_detection(possible_jumps, data) result = self.__filter_detection(possible_jumps, data)
return [(val - 1, val + 1) for val in result] return [(val - 1, val + 1) for val in result]
def __filter_detection(self, segments, data): def __filter_detection(self, segments, data):
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state.window_size
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)
segments = utils.best_pattern(close_patterns, data, 'max') segments = utils.best_pattern(close_patterns, data, 'max')
if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: if len(segments) == 0 or len(self.state.pattern_center) == 0:
segments = [] segments = []
return segments return segments
pattern_data = self.state['pattern_model'] pattern_data = self.state.pattern_model
upper_bound = self.state['convolve_max'] * 1.2 upper_bound = self.state.convolve_max * 1.2
lower_bound = self.state['convolve_min'] * 0.8 lower_bound = self.state.convolve_min * 0.8
delete_up_bound = self.state['conv_del_max'] * 1.02 delete_up_bound = self.state.conv_del_max * 1.02
delete_low_bound = self.state['conv_del_min'] * 0.98 delete_low_bound = self.state.conv_del_min * 0.98
for segment in segments: for segment in segments:
if segment > self.state['WINDOW_SIZE'] and segment < (len(data) - self.state['WINDOW_SIZE']): if segment > self.state.window_size and segment < (len(data) - self.state.window_size):
convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) convol_data = utils.get_interval(data, segment, self.state.window_size)
percent_of_nans = convol_data.isnull().sum() / len(convol_data) percent_of_nans = convol_data.isnull().sum() / len(convol_data)
if len(convol_data) == 0 or percent_of_nans > 0.5: if len(convol_data) == 0 or percent_of_nans > 0.5:
delete_list.append(segment) delete_list.append(segment)

57
analytics/analytics/models/model.py

@ -2,7 +2,7 @@ import utils
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from attrdict import AttrDict from attrdict import AttrDict
from typing import Optional, List from typing import Optional, List, Tuple
import pandas as pd import pandas as pd
import math import math
import logging import logging
@ -73,11 +73,11 @@ class Model(ABC):
DEL_CONV_ERROR = 0.02 DEL_CONV_ERROR = 0.02
@abstractmethod @abstractmethod
def do_fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache], learning_info: dict) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None:
pass pass
@abstractmethod @abstractmethod
def do_detect(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]:
pass pass
@abstractmethod @abstractmethod
@ -89,14 +89,12 @@ class Model(ABC):
pass pass
@abstractmethod @abstractmethod
def get_cache(self, cache: Optional[dict] = None) -> ModelState: def get_state(self, cache: Optional[dict] = None) -> ModelState:
pass pass
def fit(self, dataframe: pd.DataFrame, segments: list, id: AnalyticUnitId, cache: Optional[ModelCache]) -> ModelCache: def fit(self, dataframe: pd.DataFrame, segments: List[dict], id: AnalyticUnitId) -> ModelState:
logging.debug('Start method fit for analytic unit {}'.format(id)) logging.debug('Start method fit for analytic unit {}'.format(id))
data = dataframe['value'] data = dataframe['value']
if cache != None and len(cache) > 0:
self.state = cache
max_length = 0 max_length = 0
labeled = [] labeled = []
deleted = [] deleted = []
@ -114,48 +112,37 @@ class Model(ABC):
assert len(labeled) > 0, f'labeled list empty, skip fitting for {id}' assert len(labeled) > 0, f'labeled list empty, skip fitting for {id}'
if self.state.get('WINDOW_SIZE') == 0: if self.state.window_size == 0:
self.state['WINDOW_SIZE'] = math.ceil(max_length / 2) if max_length else 0 self.state.window_size = math.ceil(max_length / 2) if max_length else 0
model, model_type = self.get_model_type() model, model_type = self.get_model_type()
learning_info = self.get_parameters_from_segments(dataframe, labeled, deleted, model, model_type) learning_info = self.get_parameters_from_segments(dataframe, labeled, deleted, model, model_type)
self.do_fit(dataframe, labeled, deleted, learning_info, id) self.do_fit(dataframe, labeled, deleted, learning_info)
logging.debug('fit complete successful with self.state: {} for analytic unit: {}'.format(self.state, id)) logging.debug('fit complete successful with self.state: {} for analytic unit: {}'.format(self.state, id))
return self.state return self.state
def detect(self, dataframe: pd.DataFrame, id: str, cache: Optional[ModelCache]) -> dict: def detect(self, dataframe: pd.DataFrame, id: AnalyticUnitId) -> dict:
#If cache is None or empty dict - default parameters will be used instead logging.debug('Start method detect for analytic unit {}'.format(id))
if cache != None and len(cache) > 0: result = self.do_detect(dataframe)
self.state = cache
else:
logging.debug('Get empty cache in detect')
if not self.state:
logging.warning('self.state is empty - skip do_detect')
return {
'segments': [],
'cache': {},
}
result = self.do_detect(dataframe, id)
segments = [( segments = [(
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x[0]]), utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x[0]]),
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x[1]]), utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x[1]]),
) for x in result] ) for x in result]
if not self.state: if not self.state:
logging.warning('Return empty self.state after detect') logging.warning('Return empty self.state after detect')
logging.debug('Method detect complete successful for analytic unit {}'.format(id))
return { return {
'segments': segments, 'segments': segments,
'cache': self.state, 'cache': self.state,
} }
def _update_fiting_result(self, state: dict, confidences: list, convolve_list: list, del_conv_list: list, height_list: list) -> None: def _update_fiting_result(self, state: ModelState, confidences: list, convolve_list: list, del_conv_list: list, height_list: Optional[list] = None) -> None:
if type(state) is dict: state.confidence = float(min(confidences, default = 1.5))
state['confidence'] = float(min(confidences, default = 1.5)) state.convolve_min, state.convolve_max = utils.get_min_max(convolve_list, state.window_size)
state['convolve_min'], state['convolve_max'] = utils.get_min_max(convolve_list, state['WINDOW_SIZE']) state.conv_del_min, state.conv_del_max = utils.get_min_max(del_conv_list, 0)
state['conv_del_min'], state['conv_del_max'] = utils.get_min_max(del_conv_list, 0) if height_list is not None:
state['height_min'], state['height_max'] = utils.get_min_max(height_list, 0) state.height_min, state.height_max = utils.get_min_max(height_list, 0)
else:
raise ValueError('got non-dict as state for update fiting result: {}'.format(state))
def get_parameters_from_segments(self, dataframe: pd.DataFrame, labeled: list, deleted: list, model: str, model_type: bool) -> dict: def get_parameters_from_segments(self, dataframe: pd.DataFrame, labeled: List[dict], deleted: List[dict], model: str, model_type: bool) -> dict:
logging.debug('Start parsing segments') logging.debug('Start parsing segments')
learning_info = { learning_info = {
'confidence': [], 'confidence': [],
@ -173,11 +160,11 @@ class Model(ABC):
segment_center = segment.center_index segment_center = segment.center_index
learning_info['segment_center_list'].append(segment_center) learning_info['segment_center_list'].append(segment_center)
learning_info['pattern_timestamp'].append(segment.pattern_timestamp) learning_info['pattern_timestamp'].append(segment.pattern_timestamp)
aligned_segment = utils.get_interval(data, segment_center, self.state['WINDOW_SIZE']) aligned_segment = utils.get_interval(data, segment_center, self.state.window_size)
aligned_segment = utils.subtract_min_without_nan(aligned_segment) aligned_segment = utils.subtract_min_without_nan(aligned_segment)
if len(aligned_segment) == 0: if len(aligned_segment) == 0:
logging.warning('cant add segment to learning because segment is empty where segments center is: {}, window_size: {}, and len_data: {}'.format( logging.warning('cant add segment to learning because segment is empty where segments center is: {}, window_size: {}, and len_data: {}'.format(
segment_center, self.state['WINDOW_SIZE'], len(data))) segment_center, self.state.window_size, len(data)))
continue continue
learning_info['patterns_list'].append(aligned_segment) learning_info['patterns_list'].append(aligned_segment)
if model == 'peak' or model == 'trough': if model == 'peak' or model == 'trough':
@ -187,7 +174,7 @@ class Model(ABC):
pattern_height, pattern_length = utils.find_parameters(segment.data, segment.start, model) pattern_height, pattern_length = utils.find_parameters(segment.data, segment.start, model)
learning_info['pattern_height'].append(pattern_height) learning_info['pattern_height'].append(pattern_height)
learning_info['pattern_width'].append(pattern_length) learning_info['pattern_width'].append(pattern_length)
learning_info['patterns_value'].append(aligned_segment.values[self.state['WINDOW_SIZE']]) learning_info['patterns_value'].append(aligned_segment.values[self.state.window_size])
logging.debug('Parsing segments ended correctly with learning_info: {}'.format(learning_info)) logging.debug('Parsing segments ended correctly with learning_info: {}'.format(learning_info))
return learning_info return learning_info

67
analytics/analytics/models/peak_model.py

@ -3,11 +3,12 @@ from models import Model, ModelState
import scipy.signal import scipy.signal
from scipy.fftpack import fft from scipy.fftpack import fft
from scipy.signal import argrelextrema from scipy.signal import argrelextrema
from typing import Optional, List from typing import Optional, List, Tuple
import utils import utils
import utils.meta import utils.meta
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from analytic_types import AnalyticUnitId
SMOOTHING_COEFF = 2400 SMOOTHING_COEFF = 2400
EXP_SMOOTHING_FACTOR = 0.01 EXP_SMOOTHING_FACTOR = 0.01
@ -31,22 +32,6 @@ class PeakModelState(ModelState):
class PeakModel(Model): class PeakModel(Model):
def __init__(self):
super()
self.segments = []
self.state = {
'pattern_center': [],
'pattern_model': [],
'confidence': 1.5,
'convolve_max': 0,
'convolve_min': 0,
'WINDOW_SIZE': 0,
'conv_del_min': 0,
'conv_del_max': 0,
'height_max': 0,
'height_min': 0,
}
def get_model_type(self) -> (str, bool): def get_model_type(self) -> (str, bool):
model = 'peak' model = 'peak'
type_model = True type_model = True
@ -57,18 +42,18 @@ class PeakModel(Model):
segment = data[start: end] segment = data[start: end]
return segment.idxmax() return segment.idxmax()
def get_cache(self, cache: Optional[dict] = None) -> PeakModelState: def get_state(self, cache: Optional[dict] = None) -> PeakModelState:
return PeakModelState.from_json(cache) return PeakModelState.from_json(cache)
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state.window_size
last_pattern_center = self.state.get('pattern_center', []) last_pattern_center = self.state.pattern_center
self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list']))
self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) self.state.pattern_model = utils.get_av_model(learning_info['patterns_list'])
convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size)
correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size)
height_list = learning_info['patterns_value'] height_list = learning_info['patterns_value']
del_conv_list = [] del_conv_list = []
@ -80,20 +65,20 @@ class PeakModel(Model):
delete_pattern_timestamp.append(segment.pattern_timestamp) delete_pattern_timestamp.append(segment.pattern_timestamp)
deleted = utils.get_interval(data, del_max_index, window_size) deleted = utils.get_interval(data, del_max_index, window_size)
deleted = utils.subtract_min_without_nan(deleted) deleted = utils.subtract_min_without_nan(deleted)
del_conv = scipy.signal.fftconvolve(deleted, self.state['pattern_model']) del_conv = scipy.signal.fftconvolve(deleted, self.state.pattern_model)
if len(del_conv): del_conv_list.append(max(del_conv)) if len(del_conv): del_conv_list.append(max(del_conv))
delete_pattern_height.append(utils.find_confidence(deleted)[1]) delete_pattern_height.append(utils.find_confidence(deleted)[1])
self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list)
def do_detect(self, dataframe: pd.DataFrame, id: str): def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data
all_maxs = argrelextrema(np.array(data), np.greater)[0] all_maxs = argrelextrema(np.array(data), np.greater)[0]
extrema_list = [] extrema_list = []
for i in utils.exponential_smoothing(data + self.state['confidence'], EXP_SMOOTHING_FACTOR): for i in utils.exponential_smoothing(data + self.state.confidence, EXP_SMOOTHING_FACTOR):
extrema_list.append(i) extrema_list.append(i)
segments = [] segments = []
@ -101,27 +86,27 @@ class PeakModel(Model):
if data[i] > extrema_list[i]: if data[i] > extrema_list[i]:
segments.append(i) segments.append(i)
result = self.__filter_detection(segments, data) result = self.__filter_detection(segments, data)
result = utils.get_borders_of_peaks(result, data, self.state.get('WINDOW_SIZE'), self.state.get('confidence')) result = utils.get_borders_of_peaks(result, data, self.state.window_size, self.state.confidence)
return result return result
def __filter_detection(self, segments: list, data: list) -> list: def __filter_detection(self, segments: list, data: list) -> list:
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state.window_size
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)
segments = utils.best_pattern(close_patterns, data, 'max') segments = utils.best_pattern(close_patterns, data, 'max')
if len(segments) == 0 or len(self.state.get('pattern_model', [])) == 0: if len(segments) == 0 or len(self.state.pattern_model) == 0:
return [] return []
pattern_data = self.state['pattern_model'] pattern_data = self.state.pattern_model
up_height = self.state['height_max'] * (1 + self.HEIGHT_ERROR) up_height = self.state.height_max * (1 + self.HEIGHT_ERROR)
low_height = self.state['height_min'] * (1 - self.HEIGHT_ERROR) low_height = self.state.height_min * (1 - self.HEIGHT_ERROR)
up_conv = self.state['convolve_max'] * (1 + 1.5 * self.CONV_ERROR) up_conv = self.state.convolve_max * (1 + 1.5 * self.CONV_ERROR)
low_conv = self.state['convolve_min'] * (1 - self.CONV_ERROR) low_conv = self.state.convolve_min * (1 - self.CONV_ERROR)
up_del_conv = self.state['conv_del_max'] * (1 + self.DEL_CONV_ERROR) up_del_conv = self.state.conv_del_max * (1 + self.DEL_CONV_ERROR)
low_del_conv = self.state['conv_del_min'] * (1 - self.DEL_CONV_ERROR) low_del_conv = self.state.conv_del_min * (1 - self.DEL_CONV_ERROR)
for segment in segments: for segment in segments:
if segment > self.state['WINDOW_SIZE']: if segment > self.state.window_size:
convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) convol_data = utils.get_interval(data, segment, self.state.window_size)
convol_data = utils.subtract_min_without_nan(convol_data) convol_data = utils.subtract_min_without_nan(convol_data)
percent_of_nans = convol_data.isnull().sum() / len(convol_data) percent_of_nans = convol_data.isnull().sum() / len(convol_data)
if percent_of_nans > 0.5: if percent_of_nans > 0.5:
@ -132,7 +117,7 @@ class PeakModel(Model):
convol_data = utils.nan_to_zero(convol_data, nan_list) convol_data = utils.nan_to_zero(convol_data, nan_list)
pattern_data = utils.nan_to_zero(pattern_data, nan_list) pattern_data = utils.nan_to_zero(pattern_data, nan_list)
conv = scipy.signal.fftconvolve(convol_data, pattern_data) conv = scipy.signal.fftconvolve(convol_data, pattern_data)
pattern_height = convol_data.values[self.state['WINDOW_SIZE']] pattern_height = convol_data.values[self.state.window_size]
if pattern_height > up_height or pattern_height < low_height: if pattern_height > up_height or pattern_height < low_height:
delete_list.append(segment) delete_list.append(segment)
continue continue

65
analytics/analytics/models/trough_model.py

@ -3,11 +3,12 @@ from models import Model, ModelState
import scipy.signal import scipy.signal
from scipy.fftpack import fft from scipy.fftpack import fft
from scipy.signal import argrelextrema from scipy.signal import argrelextrema
from typing import Optional from typing import Optional, List, Tuple
import utils import utils
import utils.meta import utils.meta
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from analytic_types import AnalyticUnitId
SMOOTHING_COEFF = 2400 SMOOTHING_COEFF = 2400
EXP_SMOOTHING_FACTOR = 0.01 EXP_SMOOTHING_FACTOR = 0.01
@ -30,22 +31,6 @@ class TroughModelState(ModelState):
class TroughModel(Model): class TroughModel(Model):
def __init__(self):
super()
self.segments = []
self.state = {
'pattern_center': [],
'pattern_model': [],
'confidence': 1.5,
'convolve_max': 570000,
'convolve_min': 530000,
'WINDOW_SIZE': 0,
'conv_del_min': 54000,
'conv_del_max': 55000,
'height_max': 0,
'height_min': 0,
}
def get_model_type(self) -> (str, bool): def get_model_type(self) -> (str, bool):
model = 'trough' model = 'trough'
@ -57,18 +42,18 @@ class TroughModel(Model):
segment = data[start: end] segment = data[start: end]
return segment.idxmin() return segment.idxmin()
def get_cache(self, cache: Optional[dict] = None) -> TroughModelState: def get_state(self, cache: Optional[dict] = None) -> TroughModelState:
return TroughModelState.from_json(cache) return TroughModelState.from_json(cache)
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: List[dict], deleted_segments: List[dict], learning_info: dict) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state.window_size
last_pattern_center = self.state.get('pattern_center', []) last_pattern_center = self.state.pattern_center
self.state['pattern_center'] = list(set(last_pattern_center + learning_info['segment_center_list'])) self.state.pattern_center = list(set(last_pattern_center + learning_info['segment_center_list']))
self.state['pattern_model'] = utils.get_av_model(learning_info['patterns_list']) self.state.pattern_model = utils.get_av_model(learning_info['patterns_list'])
convolve_list = utils.get_convolve(self.state['pattern_center'], self.state['pattern_model'], data, window_size) convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size)
correlation_list = utils.get_correlation(self.state['pattern_center'], self.state['pattern_model'], data, window_size) correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size)
height_list = learning_info['patterns_value'] height_list = learning_info['patterns_value']
del_conv_list = [] del_conv_list = []
@ -80,20 +65,20 @@ class TroughModel(Model):
delete_pattern_timestamp.append(segment.pattern_timestamp) delete_pattern_timestamp.append(segment.pattern_timestamp)
deleted = utils.get_interval(data, del_min_index, window_size) deleted = utils.get_interval(data, del_min_index, window_size)
deleted = utils.subtract_min_without_nan(deleted) deleted = utils.subtract_min_without_nan(deleted)
del_conv = scipy.signal.fftconvolve(deleted, self.state['pattern_model']) del_conv = scipy.signal.fftconvolve(deleted, self.state.pattern_model)
if len(del_conv): del_conv_list.append(max(del_conv)) if len(del_conv): del_conv_list.append(max(del_conv))
delete_pattern_height.append(utils.find_confidence(deleted)[1]) delete_pattern_height.append(utils.find_confidence(deleted)[1])
self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list)
def do_detect(self, dataframe: pd.DataFrame, id: str): def do_detect(self, dataframe: pd.DataFrame) -> List[Tuple[int, int]]:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data
all_mins = argrelextrema(np.array(data), np.less)[0] all_mins = argrelextrema(np.array(data), np.less)[0]
extrema_list = [] extrema_list = []
for i in utils.exponential_smoothing(data - self.state['confidence'], EXP_SMOOTHING_FACTOR): for i in utils.exponential_smoothing(data - self.state.confidence, EXP_SMOOTHING_FACTOR):
extrema_list.append(i) extrema_list.append(i)
segments = [] segments = []
@ -101,27 +86,27 @@ class TroughModel(Model):
if data[i] < extrema_list[i]: if data[i] < extrema_list[i]:
segments.append(i) segments.append(i)
result = self.__filter_detection(segments, data) result = self.__filter_detection(segments, data)
result = utils.get_borders_of_peaks(result, data, self.state.get('WINDOW_SIZE'), self.state.get('confidence'), inverse = True) result = utils.get_borders_of_peaks(result, data, self.state.window_size, self.state.confidence, inverse = True)
return result return result
def __filter_detection(self, segments: list, data: list) -> list: def __filter_detection(self, segments: list, data: list) -> list:
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state.window_size
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)
segments = utils.best_pattern(close_patterns, data, 'min') segments = utils.best_pattern(close_patterns, data, 'min')
if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: if len(segments) == 0 or len(self.state.pattern_center) == 0:
segments = [] segments = []
return segments return segments
pattern_data = self.state['pattern_model'] pattern_data = self.state.pattern_model
up_height = self.state['height_max'] * (1 + self.HEIGHT_ERROR) up_height = self.state.height_max * (1 + self.HEIGHT_ERROR)
low_height = self.state['height_min'] * (1 - self.HEIGHT_ERROR) low_height = self.state.height_min * (1 - self.HEIGHT_ERROR)
up_conv = self.state['convolve_max'] * (1 + 1.5 * self.CONV_ERROR) up_conv = self.state.convolve_max * (1 + 1.5 * self.CONV_ERROR)
low_conv = self.state['convolve_min'] * (1 - self.CONV_ERROR) low_conv = self.state.convolve_min * (1 - self.CONV_ERROR)
up_del_conv = self.state['conv_del_max'] * (1 + self.DEL_CONV_ERROR) up_del_conv = self.state.conv_del_max * (1 + self.DEL_CONV_ERROR)
low_del_conv = self.state['conv_del_min'] * (1 - self.DEL_CONV_ERROR) low_del_conv = self.state.conv_del_min * (1 - self.DEL_CONV_ERROR)
for segment in segments: for segment in segments:
if segment > self.state['WINDOW_SIZE']: if segment > self.state.window_size:
convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) convol_data = utils.get_interval(data, segment, self.state.window_size)
convol_data = utils.subtract_min_without_nan(convol_data) convol_data = utils.subtract_min_without_nan(convol_data)
percent_of_nans = convol_data.isnull().sum() / len(convol_data) percent_of_nans = convol_data.isnull().sum() / len(convol_data)
if percent_of_nans > 0.5: if percent_of_nans > 0.5:

171
analytics/tests/test_dataset.py

@ -23,9 +23,9 @@ class TestDataset(unittest.TestCase):
for model in model_instances: for model in model_instances:
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.state = model.get_state(None)
with self.assertRaises(AssertionError): with self.assertRaises(AssertionError):
model.fit(dataframe, segments, 'test', dict()) model.fit(dataframe, segments, 'test')
def test_peak_antisegments(self): def test_peak_antisegments(self):
data_val = [1.0, 1.0, 1.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 5.0, 7.0, 5.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] data_val = [1.0, 1.0, 1.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 5.0, 7.0, 5.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
@ -36,7 +36,8 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.PeakModel() model = models.PeakModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -49,7 +50,8 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.JumpModel() model = models.JumpModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -62,7 +64,8 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.TroughModel() model = models.TroughModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -75,7 +78,8 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.DropModel() model = models.DropModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -88,7 +92,8 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.GeneralModel() model = models.GeneralModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -101,7 +106,8 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.JumpModel() model = models.JumpModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -113,8 +119,9 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.DropModel() model = models.DropModel()
model.state = model.get_state(None)
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -125,8 +132,9 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.JumpModel() model = models.JumpModel()
model.state = model.get_state(None)
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -166,7 +174,8 @@ class TestDataset(unittest.TestCase):
try: try:
for model in model_instances: for model in model_instances:
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -175,78 +184,84 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}]
model = models.GeneralModel() model = models.GeneralModel()
model.fit(dataframe, segments,'test', dict()) model.state = model.get_state(None)
model.fit(dataframe, segments,'test')
result = len(data_val) + 1 result = len(data_val) + 1
for _ in range(2): for _ in range(2):
model.do_detect(dataframe,'test') model.do_detect(dataframe)
max_pattern_index = max(model.do_detect(dataframe, 'test')) max_pattern_index = max(model.do_detect(dataframe))
self.assertLessEqual(max_pattern_index[0], result) self.assertLessEqual(max_pattern_index[0], result)
def test_peak_model_for_cache(self): def test_peak_model_for_cache(self):
cache = { cache = {
'pattern_center': [1, 6], 'patternCenter': [1, 6],
'model_peak': [1, 4, 0], 'patternModel': [1, 4, 0],
'confidence': 2, 'confidence': 2,
'convolve_max': 8, 'convolveMax': 8,
'convolve_min': 7, 'convolveMin': 7,
'WINDOW_SIZE': 1, 'windowSize': 1,
'conv_del_min': 0, 'convDelMin': 0,
'conv_del_max': 0, 'convDelMax': 0,
'heightMax': 4,
'heightMin': 4,
} }
data_val = [2.0, 5.0, 1.0, 1.0, 1.0, 2.0, 5.0, 1.0, 1.0, 2.0, 3.0, 7.0, 1.0, 1.0, 1.0] data_val = [2.0, 5.0, 1.0, 1.0, 1.0, 2.0, 5.0, 1.0, 1.0, 2.0, 3.0, 7.0, 1.0, 1.0, 1.0]
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}]
model = models.PeakModel() model = models.PeakModel()
result = model.fit(dataframe, segments, 'test', cache) model.state = model.get_state(cache)
self.assertEqual(len(result['pattern_center']), 3) result = model.fit(dataframe, segments, 'test')
self.assertEqual(len(result.pattern_center), 3)
def test_trough_model_for_cache(self): def test_trough_model_for_cache(self):
cache = { cache = {
'pattern_center': [2, 6], 'patternCenter': [2, 6],
'pattern_model': [5, 0.5, 4], 'patternModel': [5, 0.5, 4],
'confidence': 2, 'confidence': 2,
'convolve_max': 8, 'convolveMax': 8,
'convolve_min': 7, 'convolveMin': 7,
'WINDOW_SIZE': 1, 'window_size': 1,
'conv_del_min': 0, 'convDelMin': 0,
'conv_del_max': 0, 'convDelMax': 0,
} }
data_val = [5.0, 5.0, 1.0, 4.0, 5.0, 5.0, 0.0, 4.0, 5.0, 5.0, 6.0, 1.0, 5.0, 5.0, 5.0] data_val = [5.0, 5.0, 1.0, 4.0, 5.0, 5.0, 0.0, 4.0, 5.0, 5.0, 6.0, 1.0, 5.0, 5.0, 5.0]
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}]
model = models.TroughModel() model = models.TroughModel()
result = model.fit(dataframe, segments, 'test', cache) model.state = model.get_state(cache)
self.assertEqual(len(result['pattern_center']), 3) result = model.fit(dataframe, segments, 'test')
self.assertEqual(len(result.pattern_center), 3)
def test_jump_model_for_cache(self): def test_jump_model_for_cache(self):
cache = { cache = {
'pattern_center': [2, 6], 'patternCenter': [2, 6],
'pattern_model': [5, 0.5, 4], 'patternModel': [5, 0.5, 4],
'confidence': 2, 'confidence': 2,
'convolve_max': 8, 'convolveMax': 8,
'convolve_min': 7, 'convolveMin': 7,
'WINDOW_SIZE': 1, 'window_size': 1,
'conv_del_min': 0, 'convDelMin': 0,
'conv_del_max': 0, 'convDelMax': 0,
} }
data_val = [1.0, 1.0, 1.0, 4.0, 4.0, 0.0, 0.0, 5.0, 5.0, 0.0, 0.0, 4.0, 4.0, 4.0, 4.0] data_val = [1.0, 1.0, 1.0, 4.0, 4.0, 0.0, 0.0, 5.0, 5.0, 0.0, 0.0, 4.0, 4.0, 4.0, 4.0]
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}]
model = models.JumpModel() model = models.JumpModel()
result = model.fit(dataframe, segments, 'test', cache) model.state = model.get_state(cache)
self.assertEqual(len(result['pattern_center']), 3) result = model.fit(dataframe, segments, 'test')
self.assertEqual(len(result.pattern_center), 3)
def test_models_for_pattern_model_cache(self): def test_models_for_pattern_model_cache(self):
cache = { cache = {
'pattern_center': [4, 12], 'patternCenter': [4, 12],
'pattern_model': [], 'patternModel': [],
'confidence': 2, 'confidence': 2,
'convolve_max': 8, 'convolveMax': 8,
'convolve_min': 7, 'convolveMin': 7,
'WINDOW_SIZE': 2, 'window_size': 2,
'conv_del_min': 0, 'convDelMin': 0,
'conv_del_max': 0, 'convDelMax': 0,
} }
data_val = [5.0, 5.0, 5.0, 5.0, 1.0, 1.0, 1.0, 1.0, 9.0, 9.0, 9.0, 9.0, 0, 0, 0, 0, 0, 0, 6.0, 6.0, 6.0, 1.0, 1.0, 1.0, 1.0, 1.0] data_val = [5.0, 5.0, 5.0, 5.0, 1.0, 1.0, 1.0, 1.0, 9.0, 9.0, 9.0, 9.0, 0, 0, 0, 0, 0, 0, 6.0, 6.0, 6.0, 1.0, 1.0, 1.0, 1.0, 1.0]
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
@ -254,7 +269,8 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.DropModel() model = models.DropModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, 'test', cache) model.state = model.get_state(cache)
model.fit(dataframe, segments, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -267,11 +283,13 @@ class TestDataset(unittest.TestCase):
2.0, 8.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] 2.0, 8.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
data = create_dataframe(problem_data) data = create_dataframe(problem_data)
cache = { cache = {
'pattern_center': [5, 50], 'patternCenter': [5, 50],
'pattern_model': [], 'patternModel': [],
'WINDOW_SIZE': 2, 'windowSize': 2,
'convolve_min': 0, 'convolveMin': 0,
'convolve_max': 0, 'convolveMax': 0,
'convDelMin': 0,
'convDelMax': 0,
} }
max_ws = 20 max_ws = 20
iteration = 1 iteration = 1
@ -279,14 +297,15 @@ class TestDataset(unittest.TestCase):
for _ in range(iteration): for _ in range(iteration):
pattern_model = create_random_model(ws) pattern_model = create_random_model(ws)
convolve = scipy.signal.fftconvolve(pattern_model, pattern_model) convolve = scipy.signal.fftconvolve(pattern_model, pattern_model)
cache['WINDOW_SIZE'] = ws cache['windowSize'] = ws
cache['pattern_model'] = pattern_model cache['patternModel'] = pattern_model
cache['convolve_min'] = max(convolve) cache['convolveMin'] = max(convolve)
cache['convolve_max'] = max(convolve) cache['convolveMax'] = max(convolve)
try: try:
model = models.GeneralModel() model = models.GeneralModel()
model.state = model.get_state(cache)
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.detect(data, 'test', cache) model.detect(data, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly with av_model {} and window size {}'.format(model_name, pattern_model, ws)) self.fail('Model {} raised unexpectedly with av_model {} and window size {}'.format(model_name, pattern_model, ws))
@ -294,37 +313,37 @@ class TestDataset(unittest.TestCase):
data = create_random_model(random.randint(1, 100)) data = create_random_model(random.randint(1, 100))
data = create_dataframe(data) data = create_dataframe(data)
model_instances = [ model_instances = [
models.GeneralModel(),
models.PeakModel(), models.PeakModel(),
models.TroughModel() models.TroughModel()
] ]
cache = { cache = {
'pattern_center': [5, 50], 'patternCenter': [5, 50],
'pattern_model': [], 'patternModel': [],
'WINDOW_SIZE': 2, 'windowSize': 2,
'convolve_min': 0, 'convolveMin': 0,
'convolve_max': 0, 'convolveMax': 0,
'confidence': 0, 'confidence': 0,
'height_max': 0, 'heightMax': 0,
'height_min': 0, 'heightMin': 0,
'conv_del_min': 0, 'convDelMin': 0,
'conv_del_max': 0, 'convDelMax': 0,
} }
ws = random.randint(1, int(len(data['value']/2))) ws = random.randint(1, int(len(data['value']/2)))
pattern_model = create_random_model(ws) pattern_model = create_random_model(ws)
convolve = scipy.signal.fftconvolve(pattern_model, pattern_model) convolve = scipy.signal.fftconvolve(pattern_model, pattern_model)
confidence = 0.2 * (data['value'].max() - data['value'].min()) confidence = 0.2 * (data['value'].max() - data['value'].min())
cache['WINDOW_SIZE'] = ws cache['windowSize'] = ws
cache['pattern_model'] = pattern_model cache['patternModel'] = pattern_model
cache['convolve_min'] = max(convolve) cache['convolveMin'] = max(convolve)
cache['convolve_max'] = max(convolve) cache['convolveMax'] = max(convolve)
cache['confidence'] = confidence cache['confidence'] = confidence
cache['height_max'] = data['value'].max() cache['heightMax'] = data['value'].max()
cache['height_min'] = confidence cache['heightMin'] = confidence
try: try:
for model in model_instances: for model in model_instances:
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.detect(data, 'test', cache) model.state = model.get_state(cache)
model.detect(data, 'test')
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly with dataset {} and cache {}'.format(model_name, data['value'], cache)) self.fail('Model {} raised unexpectedly with dataset {} and cache {}'.format(model_name, data['value'], cache))

2
analytics/tests/test_detectors.py

@ -9,7 +9,7 @@ class TestPatternDetector(unittest.TestCase):
data = [[0,1], [1,2]] data = [[0,1], [1,2]]
dataframe = pd.DataFrame(data, columns=['timestamp', 'values']) dataframe = pd.DataFrame(data, columns=['timestamp', 'values'])
cache = {'WINDOW_SIZE': 10} cache = {'windowSize': 10}
detector = pattern_detector.PatternDetector('GENERAL', 'test_id') detector = pattern_detector.PatternDetector('GENERAL', 'test_id')

19
analytics/tests/test_manager.py

@ -1,5 +1,6 @@
from models import PeakModel, DropModel, TroughModel, JumpModel, GeneralModel from models import PeakModel, DropModel, TroughModel, JumpModel, GeneralModel
from models import PeakModelState, DropModelState, TroughModelState, JumpModelState, GeneralModelState
import utils.meta
import aiounittest import aiounittest
from analytic_unit_manager import AnalyticUnitManager from analytic_unit_manager import AnalyticUnitManager
from collections import namedtuple from collections import namedtuple
@ -97,19 +98,3 @@ class TestDataset(aiounittest.AsyncTestCase):
without_manager = await self._test_detect(test_data) without_manager = await self._test_detect(test_data)
self.assertEqual(with_manager, without_manager) self.assertEqual(with_manager, without_manager)
async def test_cache(self):
cache_attrs = {
'PEAK': PeakModel().state.keys(),
'JUMP': JumpModel().state.keys(),
'DROP': DropModel().state.keys(),
'TROUGH': TroughModel().state.keys(),
'GENERAL': GeneralModel().state.keys()
}
for pattern, attrs in cache_attrs.items():
test_data = TestData(get_random_id(), pattern, *self._get_test_dataset(pattern))
learn_task = self._get_learn_task(test_data)
cache = await self._learn(learn_task)
for a in attrs:
self.assertTrue(a in cache.keys(), msg='{} not in cache keys: {}'.format(a, cache.keys()))

Loading…
Cancel
Save