diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index 733fc04..64b9f27 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -64,7 +64,7 @@ class AnalyticUnitManager: returns payload or None """ analytic_unit_id: AnalyticUnitId = task['analyticUnitId'] - + log.debug('Analytics get task with type: {} for unit: {}'.format(task['type'], analytic_unit_id)) if task['type'] == 'CANCEL': if analytic_unit_id in self.analytic_workers: self.analytic_workers[analytic_unit_id].cancel() @@ -93,11 +93,13 @@ class AnalyticUnitManager: async def handle_analytic_task(self, task): try: + log.debug('Start handle_analytic_task with analytic unit: {}'.format(task['analyticUnitId'])) result_payload = await self.__handle_analytic_task(task) result_message = { 'status': 'SUCCESS', 'payload': result_payload } + log.debug('End correctly handle_analytic_task with anatytic unit: {}'.format(task['analyticUnitId'])) return result_message except Exception as e: error_text = traceback.format_exc() diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index faf7a7b..032be69 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -27,7 +27,7 @@ class AnalyticUnitWorker: try: new_cache: ModelCache = self._training_future.result(timeout = config.LEARNING_TIMEOUT) return new_cache - except CancelledError as e: + except CancelledError: return cache except TimeoutError: raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index af895ae..b84a348 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -42,9 +42,9 @@ class PatternDetector(Detector): def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: # TODO: pass only part of dataframe that has segments - new_cache = self.model.fit(dataframe, segments, cache) + new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id, cache) if new_cache == None or len(new_cache) == 0: - logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}'.format(dataframe, segments, cache)) + logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}, analytic unit: {}'.format(dataframe, segments, cache, self.analytic_unit_id)) return { 'cache': new_cache } @@ -52,7 +52,7 @@ class PatternDetector(Detector): def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) - detected = self.model.detect(dataframe, cache) + detected = self.model.detect(dataframe, self.analytic_unit_id, cache) segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] newCache = detected['cache'] @@ -66,13 +66,16 @@ class PatternDetector(Detector): } def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: + logging.debug('Start recieve_data for analytic unit {}'.format(self.analytic_unit_id)) data_without_nan = data.dropna() if len(data_without_nan) == 0: return None self.bucket.receive_data(data_without_nan) - if not cache: cache = {} + if cache == None: + logging.debug('Recieve_data cache is None for task {}'.format(self.analytic_unit_id)) + cache = {} bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.min_bucket_size) res = self.detect(self.bucket.data, cache) @@ -80,7 +83,7 @@ class PatternDetector(Detector): if len(self.bucket.data) > bucket_size: excess_data = len(self.bucket.data) - bucket_size self.bucket.drop_data(excess_data) - + logging.debug('End recieve_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res)) if res: return res else: diff --git a/analytics/analytics/models/drop_model.py b/analytics/analytics/models/drop_model.py index 77d81cb..72b17f8 100644 --- a/analytics/analytics/models/drop_model.py +++ b/analytics/analytics/models/drop_model.py @@ -38,7 +38,7 @@ class DropModel(Model): segment_center_index = utils.find_pattern_center(segment, start, 'drop') return segment_center_index - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] @@ -63,7 +63,7 @@ class DropModel(Model): self.state['DROP_HEIGHT'] = int(min(learning_info['pattern_height'], default = 1)) self.state['DROP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) - def do_detect(self, dataframe: pd.DataFrame) -> list: + def do_detect(self, dataframe: pd.DataFrame, id: str) -> list: data = utils.cut_dataframe(dataframe) data = data['value'] possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) diff --git a/analytics/analytics/models/general_model.py b/analytics/analytics/models/general_model.py index 11a1b18..577df07 100644 --- a/analytics/analytics/models/general_model.py +++ b/analytics/analytics/models/general_model.py @@ -1,5 +1,5 @@ from models import Model - +from typing import Union, List, Generator import utils import numpy as np import pandas as pd @@ -10,8 +10,9 @@ from scipy.stats.stats import pearsonr import math from scipy.stats import gaussian_kde from scipy.stats import norm +import logging -PEARSON_COEFF = 0.7 +PEARSON_FACTOR = 0.7 class GeneralModel(Model): @@ -26,8 +27,6 @@ class GeneralModel(Model): 'conv_del_min': 0, 'conv_del_max': 0, } - self.all_conv = [] - self.all_corr = [] def get_model_type(self) -> (str, bool): model = 'general' @@ -40,7 +39,8 @@ class GeneralModel(Model): center_ind = start + math.ceil((end - start) / 2) return center_ind - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, AnalyticUnitId: str) -> None: + logging.debug('Start method do_fit for analytic unit: {}'.format(AnalyticUnitId)) data = utils.cut_dataframe(dataframe) data = data['value'] last_pattern_center = self.state.get('pattern_center', []) @@ -61,44 +61,39 @@ class GeneralModel(Model): self.state['convolve_min'], self.state['convolve_max'] = utils.get_min_max(convolve_list, self.state['WINDOW_SIZE'] / 3) self.state['conv_del_min'], self.state['conv_del_max'] = utils.get_min_max(del_conv_list, self.state['WINDOW_SIZE']) + logging.debug('Method do_fit completed correctly for analytic unit: {}'.format(AnalyticUnitId)) - def do_detect(self, dataframe: pd.DataFrame) -> list: + def do_detect(self, dataframe: pd.DataFrame, AnalyticUnitId: str) -> List[int]: + logging.debug('Start method do_detect for analytic unit: {}'.format(AnalyticUnitId)) data = utils.cut_dataframe(dataframe) data = data['value'] pat_data = self.state.get('pattern_model', []) if pat_data.count(0) == len(pat_data): raise ValueError('Labeled patterns must not be empty') - self.all_conv = [] - self.all_corr = [] window_size = self.state.get('WINDOW_SIZE', 0) - for i in range(window_size, len(data) - window_size): - watch_data = data[i - window_size: i + window_size + 1] - watch_data = utils.subtract_min_without_nan(watch_data) - conv = scipy.signal.fftconvolve(watch_data, pat_data) - correlation = pearsonr(watch_data, pat_data) - self.all_corr.append(correlation[0]) - self.all_conv.append(max(conv)) - all_conv_peaks = utils.peak_finder(self.all_conv, window_size * 2) - all_corr_peaks = utils.peak_finder(self.all_corr, window_size * 2) + all_corr = utils.get_correlation_gen(data, window_size, pat_data) + all_corr_peaks = utils.find_peaks(all_corr, window_size * 2) filtered = self.__filter_detection(all_corr_peaks, data) + filtered = list(filtered) + logging.debug('Method do_detect completed correctly for analytic unit: {}'.format(AnalyticUnitId)) return set(item + window_size for item in filtered) - def __filter_detection(self, segments: list, data: list): - if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: + def __filter_detection(self, segments: Generator[int, None, None], data: pd.Series) -> Generator[int, None, None]: + if not self.state.get('pattern_center'): return [] - delete_list = [] - for val in segments: - if self.all_conv[val] < self.state['convolve_min'] * 0.8: - delete_list.append(val) + window_size = self.state.get('WINDOW_SIZE', 0) + pattern_model = self.state.get('pattern_model', []) + for ind, val in segments: + watch_data = data[ind - window_size: ind + window_size + 1] + watch_data = utils.subtract_min_without_nan(watch_data) + convolve_segment = scipy.signal.fftconvolve(watch_data, pattern_model) + if len(convolve_segment) > 0: + watch_conv = max(convolve_segment) + else: continue - if self.all_corr[val] < PEARSON_COEFF: - delete_list.append(val) + if watch_conv < self.state['convolve_min'] * 0.8 or val < PEARSON_FACTOR: continue - if (self.all_conv[val] < self.state['conv_del_max'] * 1.02 and self.all_conv[val] > self.state['conv_del_min'] * 0.98): - delete_list.append(val) - - for item in delete_list: - segments.remove(item) - - return set(segments) + if watch_conv < self.state['conv_del_max'] * 1.02 and watch_conv > self.state['conv_del_min'] * 0.98: + continue + yield ind diff --git a/analytics/analytics/models/jump_model.py b/analytics/analytics/models/jump_model.py index ada4a3e..68d04bd 100644 --- a/analytics/analytics/models/jump_model.py +++ b/analytics/analytics/models/jump_model.py @@ -39,7 +39,7 @@ class JumpModel(Model): segment_center_index = utils.find_pattern_center(segment, start, 'jump') return segment_center_index - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] @@ -64,7 +64,7 @@ class JumpModel(Model): self.state['JUMP_HEIGHT'] = float(min(learning_info['pattern_height'], default = 1)) self.state['JUMP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) - def do_detect(self, dataframe: pd.DataFrame) -> list: + def do_detect(self, dataframe: pd.DataFrame, id: str) -> list: data = utils.cut_dataframe(dataframe) data = data['value'] possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1) diff --git a/analytics/analytics/models/model.py b/analytics/analytics/models/model.py index 2d15591..93cdab3 100644 --- a/analytics/analytics/models/model.py +++ b/analytics/analytics/models/model.py @@ -62,8 +62,10 @@ class Model(ABC): @abstractmethod def get_model_type(self) -> (str, bool): pass - - def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache: + + # TODO: id: str -> id: AnalyticUnitId in all models + def fit(self, dataframe: pd.DataFrame, segments: list, id: str, cache: Optional[ModelCache]) -> ModelCache: + logging.debug('Start method fit for analytic unit {}'.format(id)) data = dataframe['value'] if cache != None and len(cache) > 0: self.state = cache @@ -84,29 +86,29 @@ class Model(ABC): self.state['WINDOW_SIZE'] = math.ceil(max_length / 2) if max_length else 0 model, model_type = self.get_model_type() learning_info = self.get_parameters_from_segments(dataframe, labeled, deleted, model, model_type) - self.do_fit(dataframe, labeled, deleted, learning_info) - logging.debug('fit complete successful with self.state: {}'.format(self.state)) + self.do_fit(dataframe, labeled, deleted, learning_info, id) + logging.debug('fit complete successful with self.state: {} for analytic unit: {}'.format(self.state, id)) return self.state - def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: + def detect(self, dataframe: pd.DataFrame, id: str, cache: Optional[ModelCache]) -> dict: #If cache is None or empty dict - default parameters will be used instead if cache != None and len(cache) > 0: self.state = cache else: - logging.debug('get empty cache in detect') + logging.debug('Get empty cache in detect') if not self.state: logging.warning('self.state is empty - skip do_detect') return { 'segments': [], 'cache': {}, } - result = self.do_detect(dataframe) + result = self.do_detect(dataframe, id) segments = [( utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x - 1]), utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x + 1]) ) for x in result] if not self.state: - logging.warning('return empty self.state after detect') + logging.warning('Return empty self.state after detect') return { 'segments': segments, 'cache': self.state, @@ -122,6 +124,7 @@ class Model(ABC): raise ValueError('got non-dict as state for update fiting result: {}'.format(state)) def get_parameters_from_segments(self, dataframe: pd.DataFrame, labeled: list, deleted: list, model: str, model_type: bool) -> dict: + logging.debug('Start parsing segments') learning_info = { 'confidence': [], 'patterns_list': [], @@ -150,5 +153,6 @@ class Model(ABC): learning_info['pattern_height'].append(pattern_height) learning_info['pattern_width'].append(pattern_length) learning_info['patterns_value'].append(aligned_segment.values[self.state['WINDOW_SIZE']]) + logging.debug('Parsing segments ended correctly with learning_info: {}'.format(learning_info)) return learning_info diff --git a/analytics/analytics/models/peak_model.py b/analytics/analytics/models/peak_model.py index 46fecd1..1af6711 100644 --- a/analytics/analytics/models/peak_model.py +++ b/analytics/analytics/models/peak_model.py @@ -39,7 +39,7 @@ class PeakModel(Model): segment = data[start: end] return segment.idxmax() - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] @@ -66,7 +66,7 @@ class PeakModel(Model): self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) - def do_detect(self, dataframe: pd.DataFrame): + def do_detect(self, dataframe: pd.DataFrame, id: str): data = utils.cut_dataframe(dataframe) data = data['value'] window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data diff --git a/analytics/analytics/models/trough_model.py b/analytics/analytics/models/trough_model.py index 9d58b7b..e7a7d5d 100644 --- a/analytics/analytics/models/trough_model.py +++ b/analytics/analytics/models/trough_model.py @@ -39,7 +39,7 @@ class TroughModel(Model): segment = data[start: end] return segment.idxmin() - def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: + def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None: data = utils.cut_dataframe(dataframe) data = data['value'] window_size = self.state['WINDOW_SIZE'] @@ -66,7 +66,7 @@ class TroughModel(Model): self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) - def do_detect(self, dataframe: pd.DataFrame): + def do_detect(self, dataframe: pd.DataFrame, id: str): data = utils.cut_dataframe(dataframe) data = data['value'] window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data diff --git a/analytics/analytics/services/server_service.py b/analytics/analytics/services/server_service.py index 93c435a..26b8655 100644 --- a/analytics/analytics/services/server_service.py +++ b/analytics/analytics/services/server_service.py @@ -36,7 +36,6 @@ class ServerMessage: payload = json['payload'] if 'requestId' in json: request_id = json['requestId'] - return ServerMessage(method, payload, request_id) class ServerService: diff --git a/analytics/analytics/utils/common.py b/analytics/analytics/utils/common.py index 39bd8f5..e13e9cb 100644 --- a/analytics/analytics/utils/common.py +++ b/analytics/analytics/utils/common.py @@ -6,9 +6,11 @@ from scipy.signal import argrelextrema from scipy.stats import gaussian_kde from scipy.stats.stats import pearsonr import math -from typing import Union +from typing import Union, List, Generator import utils import logging +from itertools import islice +from collections import deque SHIFT_FACTOR = 0.05 CONFIDENCE_FACTOR = 0.2 @@ -72,12 +74,15 @@ def timestamp_to_index(dataframe, timestamp): raise ValueError('Dataframe has no appropriate timestamp {}'.format(timestamp)) return time_ind -def peak_finder(data, size): - all_max = [] - for i in range(size, len(data) - size): - if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]: - all_max.append(i) - return all_max +def find_peaks(data: Generator[float, None, None], size: int) -> Generator[float, None, None]: + window = deque(islice(data, size * 2 + 1)) + for i, v in enumerate(data, size): + current = window[size] + #TODO: remove max() from loop + if current == max(window) and current != window[size + 1]: + yield i, current + window.append(v) + window.popleft() def ar_mean(numbers): return float(sum(numbers)) / max(len(numbers), 1) @@ -223,6 +228,14 @@ def get_convolve(segments: list, av_model: list, data: pd.Series, window_size: i convolve_list.append(max(convolve_segment)) return convolve_list +def get_correlation_gen(data: pd.Series, window_size: int, pattern_model: List[float]) -> Generator[float, None, None]: + #Get a new dataset by correlating between a sliding window in data and pattern_model + for i in range(window_size, len(data) - window_size): + watch_data = data[i - window_size: i + window_size + 1] + correlation = pearsonr(watch_data, pattern_model) + if len(correlation) > 0: + yield(correlation[0]) + def get_correlation(segments: list, av_model: list, data: pd.Series, window_size: int) -> list: labeled_segment = [] correlation_list = [] diff --git a/analytics/tests/test_dataset.py b/analytics/tests/test_dataset.py index 6d694eb..b59a1d4 100644 --- a/analytics/tests/test_dataset.py +++ b/analytics/tests/test_dataset.py @@ -23,7 +23,7 @@ class TestDataset(unittest.TestCase): try: for model in model_instances: model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -36,7 +36,7 @@ class TestDataset(unittest.TestCase): try: model = models.PeakModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -49,7 +49,7 @@ class TestDataset(unittest.TestCase): try: model = models.JumpModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -62,7 +62,7 @@ class TestDataset(unittest.TestCase): try: model = models.TroughModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -75,7 +75,7 @@ class TestDataset(unittest.TestCase): try: model = models.DropModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -88,7 +88,7 @@ class TestDataset(unittest.TestCase): try: model = models.GeneralModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -101,7 +101,7 @@ class TestDataset(unittest.TestCase): try: model = models.JumpModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -114,7 +114,7 @@ class TestDataset(unittest.TestCase): try: model = models.DropModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -126,7 +126,7 @@ class TestDataset(unittest.TestCase): try: model = models.JumpModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -166,7 +166,7 @@ class TestDataset(unittest.TestCase): try: for model in model_instances: model_name = model.__class__.__name__ - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments, 'test', dict()) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -175,11 +175,11 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}] model = models.GeneralModel() - model.fit(dataframe, segments, dict()) + model.fit(dataframe, segments,'test', dict()) result = len(data_val) + 1 for _ in range(2): - model.do_detect(dataframe) - max_pattern_index = max(model.do_detect(dataframe)) + model.do_detect(dataframe,'test') + max_pattern_index = max(model.do_detect(dataframe, 'test')) self.assertLessEqual(max_pattern_index, result) def test_peak_model_for_cache(self): @@ -197,7 +197,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] model = models.PeakModel() - result = model.fit(dataframe, segments, cache) + result = model.fit(dataframe, segments, 'test', cache) self.assertEqual(len(result['pattern_center']), 3) def test_trough_model_for_cache(self): @@ -215,7 +215,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] model = models.TroughModel() - result = model.fit(dataframe, segments, cache) + result = model.fit(dataframe, segments, 'test', cache) self.assertEqual(len(result['pattern_center']), 3) def test_jump_model_for_cache(self): @@ -233,7 +233,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}] model = models.JumpModel() - result = model.fit(dataframe, segments, cache) + result = model.fit(dataframe, segments, 'test', cache) self.assertEqual(len(result['pattern_center']), 3) def test_models_for_pattern_model_cache(self): @@ -253,7 +253,7 @@ class TestDataset(unittest.TestCase): try: model = models.DropModel() model_name = model.__class__.__name__ - model.fit(dataframe, segments, cache) + model.fit(dataframe, segments, 'test', cache) except ValueError: self.fail('Model {} raised unexpectedly'.format(model_name)) @@ -285,7 +285,7 @@ class TestDataset(unittest.TestCase): try: model = models.GeneralModel() model_name = model.__class__.__name__ - model.detect(data, cache) + model.detect(data, 'test', cache) except ValueError: self.fail('Model {} raised unexpectedly with av_model {} and window size {}'.format(model_name, pattern_model, ws)) @@ -323,7 +323,7 @@ class TestDataset(unittest.TestCase): try: for model in model_instances: model_name = model.__class__.__name__ - model.detect(data, cache) + model.detect(data, 'test', cache) except ValueError: self.fail('Model {} raised unexpectedly with dataset {} and cache {}'.format(model_name, data['value'], cache)) diff --git a/analytics/tests/test_utils.py b/analytics/tests/test_utils.py index ed4cceb..a5f230e 100644 --- a/analytics/tests/test_utils.py +++ b/analytics/tests/test_utils.py @@ -3,6 +3,7 @@ import unittest import numpy as np import pandas as pd import math +import random RELATIVE_TOLERANCE = 1e-1 @@ -236,6 +237,17 @@ class TestUtils(unittest.TestCase): data = [] result = [] self.assertEqual(utils.find_nan_indexes(data), result) + + def test_create_correlation_data(self): + data = [random.randint(10, 999) for _ in range(10000)] + data = pd.Series(data) + pattern_model = [100, 200, 500, 300, 100] + ws = 2 + result = 6000 + corr_data = utils.get_correlation_gen(data, ws, pattern_model) + corr_data = list(corr_data) + self.assertGreaterEqual(len(corr_data), result) + if __name__ == '__main__': unittest.main()