diff --git a/server/spec/utils/spans.jest.ts b/server/spec/utils/spans.jest.ts new file mode 100644 index 0000000..efc2cdb --- /dev/null +++ b/server/spec/utils/spans.jest.ts @@ -0,0 +1,35 @@ +import { getNonIntersectedSpans } from '../../src/utils/spans'; + +import 'jest'; + +describe('getNonIntersectedSpans', function() { + + let spanBorders = [3, 5, 6, 8, 10, 20]; + + it('functional test', function() { + expect(getNonIntersectedSpans(4, 11, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(5, 11, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(4, 10, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(5, 10, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(4, 20, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(4, 21, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}, {from: 20, to: 21}]); + expect(getNonIntersectedSpans(2, 20, spanBorders)).toEqual([{from: 2, to: 3}, {from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(2, 21, spanBorders)).toEqual([{from: 2, to: 3}, {from: 5, to: 6}, {from: 8, to: 10}, {from: 20, to: 21}]); + expect(getNonIntersectedSpans(3, 11, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(3, 20, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(3, 20, spanBorders)).toEqual([{from: 5, to: 6}, {from: 8, to: 10}]); + expect(getNonIntersectedSpans(4, 7, [3, 5, 6, 8])).toEqual([{from: 5, to: 6}]); + }); + + it('empty borders list', function() { + expect(getNonIntersectedSpans(4, 10, [])).toEqual([]); + }); + + it('all in span', function() { + expect(getNonIntersectedSpans(4, 10, [1, 20])).toEqual([]); + expect(getNonIntersectedSpans(4, 10, [1, 10])).toEqual([]); + expect(getNonIntersectedSpans(4, 10, [4, 20])).toEqual([]); + expect(getNonIntersectedSpans(4, 10, [4, 10])).toEqual([]); + }); + +}); diff --git a/server/src/config.ts b/server/src/config.ts index 07b26ec..4616142 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -17,6 +17,7 @@ export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units export const ANALYTIC_UNIT_CACHES_DATABASE_PATH = path.join(DATA_PATH, 'analytic_unit_caches.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const THRESHOLD_DATABASE_PATH = path.join(DATA_PATH, 'treshold.db'); +export const DETECTION_SPANS_DATABASE_PATH = path.join(DATA_PATH, 'detection_spans.db'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index e00bd38..682df96 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -4,11 +4,13 @@ import * as AnalyticUnitCache from '../models/analytic_unit_cache_model'; import * as Segment from '../models/segment_model'; import * as Threshold from '../models/threshold_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; +import * as Detection from '../models/detection_model'; import { AnalyticsService } from '../services/analytics_service'; import { AlertService } from '../services/alert_service'; import { HASTIC_API_KEY } from '../config'; import { DataPuller } from '../services/data_puller'; import { getGrafanaUrl } from '../utils/grafana'; +import { getNonIntersectedSpans } from '../utils/spans'; import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit'; @@ -264,14 +266,32 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number, { detector, analyticUnitType, lastDetectionTime: unit.lastDetectionTime, data, cache: oldCache } ); console.log(`run task, id:${id}`); + // TODO: status: detection + await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); let result = await runTask(task); + if(range !== undefined) { + if( + result.status === AnalyticUnit.AnalyticUnitStatus.SUCCESS || + result.status === AnalyticUnit.AnalyticUnitStatus.READY + ) { + await Detection.insertSpan( + new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.READY) + ); + } else if (result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { + await Detection.insertSpan( + new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.FAILED) + ); + } + } if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { throw new Error(result.error); } let payload = await processDetectionResult(id, result.payload); - await deleteNonDetectedSegments(id, payload); + // TODO: uncomment it + // It clears segments when redetecting on another timerange + // await deleteNonDetectedSegments(id, payload); await Promise.all([ Segment.insertSegments(payload.segments), @@ -442,3 +462,76 @@ export async function runLearningWithDetection(id: AnalyticUnit.AnalyticUnitId) .then(() => runDetect(id)) .catch(err => console.error(err)); } + +export async function getDetectionSpans( + analyticUnitId: AnalyticUnit.AnalyticUnitId, + from: number, + to: number +): Promise { + const readySpans = await Detection.getIntersectedSpans(analyticUnitId, from, to, Detection.DetectionStatus.READY); + const alreadyRunningSpans = await Detection.getIntersectedSpans(analyticUnitId, from, to, Detection.DetectionStatus.RUNNING); + + const analyticUnitCache = await AnalyticUnitCache.findById(analyticUnitId); + + if(_.isEmpty(readySpans)) { + const span = await runDetectionOnExtendedSpan(analyticUnitId, from, to, analyticUnitCache); + + if(span === null) { + return []; + } else { + return [span]; + } + } + + const spanBorders = Detection.getSpanBorders(readySpans); + + let newDetectionSpans = getNonIntersectedSpans(from, to, spanBorders); + if(newDetectionSpans.length === 0) { + return [ new Detection.DetectionSpan(analyticUnitId, from, to, Detection.DetectionStatus.READY) ]; + } + + let runningSpansPromises = []; + let newRunningSpans: Detection.DetectionSpan[] = []; + runningSpansPromises = newDetectionSpans.map(async span => { + const insideRunning = await Detection.findMany(analyticUnitId, { + status: Detection.DetectionStatus.RUNNING, + timeFromLTE: span.from, + timeToGTE: span.to + }); + + if(_.isEmpty(insideRunning)) { + const runningSpan = await runDetectionOnExtendedSpan(analyticUnitId, span.from, span.to, analyticUnitCache); + newRunningSpans.push(runningSpan); + } + }); + + await Promise.all(runningSpansPromises); + + return _.concat(readySpans, alreadyRunningSpans, newRunningSpans.filter(span => span !== null)); +} + +async function runDetectionOnExtendedSpan( + analyticUnitId: AnalyticUnit.AnalyticUnitId, + from: number, + to: number, + analyticUnitCache: AnalyticUnitCache.AnalyticUnitCache +): Promise { + if(analyticUnitCache === null) { + return null; + } + + const intersection = analyticUnitCache.getIntersection(); + + const intersectedFrom = Math.max(from - intersection, 0); + const intersectedTo = to + intersection + runDetect(analyticUnitId, intersectedFrom, intersectedTo); + + const detection = new Detection.DetectionSpan( + analyticUnitId, + intersectedFrom, + intersectedTo, + Detection.DetectionStatus.RUNNING + ); + await Detection.insertSpan(detection); + return detection; +} diff --git a/server/src/models/analytic_unit_cache_model.ts b/server/src/models/analytic_unit_cache_model.ts index 3957bd8..0c3fa56 100644 --- a/server/src/models/analytic_unit_cache_model.ts +++ b/server/src/models/analytic_unit_cache_model.ts @@ -2,8 +2,9 @@ import { AnalyticUnitId } from "./analytic_unit_model"; import { Collection, makeDBQ } from '../services/data_service'; -let db = makeDBQ(Collection.ANALYTIC_UNIT_CACHES); - +const db = makeDBQ(Collection.ANALYTIC_UNIT_CACHES); +// TODO: count milliseconds in index from dataset +const MILLISECONDS_IN_INDEX = 60000; export class AnalyticUnitCache { public constructor( @@ -28,6 +29,16 @@ export class AnalyticUnitCache { obj.data, ); } + + public getIntersection(): number { + if(this.data !== undefined && this.data !== null) { + //TODO: return one window size after resolving https://github.com/hastic/hastic-server/issues/508 + return this.data.windowSize * 2 * MILLISECONDS_IN_INDEX; + } + // TODO: default window size + return 3 * MILLISECONDS_IN_INDEX; + } + } export async function findById(id: AnalyticUnitId): Promise { diff --git a/server/src/models/detection_model.ts b/server/src/models/detection_model.ts new file mode 100644 index 0000000..27f786a --- /dev/null +++ b/server/src/models/detection_model.ts @@ -0,0 +1,140 @@ +import { AnalyticUnitId } from './analytic_unit_model'; +import { Collection, makeDBQ } from '../services/data_service'; + +import * as _ from 'lodash'; + +let db = makeDBQ(Collection.DETECTION_SPANS); + +export enum DetectionStatus { + READY = 'READY', + RUNNING = 'RUNNING', + FAILED = 'FAILED' +} + +export type DetectionId = string; + +export class DetectionSpan { + constructor( + public analyticUnitId: AnalyticUnitId, + public from: number, + public to: number, + public status: DetectionStatus, + public id?: DetectionId, + ) { + if(analyticUnitId === undefined) { + throw new Error('AnalyticUnitId is undefined'); + } + if(from === undefined) { + throw new Error('from is undefined'); + } + if(isNaN(from)) { + throw new Error('from is NaN'); + } + if(to === undefined) { + throw new Error('to is undefined'); + } + if(isNaN(to)) { + throw new Error('to is NaN'); + } + if(status === undefined) { + throw new Error('status is undefined'); + } + } + + public toObject() { + return { + _id: this.id, + analyticUnitId: this.analyticUnitId, + from: this.from, + to: this.to, + status: this.status + }; + } + + static fromObject(obj: any): DetectionSpan { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new DetectionSpan( + obj.analyticUnitId, + +obj.from, +obj.to, + obj.status, + obj._id + ); + } +} + +export type FindManyQuery = { + status?: DetectionStatus, + timeFromLTE?: number, + timeToGTE?: number, + timeFromGTE?: number, + timeToLTE?: number, +} + +export async function findMany(id: AnalyticUnitId, query?: FindManyQuery): Promise { + let dbQuery: any = { analyticUnitId: id }; + if(query.status !== undefined) { + dbQuery.status = query.status; + } + if(query.timeFromLTE !== undefined) { + dbQuery.from = { $lte: query.timeFromLTE }; + } + if(query.timeToGTE !== undefined) { + dbQuery.to = { $gte: query.timeToGTE }; + } + if(query.timeFromGTE !== undefined) { + dbQuery.from = { $gte: query.timeFromGTE }; + } + if(query.timeToLTE !== undefined) { + dbQuery.to = { $lte: query.timeToLTE }; + } + + const spans = await db.findMany(dbQuery); + if(spans === null) { + return []; + } + return spans.map(DetectionSpan.fromObject); +} + +export async function getIntersectedSpans( + analyticUnitId: AnalyticUnitId, + from: number, + to: number, + status?: DetectionStatus +): Promise { + return findMany(analyticUnitId, { status, timeFromLTE: to, timeToGTE: from }); +} + +export async function insertSpan(span: DetectionSpan) { + let spanToInsert = span.toObject(); + + const intersections = await getIntersectedSpans(span.analyticUnitId, span.from, span.to, span.status); + if(!_.isEmpty(intersections) && span.status === DetectionStatus.READY) { + let minFrom: number = _.minBy(intersections, 'from').from; + minFrom = Math.min(span.from, minFrom); + + let maxTo: number = _.maxBy(intersections, 'to').to; + maxTo = Math.max(span.to, maxTo); + + const spansInside = await findMany(span.analyticUnitId, { timeFromGTE: minFrom, timeToLTE: maxTo }); + const toRemove = _.concat(intersections.map(span => span.id), spansInside.map(span => span.id)); + + await db.removeMany(toRemove); + + spanToInsert = new DetectionSpan(span.analyticUnitId, minFrom, maxTo, span.status).toObject(); + } + return db.insertOne(spanToInsert); +} + +export function getSpanBorders(spans: DetectionSpan[]): number[] { + let spanBorders: number[] = []; + + _.sortBy(spans.map(span => span.toObject()), 'from') + .forEach(span => { + spanBorders.push(span.from); + spanBorders.push(span.to); + }); + + return spanBorders; +} diff --git a/server/src/routes/detections_router.ts b/server/src/routes/detections_router.ts index 9680077..9dd3811 100644 --- a/server/src/routes/detections_router.ts +++ b/server/src/routes/detections_router.ts @@ -1,32 +1,14 @@ import * as AnalyticsController from '../controllers/analytics_controller'; import { AnalyticUnitId } from '../models/analytic_unit_model'; +import { DetectionSpan } from '../models/detection_model'; import * as Router from 'koa-router'; -import * as _ from 'lodash'; -// TODO: move this to analytics_controller -export enum DetectionStatus { - READY = 'READY', - RUNNING = 'RUNNING', - FAILED = 'FAILED' -} - -// TODO: move this to analytics_controller -declare type DetectionSpan = { - id: AnalyticUnitId, - from: number, - to: number, - status: DetectionStatus -} - declare type DetectionSpansResponse = { spans: DetectionSpan[] } -// TODO: move this to analytics_controller -let runningDetectionSpans: DetectionSpan[] = []; - export async function getDetectionSpans(ctx: Router.IRouterContext) { let id: AnalyticUnitId = ctx.request.query.id; if(id === undefined || id === '') { @@ -42,31 +24,9 @@ export async function getDetectionSpans(ctx: Router.IRouterContext) { throw new Error(`to is missing or corrupted (got ${ctx.request.query.to})`); } - let response: DetectionSpansResponse = { spans: [] } - - // TODO: move this to analytics_controller - // TODO: what if we are inside a running span? - // TODO: this find will not find anything because of status field, use an id instead - const previousRun = _.find(runningDetectionSpans, { id, from, to }); - if(previousRun !== undefined) { - response.spans.push(previousRun); - } - - const currentRun: DetectionSpan = { id, from, to, status: DetectionStatus.RUNNING }; - runningDetectionSpans.push(currentRun); - - // TODO: move this to analytics_controller - AnalyticsController.runDetect(id, from, to) - .then(() => { - // TODO: this find will not find anything because of status field, use an id instead - _.find(runningDetectionSpans, { id, from, to }).status = DetectionStatus.READY - }) - .catch(err => { - console.error(err); - // TODO: this find will not find anything because of status field, use an id instead - _.find(runningDetectionSpans, { id, from, to }).status = DetectionStatus.FAILED; - }); - + let response: DetectionSpansResponse = { spans: [] }; + // TODO: invalidate + response.spans = await AnalyticsController.getDetectionSpans(id, from, to); ctx.response.body = response; } diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 197d273..ddb2c34 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -4,7 +4,13 @@ import * as nedb from 'nedb'; import * as fs from 'fs'; -export enum Collection { ANALYTIC_UNITS, ANALYTIC_UNIT_CACHES, SEGMENTS, THRESHOLD }; +export enum Collection { + ANALYTIC_UNITS, + ANALYTIC_UNIT_CACHES, + SEGMENTS, + THRESHOLD, + DETECTION_SPANS +}; export enum SortingOrder { ASCENDING = 1, DESCENDING = -1 }; @@ -213,3 +219,4 @@ db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DAT db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true })); db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true })); db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true })); +db.set(Collection.DETECTION_SPANS, new nedb({ filename: config.DETECTION_SPANS_DATABASE_PATH, autoload: true })); diff --git a/server/src/utils/spans.ts b/server/src/utils/spans.ts new file mode 100644 index 0000000..606f1fe --- /dev/null +++ b/server/src/utils/spans.ts @@ -0,0 +1,53 @@ +//TODO: move this code to span model + +import * as _ from 'lodash'; + +export declare type Span = { + from: number, + to: number +} + +export function getNonIntersectedSpans(from: number, to: number, spanBorders: number[]): Span[] { + // spanBorders array must be sorted ascending + let isFromProcessed = false; + let alreadyDetected = false; + let startDetectionRange = null; + let result: Span[] = []; + + for(var border of spanBorders) { + if(!isFromProcessed && border >= from) { + isFromProcessed = true; + if(border === from) { + if(alreadyDetected) { + startDetectionRange = from; + } + } else { + if(!alreadyDetected) { + startDetectionRange = from; + } + } + } + + if(border >= to) { + if(!alreadyDetected) { + result.push({ from: startDetectionRange, to }); + } + break; + } + + if(alreadyDetected) { //end of already detected region, start point for new detection + startDetectionRange = border; + } else { //end of new detection region + if(startDetectionRange !== null) { + result.push({ from: startDetectionRange, to: border}); + } + } + alreadyDetected = !alreadyDetected; + } + + if(border < to) { + result.push({ from: startDetectionRange, to }); + } + + return result; +}