Browse Source

serments / metrics models refactoring

pull/1/head
Alexey Velikiy 6 years ago
parent
commit
dc6c120261
  1. 139
      server/src/controllers/analytics_controller.ts
  2. 21
      server/src/models/analytic_unit_model.ts
  3. 35
      server/src/models/metric_model.ts
  4. 76
      server/src/models/segment_model.ts
  5. 3
      server/src/models/task_model.ts
  6. 11
      server/src/routes/analytic_units_router.ts
  7. 26
      server/src/routes/segments_router.ts
  8. 18
      server/src/services/data_service.ts

139
server/src/controllers/analytics_controller.ts

@ -30,15 +30,6 @@ async function onMessage(message: AnalyticsMessage) {
resolvedMethod = true;
}
if(message.method === 'FILE_SAVE') {
responsePayload = await onFileSave(message.payload);
resolvedMethod = true;
}
if(message.method === 'FILE_LOAD') {
responsePayload = await onFileLoad(message.payload);
resolvedMethod = true;
}
if(!resolvedMethod) {
throw new TypeError('Unknown method ' + message.method);
}
@ -59,72 +50,72 @@ export function terminate() {
}
async function runTask(task): Promise<any> {
let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId);
task.metric = {
datasource: anomaly.metric.datasource,
targets: anomaly.metric.targets.map(getTarget)
};
task._taskId = nextTaskId++;
await analyticsService.sendTask(task);
return new Promise<void>(resolve => {
taskMap[task._taskId] = resolve;
})
// let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId);
// task.metric = {
// datasource: anomaly.metric.datasource,
// targets: anomaly.metric.targets.map(getTarget)
// };
// task._taskId = nextTaskId++;
// await analyticsService.sendTask(task);
// return new Promise<void>(resolve => {
// taskMap[task._taskId] = resolve;
// })
}
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let segments = getLabeledSegments(id);
AnalyticUnit.setStatus(id, 'LEARNING');
let unit = await AnalyticUnit.findById(id);
let pattern = unit.type;
let task = {
analyticUnitId: id,
type: 'LEARN',
pattern,
segments: segments
};
let result = await runTask(task);
if (result.status === 'SUCCESS') {
AnalyticUnit.setStatus(id, 'READY');
insertSegments(id, result.segments, false);
AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
} else {
AnalyticUnit.setStatus(id, 'FAILED', result.error);
}
// let segments = getLabeledSegments(id);
// AnalyticUnit.setStatus(id, 'LEARNING');
// let unit = await AnalyticUnit.findById(id);
// let pattern = unit.type;
// let task = {
// analyticUnitId: id,
// type: 'LEARN',
// pattern,
// segments: segments
// };
// let result = await runTask(task);
// if (result.status === 'SUCCESS') {
// AnalyticUnit.setStatus(id, 'READY');
// insertSegments(id, result.segments, false);
// AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
// } else {
// AnalyticUnit.setStatus(id, 'FAILED', result.error);
// }
}
export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
let unit = await AnalyticUnit.findById(id);
let pattern = unit.type;
let task = {
type: 'PREDICT',
analyticUnitId: id,
pattern,
lastPredictionTime: unit.lastPredictionTime
};
let result = await runTask(task);
if(result.status === 'FAILED') {
return [];
}
// Merging segments
let segments = getLabeledSegments(id);
if(segments.length > 0 && result.segments.length > 0) {
let lastOldSegment = segments[segments.length - 1];
let firstNewSegment = result.segments[0];
if(firstNewSegment.start <= lastOldSegment.finish) {
result.segments[0].start = lastOldSegment.start;
removeSegments(id, [lastOldSegment.id]);
}
}
insertSegments(id, result.segments, false);
AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
return result.segments;
// let unit = await AnalyticUnit.findById(id);
// let pattern = unit.type;
// let task = {
// type: 'PREDICT',
// analyticUnitId: id,
// pattern,
// lastPredictionTime: unit.lastPredictionTime
// };
// let result = await runTask(task);
// if(result.status === 'FAILED') {
// return [];
// }
// // Merging segments
// let segments = getLabeledSegments(id);
// if(segments.length > 0 && result.segments.length > 0) {
// let lastOldSegment = segments[segments.length - 1];
// let firstNewSegment = result.segments[0];
// if(firstNewSegment.start <= lastOldSegment.finish) {
// result.segments[0].start = lastOldSegment.start;
// removeSegments(id, [lastOldSegment.id]);
// }
// }
// insertSegments(id, result.segments, false);
// AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
// return result.segments;
}
export function isAnalyticReady(): boolean {
@ -132,12 +123,8 @@ export function isAnalyticReady(): boolean {
}
export async function createAnalyticUnitFromObject(obj: any): Promise<AnalyticUnit.AnalyticUnitId> {
runLearning(newId);
let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.analyticUnitFromObj(obj);
AnalyticUnit.create(unit);
// runLearning(unit);
return
}

21
server/src/models/analytic_unit_model.ts

@ -1,11 +1,10 @@
import { Metric, metricFromObj } from './metric_model';
import { Metric } from './metric_model';
import { Collection, makeDBQ } from '../services/data_service';
let db = makeDBQ(Collection.ANALYTIC_UNITS);
export type AnalyticUnitId = string;
export type AnalyticUnit = {
@ -16,11 +15,9 @@ export type AnalyticUnit = {
metric: Metric
status: string,
error?: string,
lastPredictionTime: number,
nextId: number
lastPredictionTime: number
}
export function analyticUnitFromObj(obj: any): AnalyticUnit {
if(obj === undefined) {
throw new Error('obj is undefined');
@ -48,11 +45,9 @@ export function analyticUnitFromObj(obj: any): AnalyticUnit {
name: obj.name,
panelUrl: obj.panelUrl,
type: obj.type,
datasource: obj.datasource,
metric: metric,
metric: obj.metric,
status: 'LEARNING',
lastPredictionTime: 0,
nextId: 0
lastPredictionTime: 0
};
return unit;
@ -62,8 +57,14 @@ export async function findById(id: AnalyticUnitId): Promise<AnalyticUnit> {
return db.findOne(id);
}
/**
* Creates and updates new unit.id
*
* @param unit to create
* @returns unit.id
*/
export async function create(unit: AnalyticUnit): Promise<AnalyticUnitId> {
return db.insert(unit);
return unit.id = await db.insert(unit);
}
export async function remove(id: AnalyticUnitId): Promise<void> {

35
server/src/models/metric_model.ts

@ -12,38 +12,35 @@ export type Datasource = {
url: string
}
export type MetricId = string;
export type Metric = {
id?: MetricId,
datasource: Datasource,
targets: string[]
}
export function metricFromObj(obj: any): Metric {
const metric: Metric = {
return {
datasource: obj.datasource,
targets: obj.targets;
targets: obj.targets
};
return metric;
}
export async function saveTargets(targets: string[]) {
let metrics = [];
for (let target of targets) {
metrics.push(saveTarget(target));
}
return metrics;
}
// export async function saveTargets(targets: string[]) {
// let metrics = [];
// for (let target of targets) {
// metrics.push(create(target));
// }
// return metrics;
// }
export async function saveTarget(target: string) {
//const md5 = crypto.createHash('md5')
const targetId = crypto.createHash('md5').update(JSON.stringify(target)).digest('hex');
let filename = path.join(METRICS_PATH, `${targetId}.json`);
writeJsonDataSync(filename, target);
return targetId;
export async function create(metric: Metric): Promise<MetricId> {
return metric.id = await db.insert(metric);
}
export async function getTarget(targetId) {
let filename = path.join(METRICS_PATH, `${targetId}.json`);
return getJsonDataSync(filename);
export async function findMetric(id: MetricId): Promise<Metric> {
return db.findOne(id);
}

76
server/src/models/segment_model.ts

@ -1,76 +1,36 @@
import * as AnalyticUnit from './analytic_unit_model';
import { AnalyticUnitId } from './analytic_unit_model';
import { Collection, makeDBQ } from '../services/data_service';
import * as _ from 'lodash';
import * as path from 'path';
import * as fs from 'fs';
import * as _ from 'lodash';
export function getLabeledSegments(id: AnalyticUnit.AnalyticUnitId) {
let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`);
type SegmentId = string;
if(!fs.existsSync(filename)) {
return [];
} else {
let segments = getJsonDataSync(filename);
for(let segment of segments) {
if(segment.labeled === undefined) {
segment.labeled = false;
}
}
return segments;
}
type Segment = {
id?: SegmentId,
from: number,
to: number,
labeled: boolean
}
export function getPredictedSegments(id: AnalyticUnit.AnalyticUnitId) {
let filename = path.join(SEGMENTS_PATH, `${id}_segments.json`);
let db = makeDBQ(Collection.SEGMENTS);
let jsonData;
try {
jsonData = getJsonDataSync(filename);
} catch(e) {
console.error(e.message);
jsonData = [];
}
return jsonData;
export function getLabeledSegments(id: AnalyticUnitId) {
return
}
export function saveSegments(id: AnalyticUnit.AnalyticUnitId, segments) {
let filename = path.join(SEGMENTS_PATH, `${id}_labeled.json`);
export function getPredictedSegments(id: AnalyticUnitId) {
try {
return writeJsonDataSync(filename, _.uniqBy(segments, 'start'));
} catch(e) {
console.error(e.message);
throw new Error('Can`t write to db');
}
}
export async function insertSegments(id: AnalyticUnit.AnalyticUnitId, addedSegments, labeled: boolean) {
// Set status
let info = await AnalyticUnit.findById(id);
let segments = getLabeledSegments(id);
export function saveSegments(id: AnalyticUnitId, segments: Segment[]) {
let nextId = info.nextId;
let addedIds = []
for (let segment of addedSegments) {
segment.id = nextId;
segment.labeled = labeled;
addedIds.push(nextId);
nextId++;
segments.push(segment);
}
info.nextId = nextId;
saveSegments(id, segments);
await AnalyticUnit.update(id, info);
return addedIds;
}
export function removeSegments(id: AnalyticUnit.AnalyticUnitId, removedSegments) {
let segments = getLabeledSegments(id);
for (let segmentId of removedSegments) {
segments = segments.filter(el => el.id !== segmentId);
export async function insertSegments(id: AnalyticUnitId, addedSegments: Segment[], labeled: boolean) {
}
saveSegments(id, segments);
export function removeSegments(idsToRemove: SegmentId[]) {
}

3
server/src/models/task_model.ts

@ -0,0 +1,3 @@
class Task {
}

11
server/src/routes/analytic_units_router.ts

@ -12,7 +12,6 @@ async function sendStatus(ctx: Router.IRouterContext) {
throw new Error('Id is undefined');
}
let unit = await AnalyticUnit.findById(id);
if(unit.status === undefined) {
throw new Error('status is undefined');
}
@ -52,16 +51,8 @@ async function findItem(ctx: Router.IRouterContext) {
async function createItem(ctx: Router.IRouterContext) {
try {
let body = ctx.request.body;
await createAnalyticUnitFromObject(body);
let newId = await AnalyticUnit.create(unit);
let newId = await createAnalyticUnitFromObject(ctx.request.body);
ctx.response.body = { id: newId };
} catch(e) {
ctx.response.status = 500;
ctx.response.body = {

26
server/src/routes/segments_router.ts

@ -17,21 +17,21 @@ async function sendSegments(ctx: Router.IRouterContext) {
let timeFrom = ctx.request.query.from;
let timeTo = ctx.request.query.to;
let segments = getLabeledSegments(id);
let segments = await getLabeledSegments(id);
// Id filtering
if(lastSegmentId !== undefined) {
segments = segments.filter(el => el.id > lastSegmentId);
}
// // Id filtering
// if(lastSegmentId !== undefined) {
// segments = segments.filter(el => el.id > lastSegmentId);
// }
// Time filtering
if(timeFrom !== undefined) {
segments = segments.filter(el => el.finish > timeFrom);
}
// // Time filtering
// if(timeFrom !== undefined) {
// segments = segments.filter(el => el.finish > timeFrom);
// }
if(timeTo !== undefined) {
segments = segments.filter(el => el.start < timeTo);
}
// if(timeTo !== undefined) {
// segments = segments.filter(el => el.start < timeTo);
// }
ctx.response.body = { segments }
@ -42,7 +42,7 @@ async function updateSegments(ctx: Router.IRouterContext) {
let segmentsUpdate = ctx.request.body;
let id = segmentsUpdate.id;
let addedIds = insertSegments(id, segmentsUpdate.addedSegments, true);
removeSegments(id, segmentsUpdate.removedSegments);
// removeSegments(id, segmentsUpdate.removedSegments);
ctx.response.body = { addedIds };
runLearning(id);
} catch(e) {

18
server/src/services/data_service.ts

@ -14,6 +14,7 @@ export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS };
*/
export type DBQ = {
insert: (document: object) => string,
insertMany: (documents: object[]) => string[],
update: (query: string | object, updateQuery: any) => void,
findOne: (query: string | object) => any,
remove: (query: string | object) => number
@ -22,6 +23,7 @@ export type DBQ = {
export function makeDBQ(collection: Collection): DBQ {
return {
insert: dbInsert.bind(null, collection),
insertMany: dbInsertMany.bind(null, collection),
update: dbUpdate.bind(null, collection),
findOne: dbFindOne.bind(null, collection),
remove: dbRemove.bind(null, collection)
@ -49,6 +51,18 @@ let dbInsert = (collection: Collection, doc: object) => {
});
}
let dbInsertMany = (collection: Collection, docs: object[]) => {
return new Promise<string[]>((resolve, reject) => {
db[collection].insert(docs, (err, newDocs: any[]) => {
if(err) {
reject(err);
} else {
resolve(newDocs.map(d => d._id));
}
});
});
}
let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => {
query = wrapIdToQuery(query);
return new Promise<void>((resolve, reject) => {
@ -88,7 +102,7 @@ let dbRemove = (collection: Collection, query: string | object) => {
});
}
function maybeCreate(path: string): void {
function maybeCreateDir(path: string): void {
if(fs.existsSync(path)) {
return;
}
@ -100,7 +114,7 @@ function checkDataFolders(): void {
[
config.DATA_PATH,
config.ZMQ_IPC_PATH
].forEach(maybeCreate);
].forEach(maybeCreateDir);
}
checkDataFolders();

Loading…
Cancel
Save