From 6d5ec2b6fecd486193ffe4b6c05c169ebb58cd7c Mon Sep 17 00:00:00 2001 From: Alexandr Velikiy <39257464+VargBurz@users.noreply.github.com> Date: Mon, 11 Feb 2019 16:25:34 +0300 Subject: [PATCH] Webhooks don't find any patterns #401 (#404) --- analytics/analytics/models/drop_model.py | 18 +-- analytics/analytics/models/general_model.py | 19 ++-- analytics/analytics/models/jump_model.py | 18 +-- analytics/analytics/models/peak_model.py | 18 +-- analytics/analytics/models/trough_model.py | 18 +-- analytics/requirements.txt | 1 + analytics/tests/test_manager.py | 115 ++++++++++++++++++++ 7 files changed, 161 insertions(+), 46 deletions(-) create mode 100644 analytics/tests/test_manager.py diff --git a/analytics/analytics/models/drop_model.py b/analytics/analytics/models/drop_model.py index 5ef90df..d3d12d0 100644 --- a/analytics/analytics/models/drop_model.py +++ b/analytics/analytics/models/drop_model.py @@ -14,9 +14,9 @@ class DropModel(Model): def __init__(self): super() self.segments = [] - self.idrops = [] - self.model_drop = [] self.state = { + 'idrops': [], + 'model_drop': [], 'confidence': 1.5, 'convolve_max': 200, 'convolve_min': 200, @@ -42,10 +42,10 @@ class DropModel(Model): data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] - self.idrops = learning_info['segment_center_list'] - self.model_drop = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.idrops, self.model_drop, data, window_size) - correlation_list = utils.get_correlation(self.idrops, self.model_drop, data, window_size) + self.state['idrops'] = learning_info['segment_center_list'] + self.state['model_drop'] = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state['idrops'], self.state['model_drop'], data, window_size) + correlation_list = utils.get_correlation(self.state['idrops'], self.state['model_drop'], data, window_size) del_conv_list = [] delete_pattern_timestamp = [] @@ -54,7 +54,7 @@ class DropModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted_drop = utils.get_interval(data, segment_cent_index, window_size) deleted_drop = utils.subtract_min_without_nan(deleted_drop) - del_conv_drop = scipy.signal.fftconvolve(deleted_drop, self.model_drop) + del_conv_drop = scipy.signal.fftconvolve(deleted_drop, self.state['model_drop']) if len(del_conv_drop): del_conv_list.append(max(del_conv_drop)) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list) @@ -73,10 +73,10 @@ class DropModel(Model): variance_error = self.state['WINDOW_SIZE'] close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'min') - if len(segments) == 0 or len(self.idrops) == 0 : + if len(segments) == 0 or len(self.state['idrops']) == 0 : segments = [] return segments - pattern_data = self.model_drop + pattern_data = self.state['model_drop'] for segment in segments: if segment > self.state['WINDOW_SIZE'] and segment < (len(data) - self.state['WINDOW_SIZE']): convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) diff --git a/analytics/analytics/models/general_model.py b/analytics/analytics/models/general_model.py index 176a912..d0ebc80 100644 --- a/analytics/analytics/models/general_model.py +++ b/analytics/analytics/models/general_model.py @@ -15,10 +15,9 @@ class GeneralModel(Model): def __init__(self): super() - self.segments = [] - self.ipats = [] - self.model_gen = [] self.state = { + 'ipats': [], + 'model_gen': [], 'convolve_max': 240, 'convolve_min': 200, 'WINDOW_SIZE': 240, @@ -41,10 +40,10 @@ class GeneralModel(Model): def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] - self.ipats = learning_info['segment_center_list'] - self.model_gen = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.ipats, self.model_gen, data, self.state['WINDOW_SIZE']) - correlation_list = utils.get_correlation(self.ipats, self.model_gen, data, self.state['WINDOW_SIZE']) + self.state['ipats'] = learning_info['segment_center_list'] + self.state['model_gen'] = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state['ipats'], self.state['model_gen'], data, self.state['WINDOW_SIZE']) + correlation_list = utils.get_correlation(self.state['ipats'], self.state['model_gen'], data, self.state['WINDOW_SIZE']) del_conv_list = [] delete_pattern_timestamp = [] @@ -53,7 +52,7 @@ class GeneralModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted_pat = utils.get_interval(data, del_mid_index, self.state['WINDOW_SIZE']) deleted_pat = utils.subtract_min_without_nan(deleted_pat) - del_conv_pat = scipy.signal.fftconvolve(deleted_pat, self.model_gen) + del_conv_pat = scipy.signal.fftconvolve(deleted_pat, self.state['model_gen']) if len(del_conv_pat): del_conv_list.append(max(del_conv_pat)) self.state['convolve_min'], self.state['convolve_max'] = utils.get_min_max(convolve_list, self.state['WINDOW_SIZE'] / 3) @@ -62,7 +61,7 @@ class GeneralModel(Model): def do_detect(self, dataframe: pd.DataFrame) -> list: data = utils.cut_dataframe(dataframe) data = data['value'] - pat_data = self.model_gen + pat_data = self.state['model_gen'] if pat_data.count(0) == len(pat_data): raise ValueError('Labeled patterns must not be empty') @@ -78,7 +77,7 @@ class GeneralModel(Model): return set(item + self.state['WINDOW_SIZE'] for item in filtered) def __filter_detection(self, segments: list, data: list): - if len(segments) == 0 or len(self.ipats) == 0: + if len(segments) == 0 or len(self.state['ipats']) == 0: return [] delete_list = [] for val in segments: diff --git a/analytics/analytics/models/jump_model.py b/analytics/analytics/models/jump_model.py index deb08f8..ca27902 100644 --- a/analytics/analytics/models/jump_model.py +++ b/analytics/analytics/models/jump_model.py @@ -15,9 +15,9 @@ class JumpModel(Model): def __init__(self): super() self.segments = [] - self.ijumps = [] - self.model_jump = [] self.state = { + 'ijumps': [], + 'model_jump': [], 'confidence': 1.5, 'convolve_max': 230, 'convolve_min': 230, @@ -43,10 +43,10 @@ class JumpModel(Model): data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] - self.ijumps = learning_info['segment_center_list'] - self.model_jump = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.ijumps, self.model_jump, data, window_size) - correlation_list = utils.get_correlation(self.ijumps, self.model_jump, data, window_size) + self.state['ijumps'] = learning_info['segment_center_list'] + self.state['model_jump'] = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state['ijumps'], self.state['model_jump'], data, window_size) + correlation_list = utils.get_correlation(self.state['ijumps'], self.state['model_jump'], data, window_size) del_conv_list = [] delete_pattern_timestamp = [] @@ -55,7 +55,7 @@ class JumpModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted_jump = utils.get_interval(data, segment_cent_index, window_size) deleted_jump = utils.subtract_min_without_nan(deleted_jump) - del_conv_jump = scipy.signal.fftconvolve(deleted_jump, self.model_jump) + del_conv_jump = scipy.signal.fftconvolve(deleted_jump, self.state['model_jump']) if len(del_conv_jump): del_conv_list.append(max(del_conv_jump)) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list) @@ -75,10 +75,10 @@ class JumpModel(Model): close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'max') - if len(segments) == 0 or len(self.ijumps) == 0 : + if len(segments) == 0 or len(self.state['ijumps']) == 0 : segments = [] return segments - pattern_data = self.model_jump + pattern_data = self.state['model_jump'] upper_bound = self.state['convolve_max'] * 1.2 lower_bound = self.state['convolve_min'] * 0.8 delete_up_bound = self.state['conv_del_max'] * 1.02 diff --git a/analytics/analytics/models/peak_model.py b/analytics/analytics/models/peak_model.py index 1eb708a..009e909 100644 --- a/analytics/analytics/models/peak_model.py +++ b/analytics/analytics/models/peak_model.py @@ -16,9 +16,9 @@ class PeakModel(Model): def __init__(self): super() self.segments = [] - self.ipeaks = [] - self.model = [] self.state = { + 'ipeaks': [], + 'model_peak': [], 'confidence': 1.5, 'convolve_max': 570000, 'convolve_min': 530000, @@ -41,10 +41,10 @@ class PeakModel(Model): data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] - self.ipeaks = learning_info['segment_center_list'] - self.model = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.ipeaks, self.model, data, window_size) - correlation_list = utils.get_correlation(self.ipeaks, self.model, data, window_size) + self.state['ipeaks'] = learning_info['segment_center_list'] + self.state['model_peak'] = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state['ipeaks'], self.state['model_peak'], data, window_size) + correlation_list = utils.get_correlation(self.state['ipeaks'], self.state['model_peak'], data, window_size) del_conv_list = [] delete_pattern_width = [] @@ -55,7 +55,7 @@ class PeakModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted = utils.get_interval(data, del_max_index, window_size) deleted = utils.subtract_min_without_nan(deleted) - del_conv = scipy.signal.fftconvolve(deleted, self.model) + del_conv = scipy.signal.fftconvolve(deleted, self.state['model_peak']) if len(del_conv): del_conv_list.append(max(del_conv)) delete_pattern_height.append(utils.find_confidence(deleted)[1]) delete_pattern_width.append(utils.find_width(deleted, True)) @@ -85,9 +85,9 @@ class PeakModel(Model): close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'max') - if len(segments) == 0 or len(self.ipeaks) == 0: + if len(segments) == 0 or len(self.state['ipeaks']) == 0: return [] - pattern_data = self.model + pattern_data = self.state['model_peak'] for segment in segments: if segment > self.state['WINDOW_SIZE']: convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) diff --git a/analytics/analytics/models/trough_model.py b/analytics/analytics/models/trough_model.py index 04736d0..98a58b0 100644 --- a/analytics/analytics/models/trough_model.py +++ b/analytics/analytics/models/trough_model.py @@ -16,9 +16,9 @@ class TroughModel(Model): def __init__(self): super() self.segments = [] - self.itroughs = [] - self.model = [] self.state = { + 'itroughs': [], + 'model_trough': [], 'confidence': 1.5, 'convolve_max': 570000, 'convolve_min': 530000, @@ -41,10 +41,10 @@ class TroughModel(Model): data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] - self.itroughs = learning_info['segment_center_list'] - self.model = utils.get_av_model(learning_info['patterns_list']) - convolve_list = utils.get_convolve(self.itroughs, self.model, data, window_size) - correlation_list = utils.get_correlation(self.itroughs, self.model, data, window_size) + self.state['itroughs'] = learning_info['segment_center_list'] + self.state['model_trough'] = utils.get_av_model(learning_info['patterns_list']) + convolve_list = utils.get_convolve(self.state['itroughs'], self.state['model_trough'], data, window_size) + correlation_list = utils.get_correlation(self.state['itroughs'], self.state['model_trough'], data, window_size) del_conv_list = [] delete_pattern_width = [] @@ -55,7 +55,7 @@ class TroughModel(Model): delete_pattern_timestamp.append(segment.pattern_timestamp) deleted = utils.get_interval(data, del_min_index, window_size) deleted = utils.subtract_min_without_nan(deleted) - del_conv = scipy.signal.fftconvolve(deleted, self.model) + del_conv = scipy.signal.fftconvolve(deleted, self.state['model_trough']) if len(del_conv): del_conv_list.append(max(del_conv)) delete_pattern_height.append(utils.find_confidence(deleted)[1]) delete_pattern_width.append(utils.find_width(deleted, False)) @@ -84,10 +84,10 @@ class TroughModel(Model): variance_error = self.state['WINDOW_SIZE'] close_patterns = utils.close_filtering(segments, variance_error) segments = utils.best_pattern(close_patterns, data, 'min') - if len(segments) == 0 or len(self.itroughs) == 0 : + if len(segments) == 0 or len(self.state['itroughs']) == 0 : segments = [] return segments - pattern_data = self.model + pattern_data = self.state['model_trough'] for segment in segments: if segment > self.state['WINDOW_SIZE']: convol_data = utils.get_interval(data, segment, self.state['WINDOW_SIZE']) diff --git a/analytics/requirements.txt b/analytics/requirements.txt index 49de724..c741599 100644 --- a/analytics/requirements.txt +++ b/analytics/requirements.txt @@ -1,5 +1,6 @@ altgraph==0.15 attrdict==2.0.0 +aiounittest==1.1.0 future==0.16.0 macholib==1.9 numpy==1.14.5 diff --git a/analytics/tests/test_manager.py b/analytics/tests/test_manager.py new file mode 100644 index 0000000..44be22a --- /dev/null +++ b/analytics/tests/test_manager.py @@ -0,0 +1,115 @@ +from models import PeakModel, DropModel, TroughModel, JumpModel, GeneralModel + +import aiounittest +from analytic_unit_manager import AnalyticUnitManager +from collections import namedtuple + +TestData = namedtuple('TestData', ['uid', 'type', 'values', 'segments']) + +def get_random_id() -> str: + return str(id(list())) + +class TestDataset(aiounittest.AsyncTestCase): + + timestep = 50 #ms + + def _fill_task(self, uid, data, task_type, analytic_unit_type, segments=None, cache=None): + task = { + 'analyticUnitId': uid, + 'type': task_type, + 'payload': { + 'data': data, + 'from': data[0][0], + 'to': data[-1][0], + 'analyticUnitType': analytic_unit_type, + 'detector': 'pattern', + 'cache': cache + }, + '_id': get_random_id() + } + if segments: task['payload']['segments'] = segments + + return task + + def _convert_values(self, values) -> list: + from_t = 0 + to_t = len(values) * self.timestep + return list(zip(range(from_t, to_t, self.timestep), values)) + + def _index_to_test_time(self, idx) -> int: + return idx * self.timestep + + def _get_learn_task(self, test_data): + uid, analytic_unit_type, values, segments = test_data + data = self._convert_values(values) + segments = [{ + 'analyticUnitId': uid, + 'from': self._index_to_test_time(s[0]), + 'to': self._index_to_test_time(s[1]), + 'labeled': True, + 'deleted': False + } for s in segments] + return self._fill_task(uid, data, 'LEARN', analytic_unit_type, segments=segments) + + def _get_detect_task(self, test_data, cache): + uid, analytic_unit_type, values, _ = test_data + data = self._convert_values(values) + return self._fill_task(uid, data, 'DETECT', analytic_unit_type, cache=cache) + + def _get_test_dataset(self, pattern) -> tuple: + """ + pattern name: ([dataset values], [list of segments]) + + segment - (begin, end) - indexes in dataset values + returns dataset in format (data: List[int], segments: List[List[int]]) + """ + datasets = { + 'PEAK': ([0, 0, 1, 2, 3, 4, 3, 2, 1, 0, 0], [[2, 8]]), + 'JUMP': ([0, 0, 1, 2, 3, 4, 4, 4], [[1, 6]]), + 'DROP': ([4, 4, 4, 3, 2, 1, 0, 0], [[1, 6]]), + 'TROUGH': ([4, 4, 3, 2, 1, 0, 1, 2, 3, 4, 4], [[1, 9]]), + 'GENERAL': ([0, 0, 1, 2, 3, 4, 3, 2, 1, 0, 0], [[2, 8]]) + } + return datasets[pattern] + + async def _learn(self, task, manager=None) -> dict: + if not manager: manager = AnalyticUnitManager() + result = await manager.handle_analytic_task(task) + return result['payload']['cache'] + + async def _detect(self, task, manager=None) -> dict: + if not manager: manager = AnalyticUnitManager() + result = await manager.handle_analytic_task(task) + return result + + async def _test_detect(self, test_data, manager=None): + learn_task = self._get_learn_task(test_data) + cache = await self._learn(learn_task, manager) + detect_task = self._get_detect_task(test_data, cache) + result = await self._detect(detect_task, manager) + return result + + async def test_unit_manager(self): + test_data = TestData(get_random_id(), 'PEAK', [0,1,2,5,10,5,2,1,1,1,0,0,0,0], [[1,7]]) + manager = AnalyticUnitManager() + + with_manager = await self._test_detect(test_data, manager) + without_manager = await self._test_detect(test_data) + self.assertEqual(with_manager, without_manager) + + async def test_cache(self): + cache_attrs = { + 'PEAK': PeakModel().state.keys(), + 'JUMP': JumpModel().state.keys(), + 'DROP': DropModel().state.keys(), + 'TROUGH': TroughModel().state.keys(), + 'GENERAL': GeneralModel().state.keys() + } + + for pattern, attrs in cache_attrs.items(): + test_data = TestData(get_random_id(), pattern, *self._get_test_dataset(pattern)) + learn_task = self._get_learn_task(test_data) + cache = await self._learn(learn_task) + + for a in attrs: + self.assertTrue(a in cache.keys(), msg='{} not in cache keys: {}'.format(a, cache.keys()))