From 1f062ce5a0fa253886d7550e0c4d4b952fd558bb Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Thu, 14 Feb 2019 21:24:15 +0300 Subject: [PATCH] System webhooks #386 (#420) --- docker-compose.yml | 4 +++ server/src/services/alert_service.ts | 17 +++++++---- server/src/services/analytics_service.ts | 6 +++- server/src/services/notification_service.ts | 33 ++++++++++++++------- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 851c658..acf20c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,10 @@ services: image: hastic/analytics:latest build: analytics restart: always + command: ${HASTIC_ANALYTICS_COMMAND} + #set HASTIC_ANALYTICS_COMMAND="python -um ptvsd --host 0.0.0.0 --port 5678 bin/server" for remote debug + ports: + - 5678:5678 volumes: data-volume: diff --git a/server/src/services/alert_service.ts b/server/src/services/alert_service.ts index bb08c3b..5a5cb69 100644 --- a/server/src/services/alert_service.ts +++ b/server/src/services/alert_service.ts @@ -1,4 +1,4 @@ -import { sendWebhook } from './notification_service'; +import { sendAnalyticWebhook, sendInfoWebhook } from './notification_service'; import * as _ from 'lodash'; import * as AnalyticUnit from '../models/analytic_unit_model'; @@ -10,7 +10,7 @@ export class Alert { constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {}; public receive(segment: Segment) { if(this.enabled) { - sendWebhook(this.analyticUnit.name, segment); + sendAnalyticWebhook(this.analyticUnit.name, segment); } }; } @@ -23,7 +23,7 @@ class PatternAlert extends Alert { if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) { this.lastSentSegment = segment; if(this.enabled) { - sendWebhook(this.analyticUnit.name, segment); + sendAnalyticWebhook(this.analyticUnit.name, segment); } } } @@ -38,14 +38,14 @@ class ThresholdAlert extends Alert { if(this.lastOccurence === 0) { this.lastOccurence = segment.from; if(this.enabled) { - sendWebhook(this.analyticUnit.name, segment); + sendAnalyticWebhook(this.analyticUnit.name, segment); } } else { if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) { if(this.enabled) { console.log(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`); - sendWebhook(this.analyticUnit.name, segment); + sendAnalyticWebhook(this.analyticUnit.name, segment); } } @@ -78,6 +78,13 @@ export class AlertService { this._alerts[id].receive(segment); }; + public onStateChange(message: string, optionalInfo = {}) { + let message_payload = { + message + }; + sendInfoWebhook(Object.assign(message_payload, optionalInfo)); + } + public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { let detector = AnalyticUnit.getDetectorByType(analyticUnit.type); let alertsType = {}; diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index 1d9d5dc..6a5d580 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,6 +1,7 @@ import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'; import * as config from '../config'; +import { AlertService } from './alert_service'; import * as zmq from 'zeromq'; @@ -11,6 +12,7 @@ import * as path from 'path'; export class AnalyticsService { + private _alertService = new AlertService(); private _requester: any; private _ready: boolean = false; private _pingResponded = false; @@ -167,7 +169,9 @@ export class AnalyticsService { } private async _onAnalyticsDown() { - console.log('Analytics is down'); + let msg = 'Analytics is down'; + console.log(msg); + this._alertService.onStateChange(msg); if(this._productionMode && !this._inDocker) { await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); } diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index 0ac6fd4..c0123a1 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -4,9 +4,13 @@ import { HASTIC_WEBHOOK_URL, HASTIC_WEBHOOK_TYPE, HASTIC_WEBHOOK_SECRET } from ' import axios from 'axios'; import * as querystring from 'querystring'; +enum ContentType { + JSON = 'application/json', + URLENCODED ='application/x-www-form-urlencoded' +} // TODO: send webhook with payload without dep to AnalyticUnit -export async function sendWebhook(analyticUnitName: string, segment: Segment) { +export async function sendAnalyticWebhook(analyticUnitName: string, segment: Segment) { const alert = { analyticUnitName, from: segment.from, @@ -17,25 +21,36 @@ export async function sendWebhook(analyticUnitName: string, segment: Segment) { const toTime = new Date(alert.to).toLocaleTimeString(); console.log(`Sending alert unit:${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`); - if(HASTIC_WEBHOOK_URL === null) { - throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`); - } - let payload; - if(HASTIC_WEBHOOK_TYPE === 'application/json') { + if(HASTIC_WEBHOOK_TYPE === ContentType.JSON) { payload = JSON.stringify(alert); - } else if(HASTIC_WEBHOOK_TYPE === 'application/x-www-form-urlencoded') { + } else if(HASTIC_WEBHOOK_TYPE === ContentType.URLENCODED) { payload = querystring.stringify(alert); } else { throw new Error(`Unknown webhook type: ${HASTIC_WEBHOOK_TYPE}`); } + sendWebhook(payload); +} + +export async function sendInfoWebhook(message: any) { + if(message && typeof message === 'object') { + sendWebhook(message, ContentType.JSON); + } else { + console.error(`skip sending Info webhook, got corrupted message ${message}`); + } +} + +export async function sendWebhook(payload: any, contentType = HASTIC_WEBHOOK_TYPE) { + if(HASTIC_WEBHOOK_URL === null) { + throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`); + } // TODO: use HASTIC_WEBHOOK_SECRET const options = { method: 'POST', url: HASTIC_WEBHOOK_URL, data: payload, - headers: { 'Content-Type': HASTIC_WEBHOOK_TYPE } + headers: { 'Content-Type': contentType } }; try { @@ -43,6 +58,4 @@ export async function sendWebhook(analyticUnitName: string, segment: Segment) { } catch(err) { console.error(`Can't send alert to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`); } - } -