diff --git a/analytics/analytic_unit_manager.py b/analytics/analytic_unit_manager.py index 3da52a0..a4b9516 100644 --- a/analytics/analytic_unit_manager.py +++ b/analytics/analytic_unit_manager.py @@ -37,9 +37,9 @@ async def handle_analytic_task(task): data['timestamp'] = pd.to_datetime(data['timestamp']) result_payload = {} if task['type'] == "LEARN": - await worker.do_learn(payload['segments'], data) + result_payload = await worker.do_learn(payload['segments'], data, payload['cache']) elif task['type'] == "PREDICT": - result_payload = await worker.do_predict(data) + result_payload = await worker.do_predict(data, payload['cache']) else: raise ValueError('Unknown task type "%s"' % task['type']) return { diff --git a/analytics/detectors/detector.py b/analytics/detectors/detector.py index 5489fd6..b5bed34 100644 --- a/analytics/detectors/detector.py +++ b/analytics/detectors/detector.py @@ -5,9 +5,9 @@ from pandas import DataFrame class Detector(ABC): @abstractmethod - async def train(self, dataframe: DataFrame, segments: list): + async def train(self, dataframe: DataFrame, segments: list, cache: dict) -> dict: pass @abstractmethod - async def predict(self, dataframe: DataFrame) -> list: + async def predict(self, dataframe: DataFrame, cache: dict) -> dict: pass diff --git a/analytics/detectors/general_detector/general_detector.py b/analytics/detectors/general_detector/general_detector.py index dc4e49b..275ef1a 100644 --- a/analytics/detectors/general_detector/general_detector.py +++ b/analytics/detectors/general_detector/general_detector.py @@ -18,7 +18,7 @@ class GeneralDetector(Detector): def __init__(self): self.model = None - async def train(self, dataframe, segments): + async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict): confidence = 0.02 start_index, stop_index = 0, len(dataframe) @@ -45,7 +45,7 @@ class GeneralDetector(Detector): logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) return last_prediction_time - async def predict(self, data): + async def predict(self, dataframe: pd.DataFrame, cache: dict): logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name) last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index 2ec407b..a6f3340 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -30,13 +30,13 @@ class PatternDetector(Detector): self.model = resolve_model_by_pattern(self.pattern_type) window_size = 100 - async def train(self, dataframe: pd.DataFrame, segments: list): + async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict): # TODO: pass only part of dataframe that has segments self.model.fit(dataframe, segments) # TODO: save model after fit - return 0 + return cache - async def predict(self, dataframe: pd.DataFrame): + async def predict(self, dataframe: pd.DataFrame, cache: dict): predicted_indexes = await self.model.predict(dataframe) segments = [] @@ -50,4 +50,8 @@ class PatternDetector(Detector): last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_prediction_time = int(last_dataframe_time.timestamp() * 1000) - return segments, last_prediction_time + return { + 'cache': cache, + 'segments': segments, + 'last_prediction_time': last_prediction_time + } diff --git a/analytics/models/custom_model.py b/analytics/models/custom_model.py index 8c8f5a8..db9fbcf 100644 --- a/analytics/models/custom_model.py +++ b/analytics/models/custom_model.py @@ -1,5 +1,6 @@ from models import Model import utils +import pandas as pd # Paste your model here: class CustomModel(Model): @@ -10,8 +11,8 @@ class CustomModel(Model): # It will be saved in filesystem and loaded after server restart self.state = {} - def fit(self, dataframe, segments): + def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: pass - def predict(self, dataframe): + def predict(self, dataframe, cache: dict): return [] diff --git a/analytics/models/jump_model.py b/analytics/models/jump_model.py index 3ae3b37..6391a5b 100644 --- a/analytics/models/jump_model.py +++ b/analytics/models/jump_model.py @@ -2,6 +2,7 @@ from models import Model import utils import numpy as np +import pandas as pd import scipy.signal from scipy.fftpack import fft from scipy.signal import argrelextrema @@ -25,7 +26,7 @@ class JumpModel(Model): 'JUMP_LENGTH': 1, } - def fit(self, dataframe, segments): + def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: self.segments = segments data = dataframe['value'] confidences = [] @@ -87,9 +88,9 @@ class JumpModel(Model): if len(jump_length_list) > 0: self.state['JUMP_LENGTH'] = max(jump_length_list) else: - self.state['JUMP_LENGTH'] = 1 - - def predict(self, dataframe): + self.state['JUMP_LENGTH'] = 1 + + def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict: data = dataframe['value'] result = self.__predict(data) diff --git a/analytics/models/model.py b/analytics/models/model.py index 05dbf1e..35dba6f 100644 --- a/analytics/models/model.py +++ b/analytics/models/model.py @@ -5,9 +5,9 @@ from pandas import DataFrame class Model(ABC): @abstractmethod - def fit(self, dataframe: DataFrame, segments: list): + def fit(self, dataframe: DataFrame, segments: list, cache: dict) -> dict: pass @abstractmethod - def predict(self, dataframe: DataFrame) -> list: + def predict(self, dataframe: DataFrame, cache: dict) -> dict: pass diff --git a/analytics/models/peaks_model.py b/analytics/models/peaks_model.py index f65ea3c..0c2b7c5 100644 --- a/analytics/models/peaks_model.py +++ b/analytics/models/peaks_model.py @@ -3,6 +3,7 @@ from models import Model import utils from scipy import signal import numpy as np +import pandas as pd class PeaksModel(Model): @@ -10,10 +11,10 @@ class PeaksModel(Model): def __init__(self): super() - def fit(self, dataset, contamination=0.005): + def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: pass - def predict(self, dataframe): + def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict: array = dataframe['value'].as_matrix() window_size = 20 # window = np.ones(101) diff --git a/analytics/models/step_model.py b/analytics/models/step_model.py index 9b95c46..d735755 100644 --- a/analytics/models/step_model.py +++ b/analytics/models/step_model.py @@ -23,7 +23,7 @@ class StepModel(Model): 'DROP_LENGTH': 1, } - def fit(self, dataframe, segments): + def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict: self.segments = segments d_min = min(dataframe['value']) for i in range(0,len(dataframe['value'])): @@ -96,8 +96,7 @@ class StepModel(Model): else: self.state['DROP_LENGTH'] = 1 - - async def predict(self, dataframe): + async def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict: d_min = min(dataframe['value']) for i in range(0,len(dataframe['value'])): dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min diff --git a/analytics/server.py b/analytics/server.py index 2b49700..b4e1ed6 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -40,6 +40,7 @@ async def handle_task(task: object): '_id': task['_id'], 'task': task['type'], 'analyticUnitId': task['analyticUnitId'], + 'cache': task['cache'], 'status': "IN_PROGRESS" } diff --git a/server/src/config.ts b/server/src/config.ts index 66fc89d..652755d 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -13,6 +13,8 @@ export const DATA_PATH = path.join(__dirname, '../../data'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); +export const ANALYTIC_UNIT_CACHES_DATABASE_PATCH = path.join(DATA_PATH, 'analytic_unit_caches.db'); + export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index e390bc5..2108fe6 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -111,12 +111,14 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { } let pattern = analyticUnit.type; + // TODO: add cache let task = new AnalyticsTask( id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data } ); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); let result = await runTask(task); - let { lastPredictionTime, segments: predictedSegments } = await processLearningResult(result); + let { lastPredictionTime, segments: predictedSegments, cache } = await processLearningResult(result); + // TODO: save cache previousLastPredictionTime = analyticUnit.lastPredictionTime; await Promise.all([ @@ -136,7 +138,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { async function processLearningResult(taskResult: any): Promise<{ lastPredictionTime: number, - segments: Segment.Segment[] + segments: Segment.Segment[], + cache: any }> { if(taskResult.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { return Promise.reject(taskResult.error); @@ -153,7 +156,8 @@ async function processLearningResult(taskResult: any): Promise<{ return { lastPredictionTime: 0, - segments: [] + segments: [], + cache: {} }; } diff --git a/server/src/models/analytic_unit_cache_model.ts b/server/src/models/analytic_unit_cache_model.ts new file mode 100644 index 0000000..c916082 --- /dev/null +++ b/server/src/models/analytic_unit_cache_model.ts @@ -0,0 +1,56 @@ +import { AnalyticUnitId } from "./analytic_unit_model"; +import { Collection, makeDBQ } from '../services/data_service'; + + +let db = makeDBQ(Collection.ANALYTIC_UNIT_CACHES); + + +export type AnalyticUnitCacheId = string; + +export class AnalyticUnitCache { + public constructor( + public analyticUnitId: AnalyticUnitId, + public data: any, + public id?: AnalyticUnitCacheId, + ) { + + } + + public toObject() { + return { + _id: this.id, + analyticUnitId: self.analyticUnitId, + data: self.data, + obj._id + }; + } + + static fromObject(obj: any): AnalyticUnitCache { + if(obj.method === undefined) { + throw new Error('No method in obj:' + obj); + } + return new AnalyticUnitCache( + obj.method, + obj.data, + obj._id + ); + } +} + +export async function findById(id: AnalyticUnitCacheId): Promise { + let obj = await db.findOne(id); + return AnalyticUnitCache.fromObject(obj); +} + +export async function create(unit: AnalyticUnitCache): Promise { + let obj = unit.toObject(); + return db.insertOne(obj); +} + +export async function setData(id: AnalyticUnitCacheId, data: any) { + return db.updateOne(id, { data }); +} + +export async function remove(id: AnalyticUnitCacheId): Promise { + await db.removeOne(id); +} diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 559d4e7..091c4e9 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -89,6 +89,7 @@ export async function create(unit: AnalyticUnit): Promise { export async function remove(id: AnalyticUnitId): Promise { // TODO: remove it`s segments + // TODO: remove it`s cache await db.removeOne(id); } diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 799b3e6..f9d1210 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -4,7 +4,7 @@ import * as nedb from 'nedb'; import * as fs from 'fs'; -export enum Collection { ANALYTIC_UNITS, SEGMENTS }; +export enum Collection { ANALYTIC_UNITS, SEGMENTS, ANALYTIC_UNIT_CACHES }; /** @@ -180,3 +180,4 @@ checkDataFolders(); // TODO: it's better if models request db which we create if it`s needed db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true })); db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true })); +db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATCH, autoload: true }));