You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

147 lines
6.5 KiB

from models import Model, ModelState, AnalyticSegment, ModelType
from analytic_types import TimeSeries
from analytic_types.learning_info import LearningInfo
from scipy.fftpack import fft
from typing import Optional, List
from enum import Enum
import scipy.signal
import utils
import utils.meta
import pandas as pd
import numpy as np
import operator
POSITIVE_SEGMENT_MEASUREMENT_ERROR = 0.2
NEGATIVE_SEGMENT_MEASUREMENT_ERROR = 0.02
@utils.meta.JSONClass
class StairModelState(ModelState):
def __init__(
self,
confidence: float = 0,
stair_height: float = 0,
stair_length: float = 0,
**kwargs
):
super().__init__(**kwargs)
self.confidence = confidence
self.stair_height = stair_height
self.stair_length = stair_length
class StairModel(Model):
def get_state(self, cache: Optional[dict] = None) -> StairModelState:
return StairModelState.from_json(cache)
def get_stair_indexes(self, data: pd.Series, height: float, length: int) -> List[int]:
"""Get list of start stair segment indexes.
Keyword arguments:
data -- data, that contains stair (jump or drop) segments
length -- maximum count of values in the stair
height -- the difference between stair max_line and min_line(see utils.find_parameters)
"""
indexes = []
for i in range(len(data) - length - 1):
is_stair = self.is_stair_in_segment(data.values[i:i + length + 1], height)
if is_stair == True:
indexes.append(i)
return indexes
def is_stair_in_segment(self, segment: np.ndarray, height: float) -> bool:
if len(segment) < 2:
return False
comparison_operator = operator.ge
if self.get_model_type() == ModelType.DROP:
comparison_operator = operator.le
height = -height
return comparison_operator(max(segment[1:]), segment[0] + height)
def find_segment_center(self, dataframe: pd.DataFrame, start: int, end: int) -> int:
data = dataframe['value']
segment = data[start: end]
segment_center_index = utils.find_pattern_center(segment, start, self.get_model_type().value)
return segment_center_index
def do_fit(
self,
dataframe: pd.DataFrame,
labeled_segments: List[AnalyticSegment],
deleted_segments: List[AnalyticSegment],
learning_info: LearningInfo
) -> None:
data = utils.cut_dataframe(dataframe)
data = data['value']
window_size = self.state.window_size
last_pattern_center = self.state.pattern_center
self.state.pattern_center = utils.remove_duplicates_and_sort(last_pattern_center + learning_info.segment_center_list)
self.state.pattern_model = utils.get_av_model(learning_info.patterns_list)
convolve_list = utils.get_convolve(self.state.pattern_center, self.state.pattern_model, data, window_size)
correlation_list = utils.get_correlation(self.state.pattern_center, self.state.pattern_model, data, window_size)
height_list = learning_info.patterns_value
del_conv_list = []
delete_pattern_timestamp = []
for segment in deleted_segments:
segment_cent_index = segment.center_index
delete_pattern_timestamp.append(segment.pattern_timestamp)
deleted_stair = utils.get_interval(data, segment_cent_index, window_size)
deleted_stair = utils.subtract_min_without_nan(deleted_stair)
del_conv_stair = scipy.signal.fftconvolve(deleted_stair, self.state.pattern_model)
if len(del_conv_stair) > 0:
del_conv_list.append(max(del_conv_stair))
self._update_fitting_result(self.state, learning_info.confidence, convolve_list, del_conv_list)
self.state.stair_height = int(min(learning_info.pattern_height, default = 1))
self.state.stair_length = int(max(learning_info.pattern_width, default = 1))
def do_detect(self, dataframe: pd.DataFrame) -> TimeSeries:
data = utils.cut_dataframe(dataframe)
data = data['value']
possible_stairs = self.get_stair_indexes(data, self.state.stair_height, self.state.stair_length + 1)
result = self.__filter_detection(possible_stairs, data)
return [(val - 1, val + 1) for val in result]
def __filter_detection(self, segments_indexes: List[int], data: list):
delete_list = []
variance_error = self.state.window_size
close_segments = utils.close_filtering(segments_indexes, variance_error)
segments_indexes = utils.best_pattern(close_segments, data, self.get_extremum_type().value)
if len(segments_indexes) == 0 or len(self.state.pattern_center) == 0:
return []
pattern_data = self.state.pattern_model
for segment_index in segments_indexes:
if segment_index <= self.state.window_size or segment_index >= (len(data) - self.state.window_size):
delete_list.append(segment_index)
continue
convol_data = utils.get_interval(data, segment_index, self.state.window_size)
percent_of_nans = convol_data.isnull().sum() / len(convol_data)
if len(convol_data) == 0 or percent_of_nans > 0.5:
delete_list.append(segment_index)
continue
elif 0 < percent_of_nans <= 0.5:
nan_list = utils.find_nan_indexes(convol_data)
convol_data = utils.nan_to_zero(convol_data, nan_list)
pattern_data = utils.nan_to_zero(pattern_data, nan_list)
conv = scipy.signal.fftconvolve(convol_data, pattern_data)
if len(conv) == 0:
delete_list.append(segment_index)
continue
upper_bound = self.state.convolve_max * (1 + POSITIVE_SEGMENT_MEASUREMENT_ERROR)
lower_bound = self.state.convolve_min * (1 - POSITIVE_SEGMENT_MEASUREMENT_ERROR)
delete_up_bound = self.state.conv_del_max * (1 + NEGATIVE_SEGMENT_MEASUREMENT_ERROR)
delete_low_bound = self.state.conv_del_min * (1 - NEGATIVE_SEGMENT_MEASUREMENT_ERROR)
max_conv = max(conv)
if max_conv > upper_bound or max_conv < lower_bound:
delete_list.append(segment_index)
elif max_conv < delete_up_bound and max_conv > delete_low_bound:
delete_list.append(segment_index)
for item in delete_list:
segments_indexes.remove(item)
segments_indexes = utils.remove_duplicates_and_sort(segments_indexes)
return segments_indexes