From bfeb0df9c2d8ef850c79e63b0d3bb2979a693e57 Mon Sep 17 00:00:00 2001 From: rozetko Date: Thu, 6 Dec 2018 01:10:58 +0300 Subject: [PATCH] Restore hooks API #149 (#287) * Add webhook settings to config * Update notification service * Remove alerts controller * Remove alerts router * Add alert field to analytic unit * Add endpoints: - GET analyticUnits/units - PATCH analyticUnits/alert * Rename sendNotification -> sendWebhook * Change webhook payload * Send webhook on detection --- config.example.json | 5 +- server/src/config.ts | 3 + server/src/controllers/alerts_controller.ts | 72 ------------------- .../src/controllers/analytics_controller.ts | 28 +++++--- server/src/models/analytic_unit_model.ts | 27 +++++++ server/src/routes/alerts_router.ts | 43 ----------- server/src/routes/analytic_units_router.ts | 53 ++++++++++++++ server/src/services/notification_service.ts | 56 ++++++++------- 8 files changed, 136 insertions(+), 151 deletions(-) delete mode 100644 server/src/controllers/alerts_controller.ts delete mode 100644 server/src/routes/alerts_router.ts diff --git a/config.example.json b/config.example.json index a27c12f..7866455 100644 --- a/config.example.json +++ b/config.example.json @@ -1,4 +1,7 @@ { "HASTIC_PORT": 8000, - "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9" + "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9", + "HASTIC_WEBHOOK_URL": "http://localhost:8080", + "HASTIC_WEBHOOK_TYPE": "application/x-www-form-urlencoded", + "HASTIC_WEBHOOK_SECRET": "mysecret" } diff --git a/server/src/config.ts b/server/src/config.ts index a9cfa2e..8d58102 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -22,6 +22,9 @@ export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join(os.tmpdir() export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002'); export const ZMQ_HOST = getConfigField('ZMQ_HOST', '127.0.0.1'); export const HASTIC_API_KEY = getConfigField('HASTIC_API_KEY'); +export const HASTIC_WEBHOOK_URL = getConfigField('HASTIC_WEBHOOK_URL', null); +export const HASTIC_WEBHOOK_TYPE = getConfigField('HASTIC_WEBHOOK_TYPE', 'application/x-www-form-urlencoded'); +export const HASTIC_WEBHOOK_SECRET = getConfigField('HASTIC_WEBHOOK_SECRET', null); export const ANLYTICS_PING_INTERVAL = 500; // ms export const PACKAGE_VERSION = getPackageVersion(); export const GIT_INFO = getGitInfo(); diff --git a/server/src/controllers/alerts_controller.ts b/server/src/controllers/alerts_controller.ts deleted file mode 100644 index aeda4a0..0000000 --- a/server/src/controllers/alerts_controller.ts +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Alarting is not supported yet - */ - -throw new Error('not supported'); - - -// import { runDetect } from './analytics_controller'; -// import { getLabeledSegments } from './segments_controller'; -// import { AnalyticUnitId } from '../models/analytic_unit'; -// import { sendNotification } from '../services/notification_service'; -// import { getJsonDataSync, writeJsonDataSync } from '../services/json_service'; -// import { ANALYTIC_UNITS_PATH } from '../config'; - -// import * as path from 'path'; -// import * as fs from 'fs'; - - -// const ALERT_TIMEOUT = 60000; // ms -// const ALERTS_DB_PATH = path.join(ANALYTIC_UNITS_PATH, `alerts_anomalies.json`); - - -// export function getAlertsAnomalies(): AnalyticUnitId[] { -// if(!fs.existsSync(ALERTS_DB_PATH)) { -// saveAlertsAnomalies([]); -// } -// return getJsonDataSync(ALERTS_DB_PATH); -// } - -// export function saveAlertsAnomalies(units: AnalyticUnitId[]) { -// return writeJsonDataSync(ALERTS_DB_PATH, units); -// } - -// function processAlerts(id: AnalyticUnitId) { -// let segments = getLabeledSegments(id); - -// const currentTime = new Date().getTime(); -// const activeAlert = activeAlerts.has(id); -// let newActiveAlert = false; - -// if(segments.length > 0) { -// let lastSegment = segments[segments.length - 1]; -// if(lastSegment.finish >= currentTime - ALERT_TIMEOUT) { -// newActiveAlert = true; -// } -// } - -// if(!activeAlert && newActiveAlert) { -// activeAlerts.add(id); -// sendNotification(id, true); -// } else if(activeAlert && !newActiveAlert) { -// activeAlerts.delete(id); -// sendNotification(id, false); -// } -// } - -// async function alertsTick() { -// let alertsAnomalies = getAlertsAnomalies(); -// for (let detectorId of alertsAnomalies) { -// try { -// await runDetect(detectorId); -// processAlerts(detectorId); -// } catch (e) { -// console.error(e); -// } -// } -// setTimeout(alertsTick, 5000); -// } - - -// const activeAlerts = new Set(); -// setTimeout(alertsTick, 5000); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 271680b..ac3f890 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -4,6 +4,7 @@ import * as AnalyticUnitCache from '../models/analytic_unit_cache_model'; import * as Segment from '../models/segment_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; +import { sendWebhook } from '../services/notification_service'; import { HASTIC_API_KEY } from '../config' import { queryByMetric } from 'grafana-datasource-kit'; @@ -192,7 +193,7 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) { return []; } - let payload = processDetectionResult(id, result.payload); + let payload = await processDetectionResult(id, result.payload); // TODO: implement segments merging without removing labeled // if(segments.length > 0 && payload.segments.length > 0) { @@ -233,25 +234,30 @@ export async function deleteNonDetectedSegments(id, payload) { Segment.removeSegments(segmentsToRemove.map(s => s.id)); } -function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult): { - lastDetectionTime: number, - segments: Segment.Segment[], - cache: any -} { - - if (detectionResult.segments === undefined || !Array.isArray(detectionResult.segments)) { +async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult): + Promise<{ + lastDetectionTime: number, + segments: Segment.Segment[], + cache: any + }> { + if(detectionResult.segments === undefined || !Array.isArray(detectionResult.segments)) { throw new Error(`Missing segments in result or it is corrupted: ${JSON.stringify(detectionResult)}`); } - if (detectionResult.lastDetectionTime === undefined || isNaN(+detectionResult.lastDetectionTime)) { + if(detectionResult.lastDetectionTime === undefined || isNaN(+detectionResult.lastDetectionTime)) { throw new Error( `Missing lastDetectionTime in result or it is corrupted: ${JSON.stringify(detectionResult)}` ); } - let segments = detectionResult.segments.map( + const segments = detectionResult.segments.map( segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false) ); - + const analyticUnit = await AnalyticUnit.findById(analyticUnitId); + if(analyticUnit.alert) { + if(!_.isEmpty(segments)) { + sendWebhook(analyticUnit.name, _.last(segments)); + } + } return { lastDetectionTime: detectionResult.lastDetectionTime, segments: segments, diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index e236b96..0fe89b4 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -15,12 +15,25 @@ export enum AnalyticUnitStatus { FAILED = 'FAILED' } +export type FindManyQuery = { + name?: string, + panelUrl?: string, + type?: string, + metric?: Metric, + alert?: boolean, + id?: AnalyticUnitId, + lastDetectionTime?: number, + status?: AnalyticUnitStatus, + error?: string +}; + export class AnalyticUnit { constructor( public name: string, public panelUrl: string, public type: string, public metric: Metric, + public alert?: boolean, public id?: AnalyticUnitId, public lastDetectionTime?: number, public status?: AnalyticUnitStatus, @@ -47,6 +60,7 @@ export class AnalyticUnit { panelUrl: this.panelUrl, type: this.type, metric: this.metric.toObject(), + alert: this.alert, lastDetectionTime: this.lastDetectionTime, status: this.status, error: this.error @@ -62,6 +76,7 @@ export class AnalyticUnit { obj.panelUrl, obj.type, Metric.fromObject(obj.metric), + obj.alert, obj._id, obj.lastDetectionTime, obj.status as AnalyticUnitStatus, @@ -80,6 +95,14 @@ export async function findById(id: AnalyticUnitId): Promise { return AnalyticUnit.fromObject(obj); } +export async function findMany(query: FindManyQuery): Promise { + let analyticUnits = await db.findMany(query); + if(analyticUnits === null) { + return []; + } + return analyticUnits.map(AnalyticUnit.fromObject); +} + /** * Creates and updates new unit.id @@ -109,3 +132,7 @@ export async function setStatus(id: AnalyticUnitId, status: string, error?: stri export async function setDetectionTime(id: AnalyticUnitId, lastDetectionTime: number) { return db.updateOne(id, { lastDetectionTime }); } + +export async function setAlert(id: AnalyticUnitId, alert: boolean) { + return db.updateOne(id, { alert }); +} diff --git a/server/src/routes/alerts_router.ts b/server/src/routes/alerts_router.ts deleted file mode 100644 index 68211e6..0000000 --- a/server/src/routes/alerts_router.ts +++ /dev/null @@ -1,43 +0,0 @@ -// /** -// * Alarting is not supported yet -// */ - -// throw new console.error("Not supported"); - - -// import * as AnalyticUnit from '../models/analytic_unit'; -// import { getAlertsAnomalies, saveAlertsAnomalies } from '../controllers/alerts_controller'; - -// import * as Router from 'koa-router'; - - -// function getAlert(ctx: Router.IRouterContext) { -// let id: AnalyticUnit.AnalyticUnitId = ctx.request.query.id; - -// let alertsAnomalies = getAlertsAnomalies(); -// let pos = alertsAnomalies.indexOf(id); - -// let enabled: boolean = (pos !== -1); -// ctx.response.body = { enabled }; -// } - -// function setAlertEnabled(ctx: Router.IRouterContext) { -// let id: AnalyticUnit.AnalyticUnitId = ctx.request.body.id; -// let enabled: boolean = ctx.request.body.enabled; - -// let alertsAnomalies = getAlertsAnomalies(); -// let pos: number = alertsAnomalies.indexOf(id); -// if(enabled && pos == -1) { -// alertsAnomalies.push(id); -// saveAlertsAnomalies(alertsAnomalies); -// } else if(!enabled && pos > -1) { -// alertsAnomalies.splice(pos, 1); -// saveAlertsAnomalies(alertsAnomalies); -// } -// ctx.response.body = { status: 'OK' }; -// } - -// export const router = new Router(); - -// router.get('/', getAlert); -// router.post('/', setAlertEnabled); diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index 7f0c233..252cf3f 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -63,6 +63,31 @@ async function getUnit(ctx: Router.IRouterContext) { } } +async function getUnits(ctx: Router.IRouterContext) { + try { + const panelUrl = ctx.request.query.panelUrl; + if(panelUrl === undefined) { + throw new Error('Cannot get alerts of undefined panelUrl'); + } + + let analyticUnits = await AnalyticUnit.findMany({ panelUrl }); + if(analyticUnits === null) { + analyticUnits = []; + } + + ctx.response.body = { + analyticUnits + }; + } catch(e) { + console.error(e); + ctx.response.status = 404; + ctx.response.body = { + code: 404, + message: `GET /analyticUnits/units error: ${e.message}` + }; + } +} + async function createUnit(ctx: Router.IRouterContext) { try { let id = await createAnalyticUnitFromObject(ctx.request.body); @@ -74,7 +99,33 @@ async function createUnit(ctx: Router.IRouterContext) { message: `POST /analyticUnits error: ${e.message}` }; } +} +async function setAlert(ctx: Router.IRouterContext) { + try { + const { analyticUnitId, alert } = ctx.request.query as { + analyticUnitId: AnalyticUnit.AnalyticUnitId, alert: boolean + }; + if(analyticUnitId === undefined) { + throw new Error('Cannot update undefined id'); + } + if(alert === undefined) { + throw new Error('Cannot set undefined alert status'); + } + + await AnalyticUnit.setAlert(analyticUnitId, alert); + + ctx.response.body = { + code: 200, + message: 'Success' + }; + } catch(e) { + ctx.response.status = 500; + ctx.response.body = { + code: 500, + message: `PATCH /analyticUnits/alert error: ${e.message}` + }; + } } async function deleteUnit(ctx: Router.IRouterContext) { @@ -102,6 +153,8 @@ async function deleteUnit(ctx: Router.IRouterContext) { export var router = new Router(); router.get('/', getUnit); +router.get('/units', getUnits); router.get('/status', getStatus); +router.patch('/alert', setAlert); router.post('/', createUnit); router.delete('/', deleteUnit); diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index 6160ef9..dc94061 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -1,39 +1,47 @@ -import { findById, AnalyticUnitId } from '../models/analytic_unit_model'; +import { Segment } from '../models/segment_model'; +import { HASTIC_WEBHOOK_URL, HASTIC_WEBHOOK_TYPE, HASTIC_WEBHOOK_SECRET } from '../config'; import axios from 'axios'; +import * as querystring from 'querystring'; -// TODO: send notification with payload without dep to AnalyticUnit -export async function sendNotification(id: AnalyticUnitId, active: boolean) { - let anomalyName = (await findById(id)).name - console.log('Notification ' + anomalyName); +// TODO: send webhook with payload without dep to AnalyticUnit +export async function sendWebhook(analyticUnitName: string, segment: Segment) { + if(HASTIC_WEBHOOK_URL === null) { + throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`); + } - let notification = { - anomaly: anomalyName, - status: '' + const alert = { + analyticUnitName, + from: segment.from, + to: segment.to }; - if(active) { - notification.status = 'alert'; + + console.log(`Sending alert: ${JSON.stringify(alert)}`); + + let payload; + if(HASTIC_WEBHOOK_TYPE === 'application/json') { + payload = JSON.stringify(alert); + } else if(HASTIC_WEBHOOK_TYPE === 'application/x-www-form-urlencoded') { + payload = querystring.stringify(alert); } else { - notification.status = 'OK'; + throw new Error(`Unknown webhook type: ${HASTIC_WEBHOOK_TYPE}`); } - // TODO: more to config - let endpoint = process.env.HASTIC_ALERT_ENDPOINT; - if(endpoint === undefined) { - console.error(`Can't send alert, env HASTIC_ALERT_ENDPOINT is undefined`); - return; - } + // TODO: use HASTIC_WEBHOOK_SECRET + const options = { + method: 'POST', + url: HASTIC_WEBHOOK_URL, + data: payload, + headers: { 'Content-Type': HASTIC_WEBHOOK_TYPE } + }; try { - var data = await axios.post(endpoint, { - method: 'POST', - body: JSON.stringify(notification) - }) - console.log(data); + const response = await axios(options); + console.log(response); } catch(err) { - console.error(`Can't send alert to ${endpoint}. Error: ${err}`); + console.error(`Can't send alert to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`); } - + }