You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

328 lines
12 KiB

import numpy as np
import pandas as pd
import scipy.signal
from scipy.fftpack import fft
from scipy.signal import argrelextrema
from scipy.stats import gaussian_kde
from scipy.stats.stats import pearsonr
from typing import Union
import utils
SHIFT_FACTOR = 0.05
CONFIDENCE_FACTOR = 0.2
SMOOTHING_FACTOR = 5
def exponential_smoothing(series, alpha):
result = [series[0]]
if np.isnan(result):
result = [0]
for n in range(1, len(series)):
if np.isnan(series[n]):
series[n] = 0
result.append(alpha * series[n] + (1 - alpha) * result[n - 1])
return result
def anomalies_to_timestamp(anomalies):
for anomaly in anomalies:
anomaly['from'] = int(anomaly['from'].timestamp() * 1000)
anomaly['to'] = int(anomaly['to'].timestamp() * 1000)
return anomalies
def segments_box(segments):
max_time = 0
min_time = float("inf")
for segment in segments:
min_time = min(min_time, segment['from'])
max_time = max(max_time, segment['to'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
def find_pattern(data: pd.Series, height: float, length: int, pattern_type: str) -> list:
pattern_list = []
right_bound = len(data) - length - 1
for i in range(right_bound):
for x in range(1, length):
if pattern_type == 'jump':
if(data[i + x] > data[i] + height):
pattern_list.append(i)
elif pattern_type == 'drop':
if(data[i + x] < data[i] - height):
pattern_list.append(i)
return pattern_list
def find_jump(data, height, lenght):
j_list = []
for i in range(len(data)-lenght-1):
for x in range(1, lenght):
if(data[i + x] > data[i] + height):
j_list.append(i)
return(j_list)
def find_drop(data, height, length):
d_list = []
for i in range(len(data)-length-1):
for x in range(1, length):
if(data[i + x] < data[i] - height):
d_list.append(i)
return(d_list)
def timestamp_to_index(dataframe, timestamp):
data = dataframe['timestamp']
for i in range(len(data)):
if data[i] >= timestamp:
return i
def peak_finder(data, size):
all_max = []
for i in range(size, len(data) - size):
if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]:
all_max.append(i)
return all_max
def ar_mean(numbers):
return float(sum(numbers)) / max(len(numbers), 1)
def get_av_model(patterns_list):
if len(patterns_list) == 0:
return []
x = len(patterns_list[0])
if len(patterns_list) > 1 and len(patterns_list[1]) != x:
raise NameError(
'All elements of patterns_list should have same length')
model_pat = []
for i in range(x):
av_val = []
for j in patterns_list:
av_val.append(j.values[i])
model_pat.append(ar_mean(av_val))
return model_pat
def close_filtering(pattern_list, win_size):
if len(pattern_list) == 0:
return []
s = [[pattern_list[0]]]
k = 0
for i in range(1, len(pattern_list)):
if pattern_list[i] - win_size <= s[k][-1]:
s[k].append(pattern_list[i])
else:
k += 1
s.append([pattern_list[i]])
return s
def best_pattern(pattern_list: list, data: pd.Series, dir: str) -> list:
new_pattern_list = []
for val in pattern_list:
max_val = data[val[0]]
min_val = data[val[0]]
ind = val[0]
for i in val:
if dir == 'max':
if data[i] > max_val:
max_val = data[i]
ind = i
else:
if data[i] < min_val:
min_val = data[i]
ind = i
new_pattern_list.append(ind)
return new_pattern_list
def find_nan_indexes(segment: pd.Series) -> list:
nan_list = np.isnan(segment)
nan_indexes = []
for i, val in enumerate(nan_list):
if val:
nan_indexes.append(i)
return nan_indexes
def check_nan_values(segment: Union[pd.Series, list]) -> Union[pd.Series, list]:
nan_list = utils.find_nan_indexes(segment)
if len(nan_list) > 0:
segment = utils.nan_to_zero(segment, nan_list)
return segment
def nan_to_zero(segment: Union[pd.Series, list], nan_list: list) -> Union[pd.Series, list]:
if type(segment) == pd.Series:
for val in nan_list:
segment.values[val] = 0
else:
for val in nan_list:
segment[val] = 0
return segment
def find_confidence(segment: pd.Series) -> (float, float):
segment = utils.check_nan_values(segment)
segment_min = min(segment)
segment_max = max(segment)
height = segment_max - segment_min
if height:
return (CONFIDENCE_FACTOR * height, height)
else:
return (0, 0)
def find_width(pattern: pd.Series, selector) -> int:
pattern = pattern.values
center = utils.find_extremum_index(pattern, selector)
pattern_left = pattern[:center]
pattern_right = pattern[center:]
left_extremum_index = utils.find_last_extremum(pattern_left, selector)
right_extremum_index = utils.find_extremum_index(pattern_right, not selector)
left_width = center - left_extremum_index
right_width = right_extremum_index + 1
return right_width + left_width
def find_last_extremum(segment: np.ndarray, selector: bool) -> int:
segment = segment[::-1]
first_extremum_ind = find_extremum_index(segment, not selector)
last_extremum_ind = len(segment) - first_extremum_ind - 1
return last_extremum_ind
def find_extremum_index(segment: np.ndarray, selector: bool) -> int:
if selector:
return segment.argmax()
else:
return segment.argmin()
def get_interval(data: pd.Series, center: int, window_size: int) -> pd.Series:
left_bound = center - window_size
right_bound = center + window_size + 1
if left_bound < 0:
left_bound = 0
if right_bound > len(data):
right_bound = len(data)
return data[left_bound: right_bound]
def subtract_min_without_nan(segment: pd.Series) -> pd.Series:
if len(segment) == 0:
return []
nan_list = utils.find_nan_indexes(segment)
if len(nan_list) > 0:
return segment
else:
segment = segment - min(segment)
return segment
def get_convolve(segments: list, av_model: list, data: pd.Series, window_size: int) -> list:
labeled_segment = []
convolve_list = []
for segment in segments:
labeled_segment = utils.get_interval(data, segment, window_size)
labeled_segment = utils.subtract_min_without_nan(labeled_segment)
labeled_segment = utils.check_nan_values(labeled_segment)
auto_convolve = scipy.signal.fftconvolve(labeled_segment, labeled_segment)
convolve_segment = scipy.signal.fftconvolve(labeled_segment, av_model)
convolve_list.append(max(auto_convolve))
convolve_list.append(max(convolve_segment))
return convolve_list
def get_correlation(segments: list, av_model: list, data: pd.Series, window_size: int) -> list:
labeled_segment = []
correlation_list = []
p_value_list = []
for segment in segments:
labeled_segment = utils.get_interval(data, segment, window_size)
labeled_segment = utils.subtract_min_without_nan(labeled_segment)
labeled_segment = utils.check_nan_values(labeled_segment)
correlation = pearsonr(labeled_segment, av_model)
correlation_list.append(correlation[0])
p_value_list.append(correlation[1])
return correlation_list
def get_distribution_density(segment: pd.Series) -> float:
if len(segment) < 2:
return (0, 0, 0)
min_jump = min(segment)
max_jump = max(segment)
pdf = gaussian_kde(segment)
x = np.linspace(segment.min() - 1, segment.max() + 1, len(segment))
y = pdf(x)
ax_list = list(zip(x, y))
ax_list = np.array(ax_list, np.float32)
antipeaks_kde = argrelextrema(np.array(ax_list), np.less)[0]
peaks_kde = argrelextrema(np.array(ax_list), np.greater)[0]
try:
min_peak_index = peaks_kde[0]
segment_min_line = ax_list[min_peak_index, 0]
max_peak_index = peaks_kde[1]
segment_max_line = ax_list[max_peak_index, 0]
segment_median = ax_list[antipeaks_kde[0], 0]
except IndexError:
segment_max_line = max_jump * (1 - SHIFT_FACTOR)
segment_min_line = min_jump * (1 - SHIFT_FACTOR)
segment_median = (max_jump - min_jump) / 2 + min_jump
return segment_median, segment_max_line, segment_min_line
def find_parameters(segment_data: pd.Series, segment_from_index: int, pat_type: str) -> [int, float, int]:
segment = segment_data
if len(segment_data) > SMOOTHING_FACTOR * 3:
flat_segment = segment_data.rolling(window = SMOOTHING_FACTOR).mean()
segment = flat_segment.dropna()
segment_median, segment_max_line, segment_min_line = utils.get_distribution_density(segment)
height = 0.95 * (segment_max_line - segment_min_line)
length = utils.find_length(segment_data, segment_min_line, segment_max_line, pat_type)
return height, length
def find_pattern_center(segment_data: pd.Series, segment_from_index: int, pattern_type: str):
segment_median = utils.get_distribution_density(segment_data)[0]
cen_ind = utils.pattern_intersection(segment_data.tolist(), segment_median, pattern_type)
pat_center = cen_ind[0]
segment_cent_index = pat_center + segment_from_index
return segment_cent_index
def find_length(segment_data: pd.Series, segment_min_line: float, segment_max_line: float, pat_type: str) -> int:
x_abscissa = np.arange(0, len(segment_data))
segment_max = max(segment_data)
segment_min = min(segment_data)
if segment_min_line <= segment_min:
segment_min_line = segment_min * 1.05
if segment_max_line >= segment_max:
segment_max_line = segment_max * 0.95
min_line = []
max_line = []
for i in range(len(segment_data)):
min_line.append(segment_min_line)
max_line.append(segment_max_line)
min_line = np.array(min_line)
max_line = np.array(max_line)
segment_array = np.array(segment_data.tolist())
idmin = np.argwhere(np.diff(np.sign(min_line - segment_array)) != 0).reshape(-1)
idmax = np.argwhere(np.diff(np.sign(max_line - segment_array)) != 0).reshape(-1)
if len(idmin) > 0 and len(idmax) > 0:
if pat_type == 'jump':
result_length = idmax[0] - idmin[-1] + 1
elif pat_type == 'drop':
result_length = idmin[0] - idmax[-1] + 1
return result_length if result_length > 0 else 0
else:
return 0
def pattern_intersection(segment_data: list, median: float, pattern_type: str) -> list:
center_index = []
if pattern_type == 'jump':
for i in range(1, len(segment_data) - 1):
if segment_data[i - 1] < median and segment_data[i + 1] > median:
center_index.append(i)
elif pattern_type == 'drop':
for i in range(1, len(segment_data) - 1):
if segment_data[i - 1] > median and segment_data[i + 1] < median:
center_index.append(i)
delete_index = []
for i in range(1, len(center_index)):
if center_index[i] == center_index[i - 1] + 1:
delete_index.append(i - 1)
return [x for (idx, x) in enumerate(center_index) if idx not in delete_index]
def cut_dataframe(data: pd.DataFrame) -> pd.DataFrame:
data_min = data['value'].min()
if not np.isnan(data_min) and data_min > 0:
data['value'] = data['value'] - data_min
return data
def get_min_max(array, default):
return float(min(array, default=default)), float(max(array, default=default))