From fb0431575569aa4bcee8e766e227c3c5737b8549 Mon Sep 17 00:00:00 2001 From: rozetko Date: Fri, 3 Aug 2018 19:57:29 +0300 Subject: [PATCH 01/12] nedb using begin --- server/src/models/analytic_unit.ts | 26 +++++++++++++------------- server/src/services/data_service.ts | 1 - 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/server/src/models/analytic_unit.ts b/server/src/models/analytic_unit.ts index d4d3ef5..a9682b0 100644 --- a/server/src/models/analytic_unit.ts +++ b/server/src/models/analytic_unit.ts @@ -1,5 +1,4 @@ -import { getJsonDataSync, writeJsonDataSync } from '../services/json_service' -import { ANALYTIC_UNITS_PATH } from '../config' +import { loadFile, saveFile } from '../services/data_service'; import * as crypto from 'crypto'; @@ -38,7 +37,7 @@ export type AnalyticUnit = { export function createItem(item: AnalyticUnit): AnalyticUnitId { const hashString = item.name + (new Date()).toString(); const newId: AnalyticUnitId = crypto.createHash('md5').update(hashString).digest('hex'); - let filename = path.join(ANALYTIC_UNITS_PATH, `${newId}.json`); + let filename = `${newId}.json`; if(fs.existsSync(filename)) { throw new Error(`Can't create item with id ${newId}`); } @@ -48,26 +47,27 @@ export function createItem(item: AnalyticUnit): AnalyticUnitId { } export function remove(id: AnalyticUnitId) { - let filename = path.join(ANALYTIC_UNITS_PATH, `${id}.json`); + let filename = `${id}.json`; fs.unlinkSync(filename); } export function save(id: AnalyticUnitId, unit: AnalyticUnit) { - let filename = path.join(ANALYTIC_UNITS_PATH, `${id}.json`); - return writeJsonDataSync(filename, unit); + let filename = `${id}.json`; + return saveFile(filename, JSON.stringify(unit)); } // TODO: make async -export function findById(id: AnalyticUnitId): AnalyticUnit { - let filename = path.join(ANALYTIC_UNITS_PATH, `${id}.json`); +export async function findById(id: AnalyticUnitId): Promise { + let filename = `${id}.json`; if(!fs.existsSync(filename)) { throw new Error(`Can't find Analytic Unit with id ${id}`); } - return getJsonDataSync(filename); + let result = await loadFile(filename); + return JSON.parse(result); } -export function setStatus(predictorId: AnalyticUnitId, status: string, error?: string) { - let info = findById(predictorId); +export async function setStatus(predictorId: AnalyticUnitId, status: string, error?: string) { + let info = await findById(predictorId); info.status = status; if(error !== undefined) { info.error = error; @@ -77,8 +77,8 @@ export function setStatus(predictorId: AnalyticUnitId, status: string, error?: s save(predictorId, info); } -export function setPredictionTime(id: AnalyticUnitId, time: number) { - let info = findById(id); +export async function setPredictionTime(id: AnalyticUnitId, time: number) { + let info = await findById(id); info.lastPredictionTime = time; save(id, info); } diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index a6005ea..3f4ce99 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -68,7 +68,6 @@ export function checkDataFolders(): void { [ config.DATA_PATH, config.DATASETS_PATH, - config.ANALYTIC_UNITS_PATH, config.MODELS_PATH, config.METRICS_PATH, config.SEGMENTS_PATH, From 8a49e2dba25bc7549c30e056e4c3297d50314dbb Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 5 Aug 2018 00:52:53 +0300 Subject: [PATCH 02/12] big models refactoring --- server/src/config.ts | 1 - .../src/controllers/analytics_controller.ts | 34 +++--- server/src/controllers/metrics_controler.ts | 30 ----- server/src/index.ts | 2 - server/src/models/analytic_unit.ts | 84 -------------- server/src/models/analytic_unit_model.ts | 84 ++++++++++++++ server/src/models/metric_model.ts | 49 ++++++++ .../segment_model.ts} | 19 ++-- server/src/routes/analytic_units_router.ts | 56 ++------- server/src/routes/segments_router.ts | 4 +- server/src/services/data_service.ts | 106 ++++++++++++------ server/src/services/notification_service.ts | 4 +- 12 files changed, 241 insertions(+), 232 deletions(-) delete mode 100644 server/src/controllers/metrics_controler.ts delete mode 100644 server/src/models/analytic_unit.ts create mode 100644 server/src/models/analytic_unit_model.ts create mode 100644 server/src/models/metric_model.ts rename server/src/{controllers/segments_controller.ts => models/segment_model.ts} (70%) diff --git a/server/src/config.ts b/server/src/config.ts index b6b3c46..75883f8 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -16,7 +16,6 @@ export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db'); export const DATASETS_PATH = path.join(DATA_PATH, 'datasets'); -//export const ANALYTIC_UNITS_PATH = path.join(DATA_PATH, 'analytic_units'); export const MODELS_PATH = path.join(DATA_PATH, 'models'); export const METRICS_PATH = path.join(DATA_PATH, 'metrics'); export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 6e1a627..bc140ad 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -1,7 +1,5 @@ -import * as DataService from '../services/data_service'; -import { getTarget } from './metrics_controler'; -import { getLabeledSegments, insertSegments, removeSegments } from './segments_controller'; -import * as AnalyticUnit from '../models/analytic_unit' +import * as SegmentsController from '../models/segment_model'; +import * as AnalyticUnit from '../models/analytic_unit_model' import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service'; @@ -23,14 +21,6 @@ function onTaskResult(taskResult: any) { } } -async function onFileSave(payload: any): Promise { - return DataService.saveFile(payload.filename, payload.content); -} - -async function onFileLoad(payload: any): Promise { - return DataService.loadFile(payload.filename); -} - async function onMessage(message: AnalyticsMessage) { let responsePayload = null; let resolvedMethod = false; @@ -69,7 +59,7 @@ export function terminate() { } async function runTask(task): Promise { - let anomaly: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(task.analyticUnitId); + let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); task.metric = { datasource: anomaly.metric.datasource, targets: anomaly.metric.targets.map(getTarget) @@ -86,7 +76,7 @@ async function runTask(task): Promise { export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { let segments = getLabeledSegments(id); AnalyticUnit.setStatus(id, 'LEARNING'); - let unit = AnalyticUnit.findById(id); + let unit = await AnalyticUnit.findById(id); let pattern = unit.type; let task = { analyticUnitId: id, @@ -106,12 +96,11 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { } } - export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { - let unit = AnalyticUnit.findById(id); + let unit = await AnalyticUnit.findById(id); let pattern = unit.type; let task = { - type: 'predict', + type: 'PREDICT', analyticUnitId: id, pattern, lastPredictionTime: unit.lastPredictionTime @@ -141,3 +130,14 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { export function isAnalyticReady(): boolean { return analyticsService.ready; } + +export async function createAnalyticUnitFromObject(obj: any): Promise { + + + + + + runLearning(newId); + + return +} \ No newline at end of file diff --git a/server/src/controllers/metrics_controler.ts b/server/src/controllers/metrics_controler.ts deleted file mode 100644 index 7cf5d45..0000000 --- a/server/src/controllers/metrics_controler.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { getJsonDataSync, writeJsonDataSync } from '../services/json_service'; -import { METRICS_PATH } from '../config'; - -import * as crypto from 'crypto'; - -import * as path from 'path'; - - -function saveTargets(targets) { - let metrics = []; - for (let target of targets) { - metrics.push(saveTarget(target)); - } - return metrics; -} - -function saveTarget(target) { - //const md5 = crypto.createHash('md5') - const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex'); - let filename = path.join(METRICS_PATH, `${targetId}.json`); - writeJsonDataSync(filename, target); - return targetId; -} - -function getTarget(targetId) { - let filename = path.join(METRICS_PATH, `${targetId}.json`); - return getJsonDataSync(filename); -} - -export { saveTargets, getTarget } diff --git a/server/src/index.ts b/server/src/index.ts index 31ca62d..ce04d94 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -4,7 +4,6 @@ import { router as segmentsRouter } from './routes/segments_router'; import * as AnalyticsController from './controllers/analytics_controller'; -import * as Data from './services/data_service'; import * as ProcessService from './services/process_service'; import { HASTIC_PORT } from './config'; @@ -14,7 +13,6 @@ import * as Router from 'koa-router'; import * as bodyParser from 'koa-bodyparser'; -Data.checkDataFolders(); AnalyticsController.init(); ProcessService.registerExitHandler(AnalyticsController.terminate); diff --git a/server/src/models/analytic_unit.ts b/server/src/models/analytic_unit.ts deleted file mode 100644 index a9682b0..0000000 --- a/server/src/models/analytic_unit.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { loadFile, saveFile } from '../services/data_service'; - -import * as crypto from 'crypto'; - -import * as path from 'path' -import * as fs from 'fs' - - -export type Datasource = { - method: string, - data: Object, - params: Object, - type: string, - url: string -} - -export type Metric = { - datasource: string, - targets: string[] -} - -export type AnalyticUnitId = string; - -export type AnalyticUnit = { - id?: AnalyticUnitId, - name: string, - panelUrl: string, - type: string, - metric: Metric, - datasource: Datasource - status: string, - error?: string, - lastPredictionTime: number, - nextId: number -} - -export function createItem(item: AnalyticUnit): AnalyticUnitId { - const hashString = item.name + (new Date()).toString(); - const newId: AnalyticUnitId = crypto.createHash('md5').update(hashString).digest('hex'); - let filename = `${newId}.json`; - if(fs.existsSync(filename)) { - throw new Error(`Can't create item with id ${newId}`); - } - save(newId, item); - item.id = newId; - return newId; -} - -export function remove(id: AnalyticUnitId) { - let filename = `${id}.json`; - fs.unlinkSync(filename); -} - -export function save(id: AnalyticUnitId, unit: AnalyticUnit) { - let filename = `${id}.json`; - return saveFile(filename, JSON.stringify(unit)); -} - -// TODO: make async -export async function findById(id: AnalyticUnitId): Promise { - let filename = `${id}.json`; - if(!fs.existsSync(filename)) { - throw new Error(`Can't find Analytic Unit with id ${id}`); - } - let result = await loadFile(filename); - return JSON.parse(result); -} - -export async function setStatus(predictorId: AnalyticUnitId, status: string, error?: string) { - let info = await findById(predictorId); - info.status = status; - if(error !== undefined) { - info.error = error; - } else { - info.error = ''; - } - save(predictorId, info); -} - -export async function setPredictionTime(id: AnalyticUnitId, time: number) { - let info = await findById(id); - info.lastPredictionTime = time; - save(id, info); -} diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts new file mode 100644 index 0000000..6ba8619 --- /dev/null +++ b/server/src/models/analytic_unit_model.ts @@ -0,0 +1,84 @@ +import { Metric, metricFromObj } from './metric_model'; +import { Collection, makeDBQ } from '../services/data_service'; + + +let db = makeDBQ(Collection.ANALYTIC_UNITS); + + + +export type AnalyticUnitId = string; + +export type AnalyticUnit = { + id?: AnalyticUnitId, + name: string, + panelUrl: string, + type: string, + metric: Metric + status: string, + error?: string, + lastPredictionTime: number, + nextId: number +} + + +export function analyticUnitFromObj(obj: any): AnalyticUnit { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + if(obj.type === undefined) { + throw new Error(`Missing field "type"`); + } + if(obj.name === undefined) { + throw new Error(`Missing field "name"`); + } + if(obj.panelUrl === undefined) { + throw new Error(`Missing field "panelUrl"`); + } + if(obj.metric === undefined) { + throw new Error(`Missing field "datasource"`); + } + if(obj.metric.datasource === undefined) { + throw new Error(`Missing field "metric.datasource"`); + } + if(obj.metric.targets === undefined) { + throw new Error(`Missing field "metric.targets"`); + } + + const unit: AnalyticUnit = { + name: obj.name, + panelUrl: obj.panelUrl, + type: obj.type, + datasource: obj.datasource, + metric: metric, + status: 'LEARNING', + lastPredictionTime: 0, + nextId: 0 + }; + + return unit; +} + +export async function findById(id: AnalyticUnitId): Promise { + return db.findOne(id); +} + +export async function create(unit: AnalyticUnit): Promise { + return db.insert(unit); +} + +export async function remove(id: AnalyticUnitId): Promise { + await db.remove(id); + return; +} + +export async function update(id: AnalyticUnitId, unit: AnalyticUnit) { + return db.update(id, unit); +} + +export async function setStatus(id: AnalyticUnitId, status: string, error?: string) { + return db.update(id, { status, error }); +} + +export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) { + return db.update(id, { lastPredictionTime }); +} diff --git a/server/src/models/metric_model.ts b/server/src/models/metric_model.ts new file mode 100644 index 0000000..ca1ad16 --- /dev/null +++ b/server/src/models/metric_model.ts @@ -0,0 +1,49 @@ +import { Collection, makeDBQ } from '../services/data_service'; + + +let db = makeDBQ(Collection.METRICS); + + +export type Datasource = { + method: string, + data: Object, + params: Object, + type: string, + url: string +} + +export type Metric = { + datasource: Datasource, + targets: string[] +} + +export function metricFromObj(obj: any): Metric { + const metric: Metric = { + datasource: obj.datasource, + targets: obj.targets; + }; + return metric; +} + +export async function saveTargets(targets: string[]) { + let metrics = []; + for (let target of targets) { + metrics.push(saveTarget(target)); + } + return metrics; +} + +export async function saveTarget(target: string) { + //const md5 = crypto.createHash('md5') + const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex'); + let filename = path.join(METRICS_PATH, `${targetId}.json`); + writeJsonDataSync(filename, target); + return targetId; +} + +export async function getTarget(targetId) { + let filename = path.join(METRICS_PATH, `${targetId}.json`); + return getJsonDataSync(filename); +} + + diff --git a/server/src/controllers/segments_controller.ts b/server/src/models/segment_model.ts similarity index 70% rename from server/src/controllers/segments_controller.ts rename to server/src/models/segment_model.ts index 358b0b7..7183b30 100644 --- a/server/src/controllers/segments_controller.ts +++ b/server/src/models/segment_model.ts @@ -1,6 +1,5 @@ -import { getJsonDataSync, writeJsonDataSync } from '../services/json_service'; -import { AnalyticUnitId, findById, save } from '../models/analytic_unit'; -import { SEGMENTS_PATH } from '../config'; +import * as AnalyticUnit from './analytic_unit_model'; + import * as _ from 'lodash'; @@ -8,7 +7,7 @@ import * as path from 'path'; import * as fs from 'fs'; -export function getLabeledSegments(id: AnalyticUnitId) { +export function getLabeledSegments(id: AnalyticUnit.AnalyticUnitId) { let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); if(!fs.existsSync(filename)) { @@ -24,7 +23,7 @@ export function getLabeledSegments(id: AnalyticUnitId) { } } -export function getPredictedSegments(id: AnalyticUnitId) { +export function getPredictedSegments(id: AnalyticUnit.AnalyticUnitId) { let filename = path.join(SEGMENTS_PATH, `${id}_segments.json`); let jsonData; @@ -37,7 +36,7 @@ export function getPredictedSegments(id: AnalyticUnitId) { return jsonData; } -export function saveSegments(id: AnalyticUnitId, segments) { +export function saveSegments(id: AnalyticUnit.AnalyticUnitId, segments) { let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); try { @@ -48,9 +47,9 @@ export function saveSegments(id: AnalyticUnitId, segments) { } } -export function insertSegments(id: AnalyticUnitId, addedSegments, labeled: boolean) { +export async function insertSegments(id: AnalyticUnit.AnalyticUnitId, addedSegments, labeled: boolean) { // Set status - let info = findById(id); + let info = await AnalyticUnit.findById(id); let segments = getLabeledSegments(id); let nextId = info.nextId; @@ -64,11 +63,11 @@ export function insertSegments(id: AnalyticUnitId, addedSegments, labeled: boole } info.nextId = nextId; saveSegments(id, segments); - save(id, info); + await AnalyticUnit.update(id, info); return addedIds; } -export function removeSegments(id: AnalyticUnitId, removedSegments) { +export function removeSegments(id: AnalyticUnit.AnalyticUnitId, removedSegments) { let segments = getLabeledSegments(id); for (let segmentId of removedSegments) { segments = segments.filter(el => el.id !== segmentId); diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index dee28bb..be59684 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -1,9 +1,9 @@ import * as Router from 'koa-router'; -import * as AnalyticUnit from '../models/analytic_unit'; +import * as AnalyticUnit from '../models/analytic_unit_model'; + +import { createAnalyticUnitFromObject } from '../controllers/analytics_controller' -import { runLearning } from '../controllers/analytics_controller' -import { saveTargets } from '../controllers/metrics_controler'; async function sendStatus(ctx: Router.IRouterContext) { try { @@ -11,7 +11,7 @@ async function sendStatus(ctx: Router.IRouterContext) { if(id === undefined) { throw new Error('Id is undefined'); } - let unit = AnalyticUnit.findById(id); + let unit = await AnalyticUnit.findById(id); if(unit.status === undefined) { throw new Error('status is undefined'); @@ -34,7 +34,7 @@ async function findItem(ctx: Router.IRouterContext) { throw new Error('No id param in query'); } - let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(id); + let unit: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(id); ctx.response.body = { name: unit.name, @@ -55,53 +55,13 @@ async function createItem(ctx: Router.IRouterContext) { let body = ctx.request.body; - if(body.type === undefined) { - throw new Error(`Missing field "type"`); - } - if(body.name === undefined) { - throw new Error(`Missing field "name"`); - } - if(body.panelUrl === undefined) { - throw new Error(`Missing field "panelUrl"`); - } - if(body.metric === undefined) { - throw new Error(`Missing field "datasource"`); - } - if(body.metric.datasource === undefined) { - throw new Error(`Missing field "metric.datasource"`); - } - if(body.metric.targets === undefined) { - throw new Error(`Missing field "metric.targets"`); - } - - const metric: AnalyticUnit.Metric = { - datasource: body.metric.datasource, - targets: saveTargets(body.metric.targets) - }; - - const unit: AnalyticUnit.AnalyticUnit = { - name: body.name, - panelUrl: body.panelUrl, - type: body.type, - datasource: body.datasource, - metric: metric, - status: 'learning', - lastPredictionTime: 0, - nextId: 0 - }; + await createAnalyticUnitFromObject(body); - let newId = AnalyticUnit.createItem(unit); - if(newId === null) { - ctx.response.status = 403; - ctx.response.body = { - code: 403, - message: 'Item exists' - }; - } + let newId = await AnalyticUnit.create(unit); ctx.response.body = { id: newId }; - runLearning(newId); + } catch(e) { ctx.response.status = 500; ctx.response.body = { diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index 7315171..f63a834 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -1,12 +1,12 @@ import * as Router from 'koa-router'; -import { AnalyticUnitId } from '../models/analytic_unit'; +import { AnalyticUnitId } from '../models/analytic_unit_model'; import { getLabeledSegments, insertSegments, removeSegments, -} from '../controllers/segments_controller'; +} from '../models/segment_model'; import { runLearning } from '../controllers/analytics_controller'; diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 3f4ce99..b91e8e2 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -3,74 +3,108 @@ import * as config from '../config'; import * as nedb from 'nedb'; import * as fs from 'fs'; -export const db = { - analyticUnits: new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }), - metrics: new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }), - segments: new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }), - files: new nedb({ filename: config.FILES_DATABASE_PATH, autoload: true }) -}; +export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS }; -let dbUpsertFile = (query: any, updateQuery: any) => { + +/** + * Class which helps to make queries to your collection + * + * @param { string | object } query: a key as a string or mongodb-style query + */ +export type DBQ = { + insert: (document: object) => string, + update: (query: string | object, updateQuery: any) => void, + findOne: (query: string | object) => any, + remove: (query: string | object) => number +} + +export function makeDBQ(collection: Collection): DBQ { + return { + insert: dbInsert.bind(null, collection), + update: dbUpdate.bind(null, collection), + findOne: dbFindOne.bind(null, collection), + remove: dbRemove.bind(null, collection) + } +} + +function wrapIdToQuery(query: string | object) { + if(typeof query === 'string') { + return { _id: query }; + } + return query; +} + +const db = new Map(); + +let dbInsert = (collection: Collection, doc: object) => { + return new Promise((resolve, reject) => { + db[collection].insert(doc, (err, newDoc) => { + if(err) { + reject(err); + } else { + resolve(newDoc._id); + } + }); + }); +} + +let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { + query = wrapIdToQuery(query); return new Promise((resolve, reject) => { - db.files.update(query, updateQuery, { upsert: true }, (err: Error) => { + db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { if(err) { reject(err); } else { - console.log('saved shit with query '); - console.log(query); - console.log('saved shit with updateQuery '); - console.log(updateQuery); resolve(); } }); }); } -let dbLoadFile = (query: any) => { +let dbFindOne = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); return new Promise((resolve, reject) => { - db.files.findOne(query, (err, doc) => { + db[collection].findOne(query, (err, doc) => { if(err) { reject(err); } else { - console.log('got shit with query'); - console.log(query); - console.log('doc'); - console.log(doc); resolve(doc); } }); }); } +let dbRemove = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].remove(query, (err, numRemoved) => { + if(err) { + reject(err); + } else { + resolve(numRemoved); + } + }); + }); +} + function maybeCreate(path: string): void { if(fs.existsSync(path)) { return; } console.log('mkdir: ' + path); fs.mkdirSync(path); - console.log('exists: ' + fs.existsSync(path)); -} - -export async function saveFile(filename: string, content: string): Promise { - return dbUpsertFile({ filename } , { filename, content }); } -export async function loadFile(filename: string): Promise { - let doc = await dbLoadFile({ filename }); - if(doc === null) { - return null; - } - return doc.content; -} - -export function checkDataFolders(): void { +function checkDataFolders(): void { [ config.DATA_PATH, - config.DATASETS_PATH, - config.MODELS_PATH, - config.METRICS_PATH, - config.SEGMENTS_PATH, config.ZMQ_IPC_PATH ].forEach(maybeCreate); } +checkDataFolders(); + +// TODO: it's better if models request db which we create if it`s needed +db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); +db[Collection.METRICS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); +db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index 4b39a6b..126ca10 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -1,11 +1,11 @@ -import { findById } from '../models/analytic_unit'; +import { findById } from '../models/analytic_unit_model'; import axios from 'axios'; // TODO: send notification with payload without dep to AnalyticUnit export async function sendNotification(predictorId, active) { - let anomalyName = findById(predictorId).name; + let anomalyName = (await findById(predictorId)).name console.log('Notification ' + anomalyName); let notification = { From dc6c120261f1c78ac245a3ef4eb0ae2c10b4fb35 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 5 Aug 2018 18:09:42 +0300 Subject: [PATCH 03/12] serments / metrics models refactoring --- .../src/controllers/analytics_controller.ts | 139 ++++++++---------- server/src/models/analytic_unit_model.ts | 21 +-- server/src/models/metric_model.ts | 35 ++--- server/src/models/segment_model.ts | 78 +++------- server/src/models/task_model.ts | 3 + server/src/routes/analytic_units_router.ts | 11 +- server/src/routes/segments_router.ts | 26 ++-- server/src/services/data_service.ts | 18 ++- 8 files changed, 142 insertions(+), 189 deletions(-) create mode 100644 server/src/models/task_model.ts diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index bc140ad..f707ef0 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -30,15 +30,6 @@ async function onMessage(message: AnalyticsMessage) { resolvedMethod = true; } - if(message.method === 'FILE_SAVE') { - responsePayload = await onFileSave(message.payload); - resolvedMethod = true; - } - if(message.method === 'FILE_LOAD') { - responsePayload = await onFileLoad(message.payload); - resolvedMethod = true; - } - if(!resolvedMethod) { throw new TypeError('Unknown method ' + message.method); } @@ -59,72 +50,72 @@ export function terminate() { } async function runTask(task): Promise { - let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); - task.metric = { - datasource: anomaly.metric.datasource, - targets: anomaly.metric.targets.map(getTarget) - }; - - task._taskId = nextTaskId++; - await analyticsService.sendTask(task); - - return new Promise(resolve => { - taskMap[task._taskId] = resolve; - }) + // let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); + // task.metric = { + // datasource: anomaly.metric.datasource, + // targets: anomaly.metric.targets.map(getTarget) + // }; + + // task._taskId = nextTaskId++; + // await analyticsService.sendTask(task); + + // return new Promise(resolve => { + // taskMap[task._taskId] = resolve; + // }) } export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { - let segments = getLabeledSegments(id); - AnalyticUnit.setStatus(id, 'LEARNING'); - let unit = await AnalyticUnit.findById(id); - let pattern = unit.type; - let task = { - analyticUnitId: id, - type: 'LEARN', - pattern, - segments: segments - }; - - let result = await runTask(task); - - if (result.status === 'SUCCESS') { - AnalyticUnit.setStatus(id, 'READY'); - insertSegments(id, result.segments, false); - AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); - } else { - AnalyticUnit.setStatus(id, 'FAILED', result.error); - } + // let segments = getLabeledSegments(id); + // AnalyticUnit.setStatus(id, 'LEARNING'); + // let unit = await AnalyticUnit.findById(id); + // let pattern = unit.type; + // let task = { + // analyticUnitId: id, + // type: 'LEARN', + // pattern, + // segments: segments + // }; + + // let result = await runTask(task); + + // if (result.status === 'SUCCESS') { + // AnalyticUnit.setStatus(id, 'READY'); + // insertSegments(id, result.segments, false); + // AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); + // } else { + // AnalyticUnit.setStatus(id, 'FAILED', result.error); + // } } export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { - let unit = await AnalyticUnit.findById(id); - let pattern = unit.type; - let task = { - type: 'PREDICT', - analyticUnitId: id, - pattern, - lastPredictionTime: unit.lastPredictionTime - }; - let result = await runTask(task); - - if(result.status === 'FAILED') { - return []; - } - // Merging segments - let segments = getLabeledSegments(id); - if(segments.length > 0 && result.segments.length > 0) { - let lastOldSegment = segments[segments.length - 1]; - let firstNewSegment = result.segments[0]; - - if(firstNewSegment.start <= lastOldSegment.finish) { - result.segments[0].start = lastOldSegment.start; - removeSegments(id, [lastOldSegment.id]); - } - } - - insertSegments(id, result.segments, false); - AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); - return result.segments; + // let unit = await AnalyticUnit.findById(id); + // let pattern = unit.type; + // let task = { + // type: 'PREDICT', + // analyticUnitId: id, + // pattern, + // lastPredictionTime: unit.lastPredictionTime + // }; + // let result = await runTask(task); + + // if(result.status === 'FAILED') { + // return []; + // } + // // Merging segments + // let segments = getLabeledSegments(id); + // if(segments.length > 0 && result.segments.length > 0) { + // let lastOldSegment = segments[segments.length - 1]; + // let firstNewSegment = result.segments[0]; + + // if(firstNewSegment.start <= lastOldSegment.finish) { + // result.segments[0].start = lastOldSegment.start; + // removeSegments(id, [lastOldSegment.id]); + // } + // } + + // insertSegments(id, result.segments, false); + // AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); + // return result.segments; } export function isAnalyticReady(): boolean { @@ -132,12 +123,8 @@ export function isAnalyticReady(): boolean { } export async function createAnalyticUnitFromObject(obj: any): Promise { - - - - - - runLearning(newId); - + let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.analyticUnitFromObj(obj); + AnalyticUnit.create(unit); + // runLearning(unit); return } \ No newline at end of file diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 6ba8619..a4e6162 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -1,11 +1,10 @@ -import { Metric, metricFromObj } from './metric_model'; +import { Metric } from './metric_model'; import { Collection, makeDBQ } from '../services/data_service'; let db = makeDBQ(Collection.ANALYTIC_UNITS); - export type AnalyticUnitId = string; export type AnalyticUnit = { @@ -16,11 +15,9 @@ export type AnalyticUnit = { metric: Metric status: string, error?: string, - lastPredictionTime: number, - nextId: number + lastPredictionTime: number } - export function analyticUnitFromObj(obj: any): AnalyticUnit { if(obj === undefined) { throw new Error('obj is undefined'); @@ -48,11 +45,9 @@ export function analyticUnitFromObj(obj: any): AnalyticUnit { name: obj.name, panelUrl: obj.panelUrl, type: obj.type, - datasource: obj.datasource, - metric: metric, + metric: obj.metric, status: 'LEARNING', - lastPredictionTime: 0, - nextId: 0 + lastPredictionTime: 0 }; return unit; @@ -62,8 +57,14 @@ export async function findById(id: AnalyticUnitId): Promise { return db.findOne(id); } +/** + * Creates and updates new unit.id + * + * @param unit to create + * @returns unit.id + */ export async function create(unit: AnalyticUnit): Promise { - return db.insert(unit); + return unit.id = await db.insert(unit); } export async function remove(id: AnalyticUnitId): Promise { diff --git a/server/src/models/metric_model.ts b/server/src/models/metric_model.ts index ca1ad16..b4062d3 100644 --- a/server/src/models/metric_model.ts +++ b/server/src/models/metric_model.ts @@ -12,38 +12,35 @@ export type Datasource = { url: string } +export type MetricId = string; + export type Metric = { + id?: MetricId, datasource: Datasource, targets: string[] } export function metricFromObj(obj: any): Metric { - const metric: Metric = { + return { datasource: obj.datasource, - targets: obj.targets; + targets: obj.targets }; - return metric; } -export async function saveTargets(targets: string[]) { - let metrics = []; - for (let target of targets) { - metrics.push(saveTarget(target)); - } - return metrics; -} +// export async function saveTargets(targets: string[]) { +// let metrics = []; +// for (let target of targets) { +// metrics.push(create(target)); +// } +// return metrics; +// } -export async function saveTarget(target: string) { - //const md5 = crypto.createHash('md5') - const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex'); - let filename = path.join(METRICS_PATH, `${targetId}.json`); - writeJsonDataSync(filename, target); - return targetId; +export async function create(metric: Metric): Promise { + return metric.id = await db.insert(metric); } -export async function getTarget(targetId) { - let filename = path.join(METRICS_PATH, `${targetId}.json`); - return getJsonDataSync(filename); +export async function findMetric(id: MetricId): Promise { + return db.findOne(id); } diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index 7183b30..28b0867 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -1,76 +1,36 @@ -import * as AnalyticUnit from './analytic_unit_model'; +import { AnalyticUnitId } from './analytic_unit_model'; +import { Collection, makeDBQ } from '../services/data_service'; -import * as _ from 'lodash'; - -import * as path from 'path'; -import * as fs from 'fs'; +import * as _ from 'lodash'; -export function getLabeledSegments(id: AnalyticUnit.AnalyticUnitId) { - let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); +type SegmentId = string; - if(!fs.existsSync(filename)) { - return []; - } else { - let segments = getJsonDataSync(filename); - for(let segment of segments) { - if(segment.labeled === undefined) { - segment.labeled = false; - } - } - return segments; - } +type Segment = { + id?: SegmentId, + from: number, + to: number, + labeled: boolean } -export function getPredictedSegments(id: AnalyticUnit.AnalyticUnitId) { - let filename = path.join(SEGMENTS_PATH, `${id}_segments.json`); +let db = makeDBQ(Collection.SEGMENTS); - let jsonData; - try { - jsonData = getJsonDataSync(filename); - } catch(e) { - console.error(e.message); - jsonData = []; - } - return jsonData; +export function getLabeledSegments(id: AnalyticUnitId) { + return } -export function saveSegments(id: AnalyticUnit.AnalyticUnitId, segments) { - let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); +export function getPredictedSegments(id: AnalyticUnitId) { - try { - return writeJsonDataSync(filename, _.uniqBy(segments, 'start')); - } catch(e) { - console.error(e.message); - throw new Error('Can`t write to db'); - } } -export async function insertSegments(id: AnalyticUnit.AnalyticUnitId, addedSegments, labeled: boolean) { - // Set status - let info = await AnalyticUnit.findById(id); - let segments = getLabeledSegments(id); +export function saveSegments(id: AnalyticUnitId, segments: Segment[]) { + +} - let nextId = info.nextId; - let addedIds = [] - for (let segment of addedSegments) { - segment.id = nextId; - segment.labeled = labeled; - addedIds.push(nextId); - nextId++; - segments.push(segment); - } - info.nextId = nextId; - saveSegments(id, segments); - await AnalyticUnit.update(id, info); - return addedIds; +export async function insertSegments(id: AnalyticUnitId, addedSegments: Segment[], labeled: boolean) { } -export function removeSegments(id: AnalyticUnit.AnalyticUnitId, removedSegments) { - let segments = getLabeledSegments(id); - for (let segmentId of removedSegments) { - segments = segments.filter(el => el.id !== segmentId); - } - saveSegments(id, segments); +export function removeSegments(idsToRemove: SegmentId[]) { + } diff --git a/server/src/models/task_model.ts b/server/src/models/task_model.ts new file mode 100644 index 0000000..8992bc1 --- /dev/null +++ b/server/src/models/task_model.ts @@ -0,0 +1,3 @@ +class Task { + +} \ No newline at end of file diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index be59684..5f06072 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -12,7 +12,6 @@ async function sendStatus(ctx: Router.IRouterContext) { throw new Error('Id is undefined'); } let unit = await AnalyticUnit.findById(id); - if(unit.status === undefined) { throw new Error('status is undefined'); } @@ -52,16 +51,8 @@ async function findItem(ctx: Router.IRouterContext) { async function createItem(ctx: Router.IRouterContext) { try { - - let body = ctx.request.body; - - await createAnalyticUnitFromObject(body); - - - let newId = await AnalyticUnit.create(unit); + let newId = await createAnalyticUnitFromObject(ctx.request.body); ctx.response.body = { id: newId }; - - } catch(e) { ctx.response.status = 500; ctx.response.body = { diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index f63a834..15cda5c 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -17,21 +17,21 @@ async function sendSegments(ctx: Router.IRouterContext) { let timeFrom = ctx.request.query.from; let timeTo = ctx.request.query.to; - let segments = getLabeledSegments(id); + let segments = await getLabeledSegments(id); - // Id filtering - if(lastSegmentId !== undefined) { - segments = segments.filter(el => el.id > lastSegmentId); - } + // // Id filtering + // if(lastSegmentId !== undefined) { + // segments = segments.filter(el => el.id > lastSegmentId); + // } - // Time filtering - if(timeFrom !== undefined) { - segments = segments.filter(el => el.finish > timeFrom); - } + // // Time filtering + // if(timeFrom !== undefined) { + // segments = segments.filter(el => el.finish > timeFrom); + // } - if(timeTo !== undefined) { - segments = segments.filter(el => el.start < timeTo); - } + // if(timeTo !== undefined) { + // segments = segments.filter(el => el.start < timeTo); + // } ctx.response.body = { segments } @@ -42,7 +42,7 @@ async function updateSegments(ctx: Router.IRouterContext) { let segmentsUpdate = ctx.request.body; let id = segmentsUpdate.id; let addedIds = insertSegments(id, segmentsUpdate.addedSegments, true); - removeSegments(id, segmentsUpdate.removedSegments); + // removeSegments(id, segmentsUpdate.removedSegments); ctx.response.body = { addedIds }; runLearning(id); } catch(e) { diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index b91e8e2..f1dea86 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -14,6 +14,7 @@ export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS }; */ export type DBQ = { insert: (document: object) => string, + insertMany: (documents: object[]) => string[], update: (query: string | object, updateQuery: any) => void, findOne: (query: string | object) => any, remove: (query: string | object) => number @@ -22,6 +23,7 @@ export type DBQ = { export function makeDBQ(collection: Collection): DBQ { return { insert: dbInsert.bind(null, collection), + insertMany: dbInsertMany.bind(null, collection), update: dbUpdate.bind(null, collection), findOne: dbFindOne.bind(null, collection), remove: dbRemove.bind(null, collection) @@ -49,6 +51,18 @@ let dbInsert = (collection: Collection, doc: object) => { }); } +let dbInsertMany = (collection: Collection, docs: object[]) => { + return new Promise((resolve, reject) => { + db[collection].insert(docs, (err, newDocs: any[]) => { + if(err) { + reject(err); + } else { + resolve(newDocs.map(d => d._id)); + } + }); + }); +} + let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { query = wrapIdToQuery(query); return new Promise((resolve, reject) => { @@ -88,7 +102,7 @@ let dbRemove = (collection: Collection, query: string | object) => { }); } -function maybeCreate(path: string): void { +function maybeCreateDir(path: string): void { if(fs.existsSync(path)) { return; } @@ -100,7 +114,7 @@ function checkDataFolders(): void { [ config.DATA_PATH, config.ZMQ_IPC_PATH - ].forEach(maybeCreate); + ].forEach(maybeCreateDir); } checkDataFolders(); From 00e09cfe224ce2eb01c82d8eb8428e6a78630e29 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 5 Aug 2018 18:51:44 +0300 Subject: [PATCH 04/12] metrics db path usage fix --- server/src/config.ts | 7 +------ server/src/index.ts | 2 +- server/src/services/data_service.ts | 2 +- server/src/services/process_service.ts | 18 ++++++++++++------ 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/server/src/config.ts b/server/src/config.ts index 75883f8..4f4ab34 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -10,16 +10,11 @@ export const ANALYTICS_PATH = path.join(__dirname, '../../analytics'); export const DATA_PATH = path.join(__dirname, '../../data'); -export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analyticUnits.db'); +export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db'); export const METRICS_DATABASE_PATH = path.join(DATA_PATH, 'metrics.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db'); -export const DATASETS_PATH = path.join(DATA_PATH, 'datasets'); -export const MODELS_PATH = path.join(DATA_PATH, 'models'); -export const METRICS_PATH = path.join(DATA_PATH, 'metrics'); -export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); - export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join('/tmp', 'hastic')); diff --git a/server/src/index.ts b/server/src/index.ts index ce04d94..2933d8a 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -1,6 +1,6 @@ import { router as anomaliesRouter } from './routes/analytic_units_router'; import { router as segmentsRouter } from './routes/segments_router'; -//import { router as alertsRouter } from './routes/alerts_router'; + import * as AnalyticsController from './controllers/analytics_controller'; diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index f1dea86..62a0555 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -120,5 +120,5 @@ checkDataFolders(); // TODO: it's better if models request db which we create if it`s needed db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); -db[Collection.METRICS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); +db[Collection.METRICS] = new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }); db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); diff --git a/server/src/services/process_service.ts b/server/src/services/process_service.ts index 1d9ce5b..b4bee36 100644 --- a/server/src/services/process_service.ts +++ b/server/src/services/process_service.ts @@ -12,7 +12,7 @@ export function registerExitHandler(callback: () => void) { exitHandlers.push(callback); } -function exitHandler(options, err) { +function exitHandler(options, err?) { if(exitHandled) { return; } @@ -24,15 +24,21 @@ function exitHandler(options, err) { process.exit(); } +function catchException(options, err) { + console.log('Server exception:'); + console.log(err); + exitHandler({ exit: true }); +} + //do something when app is closing -process.on('exit', exitHandler.bind(null,{cleanup:true})); +process.on('exit', exitHandler.bind(null, { cleanup:true })); //catches ctrl+c event -process.on('SIGINT', exitHandler.bind(null, {exit:true})); +process.on('SIGINT', exitHandler.bind(null, { exit:true })); // catches "kill pid" (for example: nodemon restart) -process.on('SIGUSR1', exitHandler.bind(null, {exit:true})); -process.on('SIGUSR2', exitHandler.bind(null, {exit:true})); +process.on('SIGUSR1', exitHandler.bind(null, { exit:true })); +process.on('SIGUSR2', exitHandler.bind(null, { exit:true })); //catches uncaught exceptions -process.on('uncaughtException', exitHandler.bind(null, {exit:true})); \ No newline at end of file +process.on('uncaughtException', catchException.bind(null, { exit:true })); \ No newline at end of file From 7fde0b3d40030c5d722d922e8c34f665f32fb770 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 5 Aug 2018 19:08:57 +0300 Subject: [PATCH 05/12] use os.tmp path --- server/src/config.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/config.ts b/server/src/config.ts index 4f4ab34..2b75778 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -1,5 +1,6 @@ import * as path from 'path'; import * as fs from 'fs'; +import * as os from 'os'; import { getJsonDataSync } from './services/json_service'; @@ -17,7 +18,7 @@ export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); -export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join('/tmp', 'hastic')); +export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join(os.tmpdir(), 'hastic')); export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002'); export const ANLYTICS_PING_INTERVAL = 500; // ms From 1d3599238466b12a2c3fc4b39d3ba1ff317b827b Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 5 Aug 2018 23:01:23 +0300 Subject: [PATCH 06/12] models++ --- server/src/config.ts | 2 - .../src/controllers/analytics_controller.ts | 6 +- server/src/models/analytic_unit_model.ts | 102 ++-- server/src/models/metric_model.ts | 78 ++- server/src/models/segment_model.ts | 58 ++- server/src/models/task_model.ts | 6 +- server/src/services/analytics_service.ts | 464 +++++++++--------- server/src/services/data_service.ts | 247 +++++----- server/src/services/process_service.ts | 86 ++-- 9 files changed, 535 insertions(+), 514 deletions(-) diff --git a/server/src/config.ts b/server/src/config.ts index 2b75778..41fc9db 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -12,9 +12,7 @@ export const ANALYTICS_PATH = path.join(__dirname, '../../analytics'); export const DATA_PATH = path.join(__dirname, '../../data'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db'); -export const METRICS_DATABASE_PATH = path.join(DATA_PATH, 'metrics.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); -export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index f707ef0..00afba3 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -123,8 +123,8 @@ export function isAnalyticReady(): boolean { } export async function createAnalyticUnitFromObject(obj: any): Promise { - let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.analyticUnitFromObj(obj); - AnalyticUnit.create(unit); + let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.AnalyticUnit.fromObject(obj); + let id = await AnalyticUnit.create(unit); // runLearning(unit); - return + return id; } \ No newline at end of file diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index a4e6162..8203625 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -7,54 +7,66 @@ let db = makeDBQ(Collection.ANALYTIC_UNITS); export type AnalyticUnitId = string; -export type AnalyticUnit = { - id?: AnalyticUnitId, - name: string, - panelUrl: string, - type: string, - metric: Metric - status: string, - error?: string, - lastPredictionTime: number -} - -export function analyticUnitFromObj(obj: any): AnalyticUnit { - if(obj === undefined) { - throw new Error('obj is undefined'); - } - if(obj.type === undefined) { - throw new Error(`Missing field "type"`); - } - if(obj.name === undefined) { - throw new Error(`Missing field "name"`); - } - if(obj.panelUrl === undefined) { - throw new Error(`Missing field "panelUrl"`); +export class AnalyticUnit { + constructor( + public name: string, + public panelUrl: string, + public type: string, + public metric: Metric, + public id?: AnalyticUnitId, + public lastPredictionTime?: number, + public status?: string, + public error?: string, + ) { + if(name === undefined) { + throw new Error(`Missing field "name"`); + } + if(panelUrl === undefined) { + throw new Error(`Missing field "panelUrl"`); + } + if(type === undefined) { + throw new Error(`Missing field "type"`); + } + if(metric === undefined) { + throw new Error(`Missing field "metric"`); + } } - if(obj.metric === undefined) { - throw new Error(`Missing field "datasource"`); - } - if(obj.metric.datasource === undefined) { - throw new Error(`Missing field "metric.datasource"`); + + public toObject() { + return { + _id: this.id, + name: this.name, + panelUrl: this.panelUrl, + type: this.type, + metric: this.metric.toObject(), + lastPredictionTime: this.lastPredictionTime, + status: this.status, + error: this.error + }; } - if(obj.metric.targets === undefined) { - throw new Error(`Missing field "metric.targets"`); + + static fromObject(obj: any): AnalyticUnit { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new AnalyticUnit( + obj.name, + obj.panelUrl, + obj.type, + Metric.fromObject(obj.metric), + obj.status, + obj.lastPredictionTime, + obj.id || obj._id, + obj.error, + ); } - const unit: AnalyticUnit = { - name: obj.name, - panelUrl: obj.panelUrl, - type: obj.type, - metric: obj.metric, - status: 'LEARNING', - lastPredictionTime: 0 - }; - return unit; } + export async function findById(id: AnalyticUnitId): Promise { - return db.findOne(id); + return AnalyticUnit.fromObject(db.findOne(id)); } /** @@ -64,22 +76,22 @@ export async function findById(id: AnalyticUnitId): Promise { * @returns unit.id */ export async function create(unit: AnalyticUnit): Promise { - return unit.id = await db.insert(unit); + return unit.id = await db.insertOne(unit); } export async function remove(id: AnalyticUnitId): Promise { - await db.remove(id); + await db.removeOne(id); return; } export async function update(id: AnalyticUnitId, unit: AnalyticUnit) { - return db.update(id, unit); + return db.updateOne(id, unit); } export async function setStatus(id: AnalyticUnitId, status: string, error?: string) { - return db.update(id, { status, error }); + return db.updateOne(id, { status, error }); } export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) { - return db.update(id, { lastPredictionTime }); + return db.updateOne(id, { lastPredictionTime }); } diff --git a/server/src/models/metric_model.ts b/server/src/models/metric_model.ts index b4062d3..2d97b2d 100644 --- a/server/src/models/metric_model.ts +++ b/server/src/models/metric_model.ts @@ -1,46 +1,32 @@ -import { Collection, makeDBQ } from '../services/data_service'; - - -let db = makeDBQ(Collection.METRICS); - - -export type Datasource = { - method: string, - data: Object, - params: Object, - type: string, - url: string -} - -export type MetricId = string; - -export type Metric = { - id?: MetricId, - datasource: Datasource, - targets: string[] -} - -export function metricFromObj(obj: any): Metric { - return { - datasource: obj.datasource, - targets: obj.targets - }; -} - -// export async function saveTargets(targets: string[]) { -// let metrics = []; -// for (let target of targets) { -// metrics.push(create(target)); -// } -// return metrics; -// } - -export async function create(metric: Metric): Promise { - return metric.id = await db.insert(metric); -} - -export async function findMetric(id: MetricId): Promise { - return db.findOne(id); -} - - +export class Metric { + constructor(public datasource: string, public targets: any[]) { + if(datasource === undefined) { + throw new Error('datasource is undefined'); + } + if(targets === undefined) { + throw new Error('targets is undefined'); + } + if(targets.length === 0) { + throw new Error('targets is empty'); + } + } + + public toObject() { + return { + datasource: this.datasource, + targets: this.targets + }; + } + + static fromObject(obj: any): Metric { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new Metric( + obj.datasource, + obj.targets + ); + } +} + + diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index 28b0867..a2cced2 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -2,35 +2,61 @@ import { AnalyticUnitId } from './analytic_unit_model'; import { Collection, makeDBQ } from '../services/data_service'; +let db = makeDBQ(Collection.SEGMENTS); -import * as _ from 'lodash'; type SegmentId = string; -type Segment = { - id?: SegmentId, - from: number, - to: number, - labeled: boolean +export class Segment { + constructor( + public from: number, + public to: number, + public labeled: boolean, + public id?: SegmentId + ) { + if(from === undefined) { + throw new Error('from is undefined'); + } + if(to === undefined) { + throw new Error('to is undefined'); + } + if(labeled === undefined) { + throw new Error('labeled is undefined'); + } + } + + public toObject() { + return { + _id: this.id, + from: this.from, + to: this.to, + labeled: this.labeled + }; + } + + static fromObject(obj: any): Segment { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new Segment( + obj.from, obj.to, + obj.labeled, obj.id || obj._id + ); + } } -let db = makeDBQ(Collection.SEGMENTS); -export function getLabeledSegments(id: AnalyticUnitId) { - return +export function getLabeled(id: AnalyticUnitId) { + //return db. } -export function getPredictedSegments(id: AnalyticUnitId) { +export function getPredicted(id: AnalyticUnitId) { } -export function saveSegments(id: AnalyticUnitId, segments: Segment[]) { - -} - -export async function insertSegments(id: AnalyticUnitId, addedSegments: Segment[], labeled: boolean) { +export async function create(segments: Segment[]) { } -export function removeSegments(idsToRemove: SegmentId[]) { +export function remove(idsToRemove: SegmentId[]) { } diff --git a/server/src/models/task_model.ts b/server/src/models/task_model.ts index 8992bc1..880519c 100644 --- a/server/src/models/task_model.ts +++ b/server/src/models/task_model.ts @@ -1,3 +1,3 @@ -class Task { - -} \ No newline at end of file +export class Task { + +} diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index eb25bb6..7686135 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,232 +1,232 @@ -import * as config from '../config'; - -const zmq = require('zeromq'); - -import * as childProcess from 'child_process' -import * as fs from 'fs'; -import * as path from 'path'; - - -export class AnalyticsMessage { - public constructor(public method: string, public payload?: string, public requestId?: number) { - - } - - static fromJSON(obj: any): AnalyticsMessage { - if(obj.method === undefined) { - throw new Error('No method in obj:' + obj); - } - return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); - } -} - -function analyticsMessageFromJson(obj: any): AnalyticsMessage { - return new AnalyticsMessage(obj); -} - -export class AnalyticsService { - - private _requester: any; - private _ready: boolean = false; - private _pingResponded = false; - private _zmqConnectionString = null; - private _ipcPath = null; - private _analyticsPinger: NodeJS.Timer = null; - private _isClosed = false; - - constructor(private _onMessage: (message: AnalyticsMessage) => void) { - this._init(); - } - - public async sendTask(taskObj: any): Promise { - if(!this._ready) { - return Promise.reject("Analytics is not ready"); - } - let message = { - method: 'TASK', - payload: taskObj - } - return this.sendMessage(message); - } - - public async sendMessage(message: AnalyticsMessage): Promise { - let strMessage = JSON.stringify(message); - if(message.method === 'PING') { - strMessage = 'PING'; - } - return new Promise((resolve, reject) => { - this._requester.send(strMessage, undefined, (err) => { - if(err) { - reject(err); - } else { - resolve(); - } - }); - }); - } - - public close() { - this._isClosed = true; - console.log('Terminating analytics service...'); - clearInterval(this._analyticsPinger); - if(this._ipcPath !== null) { - console.log('Remove ipc path: ' + this._ipcPath); - fs.unlinkSync(this._ipcPath); - } - this._requester.close(); - console.log('Ok'); - } - - public get ready(): boolean { return this._ready; } - - private async _init() { - this._requester = zmq.socket('pair'); - let productionMode = process.env.NODE_ENV !== 'development'; - - this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode - if(productionMode) { - this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; - if(this._zmqConnectionString === null) { - var createResult = await AnalyticsService.createIPCAddress(); - this._zmqConnectionString = createResult.address; - this._ipcPath = createResult.file; - } - } - - console.log("Binding to zmq... %s", this._zmqConnectionString); - this._requester.connect(this._zmqConnectionString); - this._requester.on("message", this._onAnalyticsMessage.bind(this)); - console.log('Ok'); - - if(productionMode) { - console.log('Creating analytics process...'); - try { - var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); - } catch(error) { - console.error('Can`t run analytics process: %s', error); - return; - } - console.log('Ok, pid: %s', cp.pid); - } - - console.log('Start analytics pinger...'); - this._runAlalyticsPinger(); - console.log('Ok'); - - } - - /** - * Spawns analytics process. Reads process stderr and fails if it isn`t empty. - * No need to stop the process later. - * - * @returns Creaded child process - * @throws Process start error or first exception during python start - */ - private static async _runAnalyticsProcess(zmqConnectionString: string): Promise { - let cp: childProcess.ChildProcess; - let cpOptions = { - cwd: config.ANALYTICS_PATH, - env: { - ...process.env, - ZMQ_CONNECTION_STRING: zmqConnectionString - } - }; - - if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) { - console.log('dist/worker/worker'); - cp = childProcess.spawn('dist/worker/worker', [], cpOptions); - } else { - console.log('python3 server.py'); - // If compiled analytics script doesn't exist - fallback to regular python - console.log(config.ANALYTICS_PATH); - cp = childProcess.spawn('python3', ['server.py'], cpOptions); - } - - if(cp.pid === undefined) { - return new Promise((resolve, reject) => { - cp.on('error', reject); - }); - } - - return new Promise((resolve, reject) => { - var resolved = false; - - cp.stdout.on('data', () => { - if(resolved) { - return; - } else { - resolved = true; - } - resolve(cp); - }); - - cp.stderr.on('data', function(data) { - if(resolved) { - return; - } else { - resolved = true; - } - reject(data); - }); - - }); - - } - - private _onAnalyticsUp() { - console.log('Analytics is up'); - } - - private async _onAnalyticsDown() { - console.log('Analytics is down'); - if(process.env.NODE_ENV !== 'development') { - await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); - } - } - - private _onAnalyticsMessage(data: any, error) { - if(data.toString() === 'PONG') { - this._pingResponded = true; - if(!this._ready) { - this._ready = true; - this._onAnalyticsUp(); - } - return; - } - - - let text = data.toString(); - let response; - try { - response = JSON.parse(text); - } catch (e) { - console.error("Can`t parse response from analytics as json:"); - console.error(text); - throw new Error('Unexpected response'); - } - this._onMessage(AnalyticsMessage.fromJSON(response)); - } - - private async _runAlalyticsPinger() { - this._analyticsPinger = setInterval(() => { - if(this._isClosed) { - return; - } - if(!this._pingResponded && this._ready) { - this._ready = false; - this._onAnalyticsDown(); - } - this._pingResponded = false; - // TODO: set life limit for this ping - this.sendMessage({ method: 'PING' }); - }, config.ANLYTICS_PING_INTERVAL); - } - - private static async createIPCAddress(): Promise<{ address: string, file: string }> { - let filename = `${process.pid}.ipc` - let p = path.join(config.ZMQ_IPC_PATH, filename); - fs.writeFileSync(p, ''); - return Promise.resolve({ address: 'ipc://' + p, file: p }); - } - -} +import * as config from '../config'; + +const zmq = require('zeromq'); + +import * as childProcess from 'child_process' +import * as fs from 'fs'; +import * as path from 'path'; + + +export class AnalyticsMessage { + public constructor(public method: string, public payload?: string, public requestId?: number) { + + } + + static fromJSON(obj: any): AnalyticsMessage { + if(obj.method === undefined) { + throw new Error('No method in obj:' + obj); + } + return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); + } +} + +function analyticsMessageFromJson(obj: any): AnalyticsMessage { + return new AnalyticsMessage(obj); +} + +export class AnalyticsService { + + private _requester: any; + private _ready: boolean = false; + private _pingResponded = false; + private _zmqConnectionString = null; + private _ipcPath = null; + private _analyticsPinger: NodeJS.Timer = null; + private _isClosed = false; + + constructor(private _onMessage: (message: AnalyticsMessage) => void) { + this._init(); + } + + public async sendTask(taskObj: any): Promise { + if(!this._ready) { + return Promise.reject("Analytics is not ready"); + } + let message = { + method: 'TASK', + payload: taskObj + } + return this.sendMessage(message); + } + + public async sendMessage(message: AnalyticsMessage): Promise { + let strMessage = JSON.stringify(message); + if(message.method === 'PING') { + strMessage = 'PING'; + } + return new Promise((resolve, reject) => { + this._requester.send(strMessage, undefined, (err) => { + if(err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + public close() { + this._isClosed = true; + console.log('Terminating analytics service...'); + clearInterval(this._analyticsPinger); + if(this._ipcPath !== null) { + console.log('Remove ipc path: ' + this._ipcPath); + fs.unlinkSync(this._ipcPath); + } + this._requester.close(); + console.log('Ok'); + } + + public get ready(): boolean { return this._ready; } + + private async _init() { + this._requester = zmq.socket('pair'); + let productionMode = process.env.NODE_ENV !== 'development'; + + this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode + if(productionMode) { + this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; + if(this._zmqConnectionString === null) { + var createResult = await AnalyticsService.createIPCAddress(); + this._zmqConnectionString = createResult.address; + this._ipcPath = createResult.file; + } + } + + console.log("Binding to zmq... %s", this._zmqConnectionString); + this._requester.connect(this._zmqConnectionString); + this._requester.on("message", this._onAnalyticsMessage.bind(this)); + console.log('Ok'); + + if(productionMode) { + console.log('Creating analytics process...'); + try { + var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); + } catch(error) { + console.error('Can`t run analytics process: %s', error); + return; + } + console.log('Ok, pid: %s', cp.pid); + } + + console.log('Start analytics pinger...'); + this._runAlalyticsPinger(); + console.log('Ok'); + + } + + /** + * Spawns analytics process. Reads process stderr and fails if it isn`t empty. + * No need to stop the process later. + * + * @returns Creaded child process + * @throws Process start error or first exception during python start + */ + private static async _runAnalyticsProcess(zmqConnectionString: string): Promise { + let cp: childProcess.ChildProcess; + let cpOptions = { + cwd: config.ANALYTICS_PATH, + env: { + ...process.env, + ZMQ_CONNECTION_STRING: zmqConnectionString + } + }; + + if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) { + console.log('dist/worker/worker'); + cp = childProcess.spawn('dist/worker/worker', [], cpOptions); + } else { + console.log('python3 server.py'); + // If compiled analytics script doesn't exist - fallback to regular python + console.log(config.ANALYTICS_PATH); + cp = childProcess.spawn('python3', ['server.py'], cpOptions); + } + + if(cp.pid === undefined) { + return new Promise((resolve, reject) => { + cp.on('error', reject); + }); + } + + return new Promise((resolve, reject) => { + var resolved = false; + + cp.stdout.on('data', () => { + if(resolved) { + return; + } else { + resolved = true; + } + resolve(cp); + }); + + cp.stderr.on('data', function(data) { + if(resolved) { + return; + } else { + resolved = true; + } + reject(data); + }); + + }); + + } + + private _onAnalyticsUp() { + console.log('Analytics is up'); + } + + private async _onAnalyticsDown() { + console.log('Analytics is down'); + if(process.env.NODE_ENV !== 'development') { + await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); + } + } + + private _onAnalyticsMessage(data: any, error) { + if(data.toString() === 'PONG') { + this._pingResponded = true; + if(!this._ready) { + this._ready = true; + this._onAnalyticsUp(); + } + return; + } + + + let text = data.toString(); + let response; + try { + response = JSON.parse(text); + } catch (e) { + console.error("Can`t parse response from analytics as json:"); + console.error(text); + throw new Error('Unexpected response'); + } + this._onMessage(AnalyticsMessage.fromJSON(response)); + } + + private async _runAlalyticsPinger() { + this._analyticsPinger = setInterval(() => { + if(this._isClosed) { + return; + } + if(!this._pingResponded && this._ready) { + this._ready = false; + this._onAnalyticsDown(); + } + this._pingResponded = false; + // TODO: set life limit for this ping + this.sendMessage({ method: 'PING' }); + }, config.ANLYTICS_PING_INTERVAL); + } + + private static async createIPCAddress(): Promise<{ address: string, file: string }> { + let filename = `${process.pid}.ipc` + let p = path.join(config.ZMQ_IPC_PATH, filename); + fs.writeFileSync(p, ''); + return Promise.resolve({ address: 'ipc://' + p, file: p }); + } + +} diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 62a0555..ae0ee09 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -1,124 +1,123 @@ -import * as config from '../config'; - -import * as nedb from 'nedb'; -import * as fs from 'fs'; - - -export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS }; - - -/** - * Class which helps to make queries to your collection - * - * @param { string | object } query: a key as a string or mongodb-style query - */ -export type DBQ = { - insert: (document: object) => string, - insertMany: (documents: object[]) => string[], - update: (query: string | object, updateQuery: any) => void, - findOne: (query: string | object) => any, - remove: (query: string | object) => number -} - -export function makeDBQ(collection: Collection): DBQ { - return { - insert: dbInsert.bind(null, collection), - insertMany: dbInsertMany.bind(null, collection), - update: dbUpdate.bind(null, collection), - findOne: dbFindOne.bind(null, collection), - remove: dbRemove.bind(null, collection) - } -} - -function wrapIdToQuery(query: string | object) { - if(typeof query === 'string') { - return { _id: query }; - } - return query; -} - -const db = new Map(); - -let dbInsert = (collection: Collection, doc: object) => { - return new Promise((resolve, reject) => { - db[collection].insert(doc, (err, newDoc) => { - if(err) { - reject(err); - } else { - resolve(newDoc._id); - } - }); - }); -} - -let dbInsertMany = (collection: Collection, docs: object[]) => { - return new Promise((resolve, reject) => { - db[collection].insert(docs, (err, newDocs: any[]) => { - if(err) { - reject(err); - } else { - resolve(newDocs.map(d => d._id)); - } - }); - }); -} - -let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { - query = wrapIdToQuery(query); - return new Promise((resolve, reject) => { - db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { - if(err) { - reject(err); - } else { - resolve(); - } - }); - }); -} - -let dbFindOne = (collection: Collection, query: string | object) => { - query = wrapIdToQuery(query); - return new Promise((resolve, reject) => { - db[collection].findOne(query, (err, doc) => { - if(err) { - reject(err); - } else { - resolve(doc); - } - }); - }); -} - -let dbRemove = (collection: Collection, query: string | object) => { - query = wrapIdToQuery(query); - return new Promise((resolve, reject) => { - db[collection].remove(query, (err, numRemoved) => { - if(err) { - reject(err); - } else { - resolve(numRemoved); - } - }); - }); -} - -function maybeCreateDir(path: string): void { - if(fs.existsSync(path)) { - return; - } - console.log('mkdir: ' + path); - fs.mkdirSync(path); -} - -function checkDataFolders(): void { - [ - config.DATA_PATH, - config.ZMQ_IPC_PATH - ].forEach(maybeCreateDir); -} -checkDataFolders(); - -// TODO: it's better if models request db which we create if it`s needed -db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); -db[Collection.METRICS] = new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }); -db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); +import * as config from '../config'; + +import * as nedb from 'nedb'; +import * as fs from 'fs'; + + +export enum Collection { ANALYTIC_UNITS, SEGMENTS }; + + +/** + * Class which helps to make queries to your collection + * + * @param { string | object } query: a key as a string or mongodb-style query + */ +export type DBQ = { + insertOne: (document: object) => string, + insertMany: (documents: object[]) => string[], + updateOne: (query: string | object, updateQuery: any) => void, + findOne: (query: string | object) => any, + removeOne: (query: string | object) => number +} + +export function makeDBQ(collection: Collection): DBQ { + return { + insertOne: dbInsert.bind(null, collection), + insertMany: dbInsertMany.bind(null, collection), + updateOne: dbUpdate.bind(null, collection), + findOne: dbFindOne.bind(null, collection), + removeOne: dbRemove.bind(null, collection) + } +} + +function wrapIdToQuery(query: string | object) { + if(typeof query === 'string') { + return { _id: query }; + } + return query; +} + +const db = new Map(); + +let dbInsert = (collection: Collection, doc: object) => { + return new Promise((resolve, reject) => { + db[collection].insert(doc, (err, newDoc) => { + if(err) { + reject(err); + } else { + resolve(newDoc._id); + } + }); + }); +} + +let dbInsertMany = (collection: Collection, docs: object[]) => { + return new Promise((resolve, reject) => { + db[collection].insert(docs, (err, newDocs: any[]) => { + if(err) { + reject(err); + } else { + resolve(newDocs.map(d => d._id)); + } + }); + }); +} + +let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { + if(err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +let dbFindOne = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].findOne(query, (err, doc) => { + if(err) { + reject(err); + } else { + resolve(doc); + } + }); + }); +} + +let dbRemove = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].remove(query, (err, numRemoved) => { + if(err) { + reject(err); + } else { + resolve(numRemoved); + } + }); + }); +} + +function maybeCreateDir(path: string): void { + if(fs.existsSync(path)) { + return; + } + console.log('mkdir: ' + path); + fs.mkdirSync(path); +} + +function checkDataFolders(): void { + [ + config.DATA_PATH, + config.ZMQ_IPC_PATH + ].forEach(maybeCreateDir); +} +checkDataFolders(); + +// TODO: it's better if models request db which we create if it`s needed +db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); +db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); diff --git a/server/src/services/process_service.ts b/server/src/services/process_service.ts index b4bee36..6d0a237 100644 --- a/server/src/services/process_service.ts +++ b/server/src/services/process_service.ts @@ -1,44 +1,44 @@ - - -var exitHandlers = [] -var exitHandled = false; - -/** - * Add a callback for closing programm bacause of any reason - * - * @param callback a sync function - */ -export function registerExitHandler(callback: () => void) { - exitHandlers.push(callback); -} - -function exitHandler(options, err?) { - if(exitHandled) { - return; - } - exitHandled = true; - for(let i = 0; i < exitHandlers.length; i++) { - exitHandlers[i](); - } - console.log('process exit'); - process.exit(); -} - -function catchException(options, err) { - console.log('Server exception:'); - console.log(err); - exitHandler({ exit: true }); -} - -//do something when app is closing -process.on('exit', exitHandler.bind(null, { cleanup:true })); - -//catches ctrl+c event -process.on('SIGINT', exitHandler.bind(null, { exit:true })); - -// catches "kill pid" (for example: nodemon restart) -process.on('SIGUSR1', exitHandler.bind(null, { exit:true })); -process.on('SIGUSR2', exitHandler.bind(null, { exit:true })); - -//catches uncaught exceptions + + +var exitHandlers = [] +var exitHandled = false; + +/** + * Add a callback for closing programm bacause of any reason + * + * @param callback a sync function + */ +export function registerExitHandler(callback: () => void) { + exitHandlers.push(callback); +} + +function exitHandler(options, err?) { + if(exitHandled) { + return; + } + exitHandled = true; + for(let i = 0; i < exitHandlers.length; i++) { + exitHandlers[i](); + } + console.log('process exit'); + process.exit(); +} + +function catchException(options, err) { + console.log('Server exception:'); + console.log(err); + exitHandler({ exit: true }); +} + +//do something when app is closing +process.on('exit', exitHandler.bind(null, { cleanup:true })); + +//catches ctrl+c event +process.on('SIGINT', exitHandler.bind(null, { exit:true })); + +// catches "kill pid" (for example: nodemon restart) +process.on('SIGUSR1', exitHandler.bind(null, { exit:true })); +process.on('SIGUSR2', exitHandler.bind(null, { exit:true })); + +//catches uncaught exceptions process.on('uncaughtException', catchException.bind(null, { exit:true })); \ No newline at end of file From 7dd4f9bf1852b705e956a24e9524b1a214b6177d Mon Sep 17 00:00:00 2001 From: sanke Date: Mon, 6 Aug 2018 15:25:12 +0300 Subject: [PATCH 07/12] make npm run build work rename functions --- server/src/models/segment_model.ts | 6 +++--- server/src/routes/segments_router.ts | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index a2cced2..b65ee88 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -46,7 +46,7 @@ export class Segment { } -export function getLabeled(id: AnalyticUnitId) { +export function getLabeledSegments(id: AnalyticUnitId) { //return db. } @@ -54,9 +54,9 @@ export function getPredicted(id: AnalyticUnitId) { } -export async function create(segments: Segment[]) { +export async function insertSegments(id: any, segments: Segment[]) { } -export function remove(idsToRemove: SegmentId[]) { +export function removeSegments(idsToRemove: SegmentId[]) { } diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index 15cda5c..80dcc80 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -41,7 +41,7 @@ async function updateSegments(ctx: Router.IRouterContext) { try { let segmentsUpdate = ctx.request.body; let id = segmentsUpdate.id; - let addedIds = insertSegments(id, segmentsUpdate.addedSegments, true); + let addedIds = insertSegments(id, segmentsUpdate.addedSegments); // removeSegments(id, segmentsUpdate.removedSegments); ctx.response.body = { addedIds }; runLearning(id); From bacf05da9cf8cc188d6734df52dfe7516a6d1dd3 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 6 Aug 2018 23:21:58 +0300 Subject: [PATCH 08/12] db-many querys & usage in segments --- server/.vscode/launch.json | 3 +- server/src/models/analytic_unit_model.ts | 7 +-- server/src/models/segment_model.ts | 31 +++++++--- server/src/routes/analytic_units_router.ts | 19 +++---- server/src/routes/segments_router.ts | 30 +++------- server/src/services/data_service.ts | 66 ++++++++++++++++++---- 6 files changed, 97 insertions(+), 59 deletions(-) diff --git a/server/.vscode/launch.json b/server/.vscode/launch.json index 47a65b1..94af8a5 100644 --- a/server/.vscode/launch.json +++ b/server/.vscode/launch.json @@ -17,7 +17,8 @@ //"outFiles": [ "/home/alex/Projects/hastic-server/server.js" ], "port": 9229, "restart": true, - "trace": true + "trace": true, + "timeout": 100000 } ] } \ No newline at end of file diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 8203625..73b5d9f 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -61,22 +61,21 @@ export class AnalyticUnit { ); } - } export async function findById(id: AnalyticUnitId): Promise { - return AnalyticUnit.fromObject(db.findOne(id)); + return AnalyticUnit.fromObject(await db.findOne(id)); } /** * Creates and updates new unit.id - * + * * @param unit to create * @returns unit.id */ export async function create(unit: AnalyticUnit): Promise { - return unit.id = await db.insertOne(unit); + return unit.id = await db.insertOne(unit.toObject()); } export async function remove(id: AnalyticUnitId): Promise { diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index b65ee88..be91f16 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -9,11 +9,15 @@ type SegmentId = string; export class Segment { constructor( + public auId: AnalyticUnitId, public from: number, public to: number, public labeled: boolean, public id?: SegmentId ) { + if(auId === undefined) { + throw new Error('AnalyticUnitId is undefined'); + } if(from === undefined) { throw new Error('from is undefined'); } @@ -28,6 +32,7 @@ export class Segment { public toObject() { return { _id: this.id, + auId: this.auId, from: this.from, to: this.to, labeled: this.labeled @@ -39,24 +44,34 @@ export class Segment { throw new Error('obj is undefined'); } return new Segment( - obj.from, obj.to, + obj.auId, +obj.from, +obj.to, obj.labeled, obj.id || obj._id ); } } - -export function getLabeledSegments(id: AnalyticUnitId) { - //return db. +export type FindManyQuery = { + timeFromGTE?: number, + timeToLTE?: number, + intexGT?: number } -export function getPredicted(id: AnalyticUnitId) { - +export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promise { + var dbQuery: any = { auId: id }; + if(query.timeFromGTE !== undefined) { + dbQuery.from = { $gte: query.timeFromGTE }; + } + if(query.timeToLTE !== undefined) { + dbQuery.to = { $lte: query.timeToLTE }; + } + let segs = await db.findMany(dbQuery); + return segs.map(Segment.fromObject); } -export async function insertSegments(id: any, segments: Segment[]) { +export async function insertSegments(id: AnalyticUnitId, segments: Segment[]) { + return db.insertMany(segments.map(s => s.toObject())); } export function removeSegments(idsToRemove: SegmentId[]) { - + return db.removeMany(idsToRemove); } diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index 5f06072..763ddaa 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -25,7 +25,7 @@ async function sendStatus(ctx: Router.IRouterContext) { } -async function findItem(ctx: Router.IRouterContext) { +async function getUnit(ctx: Router.IRouterContext) { try { let id = ctx.request.query.id; @@ -49,7 +49,7 @@ async function findItem(ctx: Router.IRouterContext) { } } -async function createItem(ctx: Router.IRouterContext) { +async function createUnit(ctx: Router.IRouterContext) { try { let newId = await createAnalyticUnitFromObject(ctx.request.body); ctx.response.body = { id: newId }; @@ -62,14 +62,9 @@ async function createItem(ctx: Router.IRouterContext) { } } -function deleteItem(ctx: Router.IRouterContext) { +async function deleteUnit(ctx: Router.IRouterContext) { try { - let id = ctx.request.query.id; - - if(id !== undefined) { - AnalyticUnit.remove(id); - } - + await AnalyticUnit.remove(ctx.request.query.id); ctx.response.body = { code: 200, message: 'Success' @@ -86,7 +81,7 @@ function deleteItem(ctx: Router.IRouterContext) { export var router = new Router(); +router.get('/', getUnit); router.get('/status', sendStatus); -router.get('/', findItem); -router.post('/', createItem); -router.delete('/', deleteItem); +router.post('/', createUnit); +router.delete('/', deleteUnit); diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index 80dcc80..3f9f37a 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -3,35 +3,21 @@ import * as Router from 'koa-router'; import { AnalyticUnitId } from '../models/analytic_unit_model'; import { - getLabeledSegments, + findMany, insertSegments, removeSegments, } from '../models/segment_model'; import { runLearning } from '../controllers/analytics_controller'; -async function sendSegments(ctx: Router.IRouterContext) { +async function getSegments(ctx: Router.IRouterContext) { let id: AnalyticUnitId = ctx.request.query.id; - let lastSegmentId = ctx.request.query.lastSegmentId; - let timeFrom = ctx.request.query.from; - let timeTo = ctx.request.query.to; - - let segments = await getLabeledSegments(id); - - // // Id filtering - // if(lastSegmentId !== undefined) { - // segments = segments.filter(el => el.id > lastSegmentId); - // } - - // // Time filtering - // if(timeFrom !== undefined) { - // segments = segments.filter(el => el.finish > timeFrom); - // } - - // if(timeTo !== undefined) { - // segments = segments.filter(el => el.start < timeTo); - // } + let segments = await findMany(id, { + intexGT: ctx.request.query.lastSegmentId, + timeFromGTE: ctx.request.query.from, + timeToLTE: ctx.request.query.to + }); ctx.response.body = { segments } @@ -56,5 +42,5 @@ async function updateSegments(ctx: Router.IRouterContext) { export const router = new Router(); -router.get('/', sendSegments); +router.get('/', getSegments); router.patch('/', updateSegments); diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index ae0ee09..90cff7a 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -9,37 +9,48 @@ export enum Collection { ANALYTIC_UNITS, SEGMENTS }; /** * Class which helps to make queries to your collection - * + * * @param { string | object } query: a key as a string or mongodb-style query */ export type DBQ = { + findOne: (query: string | object) => any, + findMany: (query: string[] | object) => any[], insertOne: (document: object) => string, insertMany: (documents: object[]) => string[], updateOne: (query: string | object, updateQuery: any) => void, - findOne: (query: string | object) => any, - removeOne: (query: string | object) => number + removeOne: (query: string) => boolean + removeMany: (query: string[] | object) => number } export function makeDBQ(collection: Collection): DBQ { return { - insertOne: dbInsert.bind(null, collection), - insertMany: dbInsertMany.bind(null, collection), - updateOne: dbUpdate.bind(null, collection), findOne: dbFindOne.bind(null, collection), - removeOne: dbRemove.bind(null, collection) + findMany: dbFindMany.bind(null, collection), + insertOne: dbInsertOne.bind(null, collection), + insertMany: dbInsertMany.bind(null, collection), + updateOne: dbUpdateOne.bind(null, collection), + removeOne: dbRemoveOne.bind(null, collection), + removeMany: dbRemoveMany.bind(null, collection) } } -function wrapIdToQuery(query: string | object) { +function wrapIdToQuery(query: string | object): any { if(typeof query === 'string') { return { _id: query }; } return query; } +function wrapIdsToQuery(query: string[] | object): any { + if(Array.isArray(query)) { + return { _id: { $in: query } }; + } + return query; +} + const db = new Map(); -let dbInsert = (collection: Collection, doc: object) => { +let dbInsertOne = (collection: Collection, doc: object) => { return new Promise((resolve, reject) => { db[collection].insert(doc, (err, newDoc) => { if(err) { @@ -63,7 +74,7 @@ let dbInsertMany = (collection: Collection, docs: object[]) => { }); } -let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { +let dbUpdateOne = (collection: Collection, query: string | object, updateQuery: object) => { query = wrapIdToQuery(query); return new Promise((resolve, reject) => { db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { @@ -89,8 +100,38 @@ let dbFindOne = (collection: Collection, query: string | object) => { }); } -let dbRemove = (collection: Collection, query: string | object) => { - query = wrapIdToQuery(query); +let dbFindMany = (collection: Collection, query: string[] | object) => { + query = wrapIdsToQuery(query); + return new Promise((resolve, reject) => { + db[collection].findOne(query, (err, docs) => { + if(err) { + reject(err); + } else { + resolve(docs); + } + }); + }); +} + +let dbRemoveOne = (collection: Collection, id: string) => { + let query = { _id: id }; + return new Promise((resolve, reject) => { + db[collection].remove(query, (err, numRemoved) => { + if(err) { + reject(err); + } else { + if(numRemoved > 1) { + throw new Error(`Removed ${numRemoved} elements with id: ${id}. Only one is Ok.`); + } else { + resolve(numRemoved == 1); + } + } + }); + }); +} + +let dbRemoveMany = (collection: Collection, query: string[] | object) => { + query = wrapIdsToQuery(query); return new Promise((resolve, reject) => { db[collection].remove(query, (err, numRemoved) => { if(err) { @@ -102,6 +143,7 @@ let dbRemove = (collection: Collection, query: string | object) => { }); } + function maybeCreateDir(path: string): void { if(fs.existsSync(path)) { return; From 1d047087b4f47533bf54a830cf08c1bc8f8e5d14 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 7 Aug 2018 14:12:31 +0300 Subject: [PATCH 09/12] async bugfxs --- server/src/index.ts | 8 +++++++- server/src/models/analytic_unit_model.ts | 4 +++- server/src/routes/analytic_units_router.ts | 16 ++++------------ server/src/routes/segments_router.ts | 14 +++++++------- server/src/services/data_service.ts | 14 +++++++------- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/server/src/index.ts b/server/src/index.ts index 2933d8a..0676fbc 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -18,13 +18,19 @@ ProcessService.registerExitHandler(AnalyticsController.terminate); var app = new Koa(); +app.on('error', (err, ctx) => { + console.log('got server error:'); + console.log(err); +}); + + app.use(bodyParser()) app.use(async function(ctx, next) { ctx.set('Access-Control-Allow-Origin', '*'); ctx.set('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, PATCH, OPTIONS'); ctx.set('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept'); - next(); + await next(); }); diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 73b5d9f..7115803 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -75,7 +75,9 @@ export async function findById(id: AnalyticUnitId): Promise { * @returns unit.id */ export async function create(unit: AnalyticUnit): Promise { - return unit.id = await db.insertOne(unit.toObject()); + var obj = unit.toObject(); + var r = await db.insertOne(obj); + return r; } export async function remove(id: AnalyticUnitId): Promise { diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index 763ddaa..37a85d8 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -5,24 +5,15 @@ import * as AnalyticUnit from '../models/analytic_unit_model'; import { createAnalyticUnitFromObject } from '../controllers/analytics_controller' -async function sendStatus(ctx: Router.IRouterContext) { +async function getStatus(ctx: Router.IRouterContext) { try { - let id = ctx.request.query.id; - if(id === undefined) { - throw new Error('Id is undefined'); - } - let unit = await AnalyticUnit.findById(id); - if(unit.status === undefined) { - throw new Error('status is undefined'); - } - ctx.response.body = { status: unit.status, errorMessage: unit.error }; + ctx.response.body = { status: 'READY', errorMessage: undefined }; } catch(e) { console.error(e); // TODO: better send 404 when we know than isn`t found ctx.response.status = 500; ctx.response.body = { error: 'Can`t return anything' }; } - } async function getUnit(ctx: Router.IRouterContext) { @@ -60,6 +51,7 @@ async function createUnit(ctx: Router.IRouterContext) { message: `Creation error: ${e.message}` }; } + } async function deleteUnit(ctx: Router.IRouterContext) { @@ -82,6 +74,6 @@ async function deleteUnit(ctx: Router.IRouterContext) { export var router = new Router(); router.get('/', getUnit); -router.get('/status', sendStatus); +router.get('/status', getStatus); router.post('/', createUnit); router.delete('/', deleteUnit); diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index 3f9f37a..2194fac 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -11,15 +11,15 @@ import { runLearning } from '../controllers/analytics_controller'; async function getSegments(ctx: Router.IRouterContext) { - let id: AnalyticUnitId = ctx.request.query.id; + // let id: AnalyticUnitId = ctx.request.query.id; - let segments = await findMany(id, { - intexGT: ctx.request.query.lastSegmentId, - timeFromGTE: ctx.request.query.from, - timeToLTE: ctx.request.query.to - }); + // let segments = await findMany(id, { + // intexGT: ctx.request.query.lastSegmentId, + // timeFromGTE: ctx.request.query.from, + // timeToLTE: ctx.request.query.to + // }); - ctx.response.body = { segments } + ctx.response.body = { segments: [] }; } diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 90cff7a..c39da2f 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -13,13 +13,13 @@ export enum Collection { ANALYTIC_UNITS, SEGMENTS }; * @param { string | object } query: a key as a string or mongodb-style query */ export type DBQ = { - findOne: (query: string | object) => any, - findMany: (query: string[] | object) => any[], - insertOne: (document: object) => string, - insertMany: (documents: object[]) => string[], - updateOne: (query: string | object, updateQuery: any) => void, - removeOne: (query: string) => boolean - removeMany: (query: string[] | object) => number + findOne: (query: string | object) => Promise, + findMany: (query: string[] | object) => Promise, + insertOne: (document: object) => Promise, + insertMany: (documents: object[]) => Promise, + updateOne: (query: string | object, updateQuery: any) => Promise, + removeOne: (query: string) => Promise + removeMany: (query: string[] | object) => Promise } export function makeDBQ(collection: Collection): DBQ { From 590f3fdf766a9ea47e32fcc3b969038be4de2934 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 12 Aug 2018 22:31:07 +0300 Subject: [PATCH 10/12] strict any checks --- server/src/config.ts | 2 +- server/src/controllers/alerts_controller.ts | 2 +- server/src/controllers/analytics_controller.ts | 8 ++++---- server/src/models/segment_model.ts | 2 +- server/src/routes/segments_router.ts | 2 +- server/src/services/analytics_service.ts | 8 ++++---- server/src/services/data_service.ts | 18 +++++++++--------- server/src/services/notification_service.ts | 6 +++--- server/src/services/process_service.ts | 6 +++--- server/tsconfig.json | 4 ++-- 10 files changed, 29 insertions(+), 29 deletions(-) diff --git a/server/src/config.ts b/server/src/config.ts index 41fc9db..6a486f5 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -21,7 +21,7 @@ export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002'); export const ANLYTICS_PING_INTERVAL = 500; // ms -function getConfigField(field, defaultVal?) { +function getConfigField(field: string, defaultVal?: any) { let val = defaultVal; if(process.env[field] !== undefined) { diff --git a/server/src/controllers/alerts_controller.ts b/server/src/controllers/alerts_controller.ts index 0989ae2..0eaba4f 100644 --- a/server/src/controllers/alerts_controller.ts +++ b/server/src/controllers/alerts_controller.ts @@ -2,7 +2,7 @@ * Alarting is not supported yet */ -throw new console.error("Not supported"); +throw new Error('not supported'); // import { runPredict } from './analytics_controller'; diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 00afba3..ad3db56 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -3,7 +3,7 @@ import * as AnalyticUnit from '../models/analytic_unit_model' import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service'; -const taskMap = {}; +const taskMap = new Map(); let nextTaskId = 0; let analyticsService: AnalyticsService = undefined; @@ -14,9 +14,9 @@ function onTaskResult(taskResult: any) { let status = taskResult.status; if(status === 'SUCCESS' || status === 'FAILED') { if(taskId in taskMap) { - let resolver = taskMap[taskId]; + let resolver: any = taskMap.get(taskId); resolver(taskResult); - delete taskMap[taskId]; + taskMap.delete(taskId); } } } @@ -49,7 +49,7 @@ export function terminate() { analyticsService.close(); } -async function runTask(task): Promise { +async function runTask(task: any): Promise { // let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); // task.metric = { // datasource: anomaly.metric.datasource, diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index be91f16..4abb5f2 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -68,7 +68,7 @@ export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promis return segs.map(Segment.fromObject); } -export async function insertSegments(id: AnalyticUnitId, segments: Segment[]) { +export async function insertSegments(segments: Segment[]) { return db.insertMany(segments.map(s => s.toObject())); } diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index 2194fac..64ff66a 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -27,7 +27,7 @@ async function updateSegments(ctx: Router.IRouterContext) { try { let segmentsUpdate = ctx.request.body; let id = segmentsUpdate.id; - let addedIds = insertSegments(id, segmentsUpdate.addedSegments); + let addedIds = insertSegments(segmentsUpdate.addedSegments); // removeSegments(id, segmentsUpdate.removedSegments); ctx.response.body = { addedIds }; runLearning(id); diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index 7686135..940207f 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -29,8 +29,8 @@ export class AnalyticsService { private _requester: any; private _ready: boolean = false; private _pingResponded = false; - private _zmqConnectionString = null; - private _ipcPath = null; + private _zmqConnectionString: string = null; + private _ipcPath: string = null; private _analyticsPinger: NodeJS.Timer = null; private _isClosed = false; @@ -55,7 +55,7 @@ export class AnalyticsService { strMessage = 'PING'; } return new Promise((resolve, reject) => { - this._requester.send(strMessage, undefined, (err) => { + this._requester.send(strMessage, undefined, (err: any) => { if(err) { reject(err); } else { @@ -184,7 +184,7 @@ export class AnalyticsService { } } - private _onAnalyticsMessage(data: any, error) { + private _onAnalyticsMessage(data: any) { if(data.toString() === 'PONG') { this._pingResponded = true; if(!this._ready) { diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index c39da2f..725a69e 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -52,7 +52,7 @@ const db = new Map(); let dbInsertOne = (collection: Collection, doc: object) => { return new Promise((resolve, reject) => { - db[collection].insert(doc, (err, newDoc) => { + db.get(collection).insert(doc, (err, newDoc: any) => { if(err) { reject(err); } else { @@ -64,7 +64,7 @@ let dbInsertOne = (collection: Collection, doc: object) => { let dbInsertMany = (collection: Collection, docs: object[]) => { return new Promise((resolve, reject) => { - db[collection].insert(docs, (err, newDocs: any[]) => { + db.get(collection).insert(docs, (err, newDocs: any[]) => { if(err) { reject(err); } else { @@ -77,7 +77,7 @@ let dbInsertMany = (collection: Collection, docs: object[]) => { let dbUpdateOne = (collection: Collection, query: string | object, updateQuery: object) => { query = wrapIdToQuery(query); return new Promise((resolve, reject) => { - db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { + db.get(collection).update(query, updateQuery, { /* options */ }, (err: Error) => { if(err) { reject(err); } else { @@ -90,7 +90,7 @@ let dbUpdateOne = (collection: Collection, query: string | object, updateQuery: let dbFindOne = (collection: Collection, query: string | object) => { query = wrapIdToQuery(query); return new Promise((resolve, reject) => { - db[collection].findOne(query, (err, doc) => { + db.get(collection).findOne(query, (err, doc) => { if(err) { reject(err); } else { @@ -103,7 +103,7 @@ let dbFindOne = (collection: Collection, query: string | object) => { let dbFindMany = (collection: Collection, query: string[] | object) => { query = wrapIdsToQuery(query); return new Promise((resolve, reject) => { - db[collection].findOne(query, (err, docs) => { + db.get(collection).findOne(query, (err, docs: any[]) => { if(err) { reject(err); } else { @@ -116,7 +116,7 @@ let dbFindMany = (collection: Collection, query: string[] | object) => { let dbRemoveOne = (collection: Collection, id: string) => { let query = { _id: id }; return new Promise((resolve, reject) => { - db[collection].remove(query, (err, numRemoved) => { + db.get(collection).remove(query, (err, numRemoved) => { if(err) { reject(err); } else { @@ -133,7 +133,7 @@ let dbRemoveOne = (collection: Collection, id: string) => { let dbRemoveMany = (collection: Collection, query: string[] | object) => { query = wrapIdsToQuery(query); return new Promise((resolve, reject) => { - db[collection].remove(query, (err, numRemoved) => { + db.get(collection).remove(query, (err, numRemoved) => { if(err) { reject(err); } else { @@ -161,5 +161,5 @@ function checkDataFolders(): void { checkDataFolders(); // TODO: it's better if models request db which we create if it`s needed -db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); -db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); +db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true })); +db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true })); diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index 126ca10..6160ef9 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -1,11 +1,11 @@ -import { findById } from '../models/analytic_unit_model'; +import { findById, AnalyticUnitId } from '../models/analytic_unit_model'; import axios from 'axios'; // TODO: send notification with payload without dep to AnalyticUnit -export async function sendNotification(predictorId, active) { - let anomalyName = (await findById(predictorId)).name +export async function sendNotification(id: AnalyticUnitId, active: boolean) { + let anomalyName = (await findById(id)).name console.log('Notification ' + anomalyName); let notification = { diff --git a/server/src/services/process_service.ts b/server/src/services/process_service.ts index 6d0a237..b3e882e 100644 --- a/server/src/services/process_service.ts +++ b/server/src/services/process_service.ts @@ -1,6 +1,6 @@ -var exitHandlers = [] +var exitHandlers: (() => void)[] = []; var exitHandled = false; /** @@ -12,7 +12,7 @@ export function registerExitHandler(callback: () => void) { exitHandlers.push(callback); } -function exitHandler(options, err?) { +function exitHandler(options: any, err?: any) { if(exitHandled) { return; } @@ -24,7 +24,7 @@ function exitHandler(options, err?) { process.exit(); } -function catchException(options, err) { +function catchException(options: any, err: any) { console.log('Server exception:'); console.log(err); exitHandler({ exit: true }); diff --git a/server/tsconfig.json b/server/tsconfig.json index e28057c..f51b103 100644 --- a/server/tsconfig.json +++ b/server/tsconfig.json @@ -1,9 +1,9 @@ { "compilerOptions": { "sourceMap": true, - "noImplicitAny": false, + "noImplicitAny": true, "module": "commonjs", "target": "es6", - "allowJs": true + "allowJs": true, } } From de68788c8d6193cf2362c39f0774d6a5ec928c9e Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 14 Aug 2018 22:35:54 +0300 Subject: [PATCH 11/12] task model + tsconfig checksfix + segments parse++ --- .../src/controllers/analytics_controller.ts | 5 +-- server/src/models/analytic_unit_model.ts | 2 +- server/src/models/segment_model.ts | 17 +++++++--- server/src/models/task_model.ts | 31 ++++++++++++++++++- server/src/routes/segments_router.ts | 17 +++++----- server/src/services/data_service.ts | 3 ++ server/tsconfig.json | 4 +-- 7 files changed, 59 insertions(+), 20 deletions(-) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index ad3db56..ae65a91 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -1,5 +1,6 @@ +import { Task } from '../models/task_model'; import * as SegmentsController from '../models/segment_model'; -import * as AnalyticUnit from '../models/analytic_unit_model' +import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service'; @@ -49,7 +50,7 @@ export function terminate() { analyticsService.close(); } -async function runTask(task: any): Promise { +async function runTask(task: Task): Promise { // let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); // task.metric = { // datasource: anomaly.metric.datasource, diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 7115803..26b0278 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -56,7 +56,7 @@ export class AnalyticUnit { Metric.fromObject(obj.metric), obj.status, obj.lastPredictionTime, - obj.id || obj._id, + obj._id, obj.error, ); } diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index 4abb5f2..bc46b9c 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -9,21 +9,27 @@ type SegmentId = string; export class Segment { constructor( - public auId: AnalyticUnitId, + public analyticUnitId: AnalyticUnitId, public from: number, public to: number, public labeled: boolean, public id?: SegmentId ) { - if(auId === undefined) { + if(analyticUnitId === undefined) { throw new Error('AnalyticUnitId is undefined'); } if(from === undefined) { throw new Error('from is undefined'); } + if(isNaN(from)) { + throw new Error('from is NaN'); + } if(to === undefined) { throw new Error('to is undefined'); } + if(isNaN(to)) { + throw new Error('to is NaN'); + } if(labeled === undefined) { throw new Error('labeled is undefined'); } @@ -32,7 +38,7 @@ export class Segment { public toObject() { return { _id: this.id, - auId: this.auId, + analyticUnitId: this.analyticUnitId, from: this.from, to: this.to, labeled: this.labeled @@ -44,8 +50,9 @@ export class Segment { throw new Error('obj is undefined'); } return new Segment( - obj.auId, +obj.from, +obj.to, - obj.labeled, obj.id || obj._id + obj.analyticUnitId, + +obj.from, +obj.to, + obj.labeled, obj._id ); } } diff --git a/server/src/models/task_model.ts b/server/src/models/task_model.ts index 880519c..c19e3d4 100644 --- a/server/src/models/task_model.ts +++ b/server/src/models/task_model.ts @@ -1,3 +1,32 @@ +import { AnalyticUnitId } from "./analytic_unit_model"; + + +export type TaskId = string; + export class Task { - + constructor( + public analyticUnitId: AnalyticUnitId, + public id?: TaskId + ) { + if(analyticUnitId === undefined) { + throw new Error('analyticUnitId is undefined'); + } + } + + public toObject() { + return { + _id: this.id, + analyticUnitId: this.analyticUnitId + }; + } + + static fromObject(obj: any): Task { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new Task( + obj.analyticUnitId, + obj._id, + ); + } } diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index 64ff66a..a03d8c3 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -2,11 +2,7 @@ import * as Router from 'koa-router'; import { AnalyticUnitId } from '../models/analytic_unit_model'; -import { - findMany, - insertSegments, - removeSegments, -} from '../models/segment_model'; +import * as SegmentModel from '../models/segment_model'; import { runLearning } from '../controllers/analytics_controller'; @@ -25,9 +21,14 @@ async function getSegments(ctx: Router.IRouterContext) { async function updateSegments(ctx: Router.IRouterContext) { try { - let segmentsUpdate = ctx.request.body; - let id = segmentsUpdate.id; - let addedIds = insertSegments(segmentsUpdate.addedSegments); + + let { addedSegments, id } = ctx.request.body as { addedSegments: any[], id: AnalyticUnitId }; + + let segmentsToInsert: SegmentModel.Segment[] = addedSegments.map( + s => SegmentModel.Segment.fromObject({ analyticUnitId: id, labeled: true, ...s }) + ); + + let addedIds = await SegmentModel.insertSegments(segmentsToInsert); // removeSegments(id, segmentsUpdate.removedSegments); ctx.response.body = { addedIds }; runLearning(id); diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 725a69e..e892c99 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -63,6 +63,9 @@ let dbInsertOne = (collection: Collection, doc: object) => { } let dbInsertMany = (collection: Collection, docs: object[]) => { + if(docs.length === 0) { + return Promise.resolve([]); + } return new Promise((resolve, reject) => { db.get(collection).insert(docs, (err, newDocs: any[]) => { if(err) { diff --git a/server/tsconfig.json b/server/tsconfig.json index f51b103..d5cef27 100644 --- a/server/tsconfig.json +++ b/server/tsconfig.json @@ -1,9 +1,7 @@ { "compilerOptions": { "sourceMap": true, - "noImplicitAny": true, "module": "commonjs", - "target": "es6", - "allowJs": true, + "target": "es6" } } From 16f34222241416a79c21aa39a29206b5203ccac0 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Thu, 16 Aug 2018 00:35:15 +0300 Subject: [PATCH 12/12] correct saving of segments & removing many --- server/src/models/segment_model.ts | 7 +++-- server/src/routes/segments_router.ts | 43 +++++++++++++++++++--------- server/src/services/data_service.ts | 26 ++++++++++++----- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index bc46b9c..507622a 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -5,7 +5,7 @@ import { Collection, makeDBQ } from '../services/data_service'; let db = makeDBQ(Collection.SEGMENTS); -type SegmentId = string; +export type SegmentId = string; export class Segment { constructor( @@ -64,7 +64,7 @@ export type FindManyQuery = { } export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promise { - var dbQuery: any = { auId: id }; + var dbQuery: any = { analyticUnitId: id }; if(query.timeFromGTE !== undefined) { dbQuery.from = { $gte: query.timeFromGTE }; } @@ -72,6 +72,9 @@ export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promis dbQuery.to = { $lte: query.timeToLTE }; } let segs = await db.findMany(dbQuery); + if(segs === null) { + return []; + } return segs.map(Segment.fromObject); } diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index a03d8c3..07bc1fc 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -7,30 +7,47 @@ import { runLearning } from '../controllers/analytics_controller'; async function getSegments(ctx: Router.IRouterContext) { - // let id: AnalyticUnitId = ctx.request.query.id; + let id: AnalyticUnitId = ctx.request.query.id; + if(id === undefined || id === '') { + throw new Error('analyticUnitId (id) is missing'); + } + let query: SegmentModel.FindManyQuery = {}; + + if(!isNaN(+ctx.request.query.lastSegmentId)) { + query.intexGT = +ctx.request.query.lastSegmentId; + } + if(!isNaN(+ctx.request.query.from)) { + query.timeFromGTE = +ctx.request.query.from; + } + if(!isNaN(+ctx.request.query.to)) { + query.timeToLTE = +ctx.request.query.to; + } - // let segments = await findMany(id, { - // intexGT: ctx.request.query.lastSegmentId, - // timeFromGTE: ctx.request.query.from, - // timeToLTE: ctx.request.query.to - // }); + let segments = await SegmentModel.findMany(id, query); - ctx.response.body = { segments: [] }; + ctx.response.body = { segments }; } async function updateSegments(ctx: Router.IRouterContext) { try { - - let { addedSegments, id } = ctx.request.body as { addedSegments: any[], id: AnalyticUnitId }; + + let { + addedSegments, id, removedSegments: removedIds + } = ctx.request.body as { + addedSegments: any[], id: AnalyticUnitId, removedSegments: SegmentModel.SegmentId[] + }; let segmentsToInsert: SegmentModel.Segment[] = addedSegments.map( s => SegmentModel.Segment.fromObject({ analyticUnitId: id, labeled: true, ...s }) ); - - let addedIds = await SegmentModel.insertSegments(segmentsToInsert); - // removeSegments(id, segmentsUpdate.removedSegments); - ctx.response.body = { addedIds }; + + let [addedIds, removed] = await Promise.all([ + SegmentModel.insertSegments(segmentsToInsert), + SegmentModel.removeSegments(removedIds) + ]); + + ctx.response.body = { addedIds, removed }; runLearning(id); } catch(e) { ctx.response.status = 500; diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index e892c99..ec9908c 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -48,6 +48,13 @@ function wrapIdsToQuery(query: string[] | object): any { return query; } +function isEmptyArray(obj: any): boolean { + if(!Array.isArray(obj)) { + return false; + } + return obj.length == 0; +} + const db = new Map(); let dbInsertOne = (collection: Collection, doc: object) => { @@ -104,9 +111,12 @@ let dbFindOne = (collection: Collection, query: string | object) => { } let dbFindMany = (collection: Collection, query: string[] | object) => { + if(isEmptyArray(query)) { + return Promise.resolve([]); + } query = wrapIdsToQuery(query); return new Promise((resolve, reject) => { - db.get(collection).findOne(query, (err, docs: any[]) => { + db.get(collection).find(query, (err, docs: any[]) => { if(err) { reject(err); } else { @@ -116,15 +126,15 @@ let dbFindMany = (collection: Collection, query: string[] | object) => { }); } -let dbRemoveOne = (collection: Collection, id: string) => { - let query = { _id: id }; +let dbRemoveOne = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); return new Promise((resolve, reject) => { - db.get(collection).remove(query, (err, numRemoved) => { + db.get(collection).remove(query, { /* options */ }, (err, numRemoved) => { if(err) { reject(err); } else { if(numRemoved > 1) { - throw new Error(`Removed ${numRemoved} elements with id: ${id}. Only one is Ok.`); + throw new Error(`Removed ${numRemoved} elements with query: ${JSON.stringify(query)}. Only one is Ok.`); } else { resolve(numRemoved == 1); } @@ -134,9 +144,12 @@ let dbRemoveOne = (collection: Collection, id: string) => { } let dbRemoveMany = (collection: Collection, query: string[] | object) => { + if(isEmptyArray(query)) { + return Promise.resolve([]); + } query = wrapIdsToQuery(query); return new Promise((resolve, reject) => { - db.get(collection).remove(query, (err, numRemoved) => { + db.get(collection).remove(query, { multi: true }, (err, numRemoved) => { if(err) { reject(err); } else { @@ -146,7 +159,6 @@ let dbRemoveMany = (collection: Collection, query: string[] | object) => { }); } - function maybeCreateDir(path: string): void { if(fs.existsSync(path)) { return;