Browse Source

Threshold detector #324 (#330)

pull/1/head
rozetko 6 years ago committed by GitHub
parent
commit
68f47f78f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      analytics/analytics/analytic_unit_manager.py
  2. 6
      analytics/analytics/analytic_unit_worker.py
  3. 1
      analytics/analytics/detectors/__init__.py
  4. 4
      analytics/analytics/detectors/detector.py
  5. 60
      analytics/analytics/detectors/threshold_detector.py
  6. 1
      server/src/config.ts
  7. 133
      server/src/controllers/analytics_controller.ts
  8. 2
      server/src/index.ts
  9. 20
      server/src/models/analytic_unit_model.ts
  10. 70
      server/src/models/threshold_model.ts
  11. 59
      server/src/routes/threshold_router.ts
  12. 3
      server/src/services/data_service.ts

27
analytics/analytics/analytic_unit_manager.py

@ -15,8 +15,13 @@ WORKERS_EXECUTORS = 20
AnalyticUnitId = str
def get_detector_by_type(analytic_unit_type) -> detectors.Detector:
return detectors.PatternDetector(analytic_unit_type)
def get_detector_by_type(detector_type: str, analytic_unit_type: str) -> detectors.Detector:
if detector_type == 'pattern':
return detectors.PatternDetector(analytic_unit_type)
elif detector_type == 'threshold':
return detectors.ThresholdDetector()
raise ValueError('Unknown detector type "%s"' % detector_type)
def prepare_data(data: list):
"""
@ -40,11 +45,16 @@ class AnalyticUnitManager:
self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS)
def __ensure_worker(self, analytic_unit_id: AnalyticUnitId, analytic_unit_type) -> AnalyticUnitWorker:
def __ensure_worker(
self,
analytic_unit_id: AnalyticUnitId,
detector_type: str,
analytic_unit_type: str
) -> AnalyticUnitWorker:
if analytic_unit_id in self.analytic_workers:
# TODO: check that type is the same
return self.analytic_workers[analytic_unit_id]
detector = get_detector_by_type(analytic_unit_type)
detector = get_detector_by_type(detector_type, analytic_unit_type)
worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor)
self.analytic_workers[analytic_unit_id] = worker
return worker
@ -61,12 +71,17 @@ class AnalyticUnitManager:
return
payload = task['payload']
worker = self.__ensure_worker(analytic_unit_id, payload['pattern'])
worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType'])
data = prepare_data(payload['data'])
if task['type'] == 'PUSH':
return await worker.recieve_data(data, payload['cache'])
elif task['type'] == 'LEARN':
return await worker.do_train(payload['segments'], data, payload['cache'])
if 'segments' in payload:
return await worker.do_train(payload['segments'], data, payload['cache'])
elif 'threshold' in payload:
return await worker.do_train(payload['threshold'], data, payload['cache'])
else:
raise ValueError('No segments or threshold in LEARN payload')
elif task['type'] == 'DETECT':
return await worker.do_detect(data, payload['cache'])

6
analytics/analytics/analytic_unit_worker.py

@ -2,7 +2,7 @@ import config
import detectors
import logging
import pandas as pd
from typing import Optional
from typing import Optional, Union
from models import ModelCache
from concurrent.futures import Executor, CancelledError
import asyncio
@ -19,10 +19,10 @@ class AnalyticUnitWorker:
self._training_feature: asyncio.Future = None
async def do_train(
self, segments: list, data: pd.DataFrame, cache: Optional[ModelCache]
self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache]
) -> ModelCache:
self._training_feature = asyncio.get_event_loop().run_in_executor(
self._executor, self._detector.train, data, segments, cache
self._executor, self._detector.train, data, payload, cache
)
try:
new_cache: ModelCache = await self._training_feature

1
analytics/analytics/detectors/__init__.py

@ -1,2 +1,3 @@
from detectors.detector import Detector
from detectors.pattern_detector import PatternDetector
from detectors.threshold_detector import ThresholdDetector

4
analytics/analytics/detectors/detector.py

@ -1,13 +1,13 @@
from models import ModelCache
from abc import ABC, abstractmethod
from pandas import DataFrame
from typing import Optional
from typing import Optional, Union
class Detector(ABC):
@abstractmethod
def train(self, dataframe: DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache:
def train(self, dataframe: DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
"""
Should be thread-safe to other detectors' train method
"""

60
analytics/analytics/detectors/threshold_detector.py

@ -0,0 +1,60 @@
import logging
import pandas as pd
from typing import Optional
from detectors import Detector
from models import ModelCache
logger = logging.getLogger('THRESHOLD_DETECTOR')
class ThresholdDetector(Detector):
def __init__(self):
pass
def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache:
return {
'cache': {
'value': threshold['value'],
'condition': threshold['condition']
}
}
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
value = cache['value']
condition = cache['condition']
last_entry = dataframe.iloc[-1]
last_value = last_entry['value']
# TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6
last_time = last_entry['timestamp'].value / 1000000
segment = (last_time, last_time)
segments = []
if condition == '>':
if last_value > value:
segments.append(segment)
elif condition == '>=':
if last_value >= value:
segments.append(segment)
elif condition == '=':
if last_value == value:
segments.append(segment)
elif condition == '<=':
if last_value <= value:
segments.append(segment)
elif condition == '<':
if last_value < value:
segments.append(segment)
return {
'cache': cache,
'segments': segments,
'lastDetectionTime': last_time
}
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
return self.detect(self.bucket.data, cache)

1
server/src/config.ts

@ -15,6 +15,7 @@ export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units
export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const ANALYTIC_UNIT_CACHES_DATABASE_PATH = path.join(DATA_PATH, 'analytic_unit_caches.db');
export const PANELS_DATABASE_PATH = path.join(DATA_PATH, 'panels.db');
export const THRESHOLD_DATABASE_PATH = path.join(DATA_PATH, 'treshold.db');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');

133
server/src/controllers/analytics_controller.ts

@ -2,6 +2,7 @@ import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_me
import { AnalyticsTask, AnalyticsTaskType, AnalyticsTaskId } from '../models/analytics_task_model';
import * as AnalyticUnitCache from '../models/analytic_unit_cache_model';
import * as Segment from '../models/segment_model';
import * as Threshold from '../models/threshold_model';
import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from '../services/analytics_service';
import { sendWebhook } from '../services/notification_service';
@ -13,6 +14,7 @@ import { queryByMetric } from 'grafana-datasource-kit';
import * as _ from 'lodash';
const SECONDS_IN_MINUTE = 60;
type TaskResult = any;
type DetectionResult = any;
@ -87,6 +89,37 @@ async function runTask(task: AnalyticsTask): Promise<TaskResult> {
});
}
async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: AnalyticUnit.DetectorType) {
let range;
if(detector === AnalyticUnit.DetectorType.PATTERN) {
const segments = await Segment.findMany(analyticUnit.id, { labeled: true });
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}
range = getQueryRangeForLearningBySegments(segments);
} else if(detector === AnalyticUnit.DetectorType.THRESHOLD) {
const now = Date.now();
range = {
from: now - 5 * SECONDS_IN_MINUTE,
to: now
};
}
console.debug(`query time range: from ${new Date(range.from)} to ${new Date(range.to)}`);
const queryResult = await queryByMetric(
analyticUnit.metric,
analyticUnit.panelUrl,
range.from,
range.to,
HASTIC_API_KEY
);
const data = queryResult.values;
if(data.length === 0) {
throw new Error('Empty data to detect on');
}
return data;
}
/**
* Finds range for selecting subset for learning
* @param segments labeled segments
@ -112,25 +145,9 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let analyticUnit = await AnalyticUnit.findById(id);
if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) {
throw new Error('Can`t starn learning when it`s already started [' + id + ']');
throw new Error('Can`t start learning when it`s already started [' + id + ']');
}
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}
let segmentObjs = segments.map(s => s.toObject());
let { from, to } = getQueryRangeForLearningBySegments(segments);
console.debug(`query time range: from ${new Date(from)} to ${new Date(to)}`);
let queryResult = await queryByMetric(analyticUnit.metric, analyticUnit.panelUrl, from, to, HASTIC_API_KEY);
let data = queryResult.values;
if(data.length === 0) {
throw new Error('Empty data to learn on');
}
let pattern = analyticUnit.type;
let oldCache = await AnalyticUnitCache.findById(id);
if(oldCache !== null) {
oldCache = oldCache.data;
@ -138,12 +155,31 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
await AnalyticUnitCache.create(id);
}
let deletedSegments = await Segment.findMany(id, { deleted: true });
let deletedSegmentsObjs = deletedSegments.map(s => s.toObject());
segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs);
let analyticUnitType = analyticUnit.type;
let detector = AnalyticUnit.getDetectorByType(analyticUnitType);
let taskPayload: any = { detector, analyticUnitType, cache: oldCache };
if(detector === AnalyticUnit.DetectorType.PATTERN) {
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}
let segmentObjs = segments.map(s => s.toObject());
let deletedSegments = await Segment.findMany(id, { deleted: true });
let deletedSegmentsObjs = deletedSegments.map(s => s.toObject());
segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs);
taskPayload.segments = segmentObjs;
} else if(detector === AnalyticUnit.DetectorType.THRESHOLD) {
const threshold = await Threshold.findOne(id);
taskPayload.threshold = threshold;
}
taskPayload.data = await query(analyticUnit, detector);
let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data, cache: oldCache }
id, AnalyticsTaskType.LEARN, taskPayload
);
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
console.debug(`run task, id:${id}`);
@ -165,20 +201,10 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) {
try {
let unit = await AnalyticUnit.findById(id);
previousLastDetectionTime = unit.lastDetectionTime;
let pattern = unit.type;
let analyticUnitType = unit.type;
let detector = AnalyticUnit.getDetectorByType(analyticUnitType);
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}
let { from, to } = getQueryRangeForLearningBySegments(segments);
console.debug(`query time range: from ${new Date(from)} to ${new Date(to)}`);
let queryResult = await queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY);
let data = queryResult.values;
if(data.length === 0) {
throw new Error('Empty data to detect on');
}
const data = await query(unit, detector);
let oldCache = await AnalyticUnitCache.findById(id);
if(oldCache !== null) {
@ -189,7 +215,7 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) {
let task = new AnalyticsTask(
id,
AnalyticsTaskType.DETECT,
{ pattern, lastDetectionTime: unit.lastDetectionTime, data, cache: oldCache }
{ detector, analyticUnitType, lastDetectionTime: unit.lastDetectionTime, data, cache: oldCache }
);
console.debug(`run task, id:${id}`);
let result = await runTask(task);
@ -215,7 +241,7 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) {
await Promise.all([
Segment.insertSegments(payload.segments),
AnalyticUnitCache.setData(id, payload.cache),
AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime),
AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime),
]);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch(err) {
@ -245,7 +271,7 @@ export async function deleteNonDetectedSegments(id, payload) {
Segment.removeSegments(segmentsToRemove.map(s => s.id));
}
async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult):
async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult):
Promise<{
lastDetectionTime: number,
segments: Segment.Segment[],
@ -264,18 +290,16 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI
segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false)
);
const analyticUnit = await AnalyticUnit.findById(analyticUnitId);
if(analyticUnit.alert) {
if(!_.isEmpty(segments)) {
try {
sendWebhook(analyticUnit.name, _.last(segments));
} catch(err) {
console.error(`Error while sending webhook: ${err.message}`);
}
if (!_.isEmpty(segments) && analyticUnit.alert) {
try {
sendWebhook(analyticUnit.name, _.last(segments));
} catch(err) {
console.error(`Error while sending webhook: ${err.message}`);
}
}
return {
lastDetectionTime: detectionResult.lastDetectionTime,
segments: segments,
segments,
cache: detectionResult.cache
};
@ -319,8 +343,23 @@ export async function updateSegments(
]);
removed = removed.map(s => s._id);
runFirstLearning(id);
return { addedIds, removed };
}
export async function updateThreshold(
id: AnalyticUnit.AnalyticUnitId,
value: number,
condition: Threshold.Condition
) {
await Threshold.updateThreshold(id, value, condition);
runFirstLearning(id);
}
async function runFirstLearning(id: AnalyticUnit.AnalyticUnitId) {
// TODO: move setting status somehow "inside" learning
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING);
runLearning(id).then(() => runDetect(id));
return { addedIds, removed };
runLearning(id)
.then(() => runDetect(id));
}

2
server/src/index.ts

@ -1,6 +1,7 @@
import { router as analyticUnitsRouter } from './routes/analytic_units_router';
import { router as segmentsRouter } from './routes/segments_router';
import { router as panelRouter } from './routes/panel_router';
import { router as thresholdRouter } from './routes/threshold_router';
import * as AnalyticsController from './controllers/analytics_controller';
@ -38,6 +39,7 @@ var rootRouter = new Router();
rootRouter.use('/analyticUnits', analyticUnitsRouter.routes(), analyticUnitsRouter.allowedMethods());
rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods());
rootRouter.use('/panel', panelRouter.routes(), panelRouter.allowedMethods());
rootRouter.use('/threshold', thresholdRouter.routes(), thresholdRouter.allowedMethods());
rootRouter.get('/', async (ctx) => {
ctx.response.body = {

20
server/src/models/analytic_unit_model.ts

@ -2,6 +2,7 @@ import { Collection, makeDBQ } from '../services/data_service';
import { Metric } from 'grafana-datasource-kit';
import * as _ from 'lodash';
let db = makeDBQ(Collection.ANALYTIC_UNITS);
@ -37,6 +38,11 @@ export const ANALYTIC_UNIT_TYPES = {
]
};
export enum DetectorType {
PATTERN = 'pattern',
THRESHOLD = 'threshold'
};
export type AnalyticUnitId = string;
export enum AnalyticUnitStatus {
READY = 'READY',
@ -167,3 +173,17 @@ export async function setDetectionTime(id: AnalyticUnitId, lastDetectionTime: nu
export async function setAlert(id: AnalyticUnitId, alert: boolean) {
return db.updateOne(id, { alert });
}
export function getDetectorByType(analyticUnitType: string): DetectorType {
let detector;
_.forOwn(ANALYTIC_UNIT_TYPES, (types, detectorType) => {
if(_.find(types, { value: analyticUnitType }) !== undefined) {
detector = detectorType;
}
});
if(detector === undefined) {
throw new Error(`Can't find detector for analytic unit of type "${analyticUnitType}"`);
}
return detector;
}

70
server/src/models/threshold_model.ts

@ -0,0 +1,70 @@
import { AnalyticUnitId } from './analytic_unit_model';
import { Collection, makeDBQ } from '../services/data_service';
import * as _ from 'lodash';
let db = makeDBQ(Collection.THRESHOLD);
export enum Condition {
ABOVE = '>',
ABOVE_OR_EQUAL = '>=',
EQUAL = '=',
LESS_OR_EQUAL = '<=',
LESS = '<'
};
export class Threshold {
constructor(
public id: AnalyticUnitId,
public value: number,
public condition: Condition
) {
if(id === undefined) {
throw new Error('id is undefined');
}
if(value === undefined) {
throw new Error('condition is undefined');
}
if(condition === undefined) {
throw new Error('condition is undefined');
}
}
public toObject() {
return {
_id: this.id,
value: this.value,
condition: this.condition
};
}
static fromObject(obj: any): Threshold {
if(obj === undefined) {
throw new Error('obj is undefined');
}
return new Threshold(obj._id, +obj.value, obj.condition);
}
}
export async function findOne(id: AnalyticUnitId): Promise<Threshold | null> {
const query: any = { id };
const threshold = await db.findOne(query);
if(threshold === null) {
return null;
}
return Threshold.fromObject(threshold);
}
export async function updateThreshold(id: AnalyticUnitId, value: number, condition: Condition) {
if(findOne(id) === null) {
return db.insertOne({ id, value, condition });
}
return db.updateOne({ id }, { value, condition });
}
export async function removeThreshold(id: AnalyticUnitId) {
return db.removeOne(id);
}

59
server/src/routes/threshold_router.ts

@ -0,0 +1,59 @@
import * as AnalyticsController from '../controllers/analytics_controller';
import { AnalyticUnitId } from '../models/analytic_unit_model';
import * as Threshold from '../models/threshold_model';
import * as Router from 'koa-router';
import * as _ from 'lodash';
async function getThresholds(ctx: Router.IRouterContext) {
try {
const ids: AnalyticUnitId = ctx.request.query.ids;
if(ids === undefined || _.isEmpty(ids)) {
throw new Error('analyticUnitIds (ids) are missing');
}
const thresholds = await Promise.all(
_.map(ids, id => Threshold.findOne(id))
);
ctx.response.body = { thresholds };
} catch(e) {
console.error(e);
ctx.response.status = 500;
ctx.response.body = {
code: 500,
message: `GET /threshold error: ${e.message}`
};
}
}
async function updateThreshold(ctx: Router.IRouterContext) {
try {
const {
id, value, condition
} = ctx.request.body as {
id: AnalyticUnitId, value: number, condition: Threshold.Condition
};
await AnalyticsController.updateThreshold(id, value, condition);
ctx.response.body = {
code: 200,
message: 'Success'
};
} catch(e) {
console.error(e);
ctx.response.status = 500;
ctx.response.body = {
code: 500,
message: `PATCH /threshold error: ${e.message}`
};
}
}
export const router = new Router();
router.get('/', getThresholds);
router.patch('/', updateThreshold);

3
server/src/services/data_service.ts

@ -4,7 +4,7 @@ import * as nedb from 'nedb';
import * as fs from 'fs';
export enum Collection { ANALYTIC_UNITS, SEGMENTS, ANALYTIC_UNIT_CACHES, PANELS };
export enum Collection { ANALYTIC_UNITS, SEGMENTS, ANALYTIC_UNIT_CACHES, PANELS, THRESHOLD };
/**
@ -212,3 +212,4 @@ db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DAT
db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }));
db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true }));
db.set(Collection.PANELS, new nedb({ filename: config.PANELS_DATABASE_PATH, autoload: true }));
db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true }));

Loading…
Cancel
Save