Browse Source

reafactor common functions from detectors to utils folder

pull/1/head
Alexey Velikiy 6 years ago
parent
commit
7527faf068
  1. 31
      analytics/detectors/general_detector.py
  2. 20
      analytics/detectors/jump_detector.py
  3. 10
      analytics/detectors/pattern_detector.py
  4. 33
      analytics/detectors/peaks_detector.py
  5. 24
      analytics/detectors/step_detector.py
  6. 61
      analytics/utils/__init__.py

31
analytics/detectors/general_detector.py

@ -1,3 +1,4 @@
import utils
from grafana_data_provider import GrafanaDataProvider
from data_preprocessor import data_preprocessor
import pandas as pd
@ -7,18 +8,12 @@ import config
import os.path
import json
NANOSECONDS_IN_MS = 1000000
logger = logging.getLogger('analytic_toolset')
def anomalies_to_timestamp(anomalies):
for anomaly in anomalies:
anomaly['start'] = int(anomaly['start'].timestamp() * 1000)
anomaly['finish'] = int(anomaly['finish'].timestamp() * 1000)
return anomalies
class GeneralDetector:
def __init__(self, anomaly_name):
@ -46,37 +41,27 @@ class GeneralDetector:
self.__load_model()
def anomalies_box(self, anomalies):
max_time = 0
min_time = float("inf")
for anomaly in anomalies:
max_time = max(max_time, anomaly['finish'])
min_time = min(min_time, anomaly['start'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
async def learn(self, anomalies):
async def learn(self, segments):
logger.info("Start to learn for anomaly_name='%s'" % self.anomaly_name)
confidence = 0.02
dataframe = self.data_prov.get_dataframe()
start_index, stop_index = 0, len(dataframe)
if len(anomalies) > 0:
if len(segments) > 0:
confidence = 0.0
min_time, max_time = self.anomalies_box(anomalies)
min_time, max_time = utils.segments_box(segments)
dataframe = dataframe[dataframe['timestamp'] <= max_time]
dataframe = dataframe[dataframe['timestamp'] >= min_time]
train_augmented = self.preprocessor.get_augmented_data(
dataframe.index[0],
dataframe.index[-1],
anomalies
segments
)
self.model = self.create_algorithm()
await self.model.fit(train_augmented, confidence)
if len(anomalies) > 0:
if len(segments) > 0:
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_prediction_time = int(last_dataframe_time.timestamp() * 1000)
else:
@ -112,7 +97,7 @@ class GeneralDetector:
last_row = self.data_prov.get_data_range(stop_index - 1, stop_index)
last_dataframe_time = last_row.iloc[0]['timestamp']
predicted_anomalies = anomalies_to_timestamp(predicted_anomalies)
predicted_anomalies = utils.anomalies_to_timestamp(predicted_anomalies)
last_prediction_time = int(last_dataframe_time.timestamp() * 1000)
logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name)

20
analytics/detectors/jump_detector.py

@ -1,3 +1,4 @@
import utils
import numpy as np
import pickle
import scipy.signal
@ -6,20 +7,6 @@ from scipy.signal import argrelextrema
import math
def is_intersect(target_segment, segments):
for segment in segments:
start = max(segment['start'], target_segment[0])
finish = min(segment['finish'], target_segment[1])
if start <= finish:
return True
return False
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n-1])
return result
class Jumpdetector:
def __init__(self):
@ -56,6 +43,7 @@ class Jumpdetector:
async def fit(self, dataframe, segments):
#self.alpha_finder()
data = dataframe['value']
confidences = []
convolve_list = []
@ -131,7 +119,7 @@ class Jumpdetector:
result.sort()
if len(self.segments) > 0:
result = [segment for segment in result if not is_intersect(segment, self.segments)]
result = [segment for segment in result if not utils.is_intersect(segment, self.segments)]
return result
def __predict(self, data):
@ -140,7 +128,7 @@ class Jumpdetector:
extrema_list = []
# добавить все пересечения экспоненты со сглаженным графиком
for i in exponential_smoothing(data + self.confidence, 0.02):
for i in utils.exponential_smoothing(data + self.confidence, 0.02):
extrema_list.append(i)
segments = []

10
analytics/detectors/pattern_detector.py

