Browse Source

Add some sort of verification of incoming and outgoing data in analytics #227 (#281)

* fix find_confidence

* fix get_interval

* fix subtract_min_without_nan

* fix substract and get_convolve
pull/1/head
Alexandr Velikiy 6 years ago committed by rozetko
parent
commit
3cad93ca62
  1. 34
      analytics/analytics/utils/common.py

34
analytics/analytics/utils/common.py

@ -4,6 +4,7 @@ import scipy.signal
from scipy.fftpack import fft from scipy.fftpack import fft
from scipy.signal import argrelextrema from scipy.signal import argrelextrema
from scipy.stats import gaussian_kde from scipy.stats import gaussian_kde
from typing import Union
import utils import utils
def exponential_smoothing(series, alpha): def exponential_smoothing(series, alpha):
@ -267,7 +268,7 @@ def best_pat(pat_list, data, dir):
new_pat_list.append(ind) new_pat_list.append(ind)
return new_pat_list return new_pat_list
def find_nan_indexes(segment): def find_nan_indexes(segment: pd.Series) -> list:
nan_list = np.isnan(segment) nan_list = np.isnan(segment)
nan_indexes = [] nan_indexes = []
for i, val in enumerate(nan_list): for i, val in enumerate(nan_list):
@ -275,12 +276,23 @@ def find_nan_indexes(segment):
nan_indexes.append(i) nan_indexes.append(i)
return nan_indexes return nan_indexes
def nan_to_zero(segment, nan_list): def check_nan_values(segment: Union[pd.Series, list]) -> Union[pd.Series, list]:
for val in nan_list: nan_list = utils.find_nan_indexes(segment)
segment[val] = 0 if len(nan_list) > 0:
segment = utils.nan_to_zero(segment, nan_list)
return segment
def nan_to_zero(segment: Union[pd.Series, list], nan_list: list) -> Union[pd.Series, list]:
if type(segment) == pd.Series:
for val in nan_list:
segment.values[val] = 0
else:
for val in nan_list:
segment[val] = 0
return segment return segment
def find_confidence(segment: pd.Series) -> float: def find_confidence(segment: pd.Series) -> float:
segment = utils.check_nan_values(segment)
segment_min = min(segment) segment_min = min(segment)
segment_max = max(segment) segment_max = max(segment)
return 0.2 * (segment_max - segment_min) return 0.2 * (segment_max - segment_min)
@ -288,10 +300,19 @@ def find_confidence(segment: pd.Series) -> float:
def get_interval(data: pd.Series, center: int, window_size: int) -> pd.Series: def get_interval(data: pd.Series, center: int, window_size: int) -> pd.Series:
left_bound = center - window_size left_bound = center - window_size
right_bound = center + window_size + 1 right_bound = center + window_size + 1
if left_bound < 0:
left_bound = 0
if right_bound > len(data):
right_bound = len(data)
return data[left_bound: right_bound] return data[left_bound: right_bound]
def subtract_min_without_nan(segment: list) -> list: def subtract_min_without_nan(segment: pd.Series) -> pd.Series:
if not np.isnan(min(segment)): if len(segment) == 0:
return []
nan_list = utils.find_nan_indexes(segment)
if len(nan_list) > 0:
return segment
else:
segment = segment - min(segment) segment = segment - min(segment)
return segment return segment
@ -301,6 +322,7 @@ def get_convolve(segments: list, av_model: list, data: pd.Series, window_size: i
for segment in segments: for segment in segments:
labeled_segment = utils.get_interval(data, segment, window_size) labeled_segment = utils.get_interval(data, segment, window_size)
labeled_segment = utils.subtract_min_without_nan(labeled_segment) labeled_segment = utils.subtract_min_without_nan(labeled_segment)
labeled_segment = utils.check_nan_values(labeled_segment)
auto_convolve = scipy.signal.fftconvolve(labeled_segment, labeled_segment) auto_convolve = scipy.signal.fftconvolve(labeled_segment, labeled_segment)
convolve_segment = scipy.signal.fftconvolve(labeled_segment, av_model) convolve_segment = scipy.signal.fftconvolve(labeled_segment, av_model)
convolve_list.append(max(auto_convolve)) convolve_list.append(max(auto_convolve))

Loading…
Cancel
Save