|
|
|
@ -110,24 +110,35 @@ async function runTask(task: AnalyticsTask): Promise<TaskResult> {
|
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: AnalyticUnit.DetectorType, range?: any) { |
|
|
|
|
if(range === undefined) { |
|
|
|
|
if(detector === AnalyticUnit.DetectorType.PATTERN) { |
|
|
|
|
// TODO: find labeled OR deleted segments to generate timerange
|
|
|
|
|
const segments = await Segment.findMany(analyticUnit.id, { labeled: true }); |
|
|
|
|
if(segments.length === 0) { |
|
|
|
|
throw new Error('Need at least 1 labeled segment'); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
range = getQueryRangeForLearningBySegments(segments); |
|
|
|
|
} else if(detector === AnalyticUnit.DetectorType.THRESHOLD) { |
|
|
|
|
const now = Date.now(); |
|
|
|
|
range = { |
|
|
|
|
from: now - 5 * SECONDS_IN_MINUTE * 1000, |
|
|
|
|
to: now |
|
|
|
|
}; |
|
|
|
|
async function getQueryRange( |
|
|
|
|
analyticUnitId: AnalyticUnit.AnalyticUnitId, |
|
|
|
|
detectorType: AnalyticUnit.DetectorType |
|
|
|
|
): Promise<{ from: number, to: number }> { |
|
|
|
|
if(detectorType === AnalyticUnit.DetectorType.PATTERN) { |
|
|
|
|
// TODO: find labeled OR deleted segments to generate timerange
|
|
|
|
|
const segments = await Segment.findMany(analyticUnitId, { labeled: true }); |
|
|
|
|
if(segments.length === 0) { |
|
|
|
|
throw new Error('Need at least 1 labeled segment'); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return getQueryRangeForLearningBySegments(segments); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(detectorType === AnalyticUnit.DetectorType.THRESHOLD) { |
|
|
|
|
const now = Date.now(); |
|
|
|
|
return { |
|
|
|
|
from: now - 5 * SECONDS_IN_MINUTE * 1000, |
|
|
|
|
to: now |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
throw new Error(`Cannot get query range for detector type ${detectorType}`); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async function query( |
|
|
|
|
analyticUnit: AnalyticUnit.AnalyticUnit, |
|
|
|
|
range: { from: number, to: number } |
|
|
|
|
) { |
|
|
|
|
console.log(`query time range: from ${new Date(range.from)} to ${new Date(range.to)}`); |
|
|
|
|
|
|
|
|
|
const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl); |
|
|
|
@ -219,7 +230,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
|
|
|
|
|
taskPayload.threshold = threshold; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
taskPayload.data = await query(analyticUnit, detector); |
|
|
|
|
const range = await getQueryRange(id, detector); |
|
|
|
|
taskPayload.data = await query(analyticUnit, range); |
|
|
|
|
|
|
|
|
|
let task = new AnalyticsTask( |
|
|
|
|
id, AnalyticsTaskType.LEARN, taskPayload |
|
|
|
@ -251,8 +263,10 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
|
|
|
|
|
let range; |
|
|
|
|
if(from !== undefined && to !== undefined) { |
|
|
|
|
range = { from, to }; |
|
|
|
|
} else { |
|
|
|
|
range = await getQueryRange(id, detector); |
|
|
|
|
} |
|
|
|
|
const data = await query(unit, detector, range); |
|
|
|
|
const data = await query(unit, range); |
|
|
|
|
|
|
|
|
|
let oldCache = await AnalyticUnitCache.findById(id); |
|
|
|
|
if(oldCache !== null) { |
|
|
|
@ -268,31 +282,30 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
|
|
|
|
|
console.log(`run task, id:${id}`); |
|
|
|
|
// TODO: status: detection
|
|
|
|
|
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); |
|
|
|
|
let result = await runTask(task); |
|
|
|
|
if(range !== undefined) { |
|
|
|
|
if( |
|
|
|
|
result.status === AnalyticUnit.AnalyticUnitStatus.SUCCESS || |
|
|
|
|
result.status === AnalyticUnit.AnalyticUnitStatus.READY |
|
|
|
|
) { |
|
|
|
|
await Detection.insertSpan( |
|
|
|
|
new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.READY) |
|
|
|
|
); |
|
|
|
|
} else if (result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { |
|
|
|
|
await Detection.insertSpan( |
|
|
|
|
new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.FAILED) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
const result = await runTask(task); |
|
|
|
|
|
|
|
|
|
if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { |
|
|
|
|
await Detection.insertSpan( |
|
|
|
|
new Detection.DetectionSpan(id, range.from, range.to, Detection.DetectionStatus.FAILED) |
|
|
|
|
); |
|
|
|
|
throw new Error(result.error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let payload = await processDetectionResult(id, result.payload); |
|
|
|
|
const payload = await processDetectionResult(id, result.payload); |
|
|
|
|
const cache = AnalyticUnitCache.AnalyticUnitCache.fromObject({ _id: id, data: payload.cache }); |
|
|
|
|
const intersection = cache.getIntersection(); |
|
|
|
|
await Detection.insertSpan( |
|
|
|
|
new Detection.DetectionSpan( |
|
|
|
|
id, |
|
|
|
|
range.from + intersection, |
|
|
|
|
range.to - intersection, |
|
|
|
|
Detection.DetectionStatus.READY |
|
|
|
|
) |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
// TODO: uncomment it
|
|
|
|
|
// It clears segments when redetecting on another timerange
|
|
|
|
|
// await deleteNonDetectedSegments(id, payload);
|
|
|
|
|
|
|
|
|
|
await Promise.all([ |
|
|
|
|
Segment.insertSegments(payload.segments), |
|
|
|
|
AnalyticUnitCache.setData(id, payload.cache), |
|
|
|
@ -476,7 +489,7 @@ export async function getDetectionSpans(
|
|
|
|
|
|
|
|
|
|
if(_.isEmpty(readySpans)) { |
|
|
|
|
const span = await runDetectionOnExtendedSpan(analyticUnitId, from, to, analyticUnitCache); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(span === null) { |
|
|
|
|
return []; |
|
|
|
|
} else { |
|
|
|
@ -524,13 +537,13 @@ async function runDetectionOnExtendedSpan(
|
|
|
|
|
const intersection = analyticUnitCache.getIntersection(); |
|
|
|
|
|
|
|
|
|
const intersectedFrom = Math.max(from - intersection, 0); |
|
|
|
|
const intersectedTo = to + intersection |
|
|
|
|
const intersectedTo = to + intersection; |
|
|
|
|
runDetect(analyticUnitId, intersectedFrom, intersectedTo); |
|
|
|
|
|
|
|
|
|
const detection = new Detection.DetectionSpan( |
|
|
|
|
analyticUnitId, |
|
|
|
|
intersectedFrom, |
|
|
|
|
intersectedTo, |
|
|
|
|
from, |
|
|
|
|
to, |
|
|
|
|
Detection.DetectionStatus.RUNNING |
|
|
|
|
); |
|
|
|
|
await Detection.insertSpan(detection); |
|
|
|
|