@ -1,4 +1,5 @@
import detectors
import utils
from grafana_data_provider import GrafanaDataProvider
@ -14,15 +15,6 @@ import pandas as pd
logger = logging.getLogger('analytic_toolset')
def segments_box(segments):
max_time = 0
min_time = float("inf")
for segment in segments:
min_time = min(min_time, segment['start'])
max_time = max(max_time, segment['finish'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
def resolve_detector_by_pattern(pattern):
if pattern == "peak":

33
analytics/detectors/peaks_detector.py

@ -1,37 +1,8 @@
import utils
from scipy import signal
import numpy as np
def find_steps(array, threshold):
"""
Finds local maxima by segmenting array based on positions at which
the threshold value is crossed. Note that this thresholding is
applied after the absolute value of the array is taken. Thus,
the distinction between upward and downward steps is lost. However,
get_step_sizes can be used to determine directionality after the
fact.
Parameters
----------
array : numpy array
1 dimensional array that represents time series of data points
threshold : int / float
Threshold value that defines a step
Returns
-------
steps : list
List of indices of the detected steps
"""
steps = []
array = np.abs(array)
above_points = np.where(array > threshold, 1, 0)
ap_dif = np.diff(above_points)
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]
for upi, dni in zip(cross_ups,cross_dns):
steps.append(np.argmax(array[upi:dni]) + upi)
return steps
class PeaksDetector:
def __init__(self):
pass
@ -80,7 +51,7 @@ class PeaksDetector:
data = filtered
data /= data.max()
result = find_steps(data, 0.1)
result = utils.find_steps(data, 0.1)
return [(dataframe.index[x], dataframe.index[x + window_size]) for x in result]
def save(self, model_filename):

24
analytics/detectors/step_detector.py

@ -2,25 +2,11 @@ import scipy.signal
from scipy.fftpack import fft
from scipy.signal import argrelextrema
import utils
import numpy as np
import pickle
def is_intersect(target_segment, segments):
for segment in segments:
start = max(segment['start'], target_segment[0])
finish = min(segment['finish'], target_segment[1])
if start <= finish:
return True
return False
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n-1])
return result
class StepDetector:
def __init__(self):
@ -58,20 +44,20 @@ class StepDetector:
async def predict(self, dataframe):
data = dataframe['value']
result = self.__predict(data)
result = await self.__predict(data)
result.sort()
if len(self.segments) > 0:
result = [segment for segment in result if not is_intersect(segment, self.segments)]
result = [segment for segment in result if not utils.is_intersect(segment, self.segments)]
return result
def __predict(self, data):
async def __predict(self, data):
window_size = 24
all_max_flatten_data = data.rolling(window=window_size).mean()
all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0]
extrema_list = []
for i in exponential_smoothing(data - self.confidence, 0.03):
for i in utils.exponential_smoothing(data - self.confidence, 0.03):
extrema_list.append(i)
segments = []

61
analytics/utils/__init__.py

@ -0,0 +1,61 @@
import numpy as np
def is_intersect(target_segment, segments):
for segment in segments:
start = max(segment['start'], target_segment[0])
finish = min(segment['finish'], target_segment[1])
if start <= finish:
return True
return False
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n - 1])
return result
def find_steps(array, threshold):
"""
Finds local maxima by segmenting array based on positions at which
the threshold value is crossed. Note that this thresholding is
applied after the absolute value of the array is taken. Thus,
the distinction between upward and downward steps is lost. However,
get_step_sizes can be used to determine directionality after the
fact.
Parameters
----------
array : numpy array
1 dimensional array that represents time series of data points
threshold : int / float
Threshold value that defines a step
Returns
-------
steps : list
List of indices of the detected steps
"""
steps = []
array = np.abs(array)
above_points = np.where(array > threshold, 1, 0)
ap_dif = np.diff(above_points)
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]
for upi, dni in zip(cross_ups,cross_dns):
steps.append(np.argmax(array[upi:dni]) + upi)
return steps
def anomalies_to_timestamp(anomalies):
for anomaly in anomalies:
anomaly['start'] = int(anomaly['start'].timestamp() * 1000)
anomaly['finish'] = int(anomaly['finish'].timestamp() * 1000)
return anomalies
def segments_box(segments):
max_time = 0
min_time = float("inf")
for segment in segments:
min_time = min(min_time, segment['start'])
max_time = max(max_time, segment['finish'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
Loading…
Cancel
Save