diff --git a/analytics/analytics/server.py b/analytics/analytics/server.py index 3ab76b4..c32ed01 100644 --- a/analytics/analytics/server.py +++ b/analytics/analytics/server.py @@ -53,7 +53,7 @@ async def handle_data(task: object): if res['status'] == 'SUCCESS' and res['payload'] is not None: res['_id'] = task['_id'] - message = services.server_service.ServerMessage('DETECT', res) + message = services.server_service.ServerMessage('PUSH_DETECT', res) await server_service.send_message_to_server(message) async def handle_message(message: services.ServerMessage): diff --git a/server/src/config.ts b/server/src/config.ts index c385c3b..aa389d4 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -26,6 +26,8 @@ export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002'); export const ZMQ_HOST = getConfigField('ZMQ_HOST', '127.0.0.1'); export const HASTIC_API_KEY = getConfigField('HASTIC_API_KEY'); export const GRAFANA_URL = normalizeUrl(getConfigField('GRAFANA_URL', null)); +// TODO: save orgId in analytic_units.db +export const ORG_ID = getConfigField('ORG_ID', 1); export const HASTIC_WEBHOOK_URL = getConfigField('HASTIC_WEBHOOK_URL', null); export const HASTIC_WEBHOOK_TYPE = getConfigField('HASTIC_WEBHOOK_TYPE', 'application/x-www-form-urlencoded'); export const HASTIC_WEBHOOK_SECRET = getConfigField('HASTIC_WEBHOOK_SECRET', null); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index b91d339..40051ca 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -34,7 +34,6 @@ const taskResolvers = new Map(); let analyticsService: AnalyticsService = undefined; let alertService: AlertService = undefined; -let grafanaAvailableWebhok: Function = undefined; let dataPuller: DataPuller; let detectionsCount: number = 0; @@ -68,6 +67,27 @@ async function onDetect(detectionResult: DetectionResult) { ]); } +async function onPushDetect(detectionResult: DetectionResult) { + const analyticUnit = await AnalyticUnit.findById(detectionResult.analyticUnitId); + if (!_.isEmpty(detectionResult.segments) && analyticUnit.alert) { + try { + alertService.receiveAlert(analyticUnit, _.last(detectionResult.segments)); + } catch(err) { + console.error(`error while sending webhook: ${err.message}`); + } + } else { + let reasons = []; + if(!analyticUnit.alert) { + reasons.push('alerting disabled'); + } + if(_.isEmpty(detectionResult.segments)) { + reasons.push('segments empty'); + } + console.log(`skip sending webhook for ${analyticUnit.id}, ${reasons.join(', ')}`); + } + await onDetect(detectionResult); +} + async function onMessage(message: AnalyticsMessage) { let responsePayload = null; let methodResolved = false; @@ -82,6 +102,11 @@ async function onMessage(message: AnalyticsMessage) { methodResolved = true; } + if(message.method === AnalyticsMessageMethod.PUSH_DETECT) { + await onPushDetect(message.payload.payload); + methodResolved = true; + } + if(!methodResolved) { throw new TypeError('Unknown method ' + message.method); } @@ -97,10 +122,9 @@ export function init() { analyticsService = new AnalyticsService(onMessage); alertService = new AlertService(); - grafanaAvailableWebhok = alertService.getGrafanaAvailableReporter(); alertService.startAlerting(); - dataPuller = new DataPuller(analyticsService); + dataPuller = new DataPuller(analyticsService, alertService); dataPuller.runPuller(); } @@ -172,15 +196,16 @@ async function query( HASTIC_API_KEY ); data = queryResult.values; - grafanaAvailableWebhok(true); + alertService.sendGrafanaAvailableWebhook(); + alertService.sendDatasourceAvailableWebhook(analyticUnit.metric.datasource.url); } catch(e) { if(e instanceof GrafanaUnavailable) { const msg = `Can't connect Grafana: ${e.message}, check GRAFANA_URL`; - grafanaAvailableWebhok(false); + alertService.sendGrafanaUnavailableWebhook(); throw new Error(msg); } if(e instanceof DatasourceUnavailable) { - alertService.sendMsg(e.message, WebhookType.FAILURE); + alertService.sendDatasourceUnavailableWebhook(analyticUnit.metric.datasource.url); throw new Error(e.message); } throw e; @@ -429,23 +454,7 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI const segments = sortedSegments.map( segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false) ); - const analyticUnit = await AnalyticUnit.findById(analyticUnitId); - if (!_.isEmpty(segments) && analyticUnit.alert) { - try { - alertService.receiveAlert(analyticUnit, _.last(segments)); - } catch(err) { - console.error(`error while sending webhook: ${err.message}`); - } - } else { - let reasons = []; - if(!analyticUnit.alert) { - reasons.push('alerting disabled'); - } - if(_.isEmpty(segments)) { - reasons.push('segments empty'); - } - console.log(`skip sending webhook for ${analyticUnit.id}, ${reasons.join(', ')}`); - } + return { lastDetectionTime: detectionResult.lastDetectionTime, segments, diff --git a/server/src/models/analytics_message_model.ts b/server/src/models/analytics_message_model.ts index 49ec62a..f71075c 100644 --- a/server/src/models/analytics_message_model.ts +++ b/server/src/models/analytics_message_model.ts @@ -2,6 +2,7 @@ export enum AnalyticsMessageMethod { TASK = 'TASK', TASK_RESULT = 'TASK_RESULT', DETECT = 'DETECT', + PUSH_DETECT = 'PUSH_DETECT', DATA = 'DATA' } diff --git a/server/src/models/analytics_task_model.ts b/server/src/models/analytics_task_model.ts index ec47f21..d53e62a 100644 --- a/server/src/models/analytics_task_model.ts +++ b/server/src/models/analytics_task_model.ts @@ -9,6 +9,7 @@ export type AnalyticsTaskId = string; export enum AnalyticsTaskType { LEARN = 'LEARN', DETECT = 'DETECT', + PUSH_DETECT = 'PUSH_DETECT', CANCEL = 'CANCEL', PUSH = 'PUSH', PROCESS = 'PROCESS' diff --git a/server/src/services/alert_service.ts b/server/src/services/alert_service.ts index 746429f..6aa6f54 100644 --- a/server/src/services/alert_service.ts +++ b/server/src/services/alert_service.ts @@ -1,9 +1,10 @@ -import { sendAnalyticWebhook, sendInfoWebhook, InfoAlert, AnalyticAlert, WebhookType } from './notification_service'; +import { sendNotification, InfoMeta, AnalyticMeta, WebhookType, Notification } from './notification_service'; import * as _ from 'lodash'; import * as AnalyticUnit from '../models/analytic_units'; import { Segment } from '../models/segment_model'; import { availableReporter } from '../utils/reporter'; +import { ORG_ID } from '../config'; export class Alert { @@ -11,24 +12,43 @@ export class Alert { constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {}; public receive(segment: Segment) { if(this.enabled) { - const alert = this.makeAlert(segment); - sendAnalyticWebhook(alert); + sendNotification(this.makeNotification(segment)); } }; - protected makeAlert(segment): AnalyticAlert { - const alert: AnalyticAlert = { + protected makeNotification(segment: Segment): Notification { + const meta = this.makeMeta(segment); + const message = this.makeMessage(meta); + return { meta, message }; + } + + protected makeMeta(segment: Segment): AnalyticMeta { + const datshdoardId = this.analyticUnit.panelId.split('/')[0]; + const panelId = this.analyticUnit.panelId.split('/')[1]; + const grafanaUrl = `${this.analyticUnit.grafanaUrl}/d/${datshdoardId}?panelId=${panelId}&edit=true&fullscreen=true?orgId=${ORG_ID}`; + const alert: AnalyticMeta = { type: WebhookType.DETECT, analyticUnitType: this.analyticUnit.type, analyticUnitName: this.analyticUnit.name, analyticUnitId: this.analyticUnit.id, - grafanaUrl: this.analyticUnit.grafanaUrl, + grafanaUrl, from: segment.from, to: segment.to }; return alert; } + + protected makeMessage(meta: AnalyticMeta): string { + return [ + `[${meta.analyticUnitType.toUpperCase()} ALERTING] ${meta.analyticUnitName}`, + `URL: ${meta.grafanaUrl}`, + ``, + `From: ${new Date(meta.from)}`, + `To: ${new Date(meta.to)}`, + `ID: ${meta.analyticUnitId}` + ].join('\n'); + } } class PatternAlert extends Alert { @@ -39,14 +59,27 @@ class PatternAlert extends Alert { if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) { this.lastSentSegment = segment; if(this.enabled) { - sendAnalyticWebhook(this.makeAlert(segment)); + sendNotification(this.makeNotification(segment)); } } } + + protected makeMessage(meta: AnalyticMeta): string { + return [ + `[PATTERN DETECTED] ${meta.analyticUnitName}`, + `URL: ${meta.grafanaUrl}`, + ``, + `From: ${new Date(meta.from)}`, + `To: ${new Date(meta.to)}`, + `ID: ${meta.analyticUnitId}` + ].join('\n'); + } }; class ThresholdAlert extends Alert { + // TODO: configure threshold timing in panel like Grafana's alerts (`evaluate` time, `for` time) + // TODO: make events for start and end of threshold EXPIRE_PERIOD_MS = 60000; lastOccurence = 0; @@ -54,31 +87,60 @@ class ThresholdAlert extends Alert { if(this.lastOccurence === 0) { this.lastOccurence = segment.from; if(this.enabled) { - sendAnalyticWebhook(this.makeAlert(segment)); + sendNotification(this.makeNotification(segment)); } } else { if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) { if(this.enabled) { console.log(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`); - sendAnalyticWebhook(this.makeAlert(segment)); + sendNotification(this.makeNotification(segment)); } } this.lastOccurence = segment.from; } } + + protected makeMessage(meta: AnalyticMeta): string { + let message = [ + `[THRESHOLD ALERTING] ${meta.analyticUnitName}`, + `URL: ${meta.grafanaUrl}`, + ``, + `Starts at: ${new Date(meta.from)}`, + `ID: ${meta.analyticUnitId}` + ].join('\n'); + + if(meta.params !== undefined) { + const metrics = ` + Metrics: + ${this.analyticUnit.metric.targets[0].expr}: ${meta.params.value} + `; + message += metrics; + } + return message; + } } export class AlertService { - private _alerts: { [id: string]: Alert; }; + // TODO: object -> Map + private _alerts: { [id: string]: Alert }; private _alertingEnable: boolean; private _grafanaAvailableReporter: Function; + private _datasourceAvailableReporters: Map; constructor() { - this._alerts = {} + this._alerts = {}; + this._datasourceAvailableReporters = new Map(); + + this._grafanaAvailableReporter = availableReporter( + ['[OK] Grafana available', WebhookType.RECOVERY], + ['[FAILURE] Grafana unavailable for pulling data', WebhookType.FAILURE], + this.sendMsg, + this.sendMsg + ); } public receiveAlert(analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) { @@ -97,46 +159,41 @@ export class AlertService { public sendMsg(message: string, type: WebhookType, optionalInfo = {}) { const now = Date.now(); - const infoAlert: InfoAlert = { - message, + const infoAlert: InfoMeta = { params: optionalInfo, type, from: now, to: now } - sendInfoWebhook(infoAlert); + sendNotification({ message, meta: infoAlert }); } - public getGrafanaAvailableReporter() { - if(!this._grafanaAvailableReporter) { - this._grafanaAvailableReporter = availableReporter( - ['Grafana available', WebhookType.RECOVERY], - ['Grafana unavailable for pulling data', WebhookType.FAILURE], - this.sendMsg, - this.sendMsg - ); - } - return this._grafanaAvailableReporter; + public sendGrafanaAvailableWebhook() { + this._grafanaAvailableReporter(true); + } + + public sendGrafanaUnavailableWebhook() { + this._grafanaAvailableReporter(false); } - public getAvailableWebhook(recoveryMsg: string, failureMsg: string) { - return availableReporter( - [recoveryMsg, WebhookType.RECOVERY], - [failureMsg, WebhookType.FAILURE], - this.sendMsg, - this.sendMsg - ); + public sendDatasourceAvailableWebhook(url: string) { + const reporter = this._getDatasourceAvailableReporter(url); + reporter(true); + } + + public sendDatasourceUnavailableWebhook(url: string) { + const reporter = this._getDatasourceAvailableReporter(url); + reporter(false); } public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { - const detector = analyticUnit.detectorType; let alertsType = {}; alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert; alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert; alertsType[AnalyticUnit.DetectorType.ANOMALY] = Alert; - this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit); + this._alerts[analyticUnit.id] = new alertsType[analyticUnit.detectorType](analyticUnit); } public removeAnalyticUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) { @@ -145,10 +202,21 @@ export class AlertService { public stopAlerting() { this._alertingEnable = false; - this._alerts = {}; } public startAlerting() { this._alertingEnable = true; } + + private _getDatasourceAvailableReporter(url: string) { + if(!_.has(this._datasourceAvailableReporters, url)) { + this._datasourceAvailableReporters[url] = availableReporter( + [`[OK] Datasource ${url} available`, WebhookType.RECOVERY], + [`[FAILURE] Datasource ${url} unavailable`, WebhookType.FAILURE], + this.sendMsg, + this.sendMsg + ); + } + return this._datasourceAvailableReporters[url]; + } } diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index 32ea88b..8889755 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -30,33 +30,17 @@ export class DataPuller { ); private _unitTimes: { [analyticUnitId: string]: number } = {}; - private _alertService: AlertService; - private _grafanaAvailableWebhook: Function; - private _datasourceAvailableWebhook: { [analyticUnitId: string]: Function } = {}; - - constructor(private analyticsService: AnalyticsService) { - this._alertService = new AlertService(); - this._grafanaAvailableWebhook = this._alertService.getGrafanaAvailableReporter(); - }; - - private _makeDatasourceAvailableWebhook(analyticUnit: AnalyticUnit.AnalyticUnit) { - const datasourceInfo = `${analyticUnit.metric.datasource.url} (${analyticUnit.metric.datasource.type})`; - return this._alertService.getAvailableWebhook( - `datasource ${datasourceInfo} available`, - `datasource ${datasourceInfo} unavailable` - ); - } + + constructor(private analyticsService: AnalyticsService, private alertService: AlertService) {}; public addUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { console.log(`start pulling analytic unit ${analyticUnit.id}`); - this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit); this._runAnalyticUnitPuller(analyticUnit); } public deleteUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) { if(_.has(this._unitTimes, analyticUnitId)) { delete this._unitTimes[analyticUnitId]; - delete this._datasourceAvailableWebhook[analyticUnitId]; console.log(`analytic unit ${analyticUnitId} deleted from data puller`); } } @@ -97,7 +81,6 @@ export class DataPuller { console.log(`starting data puller with ${JSON.stringify(analyticUnits.map(u => u.id))} analytic units`); _.each(analyticUnits, analyticUnit => { - this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit); this._runAnalyticUnitPuller(analyticUnit); }); @@ -169,24 +152,21 @@ export class DataPuller { const now = Date.now(); const res = await this.pullData(analyticUnit, time, now); this._grafanaAvailableConsoleReporter(true); - this._grafanaAvailableWebhook(true); - this._datasourceAvailableWebhook[analyticUnit.id](true); + this.alertService.sendGrafanaAvailableWebhook(); + this.alertService.sendDatasourceAvailableWebhook(analyticUnit.metric.datasource.url); return res; } catch(err) { let errorResolved = false; if(err instanceof GrafanaUnavailable) { errorResolved = true; - this._grafanaAvailableConsoleReporter(false); - this._grafanaAvailableWebhook(false); + this.alertService.sendGrafanaUnavailableWebhook(); } else { - this._grafanaAvailableWebhook(true); + this.alertService.sendGrafanaAvailableWebhook(); } if(err instanceof DatasourceUnavailable) { errorResolved = true; - if(_.has(this._datasourceAvailableWebhook, analyticUnit.id)) { - this._datasourceAvailableWebhook[analyticUnit.id](false); - } + this.alertService.sendDatasourceUnavailableWebhook(analyticUnit.metric.datasource.url); } if(!errorResolved) { diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index 708c9bb..de3652d 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -3,6 +3,7 @@ import { HASTIC_WEBHOOK_URL, HASTIC_WEBHOOK_TYPE, HASTIC_WEBHOOK_SECRET, HASTIC_ import axios from 'axios'; import * as querystring from 'querystring'; +import * as _ from 'lodash'; enum ContentType { JSON = 'application/json', @@ -16,7 +17,7 @@ export enum WebhookType { MESSAGE = 'MESSAGE' } -export declare type AnalyticAlert = { +export declare type AnalyticMeta = { type: WebhookType, analyticUnitType: string, analyticUnitName: string, @@ -28,44 +29,30 @@ export declare type AnalyticAlert = { regionImage?: any } -export declare type InfoAlert = { +export declare type InfoMeta = { type: WebhookType, - message: string, from: number, to: number, params?: any } -// TODO: send webhook with payload without dep to AnalyticUnit -export async function sendAnalyticWebhook(alert: AnalyticAlert) { - const fromTime = new Date(alert.from).toLocaleTimeString(); - const toTime = new Date(alert.to).toLocaleTimeString(); - console.log(`Sending alert unit: ${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`); - - sendWebhook(alert); -} - -export async function sendInfoWebhook(alert: InfoAlert) { - if(alert && typeof alert === 'object') { - console.log(`Sending info webhook ${JSON.stringify(alert.message)}`); - sendWebhook(alert); - } else { - console.error(`skip sending Info webhook, got corrupted message ${alert}`); - } +export declare type Notification = { + message: string, + meta: InfoMeta | AnalyticMeta } -export async function sendWebhook(payload: any) { +export async function sendNotification(notification: Notification) { if(HASTIC_WEBHOOK_URL === null) { - throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`); + throw new Error(`Can't send notification, HASTIC_WEBHOOK_URL is undefined`); } - payload.instanceName = HASTIC_INSTANCE_NAME; + notification.message += `\nInstance: ${HASTIC_INSTANCE_NAME}`; let data; if(HASTIC_WEBHOOK_TYPE === ContentType.JSON) { - data = JSON.stringify(payload); + data = JSON.stringify(notification); } else if(HASTIC_WEBHOOK_TYPE === ContentType.URLENCODED) { - data = querystring.stringify(payload); + data = querystring.stringify(notification); } else { throw new Error(`Unknown webhook type: ${HASTIC_WEBHOOK_TYPE}`); } @@ -81,6 +68,6 @@ export async function sendWebhook(payload: any) { try { await axios(options); } catch(err) { - console.error(`Can't send alert to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`); + console.error(`Can't send notification to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`); } }