Browse Source

Synchronize data with server #455 (#461)

Synchronize data with server #455 (#461)
pull/1/head
Evgeny Smyshlyaev 5 years ago committed by GitHub
parent
commit
015f819958
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      server/src/config.ts
  2. 13
      server/src/controllers/analytics_controller.ts
  3. 5
      server/src/index.ts
  4. 35
      server/src/migrations/0.3.2-beta.ts
  5. 54
      server/src/models/analytic_unit_model.ts
  6. 67
      server/src/models/panel_model.ts
  7. 43
      server/src/routes/analytic_units_router.ts
  8. 43
      server/src/routes/panel_router.ts
  9. 2
      server/src/services/alert_service.ts
  10. 1
      server/src/services/analytics_service.ts
  11. 8
      server/src/services/data_puller.ts
  12. 5
      server/src/services/data_service.ts
  13. 2
      server/src/services/notification_service.ts

3
server/src/config.ts

@ -14,9 +14,8 @@ export const ANALYTICS_PATH = path.join(__dirname, '../../analytics');
export const DATA_PATH = path.join(__dirname, '../../data'); export const DATA_PATH = path.join(__dirname, '../../data');
export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db');
export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const ANALYTIC_UNIT_CACHES_DATABASE_PATH = path.join(DATA_PATH, 'analytic_unit_caches.db'); export const ANALYTIC_UNIT_CACHES_DATABASE_PATH = path.join(DATA_PATH, 'analytic_unit_caches.db');
export const PANELS_DATABASE_PATH = path.join(DATA_PATH, 'panels.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const THRESHOLD_DATABASE_PATH = path.join(DATA_PATH, 'treshold.db'); export const THRESHOLD_DATABASE_PATH = path.join(DATA_PATH, 'treshold.db');

13
server/src/controllers/analytics_controller.ts

@ -123,11 +123,11 @@ async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: Analytic
} }
console.log(`query time range: from ${new Date(range.from)} to ${new Date(range.to)}`); console.log(`query time range: from ${new Date(range.from)} to ${new Date(range.to)}`);
let panelUrl; let grafanaUrl;
if(GRAFANA_URL !== null) { if(GRAFANA_URL !== null) {
panelUrl = GRAFANA_URL; grafanaUrl = GRAFANA_URL;
} else { } else {
panelUrl = analyticUnit.panelUrl; grafanaUrl = analyticUnit.grafanaUrl;
} }
let data; let data;
@ -135,7 +135,7 @@ async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: Analytic
try { try {
const queryResult = await queryByMetric( const queryResult = await queryByMetric(
analyticUnit.metric, analyticUnit.metric,
panelUrl, grafanaUrl,
range.from, range.from,
range.to, range.to,
HASTIC_API_KEY HASTIC_API_KEY
@ -284,7 +284,10 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) {
} }
export async function remove(analyticUnitId: AnalyticUnit.AnalyticUnitId) { export async function remove(analyticUnitId: AnalyticUnit.AnalyticUnitId) {
await cancelAnalyticsTask(analyticUnitId); // We don't await for analytics task cancellation here
// If we add await, the rest function will be executed only when analytics becomes up
cancelAnalyticsTask(analyticUnitId);
if(dataPuller !== undefined) { if(dataPuller !== undefined) {
dataPuller.deleteUnit(analyticUnitId); dataPuller.deleteUnit(analyticUnitId);

5
server/src/index.ts

@ -1,6 +1,5 @@
import { router as analyticUnitsRouter } from './routes/analytic_units_router'; import { router as analyticUnitsRouter } from './routes/analytic_units_router';
import { router as segmentsRouter } from './routes/segments_router'; import { router as segmentsRouter } from './routes/segments_router';
import { router as panelRouter } from './routes/panel_router';
import { router as thresholdRouter } from './routes/threshold_router'; import { router as thresholdRouter } from './routes/threshold_router';
import * as AnalyticsController from './controllers/analytics_controller'; import * as AnalyticsController from './controllers/analytics_controller';
@ -9,6 +8,8 @@ import * as ProcessService from './services/process_service';
import { HASTIC_PORT, PACKAGE_VERSION, GIT_INFO, ZMQ_CONNECTION_STRING } from './config'; import { HASTIC_PORT, PACKAGE_VERSION, GIT_INFO, ZMQ_CONNECTION_STRING } from './config';
import { convertPanelUrlToPanelId } from './migrations/0.3.2-beta';
import * as Koa from 'koa'; import * as Koa from 'koa';
import * as Router from 'koa-router'; import * as Router from 'koa-router';
import * as bodyParser from 'koa-bodyparser'; import * as bodyParser from 'koa-bodyparser';
@ -16,6 +17,7 @@ import * as bodyParser from 'koa-bodyparser';
AnalyticsController.init(); AnalyticsController.init();
ProcessService.registerExitHandler(AnalyticsController.terminate); ProcessService.registerExitHandler(AnalyticsController.terminate);
convertPanelUrlToPanelId();
var app = new Koa(); var app = new Koa();
@ -50,7 +52,6 @@ app.use(async function(ctx, next) {
var rootRouter = new Router(); var rootRouter = new Router();
rootRouter.use('/analyticUnits', analyticUnitsRouter.routes(), analyticUnitsRouter.allowedMethods()); rootRouter.use('/analyticUnits', analyticUnitsRouter.routes(), analyticUnitsRouter.allowedMethods());
rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods()); rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods());
rootRouter.use('/panel', panelRouter.routes(), panelRouter.allowedMethods());
rootRouter.use('/threshold', thresholdRouter.routes(), thresholdRouter.allowedMethods()); rootRouter.use('/threshold', thresholdRouter.routes(), thresholdRouter.allowedMethods());
rootRouter.get('/', async (ctx) => { rootRouter.get('/', async (ctx) => {

35
server/src/migrations/0.3.2-beta.ts

@ -0,0 +1,35 @@
import { Collection, makeDBQ } from '../services/data_service';
const db = makeDBQ(Collection.ANALYTIC_UNITS);
export async function convertPanelUrlToPanelId() {
const analyticUnits = await db.findMany({ panelUrl: { $exists: true } });
console.log(`Found ${analyticUnits.length} analytic units with panelUrl field`);
if(analyticUnits.length === 0) {
console.log('Nothing to migrate');
return;
}
const panelUrlRegex = /^(.+)\/d\/(\w+)\/.+panelId=(\d+)/;
const newPanelUrlRegex = /^(.+)\/dashboard\/(\w+).+panelId=(\d+)/;
const updatedAnalyticUnits = analyticUnits.map(analyticUnit => {
const parsedPanelUrl = analyticUnit.panelUrl.match(panelUrlRegex) || analyticUnit.panelUrl.match(newPanelUrlRegex);
const grafanaUrl = parsedPanelUrl[1];
const dashboardId = parsedPanelUrl[2];
const oldPanelId = parsedPanelUrl[3];
const panelId = `${dashboardId}/${oldPanelId}`;
return {
_id: analyticUnit._id,
grafanaUrl,
panelId
};
});
console.log(updatedAnalyticUnits);
await updatedAnalyticUnits.forEach(analyticUnit => db.updateOne(analyticUnit._id, {
panelUrl: undefined,
...analyticUnit
}));
}

54
server/src/models/analytic_unit_model.ts

@ -54,35 +54,45 @@ export enum AnalyticUnitStatus {
export type FindManyQuery = { export type FindManyQuery = {
name?: string, name?: string,
panelUrl?: string, grafanaUrl?: string,
panelId?: string,
type?: string, type?: string,
metric?: Metric, metric?: Metric,
alert?: boolean, alert?: boolean,
id?: AnalyticUnitId, id?: AnalyticUnitId,
lastDetectionTime?: number, lastDetectionTime?: number,
status?: AnalyticUnitStatus, status?: AnalyticUnitStatus,
error?: string error?: string,
labeledColor?: string,
deletedColor?: string,
detectorType?: DetectorType,
visible?: boolean
}; };
export class AnalyticUnit { export class AnalyticUnit {
constructor( constructor(
public name: string, public name: string,
public panelUrl: string, public grafanaUrl: string,
public panelId: string,
public type: string, public type: string,
public metric?: Metric, public metric?: Metric,
public alert?: boolean, public alert?: boolean,
public id?: AnalyticUnitId, public id?: AnalyticUnitId,
public lastDetectionTime?: number, public lastDetectionTime?: number,
public status?: AnalyticUnitStatus, public status?: AnalyticUnitStatus,
public error?: string public error?: string,
public labeledColor?: string,
public deletedColor?: string,
public detectorType?: DetectorType,
public visible?: boolean
) { ) {
if(name === undefined) { if(name === undefined) {
throw new Error(`Missing field "name"`); throw new Error(`Missing field "name"`);
} }
if(panelUrl === undefined) { if(grafanaUrl === undefined) {
throw new Error(`Missing field "panelUrl"`); throw new Error(`Missing field "grafanaUrl"`);
} }
if(type === undefined) { if(type === undefined) {
throw new Error(`Missing field "type"`); throw new Error(`Missing field "type"`);
@ -98,7 +108,8 @@ export class AnalyticUnit {
return { return {
_id: this.id, _id: this.id,
name: this.name, name: this.name,
panelUrl: this.panelUrl, grafanaUrl: this.grafanaUrl,
panelId: this.panelId,
type: this.type, type: this.type,
metric, metric,
alert: this.alert, alert: this.alert,
@ -108,6 +119,19 @@ export class AnalyticUnit {
}; };
} }
public toPanelObject() {
return {
id: this.id,
name: this.name,
type: this.type,
alert: this.alert,
labeledColor: this.labeledColor,
deletedColor: this.deletedColor,
detectorType: this.detectorType,
visible: this.visible
};
}
static fromObject(obj: any): AnalyticUnit { static fromObject(obj: any): AnalyticUnit {
if(obj === undefined) { if(obj === undefined) {
throw new Error('obj is undefined'); throw new Error('obj is undefined');
@ -118,7 +142,8 @@ export class AnalyticUnit {
} }
return new AnalyticUnit( return new AnalyticUnit(
obj.name, obj.name,
obj.panelUrl, obj.grafanaUrl,
obj.panelId,
obj.type, obj.type,
metric, metric,
obj.alert, obj.alert,
@ -126,6 +151,10 @@ export class AnalyticUnit {
obj.lastDetectionTime, obj.lastDetectionTime,
obj.status as AnalyticUnitStatus, obj.status as AnalyticUnitStatus,
obj.error, obj.error,
obj.labeledColor,
obj.deletedColor,
obj.detectorType,
obj.visible
); );
} }
@ -167,7 +196,14 @@ export async function remove(id: AnalyticUnitId): Promise<void> {
} }
export async function update(id: AnalyticUnitId, unit: AnalyticUnit) { export async function update(id: AnalyticUnitId, unit: AnalyticUnit) {
return db.updateOne(id, unit); const updateObj = {
name: unit.name,
labeledColor: unit.labeledColor,
deletedColor: unit.deletedColor,
visible: unit.visible
};
return db.updateOne(id, updateObj);
} }
export async function setStatus(id: AnalyticUnitId, status: string, error?: string) { export async function setStatus(id: AnalyticUnitId, status: string, error?: string) {

67
server/src/models/panel_model.ts

@ -1,67 +0,0 @@
import { AnalyticUnitId } from './analytic_unit_model';
import { Collection, makeDBQ } from '../services/data_service';
let db = makeDBQ(Collection.PANELS);
export type PanelId = string;
export class Panel {
constructor(
public panelUrl: string,
public analyticUnits: AnalyticUnitId[],
public id?: PanelId
) {
if(this.panelUrl === undefined) {
throw new Error('panelUrl is undefined');
}
}
public toObject() {
return {
_id: this.id,
panelUrl: this.panelUrl,
analyticUnits: this.analyticUnits
};
}
static fromObject(obj: any): Panel {
if(obj === undefined) {
throw new Error('obj is undefined');
}
return new Panel(
obj.panelUrl,
obj.analyticUnits,
obj._id
);
}
}
export type FindOneQuery = {
panelUrl: string
}
export async function findOne(query: FindOneQuery): Promise<Panel> {
let panel = await db.findOne(query);
if(panel === null) {
return null;
}
return Panel.fromObject(panel);
}
export async function insertAnalyticUnit(panelUrl: string, analyticUnitId: AnalyticUnitId) {
const panel = await db.findOne({ panelUrl });
return db.updateOne({ panelUrl }, {
analyticUnits: panel.analyticUnits.concat(analyticUnitId)
});
}
export async function removeAnalyticUnit(panelUrl: string, analyticUnitId: AnalyticUnitId) {
const panel = await db.findOne({ panelUrl });
return db.updateOne({ panelUrl }, {
analyticUnits: panel.analyticUnits.filter(analyticUnit => analyticUnit !== analyticUnitId)
});
}

43
server/src/routes/analytic_units_router.ts

@ -26,36 +26,22 @@ async function getStatus(ctx: Router.IRouterContext) {
} }
} }
async function getUnit(ctx: Router.IRouterContext) {
let analyticUnitId = ctx.request.query.id;
if(analyticUnitId === undefined) {
throw new Error('No id param in query');
}
let analyticUnit = await AnalyticUnit.findById(analyticUnitId);
if(analyticUnit === null) {
throw new Error(`Cannot find analytic unit with id ${analyticUnitId}`);
}
ctx.response.body = {
name: analyticUnit.name,
metric: analyticUnit.metric,
status: analyticUnit.status
};
}
async function getUnits(ctx: Router.IRouterContext) { async function getUnits(ctx: Router.IRouterContext) {
const panelUrl = ctx.request.query.panelUrl; const panelId = ctx.request.query.panelId;
if(panelUrl === undefined) { if(panelId === undefined) {
throw new Error('Cannot get alerts of undefined panelUrl'); throw new Error('Cannot get units of undefined panelId');
} }
let analyticUnits = await AnalyticUnit.findMany({ panelUrl }); let analyticUnits = await AnalyticUnit.findMany({ panelId });
if(analyticUnits === null) { if(analyticUnits === null) {
analyticUnits = []; analyticUnits = [];
} }
ctx.response.body = { analyticUnits }; const analyticUnitObjects = analyticUnits.map(analyticUnit => analyticUnit.toPanelObject());
ctx.response.body = {
analyticUnits: analyticUnitObjects
};
} }
function getTypes(ctx: Router.IRouterContext) { function getTypes(ctx: Router.IRouterContext) {
@ -63,18 +49,18 @@ function getTypes(ctx: Router.IRouterContext) {
} }
async function createUnit(ctx: Router.IRouterContext) { async function createUnit(ctx: Router.IRouterContext) {
let id = await createAnalyticUnitFromObject(ctx.request.body); const id = await createAnalyticUnitFromObject(ctx.request.body);
ctx.response.body = { id }; ctx.response.body = { id };
} }
async function updateUnit(ctx: Router.IRouterContext) { async function updateUnit(ctx: Router.IRouterContext) {
const unit = ctx.request.body as AnalyticUnit.AnalyticUnit; const analyticUnit = ctx.request.body as AnalyticUnit.AnalyticUnit;
if(unit.id === undefined) { if(analyticUnit.id === undefined) {
throw new Error('Cannot update undefined id'); throw new Error('Cannot update undefined id');
} }
// TODO: we can't allow to update everything AnalyticUnit.update(analyticUnit.id, analyticUnit);
AnalyticUnit.update(unit.id, unit);
ctx.response.body = { ctx.response.body = {
code: 200, code: 200,
message: 'Success' message: 'Success'
@ -147,7 +133,6 @@ async function runDetect(ctx: Router.IRouterContext) {
export var router = new Router(); export var router = new Router();
router.get('/', getUnit);
router.get('/units', getUnits); router.get('/units', getUnits);
router.get('/status', getStatus); router.get('/status', getStatus);
router.get('/types', getTypes); router.get('/types', getTypes);

43
server/src/routes/panel_router.ts

@ -1,43 +0,0 @@
import { AnalyticUnitId } from '../models/analytic_unit_model';
import * as Panel from '../models/panel_model';
import * as Router from 'koa-router';
async function getAnalyticUnits(ctx: Router.IRouterContext) {
let panelUrl: string = ctx.request.query.panelUrl;
if(panelUrl === undefined || panelUrl === '') {
throw new Error('panelUrl is missing');
}
const analyticUnits = await Panel.findOne({ panelUrl });
ctx.response.body = { analyticUnits };
}
async function addAnalyticUnit(ctx: Router.IRouterContext) {
let { panelUrl, analyticUnitId } = ctx.request.body as {
panelUrl: string, analyticUnitId: AnalyticUnitId
};
await Panel.insertAnalyticUnit(panelUrl, analyticUnitId);
ctx.response.body = {
code: 200,
message: 'Success'
};
}
async function deleteAnalyticUnit(ctx: Router.IRouterContext) {
let { panelUrl, analyticUnitId } = ctx.request.body as {
panelUrl: string, analyticUnitId: AnalyticUnitId
};
// TODO: stop task when analytic unit is removed
await Panel.removeAnalyticUnit(panelUrl, analyticUnitId);
ctx.response.body = {
code: 200,
message: 'Success'
};
}
export const router = new Router();
router.get('/', getAnalyticUnits);
router.post('/', addAnalyticUnit);
router.delete('/', deleteAnalyticUnit);

2
server/src/services/alert_service.ts

@ -22,7 +22,7 @@ export class Alert {
analyticUnitType: this.analyticUnit.type, analyticUnitType: this.analyticUnit.type,
analyticUnitName: this.analyticUnit.name, analyticUnitName: this.analyticUnit.name,
analyticUnitId: this.analyticUnit.id, analyticUnitId: this.analyticUnit.id,
panelUrl: this.analyticUnit.panelUrl, grafanaUrl: this.analyticUnit.grafanaUrl,
from: segment.from, from: segment.from,
to: segment.to to: segment.to
}; };

1
server/src/services/analytics_service.ts

@ -178,6 +178,7 @@ export class AnalyticsService {
private _onAnalyticsUp() { private _onAnalyticsUp() {
const msg = 'Analytics is up'; const msg = 'Analytics is up';
for(let i in _.range(this._queue.length)) { for(let i in _.range(this._queue.length)) {
// TODO: check if task is done before removing it from the queue
this.sendTask(this._queue.shift(), true); this.sendTask(this._queue.shift(), true);
} }
console.log(msg); console.log(msg);

8
server/src/services/data_puller.ts

@ -65,14 +65,14 @@ export class DataPuller {
throw Error(`data puller: can't pull undefined unit`); throw Error(`data puller: can't pull undefined unit`);
} }
let panelUrl; let grafanaUrl;
if(GRAFANA_URL !== null) { if(GRAFANA_URL !== null) {
panelUrl = GRAFANA_URL; grafanaUrl = GRAFANA_URL;
} else { } else {
panelUrl = unit.panelUrl; grafanaUrl = unit.grafanaUrl;
} }
let data = queryByMetric(unit.metric, panelUrl, from, to, HASTIC_API_KEY); let data = queryByMetric(unit.metric, grafanaUrl, from, to, HASTIC_API_KEY);
return data; return data;
} }

5
server/src/services/data_service.ts

@ -4,7 +4,7 @@ import * as nedb from 'nedb';
import * as fs from 'fs'; import * as fs from 'fs';
export enum Collection { ANALYTIC_UNITS, SEGMENTS, ANALYTIC_UNIT_CACHES, PANELS, THRESHOLD }; export enum Collection { ANALYTIC_UNITS, ANALYTIC_UNIT_CACHES, SEGMENTS, THRESHOLD };
/** /**
@ -209,7 +209,6 @@ checkDataFolders();
// TODO: it's better if models request db which we create if it`s needed // TODO: it's better if models request db which we create if it`s needed
db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true })); db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }));
db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }));
db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true })); db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true }));
db.set(Collection.PANELS, new nedb({ filename: config.PANELS_DATABASE_PATH, autoload: true })); db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }));
db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true })); db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true }));

2
server/src/services/notification_service.ts

@ -22,7 +22,7 @@ export declare type AnalyticAlert = {
analyticUnitType: string, analyticUnitType: string,
analyticUnitName: string, analyticUnitName: string,
analyticUnitId: AnalyticUnit.AnalyticUnitId, analyticUnitId: AnalyticUnit.AnalyticUnitId,
panelUrl: string, grafanaUrl: string,
from: number, from: number,
to: number to: number
params?: any, params?: any,

Loading…
Cancel
Save