Browse Source

Not-ending learning #264 (#484)

- optimize general model
- add logs
pull/1/head
Alexandr Velikiy 5 years ago committed by rozetko
parent
commit
e0cc56d425
  1. 4
      analytics/analytics/analytic_unit_manager.py
  2. 2
      analytics/analytics/analytic_unit_worker.py
  3. 13
      analytics/analytics/detectors/pattern_detector.py
  4. 4
      analytics/analytics/models/drop_model.py
  5. 59
      analytics/analytics/models/general_model.py
  6. 4
      analytics/analytics/models/jump_model.py
  7. 20
      analytics/analytics/models/model.py
  8. 4
      analytics/analytics/models/peak_model.py
  9. 4
      analytics/analytics/models/trough_model.py
  10. 1
      analytics/analytics/services/server_service.py
  11. 27
      analytics/analytics/utils/common.py
  12. 38
      analytics/tests/test_dataset.py
  13. 12
      analytics/tests/test_utils.py

4
analytics/analytics/analytic_unit_manager.py

@ -64,7 +64,7 @@ class AnalyticUnitManager:
returns payload or None returns payload or None
""" """
analytic_unit_id: AnalyticUnitId = task['analyticUnitId'] analytic_unit_id: AnalyticUnitId = task['analyticUnitId']
log.debug('Analytics get task with type: {} for unit: {}'.format(task['type'], analytic_unit_id))
if task['type'] == 'CANCEL': if task['type'] == 'CANCEL':
if analytic_unit_id in self.analytic_workers: if analytic_unit_id in self.analytic_workers:
self.analytic_workers[analytic_unit_id].cancel() self.analytic_workers[analytic_unit_id].cancel()
@ -93,11 +93,13 @@ class AnalyticUnitManager:
async def handle_analytic_task(self, task): async def handle_analytic_task(self, task):
try: try:
log.debug('Start handle_analytic_task with analytic unit: {}'.format(task['analyticUnitId']))
result_payload = await self.__handle_analytic_task(task) result_payload = await self.__handle_analytic_task(task)
result_message = { result_message = {
'status': 'SUCCESS', 'status': 'SUCCESS',
'payload': result_payload 'payload': result_payload
} }
log.debug('End correctly handle_analytic_task with anatytic unit: {}'.format(task['analyticUnitId']))
return result_message return result_message
except Exception as e: except Exception as e:
error_text = traceback.format_exc() error_text = traceback.format_exc()

2
analytics/analytics/analytic_unit_worker.py

@ -27,7 +27,7 @@ class AnalyticUnitWorker:
try: try:
new_cache: ModelCache = self._training_future.result(timeout = config.LEARNING_TIMEOUT) new_cache: ModelCache = self._training_future.result(timeout = config.LEARNING_TIMEOUT)
return new_cache return new_cache
except CancelledError as e: except CancelledError:
return cache return cache
except TimeoutError: except TimeoutError:
raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT))

13
analytics/analytics/detectors/pattern_detector.py

