From 05390790b69c08ba94cb77e2fcb399bdc7b34f71 Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Tue, 26 Feb 2019 18:04:35 +0300 Subject: [PATCH] More info in webhooks #382 && Webhook when datasource unavailable #441 (#436) --- .../analytics/detectors/threshold_detector.py | 4 +- server/package.json | 2 +- .../src/controllers/analytics_controller.ts | 9 ++- server/src/models/segment_model.ts | 3 +- server/src/services/alert_service.ts | 55 +++++++++++++++---- server/src/services/analytics_service.ts | 7 ++- server/src/services/data_puller.ts | 37 +++++++++++-- server/src/services/notification_service.ts | 45 +++++++++++---- server/src/utils/reporter.ts | 18 ++++-- 9 files changed, 138 insertions(+), 42 deletions(-) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 8f552c1..4c71cc8 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -37,14 +37,14 @@ class ThresholdDetector(Detector): dataframe_without_nans = dataframe.dropna() if len(dataframe_without_nans) == 0: if condition == 'NO_DATA': - segments.append({ 'from': now, 'to': now }) + segments.append({ 'from': now, 'to': now , 'params': { value: 'NO_DATA' } }) else: return None else: last_entry = dataframe_without_nans.iloc[-1] last_time = convert_pd_timestamp_to_ms(last_entry['timestamp']) last_value = last_entry['value'] - segment = { 'from': last_time, 'to': last_time } + segment = { 'from': last_time, 'to': last_time, 'params': { value: last_value } } if condition == '>': if last_value > value: diff --git a/server/package.json b/server/package.json index 4af46bd..02a222a 100644 --- a/server/package.json +++ b/server/package.json @@ -38,7 +38,7 @@ "es6-promise": "^4.2.4", "event-stream": "3.3.4", "file-loader": "^1.1.11", - "grafana-datasource-kit": "0.1.11", + "grafana-datasource-kit": "0.1.12", "jest": "^23.1.1", "koa": "^2.0.46", "koa-bodyparser": "^4.2.0", diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 8e1f9ee..cbe5584 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -9,10 +9,11 @@ import { AlertService } from '../services/alert_service'; import { HASTIC_API_KEY, GRAFANA_URL } from '../config'; import { DataPuller } from '../services/data_puller'; -import { queryByMetric, ConnectionRefused } from 'grafana-datasource-kit'; +import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit'; import * as _ from 'lodash'; +import { WebhookType } from '../services/notification_service'; const SECONDS_IN_MINUTE = 60; @@ -142,11 +143,15 @@ async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: Analytic data = queryResult.values; grafanaAvailableWebhok(true); } catch(e) { - if(e instanceof ConnectionRefused) { + if(e instanceof GrafanaUnavailable) { const msg = `Can't connect Grafana: ${e.message}, check GRAFANA_URL`; grafanaAvailableWebhok(false); throw new Error(msg); } + if(e instanceof DatasourceUnavailable) { + alertService.sendMsg(e.message, WebhookType.FAILURE); + throw new Error(e.message); + } throw e; } diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index d7f8b36..9f5d8c5 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -16,7 +16,8 @@ export class Segment { public to: number, public labeled: boolean = false, public deleted: boolean = false, - public id?: SegmentId + public id?: SegmentId, + public params?: any ) { if(analyticUnitId === undefined) { throw new Error('AnalyticUnitId is undefined'); diff --git a/server/src/services/alert_service.ts b/server/src/services/alert_service.ts index f2c05ba..5873a78 100644 --- a/server/src/services/alert_service.ts +++ b/server/src/services/alert_service.ts @@ -1,4 +1,4 @@ -import { sendAnalyticWebhook, sendInfoWebhook } from './notification_service'; +import { sendAnalyticWebhook, sendInfoWebhook, InfoAlert, AnalyticAlert, WebhookType } from './notification_service'; import * as _ from 'lodash'; import * as AnalyticUnit from '../models/analytic_unit_model'; @@ -11,9 +11,26 @@ export class Alert { constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {}; public receive(segment: Segment) { if(this.enabled) { - sendAnalyticWebhook(this.analyticUnit.name, segment); + const alert = this.makeAlert(segment); + sendAnalyticWebhook(alert); } }; + + protected makeAlert(segment): AnalyticAlert { + const alert: AnalyticAlert = { + type: WebhookType.DETECT, + analyticUnitType: this.analyticUnit.type, + analyticUnitName: this.analyticUnit.name, + analyticUnitId: this.analyticUnit.id, + panelUrl: this.analyticUnit.panelUrl, + from: segment.from, + to: segment.to + }; + if(segment.params) { + alert.params = segment.params; + } + return alert; + } } class PatternAlert extends Alert { @@ -24,7 +41,7 @@ class PatternAlert extends Alert { if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) { this.lastSentSegment = segment; if(this.enabled) { - sendAnalyticWebhook(this.analyticUnit.name, segment); + sendAnalyticWebhook(this.makeAlert(segment)); } } } @@ -39,14 +56,14 @@ class ThresholdAlert extends Alert { if(this.lastOccurence === 0) { this.lastOccurence = segment.from; if(this.enabled) { - sendAnalyticWebhook(this.analyticUnit.name, segment); + sendAnalyticWebhook(this.makeAlert(segment)); } } else { if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) { if(this.enabled) { console.log(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`); - sendAnalyticWebhook(this.analyticUnit.name, segment); + sendAnalyticWebhook(this.makeAlert(segment)); } } @@ -80,18 +97,23 @@ export class AlertService { this._alerts[id].receive(segment); }; - public sendMsg(message: string, optionalInfo = {}) { - let message_payload = { - message - }; - sendInfoWebhook(Object.assign(message_payload, optionalInfo)); + public sendMsg(message: string, type: WebhookType, optionalInfo = {}) { + const now = Date.now(); + const infoAlert: InfoAlert = { + message, + params: optionalInfo, + type, + from: now, + to: now + } + sendInfoWebhook(infoAlert); } public getGrafanaAvailableReporter() { if(!this._grafanaAvailableReporter) { this._grafanaAvailableReporter = availableReporter( - 'Grafana available', - 'Grafana unavailable for pulling data', + ['Grafana available', WebhookType.RECOVERY], + ['Grafana unavailable for pulling data', WebhookType.FAILURE], this.sendMsg, this.sendMsg ); @@ -99,6 +121,15 @@ export class AlertService { return this._grafanaAvailableReporter; } + public getAvailableWebhook(recoveryMsg: string, failureMsg: string) { + return availableReporter( + [recoveryMsg, WebhookType.RECOVERY], + [failureMsg, WebhookType.FAILURE], + this.sendMsg, + this.sendMsg + ); + } + public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { let detector = AnalyticUnit.getDetectorByType(analyticUnit.type); let alertsType = {}; diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index cbec6fa..8ce5b53 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,5 +1,6 @@ import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'; +import { WebhookType } from '../services/notification_service'; import * as config from '../config'; import { AlertService } from './alert_service'; @@ -165,11 +166,13 @@ export class AnalyticsService { } private _onAnalyticsUp() { - console.log('Analytics is up'); + const msg = 'Analytics is up'; + console.log(msg); + //this._alertService.sendMsg(msg, WebhookType.RECOVERY); } private async _onAnalyticsDown() { - let msg = 'Analytics is down'; + const msg = 'Analytics is down'; console.log(msg); // TODO: enable analytics down webhooks when it stops bouncing // this._alertService.sendMsg(msg); diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index 203762b..4ab4209 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -6,9 +6,10 @@ import { HASTIC_API_KEY, GRAFANA_URL } from '../config'; import { availableReporter } from '../utils/reporter'; import { AlertService } from './alert_service'; -import { queryByMetric, ConnectionRefused } from 'grafana-datasource-kit'; +import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit'; import * as _ from 'lodash'; +import { WebhookType } from './notification_service'; type MetricDataChunk = { values: [number, number][], columns: string[] }; @@ -28,21 +29,33 @@ export class DataPuller { ); private _unitTimes: { [analyticUnitId: string]: number } = {}; + private _alertService: AlertService; private _grafanaAvailableWebhook: Function; + private _datasourceAvailableWebhook: { [analyticUnitId: string]: Function } = {}; constructor(private analyticsService: AnalyticsService) { - const _alertService = new AlertService(); - this._grafanaAvailableWebhook = _alertService.getGrafanaAvailableReporter(); + this._alertService = new AlertService(); + this._grafanaAvailableWebhook = this._alertService.getGrafanaAvailableReporter(); }; + private _makeDatasourceAvailableWebhook(analyticUnit: AnalyticUnit.AnalyticUnit) { + const datasourceInfo = `${analyticUnit.metric.datasource.url} (${analyticUnit.metric.datasource.type})`; + return this._alertService.getAvailableWebhook( + `datasource ${datasourceInfo} available`, + `datasource ${datasourceInfo} unavailable` + ); + } + public addUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { console.log(`start pulling analytic unit ${analyticUnit.id}`); + this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit); this._runAnalyticUnitPuller(analyticUnit); } public deleteUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) { if(_.has(this._unitTimes, analyticUnitId)) { delete this._unitTimes[analyticUnitId]; + delete this._datasourceAvailableWebhook[analyticUnitId]; console.log(`analytic unit ${analyticUnitId} deleted from data puller`); } } @@ -87,6 +100,7 @@ export class DataPuller { console.log(`starting data puller with ${JSON.stringify(analyticUnits.map(u => u.id))} analytic units`); _.each(analyticUnits, analyticUnit => { + this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit); this._runAnalyticUnitPuller(analyticUnit); }); @@ -156,13 +170,26 @@ export class DataPuller { const res = await this.pullData(analyticUnit, time, now); this._grafanaAvailableConsoleReporter(true); this._grafanaAvailableWebhook(true); + this._datasourceAvailableWebhook[analyticUnit.id](true); return res; } catch(err) { - - if(err instanceof ConnectionRefused) { + let errorResolved = false; + if(err instanceof GrafanaUnavailable) { + errorResolved = true; this._grafanaAvailableConsoleReporter(false); this._grafanaAvailableWebhook(false); } else { + this._grafanaAvailableWebhook(true); + } + + if(err instanceof DatasourceUnavailable) { + errorResolved = true; + if(_.has(this._datasourceAvailableWebhook, analyticUnit.id)) { + this._datasourceAvailableWebhook[analyticUnit.id](false); + } + } + + if(!errorResolved) { console.error(`error while pulling data: ${err.message}`); } diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index c0123a1..9f0133f 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -1,4 +1,5 @@ import { Segment } from '../models/segment_model'; +import * as AnalyticUnit from '../models/analytic_unit_model'; import { HASTIC_WEBHOOK_URL, HASTIC_WEBHOOK_TYPE, HASTIC_WEBHOOK_SECRET } from '../config'; import axios from 'axios'; @@ -9,14 +10,35 @@ enum ContentType { URLENCODED ='application/x-www-form-urlencoded' } -// TODO: send webhook with payload without dep to AnalyticUnit -export async function sendAnalyticWebhook(analyticUnitName: string, segment: Segment) { - const alert = { - analyticUnitName, - from: segment.from, - to: segment.to - }; +export enum WebhookType { + DETECT = 'DETECT', + FAILURE = 'FAILURE', + RECOVERY = 'RECOVERY', + MESSAGE = 'MESSAGE' +} + +export declare type AnalyticAlert = { + type: WebhookType, + analyticUnitType: string, + analyticUnitName: string, + analyticUnitId: AnalyticUnit.AnalyticUnitId, + panelUrl: string, + from: number, + to: number + params?: any, + regionImage?: any +} + +export declare type InfoAlert = { + type: WebhookType, + message: string, + from: number, + to: number, + params?: any +} +// TODO: send webhook with payload without dep to AnalyticUnit +export async function sendAnalyticWebhook(alert: AnalyticAlert) { const fromTime = new Date(alert.from).toLocaleTimeString(); const toTime = new Date(alert.to).toLocaleTimeString(); console.log(`Sending alert unit:${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`); @@ -32,11 +54,12 @@ export async function sendAnalyticWebhook(analyticUnitName: string, segment: Seg sendWebhook(payload); } -export async function sendInfoWebhook(message: any) { - if(message && typeof message === 'object') { - sendWebhook(message, ContentType.JSON); +export async function sendInfoWebhook(alert: InfoAlert) { + if(alert && typeof alert === 'object') { + console.log(`Sending info webhook ${JSON.stringify(alert.message)}`); + sendWebhook(alert, ContentType.JSON); } else { - console.error(`skip sending Info webhook, got corrupted message ${message}`); + console.error(`skip sending Info webhook, got corrupted message ${alert}`); } } diff --git a/server/src/utils/reporter.ts b/server/src/utils/reporter.ts index 3a9195a..4ba4aba 100644 --- a/server/src/utils/reporter.ts +++ b/server/src/utils/reporter.ts @@ -1,6 +1,6 @@ export function availableReporter( - positiveMsg: string|null, - negativeMsg: string|null, + positiveArgs: any|null, + negativeArgs: any|null, positiveAction = console.log, negativeAction = console.error, ) { @@ -8,15 +8,21 @@ export function availableReporter( return available => { if(available && reported) { reported = false; - if(positiveMsg) { - positiveAction(positiveMsg); + if(positiveArgs) { + if(!(positiveArgs instanceof Array)) { + positiveArgs = [ positiveArgs ]; + } + positiveAction.apply(null, positiveArgs); } } if(!available && !reported) { reported = true; - if(negativeMsg) { - negativeAction(negativeMsg); + if(negativeArgs) { + if(!(negativeArgs instanceof Array)) { + negativeArgs = [ negativeArgs ]; + } + negativeAction.apply(null, negativeArgs); } } }