diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index 2f0938d..2111159 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -15,8 +15,13 @@ WORKERS_EXECUTORS = 20 AnalyticUnitId = str -def get_detector_by_type(analytic_unit_type) -> detectors.Detector: - return detectors.PatternDetector(analytic_unit_type) +def get_detector_by_type(detector_type: str, analytic_unit_type: str) -> detectors.Detector: + if detector_type == 'pattern': + return detectors.PatternDetector(analytic_unit_type) + elif detector_type == 'threshold': + return detectors.ThresholdDetector() + + raise ValueError('Unknown detector type "%s"' % detector_type) def prepare_data(data: list): """ @@ -40,11 +45,16 @@ class AnalyticUnitManager: self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict() self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS) - def __ensure_worker(self, analytic_unit_id: AnalyticUnitId, analytic_unit_type) -> AnalyticUnitWorker: + def __ensure_worker( + self, + analytic_unit_id: AnalyticUnitId, + detector_type: str, + analytic_unit_type: str + ) -> AnalyticUnitWorker: if analytic_unit_id in self.analytic_workers: # TODO: check that type is the same return self.analytic_workers[analytic_unit_id] - detector = get_detector_by_type(analytic_unit_type) + detector = get_detector_by_type(detector_type, analytic_unit_type) worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor) self.analytic_workers[analytic_unit_id] = worker return worker @@ -61,12 +71,17 @@ class AnalyticUnitManager: return payload = task['payload'] - worker = self.__ensure_worker(analytic_unit_id, payload['pattern']) + worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType']) data = prepare_data(payload['data']) if task['type'] == 'PUSH': return await worker.recieve_data(data, payload['cache']) elif task['type'] == 'LEARN': - return await worker.do_train(payload['segments'], data, payload['cache']) + if 'segments' in payload: + return await worker.do_train(payload['segments'], data, payload['cache']) + elif 'threshold' in payload: + return await worker.do_train(payload['threshold'], data, payload['cache']) + else: + raise ValueError('No segments or threshold in LEARN payload') elif task['type'] == 'DETECT': return await worker.do_detect(data, payload['cache']) diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 4fa3d75..02f7cff 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -2,7 +2,7 @@ import config import detectors import logging import pandas as pd -from typing import Optional +from typing import Optional, Union from models import ModelCache from concurrent.futures import Executor, CancelledError import asyncio @@ -19,10 +19,10 @@ class AnalyticUnitWorker: self._training_feature: asyncio.Future = None async def do_train( - self, segments: list, data: pd.DataFrame, cache: Optional[ModelCache] + self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache] ) -> ModelCache: self._training_feature = asyncio.get_event_loop().run_in_executor( - self._executor, self._detector.train, data, segments, cache + self._executor, self._detector.train, data, payload, cache ) try: new_cache: ModelCache = await self._training_feature diff --git a/analytics/analytics/detectors/__init__.py b/analytics/analytics/detectors/__init__.py index 1357706..7c99479 100644 --- a/analytics/analytics/detectors/__init__.py +++ b/analytics/analytics/detectors/__init__.py @@ -1,2 +1,3 @@ from detectors.detector import Detector from detectors.pattern_detector import PatternDetector +from detectors.threshold_detector import ThresholdDetector diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 063f6a7..b0738a4 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -1,13 +1,13 @@ from models import ModelCache from abc import ABC, abstractmethod from pandas import DataFrame -from typing import Optional +from typing import Optional, Union class Detector(ABC): @abstractmethod - def train(self, dataframe: DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache: + def train(self, dataframe: DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: """ Should be thread-safe to other detectors' train method """ diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py new file mode 100644 index 0000000..b2c1716 --- /dev/null +++ b/analytics/analytics/detectors/threshold_detector.py @@ -0,0 +1,60 @@ +import logging + +import pandas as pd +from typing import Optional + +from detectors import Detector +from models import ModelCache + + +logger = logging.getLogger('THRESHOLD_DETECTOR') + + +class ThresholdDetector(Detector): + + def __init__(self): + pass + + def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache: + return { + 'cache': { + 'value': threshold['value'], + 'condition': threshold['condition'] + } + } + + def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: + value = cache['value'] + condition = cache['condition'] + + last_entry = dataframe.iloc[-1] + last_value = last_entry['value'] + # TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6 + last_time = last_entry['timestamp'].value / 1000000 + + segment = (last_time, last_time) + segments = [] + if condition == '>': + if last_value > value: + segments.append(segment) + elif condition == '>=': + if last_value >= value: + segments.append(segment) + elif condition == '=': + if last_value == value: + segments.append(segment) + elif condition == '<=': + if last_value <= value: + segments.append(segment) + elif condition == '<': + if last_value < value: + segments.append(segment) + + return { + 'cache': cache, + 'segments': segments, + 'lastDetectionTime': last_time + } + + def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: + return self.detect(self.bucket.data, cache) diff --git a/server/src/config.ts b/server/src/config.ts index 9a07d3e..adf2ea3 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -15,6 +15,7 @@ export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const ANALYTIC_UNIT_CACHES_DATABASE_PATH = path.join(DATA_PATH, 'analytic_unit_caches.db'); export const PANELS_DATABASE_PATH = path.join(DATA_PATH, 'panels.db'); +export const THRESHOLD_DATABASE_PATH = path.join(DATA_PATH, 'treshold.db'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 65e6fe9..a8e95e5 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -2,6 +2,7 @@ import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_me import { AnalyticsTask, AnalyticsTaskType, AnalyticsTaskId } from '../models/analytics_task_model'; import * as AnalyticUnitCache from '../models/analytic_unit_cache_model'; import * as Segment from '../models/segment_model'; +import * as Threshold from '../models/threshold_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; import { sendWebhook } from '../services/notification_service'; @@ -13,6 +14,7 @@ import { queryByMetric } from 'grafana-datasource-kit'; import * as _ from 'lodash'; +const SECONDS_IN_MINUTE = 60; type TaskResult = any; type DetectionResult = any; @@ -87,6 +89,37 @@ async function runTask(task: AnalyticsTask): Promise { }); } +async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: AnalyticUnit.DetectorType) { + let range; + if(detector === AnalyticUnit.DetectorType.PATTERN) { + const segments = await Segment.findMany(analyticUnit.id, { labeled: true }); + if(segments.length === 0) { + throw new Error('Need at least 1 labeled segment'); + } + + range = getQueryRangeForLearningBySegments(segments); + } else if(detector === AnalyticUnit.DetectorType.THRESHOLD) { + const now = Date.now(); + range = { + from: now - 5 * SECONDS_IN_MINUTE, + to: now + }; + } + console.debug(`query time range: from ${new Date(range.from)} to ${new Date(range.to)}`); + const queryResult = await queryByMetric( + analyticUnit.metric, + analyticUnit.panelUrl, + range.from, + range.to, + HASTIC_API_KEY + ); + const data = queryResult.values; + if(data.length === 0) { + throw new Error('Empty data to detect on'); + } + return data; +} + /** * Finds range for selecting subset for learning * @param segments labeled segments @@ -112,25 +145,9 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { let analyticUnit = await AnalyticUnit.findById(id); if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { - throw new Error('Can`t starn learning when it`s already started [' + id + ']'); + throw new Error('Can`t start learning when it`s already started [' + id + ']'); } - let segments = await Segment.findMany(id, { labeled: true }); - if(segments.length === 0) { - throw new Error('Need at least 1 labeled segment'); - } - - let segmentObjs = segments.map(s => s.toObject()); - - let { from, to } = getQueryRangeForLearningBySegments(segments); - console.debug(`query time range: from ${new Date(from)} to ${new Date(to)}`); - let queryResult = await queryByMetric(analyticUnit.metric, analyticUnit.panelUrl, from, to, HASTIC_API_KEY); - let data = queryResult.values; - if(data.length === 0) { - throw new Error('Empty data to learn on'); - } - - let pattern = analyticUnit.type; let oldCache = await AnalyticUnitCache.findById(id); if(oldCache !== null) { oldCache = oldCache.data; @@ -138,12 +155,31 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { await AnalyticUnitCache.create(id); } - let deletedSegments = await Segment.findMany(id, { deleted: true }); - let deletedSegmentsObjs = deletedSegments.map(s => s.toObject()); - segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs); + let analyticUnitType = analyticUnit.type; + let detector = AnalyticUnit.getDetectorByType(analyticUnitType); + let taskPayload: any = { detector, analyticUnitType, cache: oldCache }; + + if(detector === AnalyticUnit.DetectorType.PATTERN) { + let segments = await Segment.findMany(id, { labeled: true }); + if(segments.length === 0) { + throw new Error('Need at least 1 labeled segment'); + } + + let segmentObjs = segments.map(s => s.toObject()); + + let deletedSegments = await Segment.findMany(id, { deleted: true }); + let deletedSegmentsObjs = deletedSegments.map(s => s.toObject()); + segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs); + taskPayload.segments = segmentObjs; + } else if(detector === AnalyticUnit.DetectorType.THRESHOLD) { + const threshold = await Threshold.findOne(id); + taskPayload.threshold = threshold; + } + + taskPayload.data = await query(analyticUnit, detector); let task = new AnalyticsTask( - id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data, cache: oldCache } + id, AnalyticsTaskType.LEARN, taskPayload ); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); console.debug(`run task, id:${id}`); @@ -165,20 +201,10 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) { try { let unit = await AnalyticUnit.findById(id); previousLastDetectionTime = unit.lastDetectionTime; - let pattern = unit.type; + let analyticUnitType = unit.type; + let detector = AnalyticUnit.getDetectorByType(analyticUnitType); - let segments = await Segment.findMany(id, { labeled: true }); - if(segments.length === 0) { - throw new Error('Need at least 1 labeled segment'); - } - - let { from, to } = getQueryRangeForLearningBySegments(segments); - console.debug(`query time range: from ${new Date(from)} to ${new Date(to)}`); - let queryResult = await queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY); - let data = queryResult.values; - if(data.length === 0) { - throw new Error('Empty data to detect on'); - } + const data = await query(unit, detector); let oldCache = await AnalyticUnitCache.findById(id); if(oldCache !== null) { @@ -189,7 +215,7 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) { let task = new AnalyticsTask( id, AnalyticsTaskType.DETECT, - { pattern, lastDetectionTime: unit.lastDetectionTime, data, cache: oldCache } + { detector, analyticUnitType, lastDetectionTime: unit.lastDetectionTime, data, cache: oldCache } ); console.debug(`run task, id:${id}`); let result = await runTask(task); @@ -215,7 +241,7 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) { await Promise.all([ Segment.insertSegments(payload.segments), AnalyticUnitCache.setData(id, payload.cache), - AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime), + AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime), ]); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); } catch(err) { @@ -245,7 +271,7 @@ export async function deleteNonDetectedSegments(id, payload) { Segment.removeSegments(segmentsToRemove.map(s => s.id)); } -async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult): +async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult): Promise<{ lastDetectionTime: number, segments: Segment.Segment[], @@ -264,18 +290,16 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false) ); const analyticUnit = await AnalyticUnit.findById(analyticUnitId); - if(analyticUnit.alert) { - if(!_.isEmpty(segments)) { - try { - sendWebhook(analyticUnit.name, _.last(segments)); - } catch(err) { - console.error(`Error while sending webhook: ${err.message}`); - } + if (!_.isEmpty(segments) && analyticUnit.alert) { + try { + sendWebhook(analyticUnit.name, _.last(segments)); + } catch(err) { + console.error(`Error while sending webhook: ${err.message}`); } } return { lastDetectionTime: detectionResult.lastDetectionTime, - segments: segments, + segments, cache: detectionResult.cache }; @@ -319,8 +343,23 @@ export async function updateSegments( ]); removed = removed.map(s => s._id); + runFirstLearning(id); + return { addedIds, removed }; +} + +export async function updateThreshold( + id: AnalyticUnit.AnalyticUnitId, + value: number, + condition: Threshold.Condition +) { + await Threshold.updateThreshold(id, value, condition); + + runFirstLearning(id); +} + +async function runFirstLearning(id: AnalyticUnit.AnalyticUnitId) { // TODO: move setting status somehow "inside" learning await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING); - runLearning(id).then(() => runDetect(id)); - return { addedIds, removed }; + runLearning(id) + .then(() => runDetect(id)); } diff --git a/server/src/index.ts b/server/src/index.ts index 2c830f6..a7d3b22 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -1,6 +1,7 @@ import { router as analyticUnitsRouter } from './routes/analytic_units_router'; import { router as segmentsRouter } from './routes/segments_router'; import { router as panelRouter } from './routes/panel_router'; +import { router as thresholdRouter } from './routes/threshold_router'; import * as AnalyticsController from './controllers/analytics_controller'; @@ -38,6 +39,7 @@ var rootRouter = new Router(); rootRouter.use('/analyticUnits', analyticUnitsRouter.routes(), analyticUnitsRouter.allowedMethods()); rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods()); rootRouter.use('/panel', panelRouter.routes(), panelRouter.allowedMethods()); +rootRouter.use('/threshold', thresholdRouter.routes(), thresholdRouter.allowedMethods()); rootRouter.get('/', async (ctx) => { ctx.response.body = { diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 24531f9..9cdaae0 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -2,6 +2,7 @@ import { Collection, makeDBQ } from '../services/data_service'; import { Metric } from 'grafana-datasource-kit'; +import * as _ from 'lodash'; let db = makeDBQ(Collection.ANALYTIC_UNITS); @@ -37,6 +38,11 @@ export const ANALYTIC_UNIT_TYPES = { ] }; +export enum DetectorType { + PATTERN = 'pattern', + THRESHOLD = 'threshold' +}; + export type AnalyticUnitId = string; export enum AnalyticUnitStatus { READY = 'READY', @@ -167,3 +173,17 @@ export async function setDetectionTime(id: AnalyticUnitId, lastDetectionTime: nu export async function setAlert(id: AnalyticUnitId, alert: boolean) { return db.updateOne(id, { alert }); } + +export function getDetectorByType(analyticUnitType: string): DetectorType { + let detector; + _.forOwn(ANALYTIC_UNIT_TYPES, (types, detectorType) => { + if(_.find(types, { value: analyticUnitType }) !== undefined) { + detector = detectorType; + } + }); + + if(detector === undefined) { + throw new Error(`Can't find detector for analytic unit of type "${analyticUnitType}"`); + } + return detector; +} diff --git a/server/src/models/threshold_model.ts b/server/src/models/threshold_model.ts new file mode 100644 index 0000000..611ff15 --- /dev/null +++ b/server/src/models/threshold_model.ts @@ -0,0 +1,70 @@ +import { AnalyticUnitId } from './analytic_unit_model'; + +import { Collection, makeDBQ } from '../services/data_service'; + +import * as _ from 'lodash'; + +let db = makeDBQ(Collection.THRESHOLD); + + +export enum Condition { + ABOVE = '>', + ABOVE_OR_EQUAL = '>=', + EQUAL = '=', + LESS_OR_EQUAL = '<=', + LESS = '<' +}; + +export class Threshold { + constructor( + public id: AnalyticUnitId, + public value: number, + public condition: Condition + ) { + if(id === undefined) { + throw new Error('id is undefined'); + } + if(value === undefined) { + throw new Error('condition is undefined'); + } + if(condition === undefined) { + throw new Error('condition is undefined'); + } + } + + public toObject() { + return { + _id: this.id, + value: this.value, + condition: this.condition + }; + } + + static fromObject(obj: any): Threshold { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new Threshold(obj._id, +obj.value, obj.condition); + } +} + +export async function findOne(id: AnalyticUnitId): Promise { + const query: any = { id }; + + const threshold = await db.findOne(query); + if(threshold === null) { + return null; + } + return Threshold.fromObject(threshold); +} + +export async function updateThreshold(id: AnalyticUnitId, value: number, condition: Condition) { + if(findOne(id) === null) { + return db.insertOne({ id, value, condition }); + } + return db.updateOne({ id }, { value, condition }); +} + +export async function removeThreshold(id: AnalyticUnitId) { + return db.removeOne(id); +} diff --git a/server/src/routes/threshold_router.ts b/server/src/routes/threshold_router.ts new file mode 100644 index 0000000..aeeec65 --- /dev/null +++ b/server/src/routes/threshold_router.ts @@ -0,0 +1,59 @@ +import * as AnalyticsController from '../controllers/analytics_controller'; + +import { AnalyticUnitId } from '../models/analytic_unit_model'; +import * as Threshold from '../models/threshold_model'; + +import * as Router from 'koa-router'; +import * as _ from 'lodash'; + + +async function getThresholds(ctx: Router.IRouterContext) { + try { + const ids: AnalyticUnitId = ctx.request.query.ids; + if(ids === undefined || _.isEmpty(ids)) { + throw new Error('analyticUnitIds (ids) are missing'); + } + + const thresholds = await Promise.all( + _.map(ids, id => Threshold.findOne(id)) + ); + + ctx.response.body = { thresholds }; + } catch(e) { + console.error(e); + ctx.response.status = 500; + ctx.response.body = { + code: 500, + message: `GET /threshold error: ${e.message}` + }; + } +} + +async function updateThreshold(ctx: Router.IRouterContext) { + try { + const { + id, value, condition + } = ctx.request.body as { + id: AnalyticUnitId, value: number, condition: Threshold.Condition + }; + + await AnalyticsController.updateThreshold(id, value, condition); + + ctx.response.body = { + code: 200, + message: 'Success' + }; + } catch(e) { + console.error(e); + ctx.response.status = 500; + ctx.response.body = { + code: 500, + message: `PATCH /threshold error: ${e.message}` + }; + } +} + +export const router = new Router(); + +router.get('/', getThresholds); +router.patch('/', updateThreshold); diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index b73d663..22aca70 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -4,7 +4,7 @@ import * as nedb from 'nedb'; import * as fs from 'fs'; -export enum Collection { ANALYTIC_UNITS, SEGMENTS, ANALYTIC_UNIT_CACHES, PANELS }; +export enum Collection { ANALYTIC_UNITS, SEGMENTS, ANALYTIC_UNIT_CACHES, PANELS, THRESHOLD }; /** @@ -212,3 +212,4 @@ db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DAT db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true })); db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true })); db.set(Collection.PANELS, new nedb({ filename: config.PANELS_DATABASE_PATH, autoload: true })); +db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true }));