Browse Source

general predictor -> general model (#130)

pull/1/head
rozetko 6 years ago committed by GitHub
parent
commit
942c4ca112
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      analytics/analytic_unit_manager.py
  2. 1
      analytics/detectors/__init__.py
  3. 1
      analytics/detectors/general_detector/__init__.py
  4. 80
      analytics/detectors/general_detector/general_detector.py
  5. 61
      analytics/detectors/general_detector/supervised_algorithm.py
  6. 2
      analytics/detectors/pattern_detector.py
  7. 3
      analytics/models/__init__.py
  8. 97
      analytics/models/general_model.py
  9. 7
      analytics/utils/__init__.py

6
analytics/analytic_unit_manager.py

@ -12,11 +12,7 @@ analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
def get_detector_by_type(analytic_unit_type) -> detectors.Detector:
if analytic_unit_type == 'GENERAL':
detector = detectors.GeneralDetector()
else:
detector = detectors.PatternDetector(analytic_unit_type)
return detector
return detectors.PatternDetector(analytic_unit_type)
def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
if analytic_unit_id in analytic_workers:

1
analytics/detectors/__init__.py

@ -1,3 +1,2 @@
from detectors.detector import Detector
from detectors.pattern_detector import PatternDetector
from detectors.general_detector import GeneralDetector

1
analytics/detectors/general_detector/__init__.py

@ -1 +0,0 @@
from detectors.general_detector.general_detector import GeneralDetector

80
analytics/detectors/general_detector/general_detector.py

@ -1,80 +0,0 @@
from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm
from detectors import Detector
from models import AnalyticUnitCache
import utils
import pandas as pd
import logging
import config
import json
from typing import Optional
NANOSECONDS_IN_MS = 1000000
logger = logging.getLogger('GENERAL_DETECTOR')
class GeneralDetector(Detector):
def __init__(self):
self.model = None
async def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache:
confidence = 0.02
start_index, stop_index = 0, len(dataframe)
if len(segments) > 0:
confidence = 0.0
min_time, max_time = utils.segments_box(segments)
dataframe = dataframe[dataframe['timestamp'] <= max_time]
dataframe = dataframe[dataframe['timestamp'] >= min_time]
train_augmented = self.preprocessor.get_augmented_data(
dataframe.index[0],
dataframe.index[-1],
segments
)
self.model = SupervisedAlgorithm()
await self.model.fit(train_augmented, confidence)
if len(segments) > 0:
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_prediction_time = int(last_dataframe_time.timestamp() * 1000)
else:
last_prediction_time = 0
logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name)
return cache
async def predict(self, dataframe: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name)
last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms')
start_index = self.data_prov.get_upper_bound(last_prediction_time)
stop_index = self.data_prov.size()
last_prediction_time = int(last_prediction_time.value / NANOSECONDS_IN_MS)
predicted_anomalies = []
if start_index < stop_index:
max_chunk_size = 50000
predicted = pd.Series()
for index in range(start_index, stop_index, max_chunk_size):
chunk_start = index
chunk_finish = min(index + max_chunk_size, stop_index)
predict_augmented = self.preprocessor.get_augmented_data(chunk_start, chunk_finish)
assert(len(predict_augmented) == chunk_finish - chunk_start)
predicted_current = await self.model.predict(predict_augmented)
predicted = pd.concat([predicted, predicted_current])
predicted_anomalies = self.preprocessor.inverse_transform_anomalies(predicted)
last_row = self.data_prov.get_data_range(stop_index - 1, stop_index)
last_dataframe_time = last_row.iloc[0]['timestamp']
predicted_anomalies = utils.anomalies_to_timestamp(predicted_anomalies)
last_prediction_time = int(last_dataframe_time.timestamp() * 1000)
logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name)
return predicted_anomalies, last_prediction_time

61
analytics/detectors/general_detector/supervised_algorithm.py

