From dc6c120261f1c78ac245a3ef4eb0ae2c10b4fb35 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 5 Aug 2018 18:09:42 +0300 Subject: [PATCH] serments / metrics models refactoring --- .../src/controllers/analytics_controller.ts | 139 ++++++++---------- server/src/models/analytic_unit_model.ts | 21 +-- server/src/models/metric_model.ts | 35 ++--- server/src/models/segment_model.ts | 78 +++------- server/src/models/task_model.ts | 3 + server/src/routes/analytic_units_router.ts | 11 +- server/src/routes/segments_router.ts | 26 ++-- server/src/services/data_service.ts | 18 ++- 8 files changed, 142 insertions(+), 189 deletions(-) create mode 100644 server/src/models/task_model.ts diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index bc140ad..f707ef0 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -30,15 +30,6 @@ async function onMessage(message: AnalyticsMessage) { resolvedMethod = true; } - if(message.method === 'FILE_SAVE') { - responsePayload = await onFileSave(message.payload); - resolvedMethod = true; - } - if(message.method === 'FILE_LOAD') { - responsePayload = await onFileLoad(message.payload); - resolvedMethod = true; - } - if(!resolvedMethod) { throw new TypeError('Unknown method ' + message.method); } @@ -59,72 +50,72 @@ export function terminate() { } async function runTask(task): Promise { - let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); - task.metric = { - datasource: anomaly.metric.datasource, - targets: anomaly.metric.targets.map(getTarget) - }; - - task._taskId = nextTaskId++; - await analyticsService.sendTask(task); - - return new Promise(resolve => { - taskMap[task._taskId] = resolve; - }) + // let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); + // task.metric = { + // datasource: anomaly.metric.datasource, + // targets: anomaly.metric.targets.map(getTarget) + // }; + + // task._taskId = nextTaskId++; + // await analyticsService.sendTask(task); + + // return new Promise(resolve => { + // taskMap[task._taskId] = resolve; + // }) } export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { - let segments = getLabeledSegments(id); - AnalyticUnit.setStatus(id, 'LEARNING'); - let unit = await AnalyticUnit.findById(id); - let pattern = unit.type; - let task = { - analyticUnitId: id, - type: 'LEARN', - pattern, - segments: segments - }; - - let result = await runTask(task); - - if (result.status === 'SUCCESS') { - AnalyticUnit.setStatus(id, 'READY'); - insertSegments(id, result.segments, false); - AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); - } else { - AnalyticUnit.setStatus(id, 'FAILED', result.error); - } + // let segments = getLabeledSegments(id); + // AnalyticUnit.setStatus(id, 'LEARNING'); + // let unit = await AnalyticUnit.findById(id); + // let pattern = unit.type; + // let task = { + // analyticUnitId: id, + // type: 'LEARN', + // pattern, + // segments: segments + // }; + + // let result = await runTask(task); + + // if (result.status === 'SUCCESS') { + // AnalyticUnit.setStatus(id, 'READY'); + // insertSegments(id, result.segments, false); + // AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); + // } else { + // AnalyticUnit.setStatus(id, 'FAILED', result.error); + // } } export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { - let unit = await AnalyticUnit.findById(id); - let pattern = unit.type; - let task = { - type: 'PREDICT', - analyticUnitId: id, - pattern, - lastPredictionTime: unit.lastPredictionTime - }; - let result = await runTask(task); - - if(result.status === 'FAILED') { - return []; - } - // Merging segments - let segments = getLabeledSegments(id); - if(segments.length > 0 && result.segments.length > 0) { - let lastOldSegment = segments[segments.length - 1]; - let firstNewSegment = result.segments[0]; - - if(firstNewSegment.start <= lastOldSegment.finish) { - result.segments[0].start = lastOldSegment.start; - removeSegments(id, [lastOldSegment.id]); - } - } - - insertSegments(id, result.segments, false); - AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); - return result.segments; + // let unit = await AnalyticUnit.findById(id); + // let pattern = unit.type; + // let task = { + // type: 'PREDICT', + // analyticUnitId: id, + // pattern, + // lastPredictionTime: unit.lastPredictionTime + // }; + // let result = await runTask(task); + + // if(result.status === 'FAILED') { + // return []; + // } + // // Merging segments + // let segments = getLabeledSegments(id); + // if(segments.length > 0 && result.segments.length > 0) { + // let lastOldSegment = segments[segments.length - 1]; + // let firstNewSegment = result.segments[0]; + + // if(firstNewSegment.start <= lastOldSegment.finish) { + // result.segments[0].start = lastOldSegment.start; + // removeSegments(id, [lastOldSegment.id]); + // } + // } + + // insertSegments(id, result.segments, false); + // AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); + // return result.segments; } export function isAnalyticReady(): boolean { @@ -132,12 +123,8 @@ export function isAnalyticReady(): boolean { } export async function createAnalyticUnitFromObject(obj: any): Promise { - - - - - - runLearning(newId); - + let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.analyticUnitFromObj(obj); + AnalyticUnit.create(unit); + // runLearning(unit); return } \ No newline at end of file diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 6ba8619..a4e6162 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -1,11 +1,10 @@ -import { Metric, metricFromObj } from './metric_model'; +import { Metric } from './metric_model'; import { Collection, makeDBQ } from '../services/data_service'; let db = makeDBQ(Collection.ANALYTIC_UNITS); - export type AnalyticUnitId = string; export type AnalyticUnit = { @@ -16,11 +15,9 @@ export type AnalyticUnit = { metric: Metric status: string, error?: string, - lastPredictionTime: number, - nextId: number + lastPredictionTime: number } - export function analyticUnitFromObj(obj: any): AnalyticUnit { if(obj === undefined) { throw new Error('obj is undefined'); @@ -48,11 +45,9 @@ export function analyticUnitFromObj(obj: any): AnalyticUnit { name: obj.name, panelUrl: obj.panelUrl, type: obj.type, - datasource: obj.datasource, - metric: metric, + metric: obj.metric, status: 'LEARNING', - lastPredictionTime: 0, - nextId: 0 + lastPredictionTime: 0 }; return unit; @@ -62,8 +57,14 @@ export async function findById(id: AnalyticUnitId): Promise { return db.findOne(id); } +/** + * Creates and updates new unit.id + * + * @param unit to create + * @returns unit.id + */ export async function create(unit: AnalyticUnit): Promise { - return db.insert(unit); + return unit.id = await db.insert(unit); } export async function remove(id: AnalyticUnitId): Promise { diff --git a/server/src/models/metric_model.ts b/server/src/models/metric_model.ts index ca1ad16..b4062d3 100644 --- a/server/src/models/metric_model.ts +++ b/server/src/models/metric_model.ts @@ -12,38 +12,35 @@ export type Datasource = { url: string } +export type MetricId = string; + export type Metric = { + id?: MetricId, datasource: Datasource, targets: string[] } export function metricFromObj(obj: any): Metric { - const metric: Metric = { + return { datasource: obj.datasource, - targets: obj.targets; + targets: obj.targets }; - return metric; } -export async function saveTargets(targets: string[]) { - let metrics = []; - for (let target of targets) { - metrics.push(saveTarget(target)); - } - return metrics; -} +// export async function saveTargets(targets: string[]) { +// let metrics = []; +// for (let target of targets) { +// metrics.push(create(target)); +// } +// return metrics; +// } -export async function saveTarget(target: string) { - //const md5 = crypto.createHash('md5') - const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex'); - let filename = path.join(METRICS_PATH, `${targetId}.json`); - writeJsonDataSync(filename, target); - return targetId; +export async function create(metric: Metric): Promise { + return metric.id = await db.insert(metric); } -export async function getTarget(targetId) { - let filename = path.join(METRICS_PATH, `${targetId}.json`); - return getJsonDataSync(filename); +export async function findMetric(id: MetricId): Promise { + return db.findOne(id); } diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index 7183b30..28b0867 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -1,76 +1,36 @@ -import * as AnalyticUnit from './analytic_unit_model'; +import { AnalyticUnitId } from './analytic_unit_model'; +import { Collection, makeDBQ } from '../services/data_service'; -import * as _ from 'lodash'; - -import * as path from 'path'; -import * as fs from 'fs'; +import * as _ from 'lodash'; -export function getLabeledSegments(id: AnalyticUnit.AnalyticUnitId) { - let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); +type SegmentId = string; - if(!fs.existsSync(filename)) { - return []; - } else { - let segments = getJsonDataSync(filename); - for(let segment of segments) { - if(segment.labeled === undefined) { - segment.labeled = false; - } - } - return segments; - } +type Segment = { + id?: SegmentId, + from: number, + to: number, + labeled: boolean } -export function getPredictedSegments(id: AnalyticUnit.AnalyticUnitId) { - let filename = path.join(SEGMENTS_PATH, `${id}_segments.json`); +let db = makeDBQ(Collection.SEGMENTS); - let jsonData; - try { - jsonData = getJsonDataSync(filename); - } catch(e) { - console.error(e.message); - jsonData = []; - } - return jsonData; +export function getLabeledSegments(id: AnalyticUnitId) { + return } -export function saveSegments(id: AnalyticUnit.AnalyticUnitId, segments) { - let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`); +export function getPredictedSegments(id: AnalyticUnitId) { - try { - return writeJsonDataSync(filename, _.uniqBy(segments, 'start')); - } catch(e) { - console.error(e.message); - throw new Error('Can`t write to db'); - } } -export async function insertSegments(id: AnalyticUnit.AnalyticUnitId, addedSegments, labeled: boolean) { - // Set status - let info = await AnalyticUnit.findById(id); - let segments = getLabeledSegments(id); +export function saveSegments(id: AnalyticUnitId, segments: Segment[]) { + +} - let nextId = info.nextId; - let addedIds = [] - for (let segment of addedSegments) { - segment.id = nextId; - segment.labeled = labeled; - addedIds.push(nextId); - nextId++; - segments.push(segment); - } - info.nextId = nextId; - saveSegments(id, segments); - await AnalyticUnit.update(id, info); - return addedIds; +export async function insertSegments(id: AnalyticUnitId, addedSegments: Segment[], labeled: boolean) { } -export function removeSegments(id: AnalyticUnit.AnalyticUnitId, removedSegments) { - let segments = getLabeledSegments(id); - for (let segmentId of removedSegments) { - segments = segments.filter(el => el.id !== segmentId); - } - saveSegments(id, segments); +export function removeSegments(idsToRemove: SegmentId[]) { + } diff --git a/server/src/models/task_model.ts b/server/src/models/task_model.ts new file mode 100644 index 0000000..8992bc1 --- /dev/null +++ b/server/src/models/task_model.ts @@ -0,0 +1,3 @@ +class Task { + +} \ No newline at end of file diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index be59684..5f06072 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -12,7 +12,6 @@ async function sendStatus(ctx: Router.IRouterContext) { throw new Error('Id is undefined'); } let unit = await AnalyticUnit.findById(id); - if(unit.status === undefined) { throw new Error('status is undefined'); } @@ -52,16 +51,8 @@ async function findItem(ctx: Router.IRouterContext) { async function createItem(ctx: Router.IRouterContext) { try { - - let body = ctx.request.body; - - await createAnalyticUnitFromObject(body); - - - let newId = await AnalyticUnit.create(unit); + let newId = await createAnalyticUnitFromObject(ctx.request.body); ctx.response.body = { id: newId }; - - } catch(e) { ctx.response.status = 500; ctx.response.body = { diff --git a/server/src/routes/segments_router.ts b/server/src/routes/segments_router.ts index f63a834..15cda5c 100644 --- a/server/src/routes/segments_router.ts +++ b/server/src/routes/segments_router.ts @@ -17,21 +17,21 @@ async function sendSegments(ctx: Router.IRouterContext) { let timeFrom = ctx.request.query.from; let timeTo = ctx.request.query.to; - let segments = getLabeledSegments(id); + let segments = await getLabeledSegments(id); - // Id filtering - if(lastSegmentId !== undefined) { - segments = segments.filter(el => el.id > lastSegmentId); - } + // // Id filtering + // if(lastSegmentId !== undefined) { + // segments = segments.filter(el => el.id > lastSegmentId); + // } - // Time filtering - if(timeFrom !== undefined) { - segments = segments.filter(el => el.finish > timeFrom); - } + // // Time filtering + // if(timeFrom !== undefined) { + // segments = segments.filter(el => el.finish > timeFrom); + // } - if(timeTo !== undefined) { - segments = segments.filter(el => el.start < timeTo); - } + // if(timeTo !== undefined) { + // segments = segments.filter(el => el.start < timeTo); + // } ctx.response.body = { segments } @@ -42,7 +42,7 @@ async function updateSegments(ctx: Router.IRouterContext) { let segmentsUpdate = ctx.request.body; let id = segmentsUpdate.id; let addedIds = insertSegments(id, segmentsUpdate.addedSegments, true); - removeSegments(id, segmentsUpdate.removedSegments); + // removeSegments(id, segmentsUpdate.removedSegments); ctx.response.body = { addedIds }; runLearning(id); } catch(e) { diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index b91e8e2..f1dea86 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -14,6 +14,7 @@ export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS }; */ export type DBQ = { insert: (document: object) => string, + insertMany: (documents: object[]) => string[], update: (query: string | object, updateQuery: any) => void, findOne: (query: string | object) => any, remove: (query: string | object) => number @@ -22,6 +23,7 @@ export type DBQ = { export function makeDBQ(collection: Collection): DBQ { return { insert: dbInsert.bind(null, collection), + insertMany: dbInsertMany.bind(null, collection), update: dbUpdate.bind(null, collection), findOne: dbFindOne.bind(null, collection), remove: dbRemove.bind(null, collection) @@ -49,6 +51,18 @@ let dbInsert = (collection: Collection, doc: object) => { }); } +let dbInsertMany = (collection: Collection, docs: object[]) => { + return new Promise((resolve, reject) => { + db[collection].insert(docs, (err, newDocs: any[]) => { + if(err) { + reject(err); + } else { + resolve(newDocs.map(d => d._id)); + } + }); + }); +} + let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { query = wrapIdToQuery(query); return new Promise((resolve, reject) => { @@ -88,7 +102,7 @@ let dbRemove = (collection: Collection, query: string | object) => { }); } -function maybeCreate(path: string): void { +function maybeCreateDir(path: string): void { if(fs.existsSync(path)) { return; } @@ -100,7 +114,7 @@ function checkDataFolders(): void { [ config.DATA_PATH, config.ZMQ_IPC_PATH - ].forEach(maybeCreate); + ].forEach(maybeCreateDir); } checkDataFolders();