Browse Source

Analytic unit cache start #117 (#120)

pull/1/head
Alexey Velikiy 6 years ago committed by rozetko
parent
commit
955f2821e3
  1. 4
      analytics/analytic_unit_manager.py
  2. 4
      analytics/detectors/detector.py
  3. 4
      analytics/detectors/general_detector/general_detector.py
  4. 12
      analytics/detectors/pattern_detector.py
  5. 5
      analytics/models/custom_model.py
  6. 9
      analytics/models/jump_model.py
  7. 4
      analytics/models/model.py
  8. 5
      analytics/models/peaks_model.py
  9. 5
      analytics/models/step_model.py
  10. 1
      analytics/server.py
  11. 2
      server/src/config.ts
  12. 10
      server/src/controllers/analytics_controller.ts
  13. 56
      server/src/models/analytic_unit_cache_model.ts
  14. 1
      server/src/models/analytic_unit_model.ts
  15. 3
      server/src/services/data_service.ts

4
analytics/analytic_unit_manager.py

@ -37,9 +37,9 @@ async def handle_analytic_task(task):
data['timestamp'] = pd.to_datetime(data['timestamp']) data['timestamp'] = pd.to_datetime(data['timestamp'])
result_payload = {} result_payload = {}
if task['type'] == "LEARN": if task['type'] == "LEARN":
await worker.do_learn(payload['segments'], data) result_payload = await worker.do_learn(payload['segments'], data, payload['cache'])
elif task['type'] == "PREDICT": elif task['type'] == "PREDICT":
result_payload = await worker.do_predict(data) result_payload = await worker.do_predict(data, payload['cache'])
else: else:
raise ValueError('Unknown task type "%s"' % task['type']) raise ValueError('Unknown task type "%s"' % task['type'])
return { return {

4
analytics/detectors/detector.py

@ -5,9 +5,9 @@ from pandas import DataFrame
class Detector(ABC): class Detector(ABC):
@abstractmethod @abstractmethod
async def train(self, dataframe: DataFrame, segments: list): async def train(self, dataframe: DataFrame, segments: list, cache: dict) -> dict:
pass pass
@abstractmethod @abstractmethod
async def predict(self, dataframe: DataFrame) -> list: async def predict(self, dataframe: DataFrame, cache: dict) -> dict:
pass pass

4
analytics/detectors/general_detector/general_detector.py

@ -18,7 +18,7 @@ class GeneralDetector(Detector):
def __init__(self): def __init__(self):
self.model = None self.model = None
async def train(self, dataframe, segments): async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict):
confidence = 0.02 confidence = 0.02
start_index, stop_index = 0, len(dataframe) start_index, stop_index = 0, len(dataframe)
@ -45,7 +45,7 @@ class GeneralDetector(Detector):
logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name)
return last_prediction_time return last_prediction_time
async def predict(self, data): async def predict(self, dataframe: pd.DataFrame, cache: dict):
logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name) logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name)
last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms')

12
analytics/detectors/pattern_detector.py

@ -30,13 +30,13 @@ class PatternDetector(Detector):
self.model = resolve_model_by_pattern(self.pattern_type) self.model = resolve_model_by_pattern(self.pattern_type)
window_size = 100 window_size = 100
async def train(self, dataframe: pd.DataFrame, segments: list): async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict):
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
self.model.fit(dataframe, segments) self.model.fit(dataframe, segments)
# TODO: save model after fit # TODO: save model after fit
return 0 return cache
async def predict(self, dataframe: pd.DataFrame): async def predict(self, dataframe: pd.DataFrame, cache: dict):
predicted_indexes = await self.model.predict(dataframe) predicted_indexes = await self.model.predict(dataframe)
segments = [] segments = []
@ -50,4 +50,8 @@ class PatternDetector(Detector):
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_prediction_time = int(last_dataframe_time.timestamp() * 1000) last_prediction_time = int(last_dataframe_time.timestamp() * 1000)
return segments, last_prediction_time return {
'cache': cache,
'segments': segments,
'last_prediction_time': last_prediction_time
}

5
analytics/models/custom_model.py

