Browse Source

analytics clearup

pull/1/head
Coin de Gamma 6 years ago
parent
commit
f4376ccf29
  1. 20
      analytics/analytic_unit_manager.py
  2. 253
      analytics/data_preprocessor.py
  3. 6
      analytics/detectors/general_detector/general_detector.py
  4. 8
      analytics/detectors/general_detector/supervised_algorithm.py
  5. 6
      analytics/detectors/pattern_detector.py
  6. 2
      analytics/services/data_service.py

20
analytics/analytic_unit_manager.py

@ -7,10 +7,11 @@ from analytic_unit_worker import AnalyticUnitWorker
logger = logging.getLogger('AnalyticUnitManager') logger = logging.getLogger('AnalyticUnitManager')
analytic_unit_id = str AnalyticUnitId = str
analytic_workers: Dict[analytic_unit_id, AnalyticUnitWorker] = dict() analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
def get_detector(analytic_unit_type) -> detectors.Detector:
def get_detector_by_type(analytic_unit_type) -> detectors.Detector:
if analytic_unit_type == 'GENERAL': if analytic_unit_type == 'GENERAL':
detector = detectors.GeneralDetector() detector = detectors.GeneralDetector()
else: else:
@ -21,7 +22,7 @@ def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
if analytic_unit_id in analytic_workers: if analytic_unit_id in analytic_workers:
# TODO: check that type is the same # TODO: check that type is the same
return analytic_workers[analytic_unit_id] return analytic_workers[analytic_unit_id]
detector = get_detector(analytic_unit_type) detector = get_detector_by_type(analytic_unit_type)
worker = AnalyticUnitWorker(analytic_unit_id, detector) worker = AnalyticUnitWorker(analytic_unit_id, detector)
analytic_workers[analytic_unit_id] = worker analytic_workers[analytic_unit_id] = worker
return worker return worker
@ -33,15 +34,12 @@ async def handle_analytic_task(task):
worker = ensure_worker(task['analyticUnitId'], payload['pattern']) worker = ensure_worker(task['analyticUnitId'], payload['pattern'])
result_payload = {} result_payload = {}
print(task['type']) if task['type'] == "LEARN":
if task['type'] == "PREDICT": await worker.do_learn(AnalyticUnitId, payload)
result_payload = await worker.do_predict(analytic_unit_id, payload) elif task['type'] == "PREDICT":
print(result_payload) result_payload = await worker.do_predict(AnalyticUnitId, payload)
elif task['type'] == "LEARN":
await worker.do_learn(analytic_unit_id, payload)
else: else:
raise ValueError('Unknown task type "%s"' % task['type']) raise ValueError('Unknown task type "%s"' % task['type'])
print(result_payload)
return { return {
'status': 'SUCCESS', 'status': 'SUCCESS',
'payload': result_payload 'payload': result_payload

253
analytics/data_preprocessor.py

@ -1,253 +0,0 @@
import os.path
import pandas as pd
import numpy as np
import math
import time
from tsfresh.transformers.feature_augmenter import FeatureAugmenter
from tsfresh.feature_extraction.settings import from_columns
from pytz import timezone
class data_preprocessor:
# augmented = None
frame_size = 16
calc_features = [
# "value__agg_linear_trend__f_agg_\"max\"__chunk_len_5__attr_\"intercept\"",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_12__w_20",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_13__w_5",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_2__w_10",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_2__w_20",
# "value__cwt_coefficients__widths_(2, 5, 10, 20)__coeff_8__w_20",
# "value__fft_coefficient__coeff_3__attr_\"abs\"",
"time_of_day_column_x",
"time_of_day_column_y",
"value__abs_energy",
"value__absolute_sum_of_changes",
"value__sum_of_reoccurring_data_points",
]
time_features = [
'time_of_day_column_x',
'time_of_day_column_y'
]
chunk_size = 50000
def __init__(self, data_provider, augmented_path):
self.data_provider = data_provider
self.augmented_path = augmented_path
self.last_chunk_index = 0
self.total_size = 0
self.__init_chunks()
self.synchronize()
def set_data_provider(self, data_provider):
self.data_provider = data_provider
def synchronize(self):
start_frame = self.total_size
stop_frame = self.data_provider.size()
max_chunk_size = 30000
for frame in range(start_frame, stop_frame, max_chunk_size):
data = self.__get_source_frames(frame, min(stop_frame, frame + max_chunk_size))
if len(data) == 0:
return
append_augmented = self.__extract_features(data, self.calc_features)
self.__append_data(append_augmented)
def expand_indexes(self, start_index, stop_index):
return start_index, stop_index
def get_augmented_data(self, start_index, stop_index, anomalies=[]):
start_frame = start_index
stop_frame = stop_index
augmented = self.__get_data(start_frame, stop_frame)
if len(anomalies) > 0:
anomalies_indexes = self.transform_anomalies(anomalies)
augmented = augmented.drop(anomalies_indexes[:-1])
return augmented
def transform_anomalies(self, anomalies):
anomaly_index = None
dataframe = self.data_provider.get_dataframe(None)
for anomaly in anomalies:
start_time = pd.to_datetime(anomaly['from'], unit='ms')
finish_time = pd.to_datetime(anomaly['to'], unit='ms')
current_index = (dataframe['timestamp'] >= start_time) & (dataframe['timestamp'] <= finish_time)
if anomaly_index is not None:
anomaly_index = (anomaly_index | current_index)
else:
anomaly_index = current_index
rows = dataframe[anomaly_index]
indexes = np.unique(rows.index)
return indexes
def inverse_transform_anomalies(self, prediction):
anomalies = []
cur_anomaly = None
source_dataframe = self.data_provider.get_dataframe(None)
for i in prediction.index:
if prediction[i]:
start_frame_index = max(0, i - self.frame_size + 1)
finish_frame_index = i
start = source_dataframe['timestamp'][start_frame_index]
finish = source_dataframe['timestamp'][finish_frame_index]
if cur_anomaly is None:
if len(anomalies) > 0 and start <= anomalies[len(anomalies) - 1]['to']:
cur_anomaly = anomalies[len(anomalies) - 1]
anomalies.pop()
else:
cur_anomaly = {'start': start, 'finish': finish}
cur_anomaly['to'] = finish
elif cur_anomaly is not None:
anomalies.append(cur_anomaly)
cur_anomaly = None
if cur_anomaly:
anomalies.append(cur_anomaly)
return anomalies
def __get_data(self, start_index, stop_index):
result = pd.DataFrame()
start_chunk = start_index // self.chunk_size
finish_chunk = stop_index // self.chunk_size
for chunk_num in range(start_chunk, finish_chunk + 1):
chunk = self.__load_chunk(chunk_num)
if chunk_num == finish_chunk:
chunk = chunk[:stop_index % self.chunk_size]
if chunk_num == start_chunk:
chunk = chunk[start_index % self.chunk_size:]
result = pd.concat([result, chunk])
return result
def __init_chunks(self):
chunk_index = 0
self.last_chunk_index = 0
while True:
filename = self.augmented_path
if chunk_index > 0:
filename += "." + str(chunk_index)
if os.path.exists(filename):
self.last_chunk_index = chunk_index
else:
break
chunk_index += 1
self.total_size = self.last_chunk_index * self.chunk_size
last_chunk = self.__load_chunk(self.last_chunk_index)
self.total_size += len(last_chunk)
def __append_data(self, dataframe):
while len(dataframe) > 0:
chunk = self.__load_chunk(self.last_chunk_index)
rows_count = min(self.chunk_size - len(chunk), len(dataframe))
rows = dataframe.iloc[0:rows_count]
self.__save_chunk(self.last_chunk_index, rows)
self.total_size += rows_count
dataframe = dataframe[rows_count:]
if len(dataframe) > 0:
self.last_chunk_index += 1
def __load_chunk(self, index):
filename = self.augmented_path
if index > 0:
filename += "." + str(index)
if os.path.exists(filename):
chunk = pd.read_csv(filename)
frame_index = np.arange(index * self.chunk_size, index * self.chunk_size + len(chunk))
chunk = chunk.set_index(frame_index)
return chunk
return pd.DataFrame()
def __save_chunk(self, index, dataframe):
filename = self.augmented_path
if index > 0:
filename += "." + str(index)
if os.path.exists(filename):
dataframe.to_csv(filename, mode='a', index=False, header=False)
else:
dataframe.to_csv(filename, mode='w', index=False, header=True)
def __get_source_frames(self, start_frame, stop_frame):
start_index = start_frame
stop_index = stop_frame
# frame = self.source_dataframe[start_index:stop_index]
# mat = frame.as_matrix()
source_dataframe = self.data_provider.get_data_range(max(start_index - self.frame_size + 1, 0), stop_index)
dataframe = None
for i in range(start_index, stop_index):
mini = max(0, i - self.frame_size + 1)
frame = source_dataframe.loc[mini:i + 1].copy()
frame['id'] = i
if dataframe is None:
dataframe = frame
else:
dataframe = dataframe.append(frame, ignore_index=True)
#dataframe = self.source_dataframe[start_index:stop_index].copy()
#dataframe['id'] = np.floor_divide(dataframe.index, self.frame_size)
dataframe.reset_index(drop=True, inplace=True)
return dataframe
def __extract_features(self, data, features=None):
start_frame = data['id'][0]
stop_frame = data['id'][len(data)-1] + 1
augmented = pd.DataFrame(index=np.arange(start_frame, stop_frame))
# tsfresh features
tsfresh_features = None
if features is not None:
tsfresh_features = set(features) - set(self.time_features)
augmented = self.__extract_tfresh_features(data, augmented, tsfresh_features)
# time features
augmented = self.__extract_time_features(data, augmented, features)
return augmented
def __extract_tfresh_features(self, data, augmented, features):
relevant_extraction_settings = None
if features is not None:
augmented_features = set(features)
relevant_extraction_settings = from_columns(augmented_features)
#impute_function = partial(impute_dataframe_range, col_to_max=self.col_to_max,
# col_to_min=self.col_to_min, col_to_median=self.col_to_median)
feature_extractor = FeatureAugmenter(
kind_to_fc_parameters=relevant_extraction_settings,
column_id='id',
column_sort='timestamp')
feature_extractor.set_timeseries_container(data)
return feature_extractor.transform(augmented)
def __extract_time_features(self, data, augmented, features):
if features is None:
features = self.time_features
seconds = np.zeros(len(augmented))
first_id = data['id'][0]
for i in range(len(data)):
id = data['id'][i] - first_id
timeobj = data['timestamp'][i].time()
seconds[id] = timeobj.second + 60 * (timeobj.minute + 60 * timeobj.hour)
norm_seconds = 2 * math.pi * seconds / (24 * 3600)
if 'time_of_day_column_x' in features:
augmented['time_of_day_column_x'] = np.cos(norm_seconds)
if 'time_of_day_column_y' in features:
augmented['time_of_day_column_y'] = np.sin(norm_seconds)
return augmented

6
analytics/detectors/general_detector/general_detector.py

@ -1,18 +1,16 @@
from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm
from detectors import Detector from detectors import Detector
import utils import utils
from data_preprocessor import data_preprocessor
import pandas as pd import pandas as pd
import logging import logging
from urllib.parse import urlparse
import config import config
import os.path
import json import json
NANOSECONDS_IN_MS = 1000000 NANOSECONDS_IN_MS = 1000000
logger = logging.getLogger('analytic_toolset') logger = logging.getLogger('GENERAL_DETECTOR')
class GeneralDetector(Detector): class GeneralDetector(Detector):

8
analytics/detectors/general_detector/supervised_algorithm.py

@ -53,14 +53,6 @@ class SupervisedAlgorithm(object):
prediction = [x < 0.0 for x in prediction] prediction = [x < 0.0 for x in prediction]
return pd.Series(prediction, index=dataframe.index) return pd.Series(prediction, index=dataframe.index)
def save(self, model_filename):
with open(model_filename, 'wb') as file:
pickle.dump((self.clf, self.scaler), file)
def load(self, model_filename):
with open(model_filename, 'rb') as file:
self.clf, self.scaler = pickle.load(file)
def __select_features(self, x, y): def __select_features(self, x, y):
# feature_selector = FeatureSelector() # feature_selector = FeatureSelector()
feature_selector = FeatureSelector() feature_selector = FeatureSelector()

6
analytics/detectors/pattern_detector.py

@ -1,10 +1,6 @@
import models import models
import utils
import logging import logging
from urllib.parse import urlparse
import os.path
import json
import config import config
import pandas as pd import pandas as pd
@ -12,7 +8,7 @@ import pandas as pd
from detectors import Detector from detectors import Detector
logger = logging.getLogger('analytic_toolset') logger = logging.getLogger('PATTERN_DETECTOR')
def resolve_model_by_pattern(pattern: str) -> models.Model: def resolve_model_by_pattern(pattern: str) -> models.Model:

2
analytics/services/data_service.py

@ -83,5 +83,3 @@ class DataService:
filename = file_descriptor.filename filename = file_descriptor.filename
if filename not in self.locks: if filename not in self.locks:
raise RuntimeError('No lock for file %s' % filename) raise RuntimeError('No lock for file %s' % filename)

Loading…
Cancel
Save