diff --git a/server/src/config.ts b/server/src/config.ts index b6b3c46..75883f8 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -16,7 +16,6 @@ export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db'); export const DATASETS_PATH = path.join(DATA_PATH, 'datasets'); -//export const ANALYTIC_UNITS_PATH = path.join(DATA_PATH, 'analytic_units'); export const MODELS_PATH = path.join(DATA_PATH, 'models'); export const METRICS_PATH = path.join(DATA_PATH, 'metrics'); export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 6e1a627..bc140ad 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -1,7 +1,5 @@ -import * as DataService from '../services/data_service'; -import { getTarget } from './metrics_controler'; -import { getLabeledSegments, insertSegments, removeSegments } from './segments_controller'; -import * as AnalyticUnit from '../models/analytic_unit' +import * as SegmentsController from '../models/segment_model'; +import * as AnalyticUnit from '../models/analytic_unit_model' import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service'; @@ -23,14 +21,6 @@ function onTaskResult(taskResult: any) { } } -async function onFileSave(payload: any): Promise { - return DataService.saveFile(payload.filename, payload.content); -} - -async function onFileLoad(payload: any): Promise { - return DataService.loadFile(payload.filename); -} - async function onMessage(message: AnalyticsMessage) { let responsePayload = null; let resolvedMethod = false; @@ -69,7 +59,7 @@ export function terminate() { } async function runTask(task): Promise { - let anomaly: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(task.analyticUnitId); + let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); task.metric = { datasource: anomaly.metric.datasource, targets: anomaly.metric.targets.map(getTarget) @@ -86,7 +76,7 @@ async function runTask(task): Promise { export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { let segments = getLabeledSegments(id); AnalyticUnit.setStatus(id, 'LEARNING'); - let unit = AnalyticUnit.findById(id); + let unit = await AnalyticUnit.findById(id); let pattern = unit.type; let task = { analyticUnitId: id, @@ -106,12 +96,11 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { } } - export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { - let unit = AnalyticUnit.findById(id); + let unit = await AnalyticUnit.findById(id); let pattern = unit.type; let task = { - type: 'predict', + type: 'PREDICT', analyticUnitId: id, pattern, lastPredictionTime: unit.lastPredictionTime @@ -141,3 +130,14 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { export function isAnalyticReady(): boolean { return analyticsService.ready; } + +export async function createAnalyticUnitFromObject(obj: any): Promise { + + + + + + runLearning(newId); + + return +} \ No newline at end of file diff --git a/server/src/controllers/metrics_controler.ts b/server/src/controllers/metrics_controler.ts deleted file mode 100644 index 7cf5d45..0000000 --- a/server/src/controllers/metrics_controler.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { getJsonDataSync, writeJsonDataSync } from '../services/json_service'; -import { METRICS_PATH } from '../config'; - -import * as crypto from 'crypto'; - -import * as path from 'path'; - - -function saveTargets(targets) { - let metrics = []; - for (let target of targets) { - metrics.push(saveTarget(target)); - } - return metrics; -} - -function saveTarget(target) { - //const md5 = crypto.createHash('md5') - const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex'); - let filename = path.join(METRICS_PATH, `${targetId}.json`); - writeJsonDataSync(filename, target); - return targetId; -} - -function getTarget(targetId) { - let filename = path.join(METRICS_PATH, `${targetId}.json`); - return getJsonDataSync(filename); -} - -export { saveTargets, getTarget } diff --git a/server/src/index.ts b/server/src/index.ts index 31ca62d..ce04d94 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -4,7 +4,6 @@ import { router as segmentsRouter } from './routes/segments_router'; import * as AnalyticsController from './controllers/analytics_controller'; -import * as Data from './services/data_service'; import * as ProcessService from './services/process_service'; import { HASTIC_PORT } from './config'; @@ -14,7 +13,6 @@ import * as Router from 'koa-router'; import * as bodyParser from 'koa-bodyparser'; -Data.checkDataFolders(); AnalyticsController.init(); ProcessService.registerExitHandler(AnalyticsController.terminate); diff --git a/server/src/models/analytic_unit.ts b/server/src/models/analytic_unit.ts deleted file mode 100644 index a9682b0..0000000 --- a/server/src/models/analytic_unit.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { loadFile, saveFile } from '../services/data_service'; - -import * as crypto from 'crypto'; - -import * as path from 'path' -import * as fs from 'fs' - - -export type Datasource = { - method: string, - data: Object, - params: Object, - type: string, - url: string -} - -export type Metric = { - datasource: string, - targets: string[] -} - -export type AnalyticUnitId = string; - -export type AnalyticUnit = { - id?: AnalyticUnitId, - name: string, - panelUrl: string, - type: string, - metric: Metric, - datasource: Datasource - status: string, - error?: string, - lastPredictionTime: number, - nextId: number -} - -export function createItem(item: AnalyticUnit): AnalyticUnitId { - const hashString = item.name + (new Date()).toString(); - const newId: AnalyticUnitId = crypto.createHash('md5').update(hashString).digest('hex'); - let filename = `${newId}.json`; - if(fs.existsSync(filename)) { - throw new Error(`Can't create item with id ${newId}`); - } - save(newId, item); - item.id = newId; - return newId; -} - -export function remove(id: AnalyticUnitId) { - let filename = `${id}.json`; - fs.unlinkSync(filename); -} - -export function save(id: AnalyticUnitId, unit: AnalyticUnit) { - let filename = `${id}.json`; - return saveFile(filename, JSON.stringify(unit)); -} - -// TODO: make async -export async function findById(id: AnalyticUnitId): Promise { - let filename = `${id}.json`; - if(!fs.existsSync(filename)) { - throw new Error(`Can't find Analytic Unit with id ${id}`); - } - let result = await loadFile(filename); - return JSON.parse(result); -} - -export async function setStatus(predictorId: AnalyticUnitId, status: string, error?: string) { - let info = await findById(predictorId); - info.status = status; - if(error !== undefined) { - info.error = error; - } else { - info.error = ''; - } - save(predictorId, info); -} - -export async function setPredictionTime(id: AnalyticUnitId, time: number) { - let info = await findById(id); - info.lastPredictionTime = time; - save(id, info); -} diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts new file mode 100644 index 0000000..6ba8619 --- /dev/null +++ b/server/src/models/analytic_unit_model.ts @@ -0,0 +1,84 @@ +import { Metric, metricFromObj } from './metric_model'; +import { Collection, makeDBQ } from '../services/data_service'; + + +let db = makeDBQ(Collection.ANALYTIC_UNITS); + + + +export type AnalyticUnitId = string; + +export type AnalyticUnit = { + id?: AnalyticUnitId, + name: string, + panelUrl: string, + type: string, + metric: Metric + status: string, + error?: string, + lastPredictionTime: number, + nextId: number +} + + +export function analyticUnitFromObj(obj: any): AnalyticUnit { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + if(obj.type === undefined) { + throw new Error(`Missing field "type"`); + } + if(obj.name === undefined) { + throw new Error(`Missing field "name"`); + } + if(obj.panelUrl === undefined) { + throw new Error(`Missing field "panelUrl"`); + } + if(obj.metric === undefined) { + throw new Error(`Missing field "datasource"`); + } + if(obj.metric.datasource === undefined) { + throw new Error(`Missing field "metric.datasource"`); + } + if(obj.metric.targets === undefined) { + throw new Error(`Missing field "metric.targets"`); + } + + const unit: AnalyticUnit = { + name: obj.name, + panelUrl: obj.panelUrl, + type: obj.type, + datasource: obj.datasource, + metric: metric, + status: 'LEARNING', + lastPredictionTime: 0, + nextId: 0 + }; + + return unit; +} + +export async function findById(id: AnalyticUnitId): Promise { + return db.findOne(id); +} + +export async function create(unit: AnalyticUnit): Promise { + return db.insert(unit); +} + +export async function remove(id: AnalyticUnitId): Promise { + await db.remove(id); + return; +} + +export async function update(id: AnalyticUnitId, unit: AnalyticUnit) { + return db.update(id, unit); +} + +export async function setStatus(id: AnalyticUnitId, status: string, error?: string) { + return db.update(id, { status, error }); +} + +export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) { + return db.update(id, { lastPredictionTime }); +} diff --git a/server/src/models/metric_model.ts b/server/src/models/metric_model.ts new file mode 100644 index 0000000..ca1ad16 --- /dev/null +++ b/server/src/models/metric_model.ts @@ -0,0 +1,49 @@ +import { Collection, makeDBQ } from '../services/data_service'; + + +let db = makeDBQ(Collection.METRICS); + + +export type Datasource = { + method: string, + data: Object, + params: Object, + type: string, + url: string +} + +export type Metric = { + datasource: Datasource, + targets: string[] +} + +export function metricFromObj(obj: any): Metric { + const metric: Metric = { + datasource: obj.datasource, + targets: obj.targets; + }; + return metric; +} + +export async function saveTargets(targets: string[]) { + let metrics = []; + for (let target of targets) { + metrics.push(saveTarget(target)); + } + return metrics; +} + +export async function saveTarget(target: string) { + //const md5 = crypto.createHash('md5') + const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex'); + let filename = path.join(METRICS_PATH, `${targetId}.json`); + writeJsonDataSync(filename, target); + return targetId; +} + +export async function getTarget(targetId) { + let filename = path.join(METRICS_PATH, `${targetId}.json`); + return getJsonDataSync(filename); +} + + diff --git a/server/src/controllers/segments_controller.ts b/server/src/models/segment_model.ts similarity index 70% rename from server/src/controllers/segments_controller.ts rename to server/src/models/segment_model.ts index 358b0b7..7183b30 100644 --- a/server/src/controllers/segments_controller.ts +++ b/server/src/models/segment_model.ts @@ -1,6 +1,5 @@ -import { getJsonDataSync, writeJsonDataSync } from '../services/json_service'; -import { AnalyticUnitId, findById, save } from '../models/analytic_unit'; -import { SEGMENTS_PATH } from '../config'; +import * as AnalyticUnit from './analytic_unit_model'; + import * as _ from 'lodash'; @@ -8,7 +7,7 @@ import * as path from 'path'; import * as fs from 'fs'; -export function getLabeledSegments(id: AnalyticUnitId) { +export function getLabeledSegments(id: AnalyticUnit.AnalyticUnitId) { let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); if(!fs.existsSync(filename)) { @@ -24,7 +23,7 @@ export function getLabeledSegments(id: AnalyticUnitId) { } } -export function getPredictedSegments(id: AnalyticUnitId) { +export function getPredictedSegments(id: AnalyticUnit.AnalyticUnitId) { let filename = path.join(SEGMENTS_PATH, `${id}_segments.json`); let jsonData; @@ -37,7 +36,7 @@ export function getPredictedSegments(id: AnalyticUnitId) { return jsonData; } -export function saveSegments(id: AnalyticUnitId, segments) { +export function saveSegments(id: AnalyticUnit.AnalyticUnitId, segments) { let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); try { @@ -48,9 +47,9 @@ export function saveSegments(id: AnalyticUnitId, segments) { } } -export function insertSegments(id: AnalyticUnitId, addedSegments, labeled: boolean) { +export async function insertSegments(id: AnalyticUnit.AnalyticUnitId, addedSegments, labeled: boolean) { // Set status - let info = findById(id); + let info = await AnalyticUnit.findById(id); let segments = getLabeledSegments(id); let nextId = info.nextId; @@ -64,11 +63,11 @@ export function insertSegments(id: AnalyticUnitId, addedSegments, labeled: boole } info.nextId = nextId; saveSegments(id, segments); - save(id, info); + await AnalyticUnit.update(id, info); return addedIds; } -export function removeSegments(id: AnalyticUnitId, removedSegments) { +export function removeSegments(id: AnalyticUnit.AnalyticUnitId, removedSegments) { let segments = getLabeledSegments(id); for (let segmentId of removedSegments) { segments = segments.filter(el => el.id !== segmentId); diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index dee28bb..be59684 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -1,9 +1,9 @@ import * as Router from 'koa-router'; -import * as AnalyticUnit from '../models/analytic_unit'; +import * as AnalyticUnit from '../models/analytic_unit_model'; + +import { createAnalyticUnitFromObject } from '../controllers/analytics_controller' -import { runLearning } from '../controllers/analytics_controller' -import { saveTargets } from '../controllers/metrics_controler'; async function sendStatus(ctx: Router.IRouterContext) { try { @@ -11,7 +11,7 @@ async function sendStatus(ctx: Router.IRouterContext) { if(id === undefined) { throw new Error('Id is undefined'); } - let unit = AnalyticUnit.findById(id); + let unit = await AnalyticUnit.findById(id); if(unit.status === undefined) { throw new Error('status is undefined'); @@ -34,7 +34,7 @@ async function findItem(ctx: Router.IRouterContext) { throw new Error('No id param in query'); } - let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(id); + let unit: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(id); ctx.response.body = { name: unit.name, @@ -55,53 +55,13 @@ async function createItem(ctx: Router.IRouterContext) { let body = ctx.request.body; - if(body.type === undefined) { - throw new Error(`Missing field "type"`); - } - if(body.name === undefined) { - throw new Error(`Missing field "name"`); - } - if(body.panelUrl === undefined) { - throw new Error(`Missing field "panelUrl"`); - } - if(body.metric === undefined) { - throw new Error(`Missing field "datasource"`); - } - if(body.metric.datasource === undefined) { - throw new Error(`Missing field "metric.datasource"`); - } - if(body.metric.targets === undefined) { - throw new Error(`Missing field "metric.targets"`); - } - - const metric: AnalyticUnit.Metric = { - datasource: body.metric.datasource, - targets: saveTargets(body.metric.targets) - }; - - const unit: AnalyticUnit.AnalyticUnit = { - name: body.name, - panelUrl: body.panelUrl, - type: body.type, - datasource: body.datasource, - metric: metric, - status: 'learning', - lastPredictionTime: 0, - nextId: 0 - }; + await createAnalyticUnitFromObject(body); - let newId = AnalyticUnit.createItem(unit); - if(newId === null) { - ctx.response.status = 403; - ctx.response.body = { - code: 403, - message: 'Item exists' - }; - } + let newId = await AnalyticUnit.create(unit); ctx.response.body = { id: newId }; - runLearning(newId); + } catch(e) { ctx.response.status = 500; ctx.response.body = { diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index 7315171..f63a834 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -1,12 +1,12 @@ import * as Router from 'koa-router'; -import { AnalyticUnitId } from '../models/analytic_unit'; +import { AnalyticUnitId } from '../models/analytic_unit_model'; import { getLabeledSegments, insertSegments, removeSegments, -} from '../controllers/segments_controller'; +} from '../models/segment_model'; import { runLearning } from '../controllers/analytics_controller'; diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 3f4ce99..b91e8e2 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -3,74 +3,108 @@ import * as config from '../config'; import * as nedb from 'nedb'; import * as fs from 'fs'; -export const db = { - analyticUnits: new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }), - metrics: new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }), - segments: new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }), - files: new nedb({ filename: config.FILES_DATABASE_PATH, autoload: true }) -}; +export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS }; -let dbUpsertFile = (query: any, updateQuery: any) => { + +/** + * Class which helps to make queries to your collection + * + * @param { string | object } query: a key as a string or mongodb-style query + */ +export type DBQ = { + insert: (document: object) => string, + update: (query: string | object, updateQuery: any) => void, + findOne: (query: string | object) => any, + remove: (query: string | object) => number +} + +export function makeDBQ(collection: Collection): DBQ { + return { + insert: dbInsert.bind(null, collection), + update: dbUpdate.bind(null, collection), + findOne: dbFindOne.bind(null, collection), + remove: dbRemove.bind(null, collection) + } +} + +function wrapIdToQuery(query: string | object) { + if(typeof query === 'string') { + return { _id: query }; + } + return query; +} + +const db = new Map(); + +let dbInsert = (collection: Collection, doc: object) => { + return new Promise((resolve, reject) => { + db[collection].insert(doc, (err, newDoc) => { + if(err) { + reject(err); + } else { + resolve(newDoc._id); + } + }); + }); +} + +let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { + query = wrapIdToQuery(query); return new Promise((resolve, reject) => { - db.files.update(query, updateQuery, { upsert: true }, (err: Error) => { + db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { if(err) { reject(err); } else { - console.log('saved shit with query '); - console.log(query); - console.log('saved shit with updateQuery '); - console.log(updateQuery); resolve(); } }); }); } -let dbLoadFile = (query: any) => { +let dbFindOne = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); return new Promise((resolve, reject) => { - db.files.findOne(query, (err, doc) => { + db[collection].findOne(query, (err, doc) => { if(err) { reject(err); } else { - console.log('got shit with query'); - console.log(query); - console.log('doc'); - console.log(doc); resolve(doc); } }); }); } +let dbRemove = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].remove(query, (err, numRemoved) => { + if(err) { + reject(err); + } else { + resolve(numRemoved); + } + }); + }); +} + function maybeCreate(path: string): void { if(fs.existsSync(path)) { return; } console.log('mkdir: ' + path); fs.mkdirSync(path); - console.log('exists: ' + fs.existsSync(path)); -} - -export async function saveFile(filename: string, content: string): Promise { - return dbUpsertFile({ filename } , { filename, content }); } -export async function loadFile(filename: string): Promise { - let doc = await dbLoadFile({ filename }); - if(doc === null) { - return null; - } - return doc.content; -} - -export function checkDataFolders(): void { +function checkDataFolders(): void { [ config.DATA_PATH, - config.DATASETS_PATH, - config.MODELS_PATH, - config.METRICS_PATH, - config.SEGMENTS_PATH, config.ZMQ_IPC_PATH ].forEach(maybeCreate); } +checkDataFolders(); + +// TODO: it's better if models request db which we create if it`s needed +db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); +db[Collection.METRICS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); +db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index 4b39a6b..126ca10 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -1,11 +1,11 @@ -import { findById } from '../models/analytic_unit'; +import { findById } from '../models/analytic_unit_model'; import axios from 'axios'; // TODO: send notification with payload without dep to AnalyticUnit export async function sendNotification(predictorId, active) { - let anomalyName = findById(predictorId).name; + let anomalyName = (await findById(predictorId)).name console.log('Notification ' + anomalyName); let notification = {