@ -1,5 +1,6 @@
from models import Model from models import Model
import utils import utils
import pandas as pd
# Paste your model here: # Paste your model here:
class CustomModel(Model): class CustomModel(Model):
@ -10,8 +11,8 @@ class CustomModel(Model):
# It will be saved in filesystem and loaded after server restart # It will be saved in filesystem and loaded after server restart
self.state = {} self.state = {}
def fit(self, dataframe, segments): def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict:
pass pass
def predict(self, dataframe): def predict(self, dataframe, cache: dict):
return [] return []

9
analytics/models/jump_model.py

@ -2,6 +2,7 @@ from models import Model
import utils import utils
import numpy as np import numpy as np
import pandas as pd
import scipy.signal import scipy.signal
from scipy.fftpack import fft from scipy.fftpack import fft
from scipy.signal import argrelextrema from scipy.signal import argrelextrema
@ -25,7 +26,7 @@ class JumpModel(Model):
'JUMP_LENGTH': 1, 'JUMP_LENGTH': 1,
} }
def fit(self, dataframe, segments): def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict:
self.segments = segments self.segments = segments
data = dataframe['value'] data = dataframe['value']
confidences = [] confidences = []
@ -87,9 +88,9 @@ class JumpModel(Model):
if len(jump_length_list) > 0: if len(jump_length_list) > 0:
self.state['JUMP_LENGTH'] = max(jump_length_list) self.state['JUMP_LENGTH'] = max(jump_length_list)
else: else:
self.state['JUMP_LENGTH'] = 1 self.state['JUMP_LENGTH'] = 1
def predict(self, dataframe): def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict:
data = dataframe['value'] data = dataframe['value']
result = self.__predict(data) result = self.__predict(data)

4
analytics/models/model.py

@ -5,9 +5,9 @@ from pandas import DataFrame
class Model(ABC): class Model(ABC):
@abstractmethod @abstractmethod
def fit(self, dataframe: DataFrame, segments: list): def fit(self, dataframe: DataFrame, segments: list, cache: dict) -> dict:
pass pass
@abstractmethod @abstractmethod
def predict(self, dataframe: DataFrame) -> list: def predict(self, dataframe: DataFrame, cache: dict) -> dict:
pass pass

5
analytics/models/peaks_model.py

@ -3,6 +3,7 @@ from models import Model
import utils import utils
from scipy import signal from scipy import signal
import numpy as np import numpy as np
import pandas as pd
class PeaksModel(Model): class PeaksModel(Model):
@ -10,10 +11,10 @@ class PeaksModel(Model):
def __init__(self): def __init__(self):
super() super()
def fit(self, dataset, contamination=0.005): def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict:
pass pass
def predict(self, dataframe): def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict:
array = dataframe['value'].as_matrix() array = dataframe['value'].as_matrix()
window_size = 20 window_size = 20
# window = np.ones(101) # window = np.ones(101)

5
analytics/models/step_model.py

@ -23,7 +23,7 @@ class StepModel(Model):
'DROP_LENGTH': 1, 'DROP_LENGTH': 1,
} }
def fit(self, dataframe, segments): def fit(self, dataframe: pd.DataFrame, segments: list, cache: dict) -> dict:
self.segments = segments self.segments = segments
d_min = min(dataframe['value']) d_min = min(dataframe['value'])
for i in range(0,len(dataframe['value'])): for i in range(0,len(dataframe['value'])):
@ -96,8 +96,7 @@ class StepModel(Model):
else: else:
self.state['DROP_LENGTH'] = 1 self.state['DROP_LENGTH'] = 1
async def predict(self, dataframe: pd.DataFrame, cache: dict) -> dict:
async def predict(self, dataframe):
d_min = min(dataframe['value']) d_min = min(dataframe['value'])
for i in range(0,len(dataframe['value'])): for i in range(0,len(dataframe['value'])):
dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min

1
analytics/server.py

@ -40,6 +40,7 @@ async def handle_task(task: object):
'_id': task['_id'], '_id': task['_id'],
'task': task['type'], 'task': task['type'],
'analyticUnitId': task['analyticUnitId'], 'analyticUnitId': task['analyticUnitId'],
'cache': task['cache'],
'status': "IN_PROGRESS" 'status': "IN_PROGRESS"
} }

