Browse Source

More info in webhooks #382 && Webhook when datasource unavailable #441 (#436)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
05390790b6
  1. 4
      analytics/analytics/detectors/threshold_detector.py
  2. 2
      server/package.json
  3. 9
      server/src/controllers/analytics_controller.ts
  4. 3
      server/src/models/segment_model.ts
  5. 55
      server/src/services/alert_service.ts
  6. 7
      server/src/services/analytics_service.ts
  7. 37
      server/src/services/data_puller.ts
  8. 45
      server/src/services/notification_service.ts
  9. 18
      server/src/utils/reporter.ts

4
analytics/analytics/detectors/threshold_detector.py

@ -37,14 +37,14 @@ class ThresholdDetector(Detector):
dataframe_without_nans = dataframe.dropna()
if len(dataframe_without_nans) == 0:
if condition == 'NO_DATA':
segments.append({ 'from': now, 'to': now })
segments.append({ 'from': now, 'to': now , 'params': { value: 'NO_DATA' } })
else:
return None
else:
last_entry = dataframe_without_nans.iloc[-1]
last_time = convert_pd_timestamp_to_ms(last_entry['timestamp'])
last_value = last_entry['value']
segment = { 'from': last_time, 'to': last_time }
segment = { 'from': last_time, 'to': last_time, 'params': { value: last_value } }
if condition == '>':
if last_value > value:

2
server/package.json

@ -38,7 +38,7 @@
"es6-promise": "^4.2.4",
"event-stream": "3.3.4",
"file-loader": "^1.1.11",
"grafana-datasource-kit": "0.1.11",
"grafana-datasource-kit": "0.1.12",
"jest": "^23.1.1",
"koa": "^2.0.46",
"koa-bodyparser": "^4.2.0",

9
server/src/controllers/analytics_controller.ts

