diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index f6218c3..cc7866a 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -110,24 +110,35 @@ async function runTask(task: AnalyticsTask): Promise { }); } -async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: AnalyticUnit.DetectorType, range?: any) { - if(range === undefined) { - if(detector === AnalyticUnit.DetectorType.PATTERN) { - // TODO: find labeled OR deleted segments to generate timerange - const segments = await Segment.findMany(analyticUnit.id, { labeled: true }); - if(segments.length === 0) { - throw new Error('Need at least 1 labeled segment'); - } - - range = getQueryRangeForLearningBySegments(segments); - } else if(detector === AnalyticUnit.DetectorType.THRESHOLD) { - const now = Date.now(); - range = { - from: now - 5 * SECONDS_IN_MINUTE * 1000, - to: now - }; +async function getQueryRange( + analyticUnitId: AnalyticUnit.AnalyticUnitId, + detectorType: AnalyticUnit.DetectorType +): Promise<{ from: number, to: number }> { + if(detectorType === AnalyticUnit.DetectorType.PATTERN) { + // TODO: find labeled OR deleted segments to generate timerange + const segments = await Segment.findMany(analyticUnitId, { labeled: true }); + if(segments.length === 0) { + throw new Error('Need at least 1 labeled segment'); } + + return getQueryRangeForLearningBySegments(segments); } + + if(detectorType === AnalyticUnit.DetectorType.THRESHOLD) { + const now = Date.now(); + return { + from: now - 5 * SECONDS_IN_MINUTE * 1000, + to: now + }; + } + + throw new Error(`Cannot get query range for detector type ${detectorType}`); +} + +async function query( + analyticUnit: AnalyticUnit.AnalyticUnit, + range: { from: number, to: number } +) { console.log(`query time range: from ${new Date(range.from)} to ${new Date(range.to)}`); const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl); @@ -219,7 +230,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { taskPayload.threshold = threshold; } - taskPayload.data = await query(analyticUnit, detector); + const range = await getQueryRange(id, detector); + taskPayload.data = await query(analyticUnit, range); let task = new AnalyticsTask( id, AnalyticsTaskType.LEARN, taskPayload @@ -251,8 +263,10 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number, let range; if(from !== undefined && to !== undefined) { range = { from, to }; + } else { + range = await getQueryRange(id, detector); } - const data = await query(unit, detector, range); + const data = await query(unit, range); let oldCache = await AnalyticUnitCache.findById(id); if(oldCache !== null) { @@ -268,31 +282,30 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number, console.log(`run task, id:${id}`); // TODO: status: detection await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); - let result = await runTask(task); - if(range !== undefined) { - if( - result.status === AnalyticUnit.AnalyticUnitStatus.SUCCESS || - result.status === AnalyticUnit.AnalyticUnitStatus.READY - ) { - await Detection.insertSpan( - new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.READY) - ); - } else if (result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { - await Detection.insertSpan( - new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.FAILED) - ); - } - } + const result = await runTask(task); + if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { + await Detection.insertSpan( + new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.FAILED) + ); throw new Error(result.error); } - let payload = await processDetectionResult(id, result.payload); + const payload = await processDetectionResult(id, result.payload); + const cache = AnalyticUnitCache.AnalyticUnitCache.fromObject({ _id: id, data: payload.cache }); + const intersection = cache.getIntersection(); + await Detection.insertSpan( + new Detection.DetectionSpan( + id, + range.from + intersection, + range.to - intersection, + Detection.DetectionStatus.READY + ) + ); // TODO: uncomment it // It clears segments when redetecting on another timerange // await deleteNonDetectedSegments(id, payload); - await Promise.all([ Segment.insertSegments(payload.segments), AnalyticUnitCache.setData(id, payload.cache), @@ -476,7 +489,7 @@ export async function getDetectionSpans( if(_.isEmpty(readySpans)) { const span = await runDetectionOnExtendedSpan(analyticUnitId, from, to, analyticUnitCache); - + if(span === null) { return []; } else { @@ -524,13 +537,13 @@ async function runDetectionOnExtendedSpan( const intersection = analyticUnitCache.getIntersection(); const intersectedFrom = Math.max(from - intersection, 0); - const intersectedTo = to + intersection + const intersectedTo = to + intersection; runDetect(analyticUnitId, intersectedFrom, intersectedTo); const detection = new Detection.DetectionSpan( analyticUnitId, - intersectedFrom, - intersectedTo, + from, + to, Detection.DetectionStatus.RUNNING ); await Detection.insertSpan(detection);