Browse Source

big models refactoring

pull/1/head
Alexey Velikiy 6 years ago
parent
commit
8a49e2dba2
  1. 1
      server/src/config.ts
  2. 34
      server/src/controllers/analytics_controller.ts
  3. 30
      server/src/controllers/metrics_controler.ts
  4. 2
      server/src/index.ts
  5. 84
      server/src/models/analytic_unit.ts
  6. 84
      server/src/models/analytic_unit_model.ts
  7. 49
      server/src/models/metric_model.ts
  8. 19
      server/src/models/segment_model.ts
  9. 56
      server/src/routes/analytic_units_router.ts
  10. 4
      server/src/routes/segments_router.ts
  11. 106
      server/src/services/data_service.ts
  12. 4
      server/src/services/notification_service.ts

1
server/src/config.ts

@ -16,7 +16,6 @@ export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db');
export const DATASETS_PATH = path.join(DATA_PATH, 'datasets');
//export const ANALYTIC_UNITS_PATH = path.join(DATA_PATH, 'analytic_units');
export const MODELS_PATH = path.join(DATA_PATH, 'models');
export const METRICS_PATH = path.join(DATA_PATH, 'metrics');
export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments');

34
server/src/controllers/analytics_controller.ts

@ -1,7 +1,5 @@
import * as DataService from '../services/data_service';
import { getTarget } from './metrics_controler';
import { getLabeledSegments, insertSegments, removeSegments } from './segments_controller';
import * as AnalyticUnit from '../models/analytic_unit'
import * as SegmentsController from '../models/segment_model';
import * as AnalyticUnit from '../models/analytic_unit_model'
import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service';
@ -23,14 +21,6 @@ function onTaskResult(taskResult: any) {
}
}
async function onFileSave(payload: any): Promise<any> {
return DataService.saveFile(payload.filename, payload.content);
}
async function onFileLoad(payload: any): Promise<any> {
return DataService.loadFile(payload.filename);
}
async function onMessage(message: AnalyticsMessage) {
let responsePayload = null;
let resolvedMethod = false;
@ -69,7 +59,7 @@ export function terminate() {
}
async function runTask(task): Promise<any> {
let anomaly: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(task.analyticUnitId);
let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId);
task.metric = {
datasource: anomaly.metric.datasource,
targets: anomaly.metric.targets.map(getTarget)
@ -86,7 +76,7 @@ async function runTask(task): Promise<any> {
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let segments = getLabeledSegments(id);
AnalyticUnit.setStatus(id, 'LEARNING');
let unit = AnalyticUnit.findById(id);
let unit = await AnalyticUnit.findById(id);
let pattern = unit.type;
let task = {
analyticUnitId: id,
@ -106,12 +96,11 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
}
}
export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
let unit = AnalyticUnit.findById(id);
let unit = await AnalyticUnit.findById(id);
let pattern = unit.type;
let task = {
type: 'predict',
type: 'PREDICT',
analyticUnitId: id,
pattern,
lastPredictionTime: unit.lastPredictionTime
@ -141,3 +130,14 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
export function isAnalyticReady(): boolean {
return analyticsService.ready;
}
export async function createAnalyticUnitFromObject(obj: any): Promise<AnalyticUnit.AnalyticUnitId> {
runLearning(newId);
return
}

30
server/src/controllers/metrics_controler.ts

@ -1,30 +0,0 @@
import { getJsonDataSync, writeJsonDataSync } from '../services/json_service';
import { METRICS_PATH } from '../config';
import * as crypto from 'crypto';
import * as path from 'path';
function saveTargets(targets) {
let metrics = [];
for (let target of targets) {
metrics.push(saveTarget(target));
}
return metrics;
}
function saveTarget(target) {
//const md5 = crypto.createHash('md5')
const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex');
let filename = path.join(METRICS_PATH, `${targetId}.json`);
writeJsonDataSync(filename, target);
return targetId;
}
function getTarget(targetId) {
let filename = path.join(METRICS_PATH, `${targetId}.json`);
return getJsonDataSync(filename);
}
export { saveTargets, getTarget }

2
server/src/index.ts

@ -4,7 +4,6 @@ import { router as segmentsRouter } from './routes/segments_router';
import * as AnalyticsController from './controllers/analytics_controller';
import * as Data from './services/data_service';
import * as ProcessService from './services/process_service';
import { HASTIC_PORT } from './config';
@ -14,7 +13,6 @@ import * as Router from 'koa-router';
import * as bodyParser from 'koa-bodyparser';
Data.checkDataFolders();
AnalyticsController.init();
ProcessService.registerExitHandler(AnalyticsController.terminate);

84
server/src/models/analytic_unit.ts

@ -1,84 +0,0 @@
import { loadFile, saveFile } from '../services/data_service';
import * as crypto from 'crypto';
import * as path from 'path'
import * as fs from 'fs'
export type Datasource = {
method: string,
data: Object,
params: Object,
type: string,
url: string
}
export type Metric = {
datasource: string,
targets: string[]
}
export type AnalyticUnitId = string;
export type AnalyticUnit = {
id?: AnalyticUnitId,
name: string,
panelUrl: string,
type: string,
metric: Metric,
datasource: Datasource
status: string,
error?: string,
lastPredictionTime: number,
nextId: number
}
export function createItem(item: AnalyticUnit): AnalyticUnitId {
const hashString = item.name + (new Date()).toString();
const newId: AnalyticUnitId = crypto.createHash('md5').update(hashString).digest('hex');
let filename = `${newId}.json`;
if(fs.existsSync(filename)) {
throw new Error(`Can't create item with id ${newId}`);
}
save(newId, item);
item.id = newId;
return newId;
}
export function remove(id: AnalyticUnitId) {
let filename = `${id}.json`;
fs.unlinkSync(filename);
}
export function save(id: AnalyticUnitId, unit: AnalyticUnit) {
let filename = `${id}.json`;
return saveFile(filename, JSON.stringify(unit));
}
// TODO: make async
export async function findById(id: AnalyticUnitId): Promise<AnalyticUnit> {
let filename = `${id}.json`;
if(!fs.existsSync(filename)) {
throw new Error(`Can't find Analytic Unit with id ${id}`);
}
let result = await loadFile(filename);
return JSON.parse(result);
}
export async function setStatus(predictorId: AnalyticUnitId, status: string, error?: string) {
let info = await findById(predictorId);
info.status = status;
if(error !== undefined) {
info.error = error;
} else {
info.error = '';
}
save(predictorId, info);
}
export async function setPredictionTime(id: AnalyticUnitId, time: number) {
let info = await findById(id);
info.lastPredictionTime = time;
save(id, info);
}

84
server/src/models/analytic_unit_model.ts

@ -0,0 +1,84 @@
import { Metric, metricFromObj } from './metric_model';
import { Collection, makeDBQ } from '../services/data_service';
let db = makeDBQ(Collection.ANALYTIC_UNITS);
export type AnalyticUnitId = string;
export type AnalyticUnit = {
id?: AnalyticUnitId,
name: string,
panelUrl: string,
type: string,
metric: Metric
status: string,
error?: string,
lastPredictionTime: number,
nextId: number
}
export function analyticUnitFromObj(obj: any): AnalyticUnit {
if(obj === undefined) {
throw new Error('obj is undefined');
}
if(obj.type === undefined) {
throw new Error(`Missing field "type"`);
}
if(obj.name === undefined) {
throw new Error(`Missing field "name"`);
}
if(obj.panelUrl === undefined) {
throw new Error(`Missing field "panelUrl"`);
}
if(obj.metric === undefined) {
throw new Error(`Missing field "datasource"`);
}
if(obj.metric.datasource === undefined) {
throw new Error(`Missing field "metric.datasource"`);
}
if(obj.metric.targets === undefined) {
throw new Error(`Missing field "metric.targets"`);
}
const unit: AnalyticUnit = {
name: obj.name,
panelUrl: obj.panelUrl,
type: obj.type,
datasource: obj.datasource,
metric: metric,
status: 'LEARNING',
lastPredictionTime: 0,
nextId: 0
};
return unit;
}
export async function findById(id: AnalyticUnitId): Promise<AnalyticUnit> {
return db.findOne(id);
}
export async function create(unit: AnalyticUnit): Promise<AnalyticUnitId> {
return db.insert(unit);
}
export async function remove(id: AnalyticUnitId): Promise<void> {
await db.remove(id);
return;
}
export async function update(id: AnalyticUnitId, unit: AnalyticUnit) {
return db.update(id, unit);
}
export async function setStatus(id: AnalyticUnitId, status: string, error?: string) {
return db.update(id, { status, error });
}
export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) {
return db.update(id, { lastPredictionTime });
}

49
server/src/models/metric_model.ts

@ -0,0 +1,49 @@
import { Collection, makeDBQ } from '../services/data_service';
let db = makeDBQ(Collection.METRICS);
export type Datasource = {
method: string,
data: Object,
params: Object,
type: string,
url: string
}
export type Metric = {
datasource: Datasource,
targets: string[]
}
export function metricFromObj(obj: any): Metric {
const metric: Metric = {
datasource: obj.datasource,
targets: obj.targets;
};
return metric;
}
export async function saveTargets(targets: string[]) {
let metrics = [];
for (let target of targets) {
metrics.push(saveTarget(target));
}
return metrics;
}
export async function saveTarget(target: string) {
//const md5 = crypto.createHash('md5')
const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex');
let filename = path.join(METRICS_PATH, `${targetId}.json`);
writeJsonDataSync(filename, target);
return targetId;
}
export async function getTarget(targetId) {
let filename = path.join(METRICS_PATH, `${targetId}.json`);
return getJsonDataSync(filename);
}

19
server/src/controllers/segments_controller.ts → server/src/models/segment_model.ts

@ -1,6 +1,5 @@
import { getJsonDataSync, writeJsonDataSync } from '../services/json_service';
import { AnalyticUnitId, findById, save } from '../models/analytic_unit';
import { SEGMENTS_PATH } from '../config';
import * as AnalyticUnit from './analytic_unit_model';
import * as _ from 'lodash';
@ -8,7 +7,7 @@ import * as path from 'path';
import * as fs from 'fs';
export function getLabeledSegments(id: AnalyticUnitId) {
export function getLabeledSegments(id: AnalyticUnit.AnalyticUnitId) {
let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`);
if(!fs.existsSync(filename)) {
@ -24,7 +23,7 @@ export function getLabeledSegments(id: AnalyticUnitId) {
}
}
export function getPredictedSegments(id: AnalyticUnitId) {
export function getPredictedSegments(id: AnalyticUnit.AnalyticUnitId) {
let filename = path.join(SEGMENTS_PATH, `${id}_segments.json`);
let jsonData;
@ -37,7 +36,7 @@ export function getPredictedSegments(id: AnalyticUnitId) {
return jsonData;
}
export function saveSegments(id: AnalyticUnitId, segments) {
export function saveSegments(id: AnalyticUnit.AnalyticUnitId, segments) {
let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`);
try {
@ -48,9 +47,9 @@ export function saveSegments(id: AnalyticUnitId, segments) {
}
}
export function insertSegments(id: AnalyticUnitId, addedSegments, labeled: boolean) {
export async function insertSegments(id: AnalyticUnit.AnalyticUnitId, addedSegments, labeled: boolean) {
// Set status
let info = findById(id);
let info = await AnalyticUnit.findById(id);
let segments = getLabeledSegments(id);
let nextId = info.nextId;
@ -64,11 +63,11 @@ export function insertSegments(id: AnalyticUnitId, addedSegments, labeled: boole
}
info.nextId = nextId;
saveSegments(id, segments);
save(id, info);
await AnalyticUnit.update(id, info);
return addedIds;
}
export function removeSegments(id: AnalyticUnitId, removedSegments) {
export function removeSegments(id: AnalyticUnit.AnalyticUnitId, removedSegments) {
let segments = getLabeledSegments(id);
for (let segmentId of removedSegments) {
segments = segments.filter(el => el.id !== segmentId);

56
server/src/routes/analytic_units_router.ts

@ -1,9 +1,9 @@
import * as Router from 'koa-router';
import * as AnalyticUnit from '../models/analytic_unit';
import * as AnalyticUnit from '../models/analytic_unit_model';
import { createAnalyticUnitFromObject } from '../controllers/analytics_controller'
import { runLearning } from '../controllers/analytics_controller'
import { saveTargets } from '../controllers/metrics_controler';
async function sendStatus(ctx: Router.IRouterContext) {
try {
@ -11,7 +11,7 @@ async function sendStatus(ctx: Router.IRouterContext) {
if(id === undefined) {
throw new Error('Id is undefined');
}
let unit = AnalyticUnit.findById(id);
let unit = await AnalyticUnit.findById(id);
if(unit.status === undefined) {
throw new Error('status is undefined');
@ -34,7 +34,7 @@ async function findItem(ctx: Router.IRouterContext) {
throw new Error('No id param in query');
}
let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(id);
let unit: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(id);
ctx.response.body = {
name: unit.name,
@ -55,53 +55,13 @@ async function createItem(ctx: Router.IRouterContext) {
let body = ctx.request.body;
if(body.type === undefined) {
throw new Error(`Missing field "type"`);
}
if(body.name === undefined) {
throw new Error(`Missing field "name"`);
}
if(body.panelUrl === undefined) {
throw new Error(`Missing field "panelUrl"`);
}
if(body.metric === undefined) {
throw new Error(`Missing field "datasource"`);
}
if(body.metric.datasource === undefined) {
throw new Error(`Missing field "metric.datasource"`);
}
if(body.metric.targets === undefined) {
throw new Error(`Missing field "metric.targets"`);
}
await createAnalyticUnitFromObject(body);
const metric: AnalyticUnit.Metric = {
datasource: body.metric.datasource,
targets: saveTargets(body.metric.targets)
};
const unit: AnalyticUnit.AnalyticUnit = {
name: body.name,
panelUrl: body.panelUrl,
type: body.type,
datasource: body.datasource,
metric: metric,
status: 'learning',
lastPredictionTime: 0,
nextId: 0
};
let newId = AnalyticUnit.createItem(unit);
if(newId === null) {
ctx.response.status = 403;
ctx.response.body = {
code: 403,
message: 'Item exists'
};
}
let newId = await AnalyticUnit.create(unit);
ctx.response.body = { id: newId };
runLearning(newId);
} catch(e) {
ctx.response.status = 500;
ctx.response.body = {

4
server/src/routes/segments_router.ts

@ -1,12 +1,12 @@
import * as Router from 'koa-router';
import { AnalyticUnitId } from '../models/analytic_unit';
import { AnalyticUnitId } from '../models/analytic_unit_model';
import {
getLabeledSegments,
insertSegments,
removeSegments,
} from '../controllers/segments_controller';
} from '../models/segment_model';
import { runLearning } from '../controllers/analytics_controller';

106
server/src/services/data_service.ts

@ -3,74 +3,108 @@ import * as config from '../config';
import * as nedb from 'nedb';
import * as fs from 'fs';
export const db = {
analyticUnits: new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }),
metrics: new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }),
segments: new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }),
files: new nedb({ filename: config.FILES_DATABASE_PATH, autoload: true })
};
export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS };
let dbUpsertFile = (query: any, updateQuery: any) => {
/**
* Class which helps to make queries to your collection
*
* @param { string | object } query: a key as a string or mongodb-style query
*/
export type DBQ = {
insert: (document: object) => string,
update: (query: string | object, updateQuery: any) => void,
findOne: (query: string | object) => any,
remove: (query: string | object) => number
}
export function makeDBQ(collection: Collection): DBQ {
return {
insert: dbInsert.bind(null, collection),
update: dbUpdate.bind(null, collection),
findOne: dbFindOne.bind(null, collection),
remove: dbRemove.bind(null, collection)
}
}
function wrapIdToQuery(query: string | object) {
if(typeof query === 'string') {
return { _id: query };
}
return query;
}
const db = new Map<Collection, nedb>();
let dbInsert = (collection: Collection, doc: object) => {
return new Promise<string>((resolve, reject) => {
db[collection].insert(doc, (err, newDoc) => {
if(err) {
reject(err);
} else {
resolve(newDoc._id);
}
});
});
}
let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => {
query = wrapIdToQuery(query);
return new Promise<void>((resolve, reject) => {
db.files.update(query, updateQuery, { upsert: true }, (err: Error) => {
db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => {
if(err) {
reject(err);
} else {
console.log('saved shit with query ');
console.log(query);
console.log('saved shit with updateQuery ');
console.log(updateQuery);
resolve();
}
});
});
}
let dbLoadFile = (query: any) => {
let dbFindOne = (collection: Collection, query: string | object) => {
query = wrapIdToQuery(query);
return new Promise<any>((resolve, reject) => {
db.files.findOne(query, (err, doc) => {
db[collection].findOne(query, (err, doc) => {
if(err) {
reject(err);
} else {
console.log('got shit with query');
console.log(query);
console.log('doc');
console.log(doc);
resolve(doc);
}
});
});
}
let dbRemove = (collection: Collection, query: string | object) => {
query = wrapIdToQuery(query);
return new Promise<number>((resolve, reject) => {
db[collection].remove(query, (err, numRemoved) => {
if(err) {
reject(err);
} else {
resolve(numRemoved);
}
});
});
}
function maybeCreate(path: string): void {
if(fs.existsSync(path)) {
return;
}
console.log('mkdir: ' + path);
fs.mkdirSync(path);
console.log('exists: ' + fs.existsSync(path));
}
export async function saveFile(filename: string, content: string): Promise<void> {
return dbUpsertFile({ filename } , { filename, content });
}
export async function loadFile(filename: string): Promise<string> {
let doc = await dbLoadFile({ filename });
if(doc === null) {
return null;
}
return doc.content;
}
export function checkDataFolders(): void {
function checkDataFolders(): void {
[
config.DATA_PATH,
config.DATASETS_PATH,
config.MODELS_PATH,
config.METRICS_PATH,
config.SEGMENTS_PATH,
config.ZMQ_IPC_PATH
].forEach(maybeCreate);
}
checkDataFolders();
// TODO: it's better if models request db which we create if it`s needed
db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true });
db[Collection.METRICS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true });
db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true });

4
server/src/services/notification_service.ts

@ -1,11 +1,11 @@
import { findById } from '../models/analytic_unit';
import { findById } from '../models/analytic_unit_model';
import axios from 'axios';
// TODO: send notification with payload without dep to AnalyticUnit
export async function sendNotification(predictorId, active) {
let anomalyName = findById(predictorId).name;
let anomalyName = (await findById(predictorId)).name
console.log('Notification ' + anomalyName);
let notification = {

Loading…
Cancel
Save