Browse Source

"[PATTERN DETECTED]" in webhooks message #560 (#696)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
3e9eb3daa7
  1. 2
      analytics/analytics/server.py
  2. 2
      server/src/config.ts
  3. 55
      server/src/controllers/analytics_controller.ts
  4. 1
      server/src/models/analytics_message_model.ts
  5. 1
      server/src/models/analytics_task_model.ts
  6. 136
      server/src/services/alert_service.ts
  7. 34
      server/src/services/data_puller.ts
  8. 37
      server/src/services/notification_service.ts

2
analytics/analytics/server.py

@ -53,7 +53,7 @@ async def handle_data(task: object):
if res['status'] == 'SUCCESS' and res['payload'] is not None:
res['_id'] = task['_id']
message = services.server_service.ServerMessage('DETECT', res)
message = services.server_service.ServerMessage('PUSH_DETECT', res)
await server_service.send_message_to_server(message)
async def handle_message(message: services.ServerMessage):

2
server/src/config.ts

@ -26,6 +26,8 @@ export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002');
export const ZMQ_HOST = getConfigField('ZMQ_HOST', '127.0.0.1');
export const HASTIC_API_KEY = getConfigField('HASTIC_API_KEY');
export const GRAFANA_URL = normalizeUrl(getConfigField('GRAFANA_URL', null));
// TODO: save orgId in analytic_units.db
export const ORG_ID = getConfigField('ORG_ID', 1);
export const HASTIC_WEBHOOK_URL = getConfigField('HASTIC_WEBHOOK_URL', null);
export const HASTIC_WEBHOOK_TYPE = getConfigField('HASTIC_WEBHOOK_TYPE', 'application/x-www-form-urlencoded');
export const HASTIC_WEBHOOK_SECRET = getConfigField('HASTIC_WEBHOOK_SECRET', null);

55
server/src/controllers/analytics_controller.ts

