Browse Source

System webhooks #386 (#420)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
1f062ce5a0
  1. 4
      docker-compose.yml
  2. 17
      server/src/services/alert_service.ts
  3. 6
      server/src/services/analytics_service.ts
  4. 33
      server/src/services/notification_service.ts

4
docker-compose.yml

@ -21,6 +21,10 @@ services:
image: hastic/analytics:latest image: hastic/analytics:latest
build: analytics build: analytics
restart: always restart: always
command: ${HASTIC_ANALYTICS_COMMAND}
#set HASTIC_ANALYTICS_COMMAND="python -um ptvsd --host 0.0.0.0 --port 5678 bin/server" for remote debug
ports:
- 5678:5678
volumes: volumes:
data-volume: data-volume:

17
server/src/services/alert_service.ts

@ -1,4 +1,4 @@
import { sendWebhook } from './notification_service'; import { sendAnalyticWebhook, sendInfoWebhook } from './notification_service';
import * as _ from 'lodash'; import * as _ from 'lodash';
import * as AnalyticUnit from '../models/analytic_unit_model'; import * as AnalyticUnit from '../models/analytic_unit_model';
@ -10,7 +10,7 @@ export class Alert {
constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {}; constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {};
public receive(segment: Segment) { public receive(segment: Segment) {
if(this.enabled) { if(this.enabled) {
sendWebhook(this.analyticUnit.name, segment); sendAnalyticWebhook(this.analyticUnit.name, segment);
} }
}; };
} }
@ -23,7 +23,7 @@ class PatternAlert extends Alert {
if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) { if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) {
this.lastSentSegment = segment; this.lastSentSegment = segment;
if(this.enabled) { if(this.enabled) {
sendWebhook(this.analyticUnit.name, segment); sendAnalyticWebhook(this.analyticUnit.name, segment);
} }
} }
} }
@ -38,14 +38,14 @@ class ThresholdAlert extends Alert {
if(this.lastOccurence === 0) { if(this.lastOccurence === 0) {
this.lastOccurence = segment.from; this.lastOccurence = segment.from;
if(this.enabled) { if(this.enabled) {
sendWebhook(this.analyticUnit.name, segment); sendAnalyticWebhook(this.analyticUnit.name, segment);
} }
} else { } else {
if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) { if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) {
if(this.enabled) { if(this.enabled) {
console.log(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`); console.log(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`);
sendWebhook(this.analyticUnit.name, segment); sendAnalyticWebhook(this.analyticUnit.name, segment);
} }
} }
@ -78,6 +78,13 @@ export class AlertService {
this._alerts[id].receive(segment); this._alerts[id].receive(segment);
}; };
public onStateChange(message: string, optionalInfo = {}) {
let message_payload = {
message
};
sendInfoWebhook(Object.assign(message_payload, optionalInfo));
}
public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) {
let detector = AnalyticUnit.getDetectorByType(analyticUnit.type); let detector = AnalyticUnit.getDetectorByType(analyticUnit.type);
let alertsType = {}; let alertsType = {};

6
server/src/services/analytics_service.ts

@ -1,6 +1,7 @@
import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model';
import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'; import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model';
import * as config from '../config'; import * as config from '../config';
import { AlertService } from './alert_service';
import * as zmq from 'zeromq'; import * as zmq from 'zeromq';
@ -11,6 +12,7 @@ import * as path from 'path';
export class AnalyticsService { export class AnalyticsService {
private _alertService = new AlertService();
private _requester: any; private _requester: any;
private _ready: boolean = false; private _ready: boolean = false;
private _pingResponded = false; private _pingResponded = false;
@ -167,7 +169,9 @@ export class AnalyticsService {
} }
private async _onAnalyticsDown() { private async _onAnalyticsDown() {
console.log('Analytics is down'); let msg = 'Analytics is down';
console.log(msg);
this._alertService.onStateChange(msg);
if(this._productionMode && !this._inDocker) { if(this._productionMode && !this._inDocker) {
await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString);
} }

33
server/src/services/notification_service.ts

@ -4,9 +4,13 @@ import { HASTIC_WEBHOOK_URL, HASTIC_WEBHOOK_TYPE, HASTIC_WEBHOOK_SECRET } from '
import axios from 'axios'; import axios from 'axios';
import * as querystring from 'querystring'; import * as querystring from 'querystring';
enum ContentType {
JSON = 'application/json',
URLENCODED ='application/x-www-form-urlencoded'
}
// TODO: send webhook with payload without dep to AnalyticUnit // TODO: send webhook with payload without dep to AnalyticUnit
export async function sendWebhook(analyticUnitName: string, segment: Segment) { export async function sendAnalyticWebhook(analyticUnitName: string, segment: Segment) {
const alert = { const alert = {
analyticUnitName, analyticUnitName,
from: segment.from, from: segment.from,
@ -17,25 +21,36 @@ export async function sendWebhook(analyticUnitName: string, segment: Segment) {
const toTime = new Date(alert.to).toLocaleTimeString(); const toTime = new Date(alert.to).toLocaleTimeString();
console.log(`Sending alert unit:${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`); console.log(`Sending alert unit:${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`);
if(HASTIC_WEBHOOK_URL === null) {
throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`);
}
let payload; let payload;
if(HASTIC_WEBHOOK_TYPE === 'application/json') { if(HASTIC_WEBHOOK_TYPE === ContentType.JSON) {
payload = JSON.stringify(alert); payload = JSON.stringify(alert);
} else if(HASTIC_WEBHOOK_TYPE === 'application/x-www-form-urlencoded') { } else if(HASTIC_WEBHOOK_TYPE === ContentType.URLENCODED) {
payload = querystring.stringify(alert); payload = querystring.stringify(alert);
} else { } else {
throw new Error(`Unknown webhook type: ${HASTIC_WEBHOOK_TYPE}`); throw new Error(`Unknown webhook type: ${HASTIC_WEBHOOK_TYPE}`);
} }
sendWebhook(payload);
}
export async function sendInfoWebhook(message: any) {
if(message && typeof message === 'object') {
sendWebhook(message, ContentType.JSON);
} else {
console.error(`skip sending Info webhook, got corrupted message ${message}`);
}
}
export async function sendWebhook(payload: any, contentType = HASTIC_WEBHOOK_TYPE) {
if(HASTIC_WEBHOOK_URL === null) {
throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`);
}
// TODO: use HASTIC_WEBHOOK_SECRET // TODO: use HASTIC_WEBHOOK_SECRET
const options = { const options = {
method: 'POST', method: 'POST',
url: HASTIC_WEBHOOK_URL, url: HASTIC_WEBHOOK_URL,
data: payload, data: payload,
headers: { 'Content-Type': HASTIC_WEBHOOK_TYPE } headers: { 'Content-Type': contentType }
}; };
try { try {
@ -43,6 +58,4 @@ export async function sendWebhook(analyticUnitName: string, segment: Segment) {
} catch(err) { } catch(err) {
console.error(`Can't send alert to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`); console.error(`Can't send alert to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`);
} }
} }

Loading…
Cancel
Save