@ -42,9 +42,9 @@ class PatternDetector(Detector):
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache:
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
new_cache = self.model.fit(dataframe, segments, cache) new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id, cache)
if new_cache == None or len(new_cache) == 0: if new_cache == None or len(new_cache) == 0:
logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}'.format(dataframe, segments, cache)) logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}, analytic unit: {}'.format(dataframe, segments, cache, self.analytic_unit_id))
return { return {
'cache': new_cache 'cache': new_cache
} }
@ -52,7 +52,7 @@ class PatternDetector(Detector):
def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict:
logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe)))
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
detected = self.model.detect(dataframe, cache) detected = self.model.detect(dataframe, self.analytic_unit_id, cache)
segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']]
newCache = detected['cache'] newCache = detected['cache']
@ -66,13 +66,16 @@ class PatternDetector(Detector):
} }
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
logging.debug('Start recieve_data for analytic unit {}'.format(self.analytic_unit_id))
data_without_nan = data.dropna() data_without_nan = data.dropna()
if len(data_without_nan) == 0: if len(data_without_nan) == 0:
return None return None
self.bucket.receive_data(data_without_nan) self.bucket.receive_data(data_without_nan)
if not cache: cache = {} if cache == None:
logging.debug('Recieve_data cache is None for task {}'.format(self.analytic_unit_id))
cache = {}
bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.min_bucket_size) bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.min_bucket_size)
res = self.detect(self.bucket.data, cache) res = self.detect(self.bucket.data, cache)
@ -80,7 +83,7 @@ class PatternDetector(Detector):
if len(self.bucket.data) > bucket_size: if len(self.bucket.data) > bucket_size:
excess_data = len(self.bucket.data) - bucket_size excess_data = len(self.bucket.data) - bucket_size
self.bucket.drop_data(excess_data) self.bucket.drop_data(excess_data)
logging.debug('End recieve_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res))
if res: if res:
return res return res
else: else:

4
analytics/analytics/models/drop_model.py

@ -38,7 +38,7 @@ class DropModel(Model):
segment_center_index = utils.find_pattern_center(segment, start, 'drop') segment_center_index = utils.find_pattern_center(segment, start, 'drop')
return segment_center_index return segment_center_index
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state['WINDOW_SIZE']
@ -63,7 +63,7 @@ class DropModel(Model):
self.state['DROP_HEIGHT'] = int(min(learning_info['pattern_height'], default = 1)) self.state['DROP_HEIGHT'] = int(min(learning_info['pattern_height'], default = 1))
self.state['DROP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) self.state['DROP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1))
def do_detect(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame, id: str) -> list:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1)

59
analytics/analytics/models/general_model.py