2
server/src/config.ts

@ -13,6 +13,8 @@ export const DATA_PATH = path.join(__dirname, '../../data');
export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db');
export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const ANALYTIC_UNIT_CACHES_DATABASE_PATCH = path.join(DATA_PATH, 'analytic_unit_caches.db');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');
export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null);

10
server/src/controllers/analytics_controller.ts

@ -111,12 +111,14 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
} }
let pattern = analyticUnit.type; let pattern = analyticUnit.type;
// TODO: add cache
let task = new AnalyticsTask( let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data } id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data }
); );
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
let result = await runTask(task); let result = await runTask(task);
let { lastPredictionTime, segments: predictedSegments } = await processLearningResult(result); let { lastPredictionTime, segments: predictedSegments, cache } = await processLearningResult(result);
// TODO: save cache
previousLastPredictionTime = analyticUnit.lastPredictionTime; previousLastPredictionTime = analyticUnit.lastPredictionTime;
await Promise.all([ await Promise.all([
@ -136,7 +138,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
async function processLearningResult(taskResult: any): Promise<{ async function processLearningResult(taskResult: any): Promise<{
lastPredictionTime: number, lastPredictionTime: number,
segments: Segment.Segment[] segments: Segment.Segment[],
cache: any
}> { }> {
if(taskResult.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { if(taskResult.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) {
return Promise.reject(taskResult.error); return Promise.reject(taskResult.error);
@ -153,7 +156,8 @@ async function processLearningResult(taskResult: any): Promise<{
return { return {
lastPredictionTime: 0, lastPredictionTime: 0,
segments: [] segments: [],
cache: {}
}; };
} }

56
server/src/models/analytic_unit_cache_model.ts

@ -0,0 +1,56 @@
import { AnalyticUnitId } from "./analytic_unit_model";
import { Collection, makeDBQ } from '../services/data_service';
let db = makeDBQ(Collection.ANALYTIC_UNIT_CACHES);
export type AnalyticUnitCacheId = string;
export class AnalyticUnitCache {
public constructor(
public analyticUnitId: AnalyticUnitId,
public data: any,
public id?: AnalyticUnitCacheId,
) {
}
public toObject() {
return {
_id: this.id,
analyticUnitId: self.analyticUnitId,
data: self.data,
obj._id
};
}
static fromObject(obj: any): AnalyticUnitCache {
if(obj.method === undefined) {
throw new Error('No method in obj:' + obj);
}
return new AnalyticUnitCache(
obj.method,
obj.data,
obj._id
);
}
}
export async function findById(id: AnalyticUnitCacheId): Promise<AnalyticUnitCache> {
let obj = await db.findOne(id);
return AnalyticUnitCache.fromObject(obj);
}
export async function create(unit: AnalyticUnitCache): Promise<AnalyticUnitCacheId> {
let obj = unit.toObject();
return db.insertOne(obj);
}
export async function setData(id: AnalyticUnitCacheId, data: any) {
return db.updateOne(id, { data });
}
export async function remove(id: AnalyticUnitCacheId): Promise<void> {
await db.removeOne(id);
}

1
server/src/models/analytic_unit_model.ts

@ -89,6 +89,7 @@ export async function create(unit: AnalyticUnit): Promise<AnalyticUnitId> {
export async function remove(id: AnalyticUnitId): Promise<void> { export async function remove(id: AnalyticUnitId): Promise<void> {
// TODO: remove it`s segments // TODO: remove it`s segments
// TODO: remove it`s cache
await db.removeOne(id); await db.removeOne(id);
} }

3
server/src/services/data_service.ts

@ -4,7 +4,7 @@ import * as nedb from 'nedb';
import * as fs from 'fs'; import * as fs from 'fs';
export enum Collection { ANALYTIC_UNITS, SEGMENTS }; export enum Collection { ANALYTIC_UNITS, SEGMENTS, ANALYTIC_UNIT_CACHES };
/** /**
@ -180,3 +180,4 @@ checkDataFolders();
// TODO: it's better if models request db which we create if it`s needed // TODO: it's better if models request db which we create if it`s needed
db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true })); db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }));
db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true })); db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }));
db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATCH, autoload: true }));

Loading…
Cancel
Save