Browse Source

Save threshold alert state #340 (#342)

* threshold fixes

* rm webhook debug logging

* fix threshold detection

* .env -> .gitignore

* add alert service

* fix

* fix
pull/1/head
Evgeny Smyshlyaev 6 years ago committed by Alexey Velikiy
parent
commit
a9904badc9
  1. 12
      server/src/controllers/analytics_controller.ts
  2. 90
      server/src/services/alert_service.ts

12
server/src/controllers/analytics_controller.ts

@ -5,7 +5,7 @@ import * as Segment from '../models/segment_model';
import * as Threshold from '../models/threshold_model'; import * as Threshold from '../models/threshold_model';
import * as AnalyticUnit from '../models/analytic_unit_model'; import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from '../services/analytics_service'; import { AnalyticsService } from '../services/analytics_service';
import { sendWebhook } from '../services/notification_service'; import { AlertService } from '../services/alert_service';
import { HASTIC_API_KEY, GRAFANA_URL } from '../config'; import { HASTIC_API_KEY, GRAFANA_URL } from '../config';
import { DataPuller } from '../services/data_puller'; import { DataPuller } from '../services/data_puller';
@ -23,6 +23,7 @@ export type TaskResolver = (taskResult: TaskResult) => void;
const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>(); const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>();
let analyticsService: AnalyticsService = undefined; let analyticsService: AnalyticsService = undefined;
let alertService: AlertService = undefined;
let dataPuller: DataPuller; let dataPuller: DataPuller;
@ -74,12 +75,17 @@ async function onMessage(message: AnalyticsMessage) {
export function init() { export function init() {
analyticsService = new AnalyticsService(onMessage); analyticsService = new AnalyticsService(onMessage);
alertService = new AlertService();
alertService.startAlerting();
dataPuller = new DataPuller(analyticsService); dataPuller = new DataPuller(analyticsService);
dataPuller.runPuller(); dataPuller.runPuller();
} }
export function terminate() { export function terminate() {
analyticsService.close(); analyticsService.close();
alertService.stopAlerting();
} }
async function runTask(task: AnalyticsTask): Promise<TaskResult> { async function runTask(task: AnalyticsTask): Promise<TaskResult> {
@ -300,7 +306,7 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI
const analyticUnit = await AnalyticUnit.findById(analyticUnitId); const analyticUnit = await AnalyticUnit.findById(analyticUnitId);
if (!_.isEmpty(segments) && analyticUnit.alert) { if (!_.isEmpty(segments) && analyticUnit.alert) {
try { try {
sendWebhook(analyticUnit.name, _.last(segments)); alertService.recieveAlert(analyticUnit, _.last(segments));
} catch(err) { } catch(err) {
console.error(`error while sending webhook: ${err.message}`); console.error(`error while sending webhook: ${err.message}`);
} }
@ -336,8 +342,10 @@ export async function setAlert(analyticUnitId: AnalyticUnit.AnalyticUnitId, aler
if(alert) { if(alert) {
const analyticUnit = await AnalyticUnit.findById(analyticUnitId); const analyticUnit = await AnalyticUnit.findById(analyticUnitId);
dataPuller.addUnit(analyticUnit); dataPuller.addUnit(analyticUnit);
alertService.addAnalyticUnit(analyticUnit);
} else { } else {
dataPuller.deleteUnit(analyticUnitId); dataPuller.deleteUnit(analyticUnitId);
alertService.removeAnalyticUnit(analyticUnitId);
} }
} }
} }

90
server/src/services/alert_service.ts

@ -0,0 +1,90 @@
import { sendWebhook } from './notification_service';
import * as _ from 'lodash';
import * as AnalyticUnit from '../models/analytic_unit_model';
import { Segment } from '../models/segment_model';
export class Alert {
constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit, protected sender) {};
public recieve(segment: Segment) {
this.sender(this.analyticUnit, segment);
};
public update(now: number) {};
}
class PatternAlert extends Alert {};
class ThresholdAlert extends Alert {
EXPIRE_PERIOD_MS = 60000;
lastOccurence = 0;
public recieve(segment: Segment) {
if(this.lastOccurence === 0) {
this.lastOccurence = segment.from;
this.sender(this.analyticUnit, segment);
} else {
if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) {
console.debug(`difference detween threshold occurences ${segment.from - this.lastOccurence}, send alert`);
this.sender(this.analyticUnit, segment);
}
this.lastOccurence = segment.from;
}
}
public update(now: number) {}
}
export class AlertService {
private _alerts: { [id: string]: Alert; };
private _alertingEnable: boolean;
private _sender: any;
constructor() {
this._alerts = {}
this._alertingEnable = false;
this._sender = (analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) => {
if(this._alertingEnable) {
sendWebhook(analyticUnit.name, segment);
}
}
}
public recieveAlert(analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) {
let id = analyticUnit.id;
if(!_.has(this._alerts, id)) {
this.addAnalyticUnit(analyticUnit);
}
this._alerts[id].recieve(segment);
};
public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) {
let detector = AnalyticUnit.getDetectorByType(analyticUnit.type);
let alertsType = {};
alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert;
alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert;
this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit, this._sender);
}
public removeAnalyticUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) {
delete this._alerts[analyticUnitId];
}
public stopAlerting() {
this._alertingEnable = false;
this._alerts = {};
}
public startAlerting() {
this._alertingEnable = true;
}
}
Loading…
Cancel
Save