@ -1,5 +1,5 @@
from models import Model from models import Model
from typing import Union, List, Generator
import utils import utils
import numpy as np import numpy as np
import pandas as pd import pandas as pd
@ -10,8 +10,9 @@ from scipy.stats.stats import pearsonr
import math import math
from scipy.stats import gaussian_kde from scipy.stats import gaussian_kde
from scipy.stats import norm from scipy.stats import norm
import logging
PEARSON_COEFF = 0.7 PEARSON_FACTOR = 0.7
class GeneralModel(Model): class GeneralModel(Model):
@ -26,8 +27,6 @@ class GeneralModel(Model):
'conv_del_min': 0, 'conv_del_min': 0,
'conv_del_max': 0, 'conv_del_max': 0,
} }
self.all_conv = []
self.all_corr = []
def get_model_type(self) -> (str, bool): def get_model_type(self) -> (str, bool):
model = 'general' model = 'general'
@ -40,7 +39,8 @@ class GeneralModel(Model):
center_ind = start + math.ceil((end - start) / 2) center_ind = start + math.ceil((end - start) / 2)
return center_ind return center_ind
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, AnalyticUnitId: str) -> None:
logging.debug('Start method do_fit for analytic unit: {}'.format(AnalyticUnitId))
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
last_pattern_center = self.state.get('pattern_center', []) last_pattern_center = self.state.get('pattern_center', [])
@ -61,44 +61,39 @@ class GeneralModel(Model):
self.state['convolve_min'], self.state['convolve_max'] = utils.get_min_max(convolve_list, self.state['WINDOW_SIZE'] / 3) self.state['convolve_min'], self.state['convolve_max'] = utils.get_min_max(convolve_list, self.state['WINDOW_SIZE'] / 3)
self.state['conv_del_min'], self.state['conv_del_max'] = utils.get_min_max(del_conv_list, self.state['WINDOW_SIZE']) self.state['conv_del_min'], self.state['conv_del_max'] = utils.get_min_max(del_conv_list, self.state['WINDOW_SIZE'])
logging.debug('Method do_fit completed correctly for analytic unit: {}'.format(AnalyticUnitId))
def do_detect(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame, AnalyticUnitId: str) -> List[int]:
logging.debug('Start method do_detect for analytic unit: {}'.format(AnalyticUnitId))
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
pat_data = self.state.get('pattern_model', []) pat_data = self.state.get('pattern_model', [])
if pat_data.count(0) == len(pat_data): if pat_data.count(0) == len(pat_data):
raise ValueError('Labeled patterns must not be empty') raise ValueError('Labeled patterns must not be empty')
self.all_conv = []
self.all_corr = []
window_size = self.state.get('WINDOW_SIZE', 0) window_size = self.state.get('WINDOW_SIZE', 0)
for i in range(window_size, len(data) - window_size): all_corr = utils.get_correlation_gen(data, window_size, pat_data)
watch_data = data[i - window_size: i + window_size + 1] all_corr_peaks = utils.find_peaks(all_corr, window_size * 2)
watch_data = utils.subtract_min_without_nan(watch_data)
conv = scipy.signal.fftconvolve(watch_data, pat_data)
correlation = pearsonr(watch_data, pat_data)
self.all_corr.append(correlation[0])
self.all_conv.append(max(conv))
all_conv_peaks = utils.peak_finder(self.all_conv, window_size * 2)
all_corr_peaks = utils.peak_finder(self.all_corr, window_size * 2)
filtered = self.__filter_detection(all_corr_peaks, data) filtered = self.__filter_detection(all_corr_peaks, data)
filtered = list(filtered)
logging.debug('Method do_detect completed correctly for analytic unit: {}'.format(AnalyticUnitId))
return set(item + window_size for item in filtered) return set(item + window_size for item in filtered)
def __filter_detection(self, segments: list, data: list): def __filter_detection(self, segments: Generator[int, None, None], data: pd.Series) -> Generator[int, None, None]:
if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0: if not self.state.get('pattern_center'):
return [] return []
delete_list = [] window_size = self.state.get('WINDOW_SIZE', 0)
for val in segments: pattern_model = self.state.get('pattern_model', [])
if self.all_conv[val] < self.state['convolve_min'] * 0.8: for ind, val in segments:
delete_list.append(val) watch_data = data[ind - window_size: ind + window_size + 1]
watch_data = utils.subtract_min_without_nan(watch_data)
convolve_segment = scipy.signal.fftconvolve(watch_data, pattern_model)
if len(convolve_segment) > 0:
watch_conv = max(convolve_segment)
else:
continue continue
if self.all_corr[val] < PEARSON_COEFF: if watch_conv < self.state['convolve_min'] * 0.8 or val < PEARSON_FACTOR:
delete_list.append(val)
continue continue
if (self.all_conv[val] < self.state['conv_del_max'] * 1.02 and self.all_conv[val] > self.state['conv_del_min'] * 0.98): if watch_conv < self.state['conv_del_max'] * 1.02 and watch_conv > self.state['conv_del_min'] * 0.98:
delete_list.append(val) continue
yield ind
for item in delete_list:
segments.remove(item)
return set(segments)

4
analytics/analytics/models/jump_model.py

@ -39,7 +39,7 @@ class JumpModel(Model):
segment_center_index = utils.find_pattern_center(segment, start, 'jump') segment_center_index = utils.find_pattern_center(segment, start, 'jump')
return segment_center_index return segment_center_index
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state['WINDOW_SIZE']
@ -64,7 +64,7 @@ class JumpModel(Model):
self.state['JUMP_HEIGHT'] = float(min(learning_info['pattern_height'], default = 1)) self.state['JUMP_HEIGHT'] = float(min(learning_info['pattern_height'], default = 1))
self.state['JUMP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1)) self.state['JUMP_LENGTH'] = int(max(learning_info['pattern_width'], default = 1))
def do_detect(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame, id: str) -> list:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1) possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1)

20
analytics/analytics/models/model.py