@ -34,7 +34,6 @@ const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>();
let analyticsService: AnalyticsService = undefined;
let alertService: AlertService = undefined;
let grafanaAvailableWebhok: Function = undefined;
let dataPuller: DataPuller;
let detectionsCount: number = 0;
@ -68,6 +67,27 @@ async function onDetect(detectionResult: DetectionResult) {
]);
}
async function onPushDetect(detectionResult: DetectionResult) {
const analyticUnit = await AnalyticUnit.findById(detectionResult.analyticUnitId);
if (!_.isEmpty(detectionResult.segments) && analyticUnit.alert) {
try {
alertService.receiveAlert(analyticUnit, _.last(detectionResult.segments));
} catch(err) {
console.error(`error while sending webhook: ${err.message}`);
}
} else {
let reasons = [];
if(!analyticUnit.alert) {
reasons.push('alerting disabled');
}
if(_.isEmpty(detectionResult.segments)) {
reasons.push('segments empty');
}
console.log(`skip sending webhook for ${analyticUnit.id}, ${reasons.join(', ')}`);
}
await onDetect(detectionResult);
}
async function onMessage(message: AnalyticsMessage) {
let responsePayload = null;
let methodResolved = false;
@ -82,6 +102,11 @@ async function onMessage(message: AnalyticsMessage) {
methodResolved = true;
}
if(message.method === AnalyticsMessageMethod.PUSH_DETECT) {
await onPushDetect(message.payload.payload);
methodResolved = true;
}
if(!methodResolved) {
throw new TypeError('Unknown method ' + message.method);
}
@ -97,10 +122,9 @@ export function init() {
analyticsService = new AnalyticsService(onMessage);
alertService = new AlertService();
grafanaAvailableWebhok = alertService.getGrafanaAvailableReporter();
alertService.startAlerting();
dataPuller = new DataPuller(analyticsService);
dataPuller = new DataPuller(analyticsService, alertService);
dataPuller.runPuller();
}
@ -172,15 +196,16 @@ async function query(
HASTIC_API_KEY
);
data = queryResult.values;
grafanaAvailableWebhok(true);
alertService.sendGrafanaAvailableWebhook();
alertService.sendDatasourceAvailableWebhook(analyticUnit.metric.datasource.url);
} catch(e) {
if(e instanceof GrafanaUnavailable) {
const msg = `Can't connect Grafana: ${e.message}, check GRAFANA_URL`;
grafanaAvailableWebhok(false);
alertService.sendGrafanaUnavailableWebhook();
throw new Error(msg);
}
if(e instanceof DatasourceUnavailable) {
alertService.sendMsg(e.message, WebhookType.FAILURE);
alertService.sendDatasourceUnavailableWebhook(analyticUnit.metric.datasource.url);
throw new Error(e.message);
}
throw e;
@ -429,23 +454,7 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI
const segments = sortedSegments.map(
segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false)
);
const analyticUnit = await AnalyticUnit.findById(analyticUnitId);
if (!_.isEmpty(segments) && analyticUnit.alert) {
try {
alertService.receiveAlert(analyticUnit, _.last(segments));
} catch(err) {
console.error(`error while sending webhook: ${err.message}`);
}
} else {
let reasons = [];
if(!analyticUnit.alert) {
reasons.push('alerting disabled');
}
if(_.isEmpty(segments)) {
reasons.push('segments empty');
}
console.log(`skip sending webhook for ${analyticUnit.id}, ${reasons.join(', ')}`);
}
return {
lastDetectionTime: detectionResult.lastDetectionTime,
segments,

1
server/src/models/analytics_message_model.ts

@ -2,6 +2,7 @@ export enum AnalyticsMessageMethod {
TASK = 'TASK',
TASK_RESULT = 'TASK_RESULT',
DETECT = 'DETECT',
PUSH_DETECT = 'PUSH_DETECT',
DATA = 'DATA'
}

1
server/src/models/analytics_task_model.ts

@ -9,6 +9,7 @@ export type AnalyticsTaskId = string;
export enum AnalyticsTaskType {
LEARN = 'LEARN',
DETECT = 'DETECT',
PUSH_DETECT = 'PUSH_DETECT',
CANCEL = 'CANCEL',
PUSH = 'PUSH',
PROCESS = 'PROCESS'

136
server/src/services/alert_service.ts

@ -1,9 +1,10 @@
import { sendAnalyticWebhook, sendInfoWebhook, InfoAlert, AnalyticAlert, WebhookType } from './notification_service';
import { sendNotification, InfoMeta, AnalyticMeta, WebhookType, Notification } from './notification_service';
import * as _ from 'lodash';
import * as AnalyticUnit from '../models/analytic_units';
import { Segment } from '../models/segment_model';
import { availableReporter } from '../utils/reporter';
import { ORG_ID } from '../config';
export class Alert {
@ -11,24 +12,43 @@ export class Alert {
constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {};
public receive(segment: Segment) {
if(this.enabled) {
const alert = this.makeAlert(segment);
sendAnalyticWebhook(alert);
sendNotification(this.makeNotification(segment));
}
};
protected makeAlert(segment): AnalyticAlert {
const alert: AnalyticAlert = {
protected makeNotification(segment: Segment): Notification {
const meta = this.makeMeta(segment);
const message = this.makeMessage(meta);
return { meta, message };
}
protected makeMeta(segment: Segment): AnalyticMeta {
const datshdoardId = this.analyticUnit.panelId.split('/')[0];
const panelId = this.analyticUnit.panelId.split('/')[1];
const grafanaUrl = `${this.analyticUnit.grafanaUrl}/d/${datshdoardId}?panelId=${panelId}&edit=true&fullscreen=true?orgId=${ORG_ID}`;
const alert: AnalyticMeta = {
type: WebhookType.DETECT,
analyticUnitType: this.analyticUnit.type,
analyticUnitName: this.analyticUnit.name,
analyticUnitId: this.analyticUnit.id,
grafanaUrl: this.analyticUnit.grafanaUrl,
grafanaUrl,
from: segment.from,
to: segment.to
};
return alert;
}
protected makeMessage(meta: AnalyticMeta): string {
return [
`[${meta.analyticUnitType.toUpperCase()} ALERTING] ${meta.analyticUnitName}`,
`URL: ${meta.grafanaUrl}`,
``,
`From: ${new Date(meta.from)}`,
`To: ${new Date(meta.to)}`,
`ID: ${meta.analyticUnitId}`
].join('\n');
}
}
class PatternAlert extends Alert {
@ -39,14 +59,27 @@ class PatternAlert extends Alert {
if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) {
this.lastSentSegment = segment;
if(this.enabled) {
sendAnalyticWebhook(this.makeAlert(segment));
sendNotification(this.makeNotification(segment));
}
}
}
protected makeMessage(meta: AnalyticMeta): string {
return [
`[PATTERN DETECTED] ${meta.analyticUnitName}`,
`URL: ${meta.grafanaUrl}`,
``,
`From: ${new Date(meta.from)}`,
`To: ${new Date(meta.to)}`,
`ID: ${meta.analyticUnitId}`
].join('\n');
}
};
class ThresholdAlert extends Alert {
// TODO: configure threshold timing in panel like Grafana's alerts (`evaluate` time, `for` time)
// TODO: make events for start and end of threshold
EXPIRE_PERIOD_MS = 60000;
lastOccurence = 0;
@ -54,31 +87,60 @@ class ThresholdAlert extends Alert {
if(this.lastOccurence === 0) {
this.lastOccurence = segment.from;
if(this.enabled) {
sendAnalyticWebhook(this.makeAlert(segment));
sendNotification(this.makeNotification(segment));
}
} else {
if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) {
if(this.enabled) {
console.log(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`);
sendAnalyticWebhook(this.makeAlert(segment));
sendNotification(this.makeNotification(segment));
}
}
this.lastOccurence = segment.from;
}
}
protected makeMessage(meta: AnalyticMeta): string {
let message = [
`[THRESHOLD ALERTING] ${meta.analyticUnitName}`,
`URL: ${meta.grafanaUrl}`,
``,
`Starts at: ${new Date(meta.from)}`,
`ID: ${meta.analyticUnitId}`
].join('\n');
if(meta.params !== undefined) {
const metrics = `
Metrics:
${this.analyticUnit.metric.targets[0].expr}: ${meta.params.value}
`;
message += metrics;
}
return message;
}
}
export class AlertService {
private _alerts: { [id: string]: Alert; };
// TODO: object -> Map
private _alerts: { [id: string]: Alert };
private _alertingEnable: boolean;
private _grafanaAvailableReporter: Function;
private _datasourceAvailableReporters: Map<string, Function>;
constructor() {
this._alerts = {}
this._alerts = {};
this._datasourceAvailableReporters = new Map();
this._grafanaAvailableReporter = availableReporter(
['[OK] Grafana available', WebhookType.RECOVERY],
['[FAILURE] Grafana unavailable for pulling data', WebhookType.FAILURE],
this.sendMsg,
this.sendMsg
);
}
public receiveAlert(analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) {
@ -97,46 +159,41 @@ export class AlertService {
public sendMsg(message: string, type: WebhookType, optionalInfo = {}) {
const now = Date.now();
const infoAlert: InfoAlert = {
message,
const infoAlert: InfoMeta = {
params: optionalInfo,
type,
from: now,
to: now
}
sendInfoWebhook(infoAlert);
sendNotification({ message, meta: infoAlert });
}
public getGrafanaAvailableReporter() {
if(!this._grafanaAvailableReporter) {
this._grafanaAvailableReporter = availableReporter(
['Grafana available', WebhookType.RECOVERY],
['Grafana unavailable for pulling data', WebhookType.FAILURE],
this.sendMsg,
this.sendMsg
);
}
return this._grafanaAvailableReporter;
public sendGrafanaAvailableWebhook() {
this._grafanaAvailableReporter(true);
}
public getAvailableWebhook(recoveryMsg: string, failureMsg: string) {
return availableReporter(
[recoveryMsg, WebhookType.RECOVERY],
[failureMsg, WebhookType.FAILURE],
this.sendMsg,
this.sendMsg
);
public sendGrafanaUnavailableWebhook() {
this._grafanaAvailableReporter(false);
}
public sendDatasourceAvailableWebhook(url: string) {
const reporter = this._getDatasourceAvailableReporter(url);
reporter(true);
}
public sendDatasourceUnavailableWebhook(url: string) {
const reporter = this._getDatasourceAvailableReporter(url);
reporter(false);
}
public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) {
const detector = analyticUnit.detectorType;
let alertsType = {};
alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert;
alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert;
alertsType[AnalyticUnit.DetectorType.ANOMALY] = Alert;
this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit);
this._alerts[analyticUnit.id] = new alertsType[analyticUnit.detectorType](analyticUnit);
}
public removeAnalyticUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) {
@ -145,10 +202,21 @@ export class AlertService {
public stopAlerting() {
this._alertingEnable = false;
this._alerts = {};
}
public startAlerting() {
this._alertingEnable = true;
}
private _getDatasourceAvailableReporter(url: string) {
if(!_.has(this._datasourceAvailableReporters, url)) {
this._datasourceAvailableReporters[url] = availableReporter(
[`[OK] Datasource ${url} available`, WebhookType.RECOVERY],
[`[FAILURE] Datasource ${url} unavailable`, WebhookType.FAILURE],
this.sendMsg,
this.sendMsg
);
}
return this._datasourceAvailableReporters[url];
}
}

34
server/src/services/data_puller.ts

@ -30,33 +30,17 @@ export class DataPuller {
);
private _unitTimes: { [analyticUnitId: string]: number } = {};
private _alertService: AlertService;
private _grafanaAvailableWebhook: Function;
private _datasourceAvailableWebhook: { [analyticUnitId: string]: Function } = {};
constructor(private analyticsService: AnalyticsService) {
this._alertService = new AlertService();
this._grafanaAvailableWebhook = this._alertService.getGrafanaAvailableReporter();
};
private _makeDatasourceAvailableWebhook(analyticUnit: AnalyticUnit.AnalyticUnit) {
const datasourceInfo = `${analyticUnit.metric.datasource.url} (${analyticUnit.metric.datasource.type})`;
return this._alertService.getAvailableWebhook(
`datasource ${datasourceInfo} available`,
`datasource ${datasourceInfo} unavailable`
);
}
constructor(private analyticsService: AnalyticsService, private alertService: AlertService) {};
public addUnit(analyticUnit: AnalyticUnit.AnalyticUnit) {
console.log(`start pulling analytic unit ${analyticUnit.id}`);
this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit);
this._runAnalyticUnitPuller(analyticUnit);
}
public deleteUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) {
if(_.has(this._unitTimes, analyticUnitId)) {
delete this._unitTimes[analyticUnitId];
delete this._datasourceAvailableWebhook[analyticUnitId];
console.log(`analytic unit ${analyticUnitId} deleted from data puller`);
}
}
@ -97,7 +81,6 @@ export class DataPuller {
console.log(`starting data puller with ${JSON.stringify(analyticUnits.map(u => u.id))} analytic units`);
_.each(analyticUnits, analyticUnit => {
this._datasourceAvailableWebhook[analyticUnit.id] = this._makeDatasourceAvailableWebhook(analyticUnit);
this._runAnalyticUnitPuller(analyticUnit);
});
@ -169,24 +152,21 @@ export class DataPuller {
const now = Date.now();
const res = await this.pullData(analyticUnit, time, now);
this._grafanaAvailableConsoleReporter(true);
this._grafanaAvailableWebhook(true);
this._datasourceAvailableWebhook[analyticUnit.id](true);
this.alertService.sendGrafanaAvailableWebhook();
this.alertService.sendDatasourceAvailableWebhook(analyticUnit.metric.datasource.url);
return res;
} catch(err) {
let errorResolved = false;
if(err instanceof GrafanaUnavailable) {
errorResolved = true;
this._grafanaAvailableConsoleReporter(false);
this._grafanaAvailableWebhook(false);
this.alertService.sendGrafanaUnavailableWebhook();
} else {
this._grafanaAvailableWebhook(true);
this.alertService.sendGrafanaAvailableWebhook();
}
if(err instanceof DatasourceUnavailable) {
errorResolved = true;
if(_.has(this._datasourceAvailableWebhook, analyticUnit.id)) {
this._datasourceAvailableWebhook[analyticUnit.id](false);
}
this.alertService.sendDatasourceUnavailableWebhook(analyticUnit.metric.datasource.url);
}
if(!errorResolved) {

37
server/src/services/notification_service.ts

@ -3,6 +3,7 @@ import { HASTIC_WEBHOOK_URL, HASTIC_WEBHOOK_TYPE, HASTIC_WEBHOOK_SECRET, HASTIC_
import axios from 'axios';
import * as querystring from 'querystring';
import * as _ from 'lodash';
enum ContentType {
JSON = 'application/json',
@ -16,7 +17,7 @@ export enum WebhookType {
MESSAGE = 'MESSAGE'
}
export declare type AnalyticAlert = {
export declare type AnalyticMeta = {
type: WebhookType,
analyticUnitType: string,
analyticUnitName: string,
@ -28,44 +29,30 @@ export declare type AnalyticAlert = {
regionImage?: any
}
export declare type InfoAlert = {
export declare type InfoMeta = {
type: WebhookType,
message: string,
from: number,
to: number,
params?: any
}
// TODO: send webhook with payload without dep to AnalyticUnit
export async function sendAnalyticWebhook(alert: AnalyticAlert) {
const fromTime = new Date(alert.from).toLocaleTimeString();
const toTime = new Date(alert.to).toLocaleTimeString();
console.log(`Sending alert unit: ${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`);
sendWebhook(alert);
}
export async function sendInfoWebhook(alert: InfoAlert) {
if(alert && typeof alert === 'object') {
console.log(`Sending info webhook ${JSON.stringify(alert.message)}`);
sendWebhook(alert);
} else {
console.error(`skip sending Info webhook, got corrupted message ${alert}`);
}
export declare type Notification = {
message: string,
meta: InfoMeta | AnalyticMeta
}
export async function sendWebhook(payload: any) {
export async function sendNotification(notification: Notification) {
if(HASTIC_WEBHOOK_URL === null) {
throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`);
throw new Error(`Can't send notification, HASTIC_WEBHOOK_URL is undefined`);
}
payload.instanceName = HASTIC_INSTANCE_NAME;
notification.message += `\nInstance: ${HASTIC_INSTANCE_NAME}`;
let data;
if(HASTIC_WEBHOOK_TYPE === ContentType.JSON) {
data = JSON.stringify(payload);
data = JSON.stringify(notification);
} else if(HASTIC_WEBHOOK_TYPE === ContentType.URLENCODED) {
data = querystring.stringify(payload);
data = querystring.stringify(notification);
} else {
throw new Error(`Unknown webhook type: ${HASTIC_WEBHOOK_TYPE}`);
}
@ -81,6 +68,6 @@ export async function sendWebhook(payload: any) {
try {
await axios(options);
} catch(err) {
console.error(`Can't send alert to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`);
console.error(`Can't send notification to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`);
}
}

Loading…
Cancel
Save