@ -1,61 +0,0 @@
import pickle
from tsfresh.transformers.feature_selector import FeatureSelector
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import IsolationForest
import pandas as pd
class SupervisedAlgorithm(object):
frame_size = 16
good_features = [
#"value__agg_linear_trend__f_agg_\"max\"__chunk_len_5__attr_\"intercept\"",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_12__w_20",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_13__w_5",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_2__w_10",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_2__w_20",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_8__w_20",
# "value__fft_coefficient__coeff_3__attr_\"abs\"",
"time_of_day_column_x",
"time_of_day_column_y",
"value__abs_energy",
# "value__absolute_sum_of_changes",
# "value__sum_of_reoccurring_data_points",
]
clf = None
scaler = None
def __init__(self):
self.features = []
self.col_to_max, self.col_to_min, self.col_to_median = None, None, None
self.augmented_path = None
async def fit(self, dataset, contamination=0.005):
dataset = dataset[self.good_features]
dataset = dataset[-100000:]
self.scaler = MinMaxScaler(feature_range=(-1, 1))
# self.clf = svm.OneClassSVM(nu=contamination, kernel="rbf", gamma=0.1)
self.clf = IsolationForest(contamination=contamination)
self.scaler.fit(dataset)
dataset = self.scaler.transform(dataset)
self.clf.fit(dataset)
async def predict(self, dataframe):
dataset = dataframe[self.good_features]
dataset = self.scaler.transform(dataset)
prediction = self.clf.predict(dataset)
# for i in range(len(dataset)):
# print(str(dataset[i]) + " " + str(prediction[i]))
prediction = [x < 0.0 for x in prediction]
return pd.Series(prediction, index=dataframe.index)
def __select_features(self, x, y):
# feature_selector = FeatureSelector()
feature_selector = FeatureSelector()
feature_selector.fit(x, y)
return feature_selector.relevant_features

2
analytics/detectors/pattern_detector.py

@ -13,6 +13,8 @@ logger = logging.getLogger('PATTERN_DETECTOR')
def resolve_model_by_pattern(pattern: str) -> models.Model:
if pattern == 'GENERAL':
return models.GeneralModel()
if pattern == 'PEAK':
return models.PeakModel()
if pattern == 'REVERSE_PEAK':

3
analytics/models/__init__.py

@ -2,7 +2,6 @@ from models.model import Model, AnalyticUnitCache
from models.drop_model import DropModel
from models.peak_model import PeakModel
from models.jump_model import JumpModel
from models.custom_model import CustomModel
from models.custom_model import CustomModel
from models.reverse_peak_model import ReversePeakModel
from models.general_model import GeneralModel

97
analytics/models/general_model.py

@ -0,0 +1,97 @@
from models import Model, AnalyticUnitCache
import utils
import numpy as np
import pandas as pd
import scipy.signal
from scipy.fftpack import fft
from scipy.signal import argrelextrema
import math
from scipy.stats import gaussian_kde
from scipy.stats import norm
from typing import Optional
WINDOW_SIZE = 350
class GeneralModel(Model):
def __init__(self):
super()
self.segments = []
self.ipats = []
self.state = {
'convolve_max': WINDOW_SIZE,
}
self.all_conv = []
def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache:
if type(cache) is AnalyticUnitCache:
self.state = cache
self.segments = segments
data = dataframe['value']
convolve_list = []
for segment in segments:
if segment['labeled']:
segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from']))
segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to']))
segment_data = data[segment_from_index: segment_to_index + 1]
if len(segment_data) == 0:
continue
self.ipats.append(segment_from_index + int((segment_to_index - segment_from_index) / 2))
segment_min = min(segment_data)
segment_data = segment_data - segment_min
segment_max = max(segment_data)
segment_data = segment_data / segment_max
convolve = scipy.signal.fftconvolve(segment_data, segment_data)
convolve_list.append(max(convolve))
if len(convolve_list) > 0:
self.state['convolve_max'] = float(max(convolve_list))
else:
self.state['convolve_max'] = WINDOW_SIZE / 3
return self.state
def do_predict(self, dataframe: pd.DataFrame):
data = dataframe['value']
pat_data = data[self.ipats[0] - WINDOW_SIZE: self.ipats[0] + WINDOW_SIZE]
x = min(pat_data)
pat_data = pat_data - x
y = max(pat_data)
pat_data = pat_data / y
for i in range(WINDOW_SIZE * 2, len(data)):
watch_data = data[i - WINDOW_SIZE * 2: i]
w = min(watch_data)
watch_data = watch_data - w
r = max(watch_data)
if r < y:
watch_data = watch_data / y
else:
watch_data = watch_data / r
conv = scipy.signal.fftconvolve(pat_data, watch_data)
self.all_conv.append(max(conv))
all_conv_peaks = utils.peak_finder(self.all_conv, WINDOW_SIZE * 2)
filtered = self.__filter_prediction(all_conv_peaks, data)
return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered]
def __filter_prediction(self, segments: list, data: list):
if len(segments) == 0 or len(self.ipats) == 0:
segments = []
return segments
delete_list = []
for val in segments:
if self.all_conv[val] < self.state['convolve_max'] * 0.8:
delete_list.append(val)
for item in delete_list:
segments.remove(item)
return segments

7
analytics/utils/__init__.py

@ -207,3 +207,10 @@ def timestamp_to_index(dataframe, timestamp):
for i in range(len(data)):
if data[i] >= timestamp:
return i
def peak_finder(data, size):
all_max = []
for i in range(size, len(data) - size):
if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]:
all_max.append(i)
return all_max

Loading…
Cancel
Save