@ -62,8 +62,10 @@ class Model(ABC):
@abstractmethod @abstractmethod
def get_model_type(self) -> (str, bool): def get_model_type(self) -> (str, bool):
pass pass
def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache: # TODO: id: str -> id: AnalyticUnitId in all models
def fit(self, dataframe: pd.DataFrame, segments: list, id: str, cache: Optional[ModelCache]) -> ModelCache:
logging.debug('Start method fit for analytic unit {}'.format(id))
data = dataframe['value'] data = dataframe['value']
if cache != None and len(cache) > 0: if cache != None and len(cache) > 0:
self.state = cache self.state = cache
@ -84,29 +86,29 @@ class Model(ABC):
self.state['WINDOW_SIZE'] = math.ceil(max_length / 2) if max_length else 0 self.state['WINDOW_SIZE'] = math.ceil(max_length / 2) if max_length else 0
model, model_type = self.get_model_type() model, model_type = self.get_model_type()
learning_info = self.get_parameters_from_segments(dataframe, labeled, deleted, model, model_type) learning_info = self.get_parameters_from_segments(dataframe, labeled, deleted, model, model_type)
self.do_fit(dataframe, labeled, deleted, learning_info) self.do_fit(dataframe, labeled, deleted, learning_info, id)
logging.debug('fit complete successful with self.state: {}'.format(self.state)) logging.debug('fit complete successful with self.state: {} for analytic unit: {}'.format(self.state, id))
return self.state return self.state
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: def detect(self, dataframe: pd.DataFrame, id: str, cache: Optional[ModelCache]) -> dict:
#If cache is None or empty dict - default parameters will be used instead #If cache is None or empty dict - default parameters will be used instead
if cache != None and len(cache) > 0: if cache != None and len(cache) > 0:
self.state = cache self.state = cache
else: else:
logging.debug('get empty cache in detect') logging.debug('Get empty cache in detect')
if not self.state: if not self.state:
logging.warning('self.state is empty - skip do_detect') logging.warning('self.state is empty - skip do_detect')
return { return {
'segments': [], 'segments': [],
'cache': {}, 'cache': {},
} }
result = self.do_detect(dataframe) result = self.do_detect(dataframe, id)
segments = [( segments = [(
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x - 1]), utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x - 1]),
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x + 1]) utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x + 1])
) for x in result] ) for x in result]
if not self.state: if not self.state:
logging.warning('return empty self.state after detect') logging.warning('Return empty self.state after detect')
return { return {
'segments': segments, 'segments': segments,
'cache': self.state, 'cache': self.state,
@ -122,6 +124,7 @@ class Model(ABC):
raise ValueError('got non-dict as state for update fiting result: {}'.format(state)) raise ValueError('got non-dict as state for update fiting result: {}'.format(state))
def get_parameters_from_segments(self, dataframe: pd.DataFrame, labeled: list, deleted: list, model: str, model_type: bool) -> dict: def get_parameters_from_segments(self, dataframe: pd.DataFrame, labeled: list, deleted: list, model: str, model_type: bool) -> dict:
logging.debug('Start parsing segments')
learning_info = { learning_info = {
'confidence': [], 'confidence': [],
'patterns_list': [], 'patterns_list': [],
@ -150,5 +153,6 @@ class Model(ABC):
learning_info['pattern_height'].append(pattern_height) learning_info['pattern_height'].append(pattern_height)
learning_info['pattern_width'].append(pattern_length) learning_info['pattern_width'].append(pattern_length)
learning_info['patterns_value'].append(aligned_segment.values[self.state['WINDOW_SIZE']]) learning_info['patterns_value'].append(aligned_segment.values[self.state['WINDOW_SIZE']])
logging.debug('Parsing segments ended correctly with learning_info: {}'.format(learning_info))
return learning_info return learning_info

4
analytics/analytics/models/peak_model.py

@ -39,7 +39,7 @@ class PeakModel(Model):
segment = data[start: end] segment = data[start: end]
return segment.idxmax() return segment.idxmax()
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state['WINDOW_SIZE']
@ -66,7 +66,7 @@ class PeakModel(Model):
self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list)
def do_detect(self, dataframe: pd.DataFrame): def do_detect(self, dataframe: pd.DataFrame, id: str):
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data

4
analytics/analytics/models/trough_model.py

@ -39,7 +39,7 @@ class TroughModel(Model):
segment = data[start: end] segment = data[start: end]
return segment.idxmin() return segment.idxmin()
def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict) -> None: def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segments: list, learning_info: dict, id: str) -> None:
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = self.state['WINDOW_SIZE'] window_size = self.state['WINDOW_SIZE']
@ -66,7 +66,7 @@ class TroughModel(Model):
self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list) self._update_fiting_result(self.state, learning_info['confidence'], convolve_list, del_conv_list, height_list)
def do_detect(self, dataframe: pd.DataFrame): def do_detect(self, dataframe: pd.DataFrame, id: str):
data = utils.cut_dataframe(dataframe) data = utils.cut_dataframe(dataframe)
data = data['value'] data = data['value']
window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data

