Browse Source

Set ready status for span after inserting segments (#648)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
fc11f1d012
  1. 50
      server/src/controllers/analytics_controller.ts

50
server/src/controllers/analytics_controller.ts

@ -277,6 +277,16 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number, to?: number) {
let previousLastDetectionTime: number = undefined;
let range: TimeRange;
let intersection = 0;
let oldCache = await AnalyticUnitCache.findById(id);
if(oldCache !== null) {
intersection = oldCache.getIntersection();
oldCache = oldCache.data;
} else {
await AnalyticUnitCache.create(id);
}
try {
let unit = await AnalyticUnit.findById(id);
@ -284,7 +294,6 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
let analyticUnitType = unit.type;
const detector = unit.detectorType;
let range: TimeRange;
if(from !== undefined && to !== undefined) {
range = { from, to };
} else {
@ -292,12 +301,6 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
}
const data = await query(unit, range);
let oldCache = await AnalyticUnitCache.findById(id);
if(oldCache !== null) {
oldCache = oldCache.data;
} else {
await AnalyticUnitCache.create(id);
}
let task = new AnalyticsTask(
id,
AnalyticsTaskType.DETECT,
@ -309,23 +312,11 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
const result = await runTask(task);
if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) {
await Detection.insertSpan(
new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.FAILED)
);
throw new Error(result.error);
}
const payload = await processDetectionResult(id, result.payload);
const cache = AnalyticUnitCache.AnalyticUnitCache.fromObject({ _id: id, data: payload.cache });
const intersection = cache.getIntersection();
await Detection.insertSpan(
new Detection.DetectionSpan(
id,
range.from + intersection,
range.to - intersection,
Detection.DetectionStatus.READY
)
);
// TODO: uncomment it
// It clears segments when redetecting on another timerange
@ -336,12 +327,29 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime),
]);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
await Detection.insertSpan(
new Detection.DetectionSpan(
id,
range.from + intersection,
range.to - intersection,
Detection.DetectionStatus.READY
)
);
} catch(err) {
let message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);
// TODO: maybe we don't need to update detectionTime with previous value?
if(previousLastDetectionTime !== undefined) {
await AnalyticUnit.setDetectionTime(id, previousLastDetectionTime);
}
let message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);
await Detection.insertSpan(
new Detection.DetectionSpan(
id,
range.from + intersection,
range.to - intersection,
Detection.DetectionStatus.FAILED
)
);
}
}

Loading…
Cancel
Save