Browse Source

Detection status router merge queried segments #590 (#596)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
9c0e2561e6
  1. 35
      server/spec/utils/spans.jest.ts
  2. 1
      server/src/config.ts
  3. 95
      server/src/controllers/analytics_controller.ts
  4. 15
      server/src/models/analytic_unit_cache_model.ts
  5. 140
      server/src/models/detection_model.ts
  6. 48
      server/src/routes/detections_router.ts
  7. 9
      server/src/services/data_service.ts
  8. 53
      server/src/utils/spans.ts

35
server/spec/utils/spans.jest.ts

@ -0,0 +1,35 @@
import { getNonIntersectedSpans } from '../../src/utils/spans';
import 'jest';
describe('getNonIntersectedSpans', function() {
let spanBorders = [3, 5, 6, 8, 10, 20];
it('functional test', function() {
expect(getNonIntersectedSpans(4, 11, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(5, 11, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(4, 10, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(5, 10, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(4, 20, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(4, 21, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}, {from: 20, to: 21}]);
expect(getNonIntersectedSpans(2, 20, spanBorders)).toEqual([{from: 2, to: 3}, {from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(2, 21, spanBorders)).toEqual([{from: 2, to: 3}, {from: 5, to: 6}, {from: 8, to: 10}, {from: 20, to: 21}]);
expect(getNonIntersectedSpans(3, 11, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(3, 20, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(3, 20, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]);
expect(getNonIntersectedSpans(4, 7, [3, 5, 6, 8])).toEqual([{from: 5, to: 6}]);
});
it('empty borders list', function() {
expect(getNonIntersectedSpans(4, 10, [])).toEqual([]);
});
it('all in span', function() {
expect(getNonIntersectedSpans(4, 10, [1, 20])).toEqual([]);
expect(getNonIntersectedSpans(4, 10, [1, 10])).toEqual([]);
expect(getNonIntersectedSpans(4, 10, [4, 20])).toEqual([]);
expect(getNonIntersectedSpans(4, 10, [4, 10])).toEqual([]);
});
});

1
server/src/config.ts

@ -17,6 +17,7 @@ export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units
export const ANALYTIC_UNIT_CACHES_DATABASE_PATH = path.join(DATA_PATH, 'analytic_unit_caches.db');
export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const THRESHOLD_DATABASE_PATH = path.join(DATA_PATH, 'treshold.db');
export const DETECTION_SPANS_DATABASE_PATH = path.join(DATA_PATH, 'detection_spans.db');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');

95
server/src/controllers/analytics_controller.ts

@ -4,11 +4,13 @@ import * as AnalyticUnitCache from '../models/analytic_unit_cache_model';
import * as Segment from '../models/segment_model';
import * as Threshold from '../models/threshold_model';
import * as AnalyticUnit from '../models/analytic_unit_model';
import * as Detection from '../models/detection_model';
import { AnalyticsService } from '../services/analytics_service';
import { AlertService } from '../services/alert_service';
import { HASTIC_API_KEY } from '../config';
import { DataPuller } from '../services/data_puller';
import { getGrafanaUrl } from '../utils/grafana';
import { getNonIntersectedSpans } from '../utils/spans';
import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit';
@ -264,14 +266,32 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
{ detector, analyticUnitType, lastDetectionTime: unit.lastDetectionTime, data, cache: oldCache }
);
console.log(`run task, id:${id}`);
// TODO: status: detection
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
let result = await runTask(task);
if(range !== undefined) {
if(
result.status === AnalyticUnit.AnalyticUnitStatus.SUCCESS ||
result.status === AnalyticUnit.AnalyticUnitStatus.READY
) {
await Detection.insertSpan(
new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.READY)
);
} else if (result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) {
await Detection.insertSpan(
new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.FAILED)
);
}
}
if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) {
throw new Error(result.error);
}
let payload = await processDetectionResult(id, result.payload);
await deleteNonDetectedSegments(id, payload);
// TODO: uncomment it
// It clears segments when redetecting on another timerange
// await deleteNonDetectedSegments(id, payload);
await Promise.all([
Segment.insertSegments(payload.segments),
@ -442,3 +462,76 @@ export async function runLearningWithDetection(id: AnalyticUnit.AnalyticUnitId)
.then(() => runDetect(id))
.catch(err => console.error(err));
}
export async function getDetectionSpans(
analyticUnitId: AnalyticUnit.AnalyticUnitId,
from: number,
to: number
): Promise<Detection.DetectionSpan[]> {
const readySpans = await Detection.getIntersectedSpans(analyticUnitId, from, to, Detection.DetectionStatus.READY);
const alreadyRunningSpans = await Detection.getIntersectedSpans(analyticUnitId, from, to, Detection.DetectionStatus.RUNNING);
const analyticUnitCache = await AnalyticUnitCache.findById(analyticUnitId);
if(_.isEmpty(readySpans)) {
const span = await runDetectionOnExtendedSpan(analyticUnitId, from, to, analyticUnitCache);
if(span === null) {
return [];
} else {
return [span];
}
}
const spanBorders = Detection.getSpanBorders(readySpans);
let newDetectionSpans = getNonIntersectedSpans(from, to, spanBorders);
if(newDetectionSpans.length === 0) {
return [ new Detection.DetectionSpan(analyticUnitId, from, to, Detection.DetectionStatus.READY) ];
}
let runningSpansPromises = [];
let newRunningSpans: Detection.DetectionSpan[] = [];
runningSpansPromises = newDetectionSpans.map(async span => {
const insideRunning = await Detection.findMany(analyticUnitId, {
status: Detection.DetectionStatus.RUNNING,
timeFromLTE: span.from,
timeToGTE: span.to
});
if(_.isEmpty(insideRunning)) {
const runningSpan = await runDetectionOnExtendedSpan(analyticUnitId, span.from, span.to, analyticUnitCache);
newRunningSpans.push(runningSpan);
}
});
await Promise.all(runningSpansPromises);
return _.concat(readySpans, alreadyRunningSpans, newRunningSpans.filter(span => span !== null));
}
async function runDetectionOnExtendedSpan(
analyticUnitId: AnalyticUnit.AnalyticUnitId,
from: number,
to: number,
analyticUnitCache: AnalyticUnitCache.AnalyticUnitCache
): Promise<Detection.DetectionSpan> {
if(analyticUnitCache === null) {
return null;
}
const intersection = analyticUnitCache.getIntersection();
const intersectedFrom = Math.max(from - intersection, 0);
const intersectedTo = to + intersection
runDetect(analyticUnitId, intersectedFrom, intersectedTo);
const detection = new Detection.DetectionSpan(
analyticUnitId,
intersectedFrom,
intersectedTo,
Detection.DetectionStatus.RUNNING
);
await Detection.insertSpan(detection);
return detection;
}

15
server/src/models/analytic_unit_cache_model.ts

@ -2,8 +2,9 @@ import { AnalyticUnitId } from "./analytic_unit_model";
import { Collection, makeDBQ } from '../services/data_service';
let db = makeDBQ(Collection.ANALYTIC_UNIT_CACHES);
const db = makeDBQ(Collection.ANALYTIC_UNIT_CACHES);
// TODO: count milliseconds in index from dataset
const MILLISECONDS_IN_INDEX = 60000;
export class AnalyticUnitCache {
public constructor(
@ -28,6 +29,16 @@ export class AnalyticUnitCache {
obj.data,
);
}
public getIntersection(): number {
if(this.data !== undefined && this.data !== null) {
//TODO: return one window size after resolving https://github.com/hastic/hastic-server/issues/508
return this.data.windowSize * 2 * MILLISECONDS_IN_INDEX;
}
// TODO: default window size
return 3 * MILLISECONDS_IN_INDEX;
}
}
export async function findById(id: AnalyticUnitId): Promise<AnalyticUnitCache> {

140
server/src/models/detection_model.ts

@ -0,0 +1,140 @@
import { AnalyticUnitId } from './analytic_unit_model';
import { Collection, makeDBQ } from '../services/data_service';
import * as _ from 'lodash';
let db = makeDBQ(Collection.DETECTION_SPANS);
export enum DetectionStatus {
READY = 'READY',
RUNNING = 'RUNNING',
FAILED = 'FAILED'
}
export type DetectionId = string;
export class DetectionSpan {
constructor(
public analyticUnitId: AnalyticUnitId,
public from: number,
public to: number,
public status: DetectionStatus,
public id?: DetectionId,
) {
if(analyticUnitId === undefined) {
throw new Error('AnalyticUnitId is undefined');
}
if(from === undefined) {
throw new Error('from is undefined');
}
if(isNaN(from)) {
throw new Error('from is NaN');
}
if(to === undefined) {
throw new Error('to is undefined');
}
if(isNaN(to)) {
throw new Error('to is NaN');
}
if(status === undefined) {
throw new Error('status is undefined');
}
}
public toObject() {
return {
_id: this.id,
analyticUnitId: this.analyticUnitId,
from: this.from,
to: this.to,
status: this.status
};
}
static fromObject(obj: any): DetectionSpan {
if(obj === undefined) {
throw new Error('obj is undefined');
}
return new DetectionSpan(
obj.analyticUnitId,
+obj.from, +obj.to,
obj.status,
obj._id
);
}
}
export type FindManyQuery = {
status?: DetectionStatus,
timeFromLTE?: number,
timeToGTE?: number,
timeFromGTE?: number,
timeToLTE?: number,
}
export async function findMany(id: AnalyticUnitId, query?: FindManyQuery): Promise<DetectionSpan[]> {
let dbQuery: any = { analyticUnitId: id };
if(query.status !== undefined) {
dbQuery.status = query.status;
}
if(query.timeFromLTE !== undefined) {
dbQuery.from = { $lte: query.timeFromLTE };
}
if(query.timeToGTE !== undefined) {
dbQuery.to = { $gte: query.timeToGTE };
}
if(query.timeFromGTE !== undefined) {
dbQuery.from = { $gte: query.timeFromGTE };
}
if(query.timeToLTE !== undefined) {
dbQuery.to = { $lte: query.timeToLTE };
}
const spans = await db.findMany(dbQuery);
if(spans === null) {
return [];
}
return spans.map(DetectionSpan.fromObject);
}
export async function getIntersectedSpans(
analyticUnitId: AnalyticUnitId,
from: number,
to: number,
status?: DetectionStatus
): Promise<DetectionSpan[]> {
return findMany(analyticUnitId, { status, timeFromLTE: to, timeToGTE: from });
}
export async function insertSpan(span: DetectionSpan) {
let spanToInsert = span.toObject();
const intersections = await getIntersectedSpans(span.analyticUnitId, span.from, span.to, span.status);
if(!_.isEmpty(intersections) && span.status === DetectionStatus.READY) {
let minFrom: number = _.minBy(intersections, 'from').from;
minFrom = Math.min(span.from, minFrom);
let maxTo: number = _.maxBy(intersections, 'to').to;
maxTo = Math.max(span.to, maxTo);
const spansInside = await findMany(span.analyticUnitId, { timeFromGTE: minFrom, timeToLTE: maxTo });
const toRemove = _.concat(intersections.map(span => span.id), spansInside.map(span => span.id));
await db.removeMany(toRemove);
spanToInsert = new DetectionSpan(span.analyticUnitId, minFrom, maxTo, span.status).toObject();
}
return db.insertOne(spanToInsert);
}
export function getSpanBorders(spans: DetectionSpan[]): number[] {
let spanBorders: number[] = [];
_.sortBy(spans.map(span => span.toObject()), 'from')
.forEach(span => {
spanBorders.push(span.from);
spanBorders.push(span.to);
});
return spanBorders;
}

48
server/src/routes/detections_router.ts

@ -1,32 +1,14 @@
import * as AnalyticsController from '../controllers/analytics_controller';
import { AnalyticUnitId } from '../models/analytic_unit_model';
import { DetectionSpan } from '../models/detection_model';
import * as Router from 'koa-router';
import * as _ from 'lodash';
// TODO: move this to analytics_controller
export enum DetectionStatus {
READY = 'READY',
RUNNING = 'RUNNING',
FAILED = 'FAILED'
}
// TODO: move this to analytics_controller
declare type DetectionSpan = {
id: AnalyticUnitId,
from: number,
to: number,
status: DetectionStatus
}
declare type DetectionSpansResponse = {
spans: DetectionSpan[]
}
// TODO: move this to analytics_controller
let runningDetectionSpans: DetectionSpan[] = [];
export async function getDetectionSpans(ctx: Router.IRouterContext) {
let id: AnalyticUnitId = ctx.request.query.id;
if(id === undefined || id === '') {
@ -42,31 +24,9 @@ export async function getDetectionSpans(ctx: Router.IRouterContext) {
throw new Error(`to is missing or corrupted (got ${ctx.request.query.to})`);
}
let response: DetectionSpansResponse = { spans: [] }
// TODO: move this to analytics_controller
// TODO: what if we are inside a running span?
// TODO: this find will not find anything because of status field, use an id instead
const previousRun = _.find(runningDetectionSpans, { id, from, to });
if(previousRun !== undefined) {
response.spans.push(previousRun);
}
const currentRun: DetectionSpan = { id, from, to, status: DetectionStatus.RUNNING };
runningDetectionSpans.push(currentRun);
// TODO: move this to analytics_controller
AnalyticsController.runDetect(id, from, to)
.then(() => {
// TODO: this find will not find anything because of status field, use an id instead
_.find(runningDetectionSpans, { id, from, to }).status = DetectionStatus.READY
})
.catch(err => {
console.error(err);
// TODO: this find will not find anything because of status field, use an id instead
_.find(runningDetectionSpans, { id, from, to }).status = DetectionStatus.FAILED;
});
let response: DetectionSpansResponse = { spans: [] };
// TODO: invalidate
response.spans = await AnalyticsController.getDetectionSpans(id, from, to);
ctx.response.body = response;
}

9
server/src/services/data_service.ts

@ -4,7 +4,13 @@ import * as nedb from 'nedb';
import * as fs from 'fs';
export enum Collection { ANALYTIC_UNITS, ANALYTIC_UNIT_CACHES, SEGMENTS, THRESHOLD };
export enum Collection {
ANALYTIC_UNITS,
ANALYTIC_UNIT_CACHES,
SEGMENTS,
THRESHOLD,
DETECTION_SPANS
};
export enum SortingOrder { ASCENDING = 1, DESCENDING = -1 };
@ -213,3 +219,4 @@ db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DAT
db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true }));
db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }));
db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true }));
db.set(Collection.DETECTION_SPANS, new nedb({ filename: config.DETECTION_SPANS_DATABASE_PATH, autoload: true }));

53
server/src/utils/spans.ts

@ -0,0 +1,53 @@
//TODO: move this code to span model
import * as _ from 'lodash';
export declare type Span = {
from: number,
to: number
}
export function getNonIntersectedSpans(from: number, to: number, spanBorders: number[]): Span[] {
// spanBorders array must be sorted ascending
let isFromProcessed = false;
let alreadyDetected = false;
let startDetectionRange = null;
let result: Span[] = [];
for(var border of spanBorders) {
if(!isFromProcessed && border >= from) {
isFromProcessed = true;
if(border === from) {
if(alreadyDetected) {
startDetectionRange = from;
}
} else {
if(!alreadyDetected) {
startDetectionRange = from;
}
}
}
if(border >= to) {
if(!alreadyDetected) {
result.push({ from: startDetectionRange, to });
}
break;
}
if(alreadyDetected) { //end of already detected region, start point for new detection
startDetectionRange = border;
} else { //end of new detection region
if(startDetectionRange !== null) {
result.push({ from: startDetectionRange, to: border});
}
}
alreadyDetected = !alreadyDetected;
}
if(border < to) {
result.push({ from: startDetectionRange, to });
}
return result;
}
Loading…
Cancel
Save