1
analytics/analytics/services/server_service.py

@ -36,7 +36,6 @@ class ServerMessage:
payload = json['payload'] payload = json['payload']
if 'requestId' in json: if 'requestId' in json:
request_id = json['requestId'] request_id = json['requestId']
return ServerMessage(method, payload, request_id) return ServerMessage(method, payload, request_id)
class ServerService: class ServerService:

27
analytics/analytics/utils/common.py

@ -6,9 +6,11 @@ from scipy.signal import argrelextrema
from scipy.stats import gaussian_kde from scipy.stats import gaussian_kde
from scipy.stats.stats import pearsonr from scipy.stats.stats import pearsonr
import math import math
from typing import Union from typing import Union, List, Generator
import utils import utils
import logging import logging
from itertools import islice
from collections import deque
SHIFT_FACTOR = 0.05 SHIFT_FACTOR = 0.05
CONFIDENCE_FACTOR = 0.2 CONFIDENCE_FACTOR = 0.2
@ -72,12 +74,15 @@ def timestamp_to_index(dataframe, timestamp):
raise ValueError('Dataframe has no appropriate timestamp {}'.format(timestamp)) raise ValueError('Dataframe has no appropriate timestamp {}'.format(timestamp))
return time_ind return time_ind
def peak_finder(data, size): def find_peaks(data: Generator[float, None, None], size: int) -> Generator[float, None, None]:
all_max = [] window = deque(islice(data, size * 2 + 1))
for i in range(size, len(data) - size): for i, v in enumerate(data, size):
if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]: current = window[size]
all_max.append(i) #TODO: remove max() from loop
return all_max if current == max(window) and current != window[size + 1]:
yield i, current
window.append(v)
window.popleft()
def ar_mean(numbers): def ar_mean(numbers):
return float(sum(numbers)) / max(len(numbers), 1) return float(sum(numbers)) / max(len(numbers), 1)
@ -223,6 +228,14 @@ def get_convolve(segments: list, av_model: list, data: pd.Series, window_size: i
convolve_list.append(max(convolve_segment)) convolve_list.append(max(convolve_segment))
return convolve_list return convolve_list
def get_correlation_gen(data: pd.Series, window_size: int, pattern_model: List[float]) -> Generator[float, None, None]:
#Get a new dataset by correlating between a sliding window in data and pattern_model
for i in range(window_size, len(data) - window_size):
watch_data = data[i - window_size: i + window_size + 1]
correlation = pearsonr(watch_data, pattern_model)
if len(correlation) > 0:
yield(correlation[0])
def get_correlation(segments: list, av_model: list, data: pd.Series, window_size: int) -> list: def get_correlation(segments: list, av_model: list, data: pd.Series, window_size: int) -> list:
labeled_segment = [] labeled_segment = []
correlation_list = [] correlation_list = []

38
analytics/tests/test_dataset.py

@ -23,7 +23,7 @@ class TestDataset(unittest.TestCase):
try: try:
for model in model_instances: for model in model_instances:
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -36,7 +36,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.PeakModel() model = models.PeakModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -49,7 +49,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.JumpModel() model = models.JumpModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -62,7 +62,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.TroughModel() model = models.TroughModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -75,7 +75,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.DropModel() model = models.DropModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -88,7 +88,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.GeneralModel() model = models.GeneralModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -101,7 +101,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.JumpModel() model = models.JumpModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -114,7 +114,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.DropModel() model = models.DropModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -126,7 +126,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.JumpModel() model = models.JumpModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -166,7 +166,7 @@ class TestDataset(unittest.TestCase):
try: try:
for model in model_instances: for model in model_instances:
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments, 'test', dict())
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -175,11 +175,11 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}]
model = models.GeneralModel() model = models.GeneralModel()
model.fit(dataframe, segments, dict()) model.fit(dataframe, segments,'test', dict())
result = len(data_val) + 1 result = len(data_val) + 1
for _ in range(2): for _ in range(2):
model.do_detect(dataframe) model.do_detect(dataframe,'test')
max_pattern_index = max(model.do_detect(dataframe)) max_pattern_index = max(model.do_detect(dataframe, 'test'))
self.assertLessEqual(max_pattern_index, result) self.assertLessEqual(max_pattern_index, result)
def test_peak_model_for_cache(self): def test_peak_model_for_cache(self):
@ -197,7 +197,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}]
model = models.PeakModel() model = models.PeakModel()
result = model.fit(dataframe, segments, cache) result = model.fit(dataframe, segments, 'test', cache)
self.assertEqual(len(result['pattern_center']), 3) self.assertEqual(len(result['pattern_center']), 3)
def test_trough_model_for_cache(self): def test_trough_model_for_cache(self):
@ -215,7 +215,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}]
model = models.TroughModel() model = models.TroughModel()
result = model.fit(dataframe, segments, cache) result = model.fit(dataframe, segments, 'test', cache)
self.assertEqual(len(result['pattern_center']), 3) self.assertEqual(len(result['pattern_center']), 3)
def test_jump_model_for_cache(self): def test_jump_model_for_cache(self):
@ -233,7 +233,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val) dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}] segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}]
model = models.JumpModel() model = models.JumpModel()
result = model.fit(dataframe, segments, cache) result = model.fit(dataframe, segments, 'test', cache)
self.assertEqual(len(result['pattern_center']), 3) self.assertEqual(len(result['pattern_center']), 3)
def test_models_for_pattern_model_cache(self): def test_models_for_pattern_model_cache(self):
@ -253,7 +253,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.DropModel() model = models.DropModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.fit(dataframe, segments, cache) model.fit(dataframe, segments, 'test', cache)
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly'.format(model_name)) self.fail('Model {} raised unexpectedly'.format(model_name))
@ -285,7 +285,7 @@ class TestDataset(unittest.TestCase):
try: try:
model = models.GeneralModel() model = models.GeneralModel()
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.detect(data, cache) model.detect(data, 'test', cache)
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly with av_model {} and window size {}'.format(model_name, pattern_model, ws)) self.fail('Model {} raised unexpectedly with av_model {} and window size {}'.format(model_name, pattern_model, ws))
@ -323,7 +323,7 @@ class TestDataset(unittest.TestCase):
try: try:
for model in model_instances: for model in model_instances:
model_name = model.__class__.__name__ model_name = model.__class__.__name__
model.detect(data, cache) model.detect(data, 'test', cache)
except ValueError: except ValueError:
self.fail('Model {} raised unexpectedly with dataset {} and cache {}'.format(model_name, data['value'], cache)) self.fail('Model {} raised unexpectedly with dataset {} and cache {}'.format(model_name, data['value'], cache))

12
analytics/tests/test_utils.py

@ -3,6 +3,7 @@ import unittest
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import math import math
import random
RELATIVE_TOLERANCE = 1e-1 RELATIVE_TOLERANCE = 1e-1
@ -236,6 +237,17 @@ class TestUtils(unittest.TestCase):
data = [] data = []
result = [] result = []
self.assertEqual(utils.find_nan_indexes(data), result) self.assertEqual(utils.find_nan_indexes(data), result)
def test_create_correlation_data(self):
data = [random.randint(10, 999) for _ in range(10000)]
data = pd.Series(data)
pattern_model = [100, 200, 500, 300, 100]
ws = 2
result = 6000
corr_data = utils.get_correlation_gen(data, ws, pattern_model)
corr_data = list(corr_data)
self.assertGreaterEqual(len(corr_data), result)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

Loading…
Cancel
Save