@ -9,10 +9,11 @@ import { AlertService } from '../services/alert_service';
import { HASTIC_API_KEY, GRAFANA_URL } from '../config';
import { DataPuller } from '../services/data_puller';
import { queryByMetric, ConnectionRefused } from 'grafana-datasource-kit';
import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit';
import * as _ from 'lodash';
import { WebhookType } from '../services/notification_service';
const SECONDS_IN_MINUTE = 60;
@ -142,11 +143,15 @@ async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: Analytic
data = queryResult.values;
grafanaAvailableWebhok(true);
} catch(e) {
if(e instanceof ConnectionRefused) {
if(e instanceof GrafanaUnavailable) {
const msg = `Can't connect Grafana: ${e.message}, check GRAFANA_URL`;
grafanaAvailableWebhok(false);
throw new Error(msg);
}
if(e instanceof DatasourceUnavailable) {
alertService.sendMsg(e.message, WebhookType.FAILURE);
throw new Error(e.message);
}
throw e;
}

3
server/src/models/segment_model.ts

@ -16,7 +16,8 @@ export class Segment {
public to: number,
public labeled: boolean = false,
public deleted: boolean = false,
public id?: SegmentId
public id?: SegmentId,
public params?: any
) {
if(analyticUnitId === undefined) {
throw new Error('AnalyticUnitId is undefined');

55
server/src/services/alert_service.ts

@ -1,4 +1,4 @@
import { sendAnalyticWebhook, sendInfoWebhook } from './notification_service';
import { sendAnalyticWebhook, sendInfoWebhook, InfoAlert, AnalyticAlert, WebhookType } from './notification_service';
import * as _ from 'lodash';
import * as AnalyticUnit from '../models/analytic_unit_model';
@ -11,9 +11,26 @@ export class Alert {
constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {};
public receive(segment: Segment) {
if(this.enabled) {
sendAnalyticWebhook(this.analyticUnit.name, segment);
const alert = this.makeAlert(segment);
sendAnalyticWebhook(alert);
}
};
protected makeAlert(segment): AnalyticAlert {
const alert: AnalyticAlert = {
type: WebhookType.DETECT,
analyticUnitType: this.analyticUnit.type,
analyticUnitName: this.analyticUnit.name,
analyticUnitId: this.analyticUnit.id,
panelUrl: this.analyticUnit.panelUrl,
from: segment.from,
to: segment.to
};
if(segment.params) {
alert.params = segment.params;
}
return alert;
}
}
class PatternAlert extends Alert {
@ -24,7 +41,7 @@ class PatternAlert extends Alert {
if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) {
this.lastSentSegment = segment;
if(this.enabled) {
sendAnalyticWebhook(this.analyticUnit.name, segment);
sendAnalyticWebhook(this.makeAlert(segment));
}
}
}
@ -39,14 +56,14 @@ class ThresholdAlert extends Alert {
if(this.lastOccurence === 0) {
this.lastOccurence = segment.from;
if(this.enabled) {
sendAnalyticWebhook(this.analyticUnit.name, segment);
sendAnalyticWebhook(this.makeAlert(segment));
}
} else {
if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) {
if(this.enabled) {
console.log(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`);
sendAnalyticWebhook(this.analyticUnit.name, segment);
sendAnalyticWebhook(this.makeAlert(segment));
}
}
@ -80,18 +97,23 @@ export class AlertService {
this._alerts[id].receive(segment);
};
public sendMsg(message: string, optionalInfo = {}) {
let message_payload = {
message
};
sendInfoWebhook(Object.assign(message_payload, optionalInfo));
public sendMsg(message: string, type: WebhookType, optionalInfo = {}) {
const now = Date.now();
const infoAlert: InfoAlert = {
message,
params: optionalInfo,
type,
from: now,
to: now
}
sendInfoWebhook(infoAlert);
}
public getGrafanaAvailableReporter() {
if(!this._grafanaAvailableReporter) {
this._grafanaAvailableReporter = availableReporter(
'Grafana available',
'Grafana unavailable for pulling data',
['Grafana available', WebhookType.RECOVERY],
['Grafana unavailable for pulling data', WebhookType.FAILURE],
this.sendMsg,
this.sendMsg
);
@ -99,6 +121,15 @@ export class AlertService {
return this._grafanaAvailableReporter;
}
public getAvailableWebhook(recoveryMsg: string, failureMsg: string) {
return availableReporter(
[recoveryMsg, WebhookType.RECOVERY],
[failureMsg, WebhookType.FAILURE],
this.sendMsg,
this.sendMsg
);
}
public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) {
let detector = AnalyticUnit.getDetectorByType(analyticUnit.type);
let alertsType = {};

7
server/src/services/analytics_service.ts

@ -1,5 +1,6 @@
import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model';
import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model';
import { WebhookType } from '../services/notification_service';
import * as config from '../config';
import { AlertService } from './alert_service';
@ -165,11 +166,13 @@ export class AnalyticsService {
}
private _onAnalyticsUp() {
console.log('Analytics is up');
const msg = 'Analytics is up';
console.log(msg);
//this._alertService.sendMsg(msg, WebhookType.RECOVERY);
}
private async _onAnalyticsDown() {
let msg = 'Analytics is down';
const msg = 'Analytics is down';
console.log(msg);
// TODO: enable analytics down webhooks when it stops bouncing
// this._alertService.sendMsg(msg);

37
server/src/services/data_puller.ts

@ -6,9 +6,10 @@ import { HASTIC_API_KEY, GRAFANA_URL } from '../config';
import { availableReporter } from '../utils/reporter';
import { AlertService } from './alert_service';
import { queryByMetric, ConnectionRefused } from 'grafana-datasource-kit';
import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit';
import * as _ from 'lodash';
import { WebhookType } from './notification_service';
type MetricDataChunk = { values: [number, number][], columns: string[] };
@ -28,21 +29,33 @@ export class DataPuller {
);
private _unitTimes: { [analyticUnitId: string]: number } = {};
private _alertService: AlertService;
private _grafanaAvailableWebhook: Function;
private _datasourceAvailableWebhook: { [analyticUnitId: string]: Function } = {};
constructor(private analyticsService: AnalyticsService) {
const _alertService = new AlertService();
this._grafanaAvailableWebhook = _alertService.getGrafanaAvailableReporter();
this._alertService = new AlertService();
this._grafanaAvailableWebhook = this._alertService.getGrafanaAvailableReporter();
};
private _makeDatasourceAvailableWebhook(analyticUnit: AnalyticUnit.AnalyticUnit) {
const datasourceInfo = `${analyticUnit.metric.datasource.url} (${analyticUnit.metric.datasource.type})`;
return this._alertService.getAvailableWebhook(
`datasource ${datasourceInfo} available`,
`datasource ${datasourceInfo} unavailable`
);
}
public addUnit(analyticUnit: AnalyticUnit.AnalyticUnit) {
console.log(`start pulling analytic unit ${analyticUnit.id}`);
this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit);
this._runAnalyticUnitPuller(analyticUnit);
}
public deleteUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) {
if(_.has(this._unitTimes, analyticUnitId)) {
delete this._unitTimes[analyticUnitId];
delete this._datasourceAvailableWebhook[analyticUnitId];
console.log(`analytic unit ${analyticUnitId} deleted from data puller`);
}
}
@ -87,6 +100,7 @@ export class DataPuller {
console.log(`starting data puller with ${JSON.stringify(analyticUnits.map(u => u.id))} analytic units`);
_.each(analyticUnits, analyticUnit => {
this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit);
this._runAnalyticUnitPuller(analyticUnit);
});
@ -156,13 +170,26 @@ export class DataPuller {
const res = await this.pullData(analyticUnit, time, now);
this._grafanaAvailableConsoleReporter(true);
this._grafanaAvailableWebhook(true);
this._datasourceAvailableWebhook[analyticUnit.id](true);
return res;
} catch(err) {
if(err instanceof ConnectionRefused) {
let errorResolved = false;
if(err instanceof GrafanaUnavailable) {
errorResolved = true;
this._grafanaAvailableConsoleReporter(false);
this._grafanaAvailableWebhook(false);
} else {
this._grafanaAvailableWebhook(true);
}
if(err instanceof DatasourceUnavailable) {
errorResolved = true;
if(_.has(this._datasourceAvailableWebhook, analyticUnit.id)) {
this._datasourceAvailableWebhook[analyticUnit.id](false);
}
}
if(!errorResolved) {
console.error(`error while pulling data: ${err.message}`);
}

45
server/src/services/notification_service.ts

@ -1,4 +1,5 @@
import { Segment } from '../models/segment_model';
import * as AnalyticUnit from '../models/analytic_unit_model';
import { HASTIC_WEBHOOK_URL, HASTIC_WEBHOOK_TYPE, HASTIC_WEBHOOK_SECRET } from '../config';
import axios from 'axios';
@ -9,14 +10,35 @@ enum ContentType {
URLENCODED ='application/x-www-form-urlencoded'
}
// TODO: send webhook with payload without dep to AnalyticUnit
export async function sendAnalyticWebhook(analyticUnitName: string, segment: Segment) {
const alert = {
analyticUnitName,
from: segment.from,
to: segment.to
};
export enum WebhookType {
DETECT = 'DETECT',
FAILURE = 'FAILURE',
RECOVERY = 'RECOVERY',
MESSAGE = 'MESSAGE'
}
export declare type AnalyticAlert = {
type: WebhookType,
analyticUnitType: string,
analyticUnitName: string,
analyticUnitId: AnalyticUnit.AnalyticUnitId,
panelUrl: string,
from: number,
to: number
params?: any,
regionImage?: any
}
export declare type InfoAlert = {
type: WebhookType,
message: string,
from: number,
to: number,
params?: any
}
// TODO: send webhook with payload without dep to AnalyticUnit
export async function sendAnalyticWebhook(alert: AnalyticAlert) {
const fromTime = new Date(alert.from).toLocaleTimeString();
const toTime = new Date(alert.to).toLocaleTimeString();
console.log(`Sending alert unit:${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`);
@ -32,11 +54,12 @@ export async function sendAnalyticWebhook(analyticUnitName: string, segment: Seg
sendWebhook(payload);
}
export async function sendInfoWebhook(message: any) {
if(message && typeof message === 'object') {
sendWebhook(message, ContentType.JSON);
export async function sendInfoWebhook(alert: InfoAlert) {
if(alert && typeof alert === 'object') {
console.log(`Sending info webhook ${JSON.stringify(alert.message)}`);
sendWebhook(alert, ContentType.JSON);
} else {
console.error(`skip sending Info webhook, got corrupted message ${message}`);
console.error(`skip sending Info webhook, got corrupted message ${alert}`);
}
}

18
server/src/utils/reporter.ts

@ -1,6 +1,6 @@
export function availableReporter(
positiveMsg: string|null,
negativeMsg: string|null,
positiveArgs: any|null,
negativeArgs: any|null,
positiveAction = console.log,
negativeAction = console.error,
) {
@ -8,15 +8,21 @@ export function availableReporter(
return available => {
if(available && reported) {
reported = false;
if(positiveMsg) {
positiveAction(positiveMsg);
if(positiveArgs) {
if(!(positiveArgs instanceof Array)) {
positiveArgs = [ positiveArgs ];
}
positiveAction.apply(null, positiveArgs);
}
}
if(!available && !reported) {
reported = true;
if(negativeMsg) {
negativeAction(negativeMsg);
if(negativeArgs) {
if(!(negativeArgs instanceof Array)) {
negativeArgs = [ negativeArgs ];
}
negativeAction.apply(null, negativeArgs);
}
}
}

Loading…
Cancel
Save