From a9904badc9846fc1bf918a66c29e5b4e1dce1490 Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Fri, 18 Jan 2019 20:43:01 +0300 Subject: [PATCH] Save threshold alert state #340 (#342) * threshold fixes * rm webhook debug logging * fix threshold detection * .env -> .gitignore * add alert service * fix * fix --- .../src/controllers/analytics_controller.ts | 12 ++- server/src/services/alert_service.ts | 90 +++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 server/src/services/alert_service.ts diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index b437d64..3540abb 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -5,7 +5,7 @@ import * as Segment from '../models/segment_model'; import * as Threshold from '../models/threshold_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; -import { sendWebhook } from '../services/notification_service'; +import { AlertService } from '../services/alert_service'; import { HASTIC_API_KEY, GRAFANA_URL } from '../config'; import { DataPuller } from '../services/data_puller'; @@ -23,6 +23,7 @@ export type TaskResolver = (taskResult: TaskResult) => void; const taskResolvers = new Map(); let analyticsService: AnalyticsService = undefined; +let alertService: AlertService = undefined; let dataPuller: DataPuller; @@ -74,12 +75,17 @@ async function onMessage(message: AnalyticsMessage) { export function init() { analyticsService = new AnalyticsService(onMessage); + + alertService = new AlertService(); + alertService.startAlerting(); + dataPuller = new DataPuller(analyticsService); dataPuller.runPuller(); } export function terminate() { analyticsService.close(); + alertService.stopAlerting(); } async function runTask(task: AnalyticsTask): Promise { @@ -300,7 +306,7 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI const analyticUnit = await AnalyticUnit.findById(analyticUnitId); if (!_.isEmpty(segments) && analyticUnit.alert) { try { - sendWebhook(analyticUnit.name, _.last(segments)); + alertService.recieveAlert(analyticUnit, _.last(segments)); } catch(err) { console.error(`error while sending webhook: ${err.message}`); } @@ -336,8 +342,10 @@ export async function setAlert(analyticUnitId: AnalyticUnit.AnalyticUnitId, aler if(alert) { const analyticUnit = await AnalyticUnit.findById(analyticUnitId); dataPuller.addUnit(analyticUnit); + alertService.addAnalyticUnit(analyticUnit); } else { dataPuller.deleteUnit(analyticUnitId); + alertService.removeAnalyticUnit(analyticUnitId); } } } diff --git a/server/src/services/alert_service.ts b/server/src/services/alert_service.ts new file mode 100644 index 0000000..73da7cd --- /dev/null +++ b/server/src/services/alert_service.ts @@ -0,0 +1,90 @@ +import { sendWebhook } from './notification_service'; + +import * as _ from 'lodash'; +import * as AnalyticUnit from '../models/analytic_unit_model'; +import { Segment } from '../models/segment_model'; + + +export class Alert { + constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit, protected sender) {}; + public recieve(segment: Segment) { + this.sender(this.analyticUnit, segment); + }; + public update(now: number) {}; +} + +class PatternAlert extends Alert {}; + + +class ThresholdAlert extends Alert { + EXPIRE_PERIOD_MS = 60000; + lastOccurence = 0; + + public recieve(segment: Segment) { + if(this.lastOccurence === 0) { + this.lastOccurence = segment.from; + this.sender(this.analyticUnit, segment); + } else { + + if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) { + console.debug(`difference detween threshold occurences ${segment.from - this.lastOccurence}, send alert`); + this.sender(this.analyticUnit, segment); + } + + this.lastOccurence = segment.from; + } + } + + public update(now: number) {} +} + + +export class AlertService { + + private _alerts: { [id: string]: Alert; }; + private _alertingEnable: boolean; + private _sender: any; + + constructor() { + this._alerts = {} + this._alertingEnable = false; + this._sender = (analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) => { + if(this._alertingEnable) { + sendWebhook(analyticUnit.name, segment); + } + } + } + + public recieveAlert(analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) { + let id = analyticUnit.id; + + if(!_.has(this._alerts, id)) { + this.addAnalyticUnit(analyticUnit); + } + + this._alerts[id].recieve(segment); + }; + + public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { + let detector = AnalyticUnit.getDetectorByType(analyticUnit.type); + let alertsType = {}; + + alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert; + alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert; + + this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit, this._sender); + } + + public removeAnalyticUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) { + delete this._alerts[analyticUnitId]; + } + + public stopAlerting() { + this._alertingEnable = false; + this._alerts = {}; + } + + public startAlerting() { + this._alertingEnable = true; + } +}