|
|
|
@ -13,7 +13,7 @@ import * as _ from 'lodash';
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type TaskResult = any; |
|
|
|
|
type PredictionResult = any; |
|
|
|
|
type DetectionResult = any; |
|
|
|
|
export type TaskResolver = (taskResult: TaskResult) => void; |
|
|
|
|
|
|
|
|
|
const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>(); |
|
|
|
@ -38,8 +38,8 @@ function onTaskResult(taskResult: TaskResult) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
function onPredict(predictionResult: PredictionResult) { |
|
|
|
|
processPredictionResult(predictionResult.analyticUnitId, predictionResult); |
|
|
|
|
function onDetect(detectionResult: DetectionResult) { |
|
|
|
|
processDetectionResult(detectionResult.analyticUnitId, detectionResult); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async function onMessage(message: AnalyticsMessage) { |
|
|
|
@ -52,7 +52,7 @@ async function onMessage(message: AnalyticsMessage) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if(message.method === AnalyticsMessageMethod.DETECT) { |
|
|
|
|
onPredict(message.payload); |
|
|
|
|
onDetect(message.payload); |
|
|
|
|
methodResolved = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -154,12 +154,12 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { |
|
|
|
|
let previousLastPredictionTime: number = undefined; |
|
|
|
|
export async function runDetect(id: AnalyticUnit.AnalyticUnitId) { |
|
|
|
|
let previousLastDetectionTime: number = undefined; |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
let unit = await AnalyticUnit.findById(id); |
|
|
|
|
previousLastPredictionTime = unit.lastDetectionTime; |
|
|
|
|
previousLastDetectionTime = unit.lastDetectionTime; |
|
|
|
|
let pattern = unit.type; |
|
|
|
|
|
|
|
|
|
let segments = await Segment.findMany(id, { labeled: true }); |
|
|
|
@ -172,7 +172,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
|
|
|
|
|
let queryResult = await queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY); |
|
|
|
|
let data = queryResult.values; |
|
|
|
|
if(data.length === 0) { |
|
|
|
|
throw new Error('Empty data to predict on'); |
|
|
|
|
throw new Error('Empty data to detect on'); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let oldCache = await AnalyticUnitCache.findById(id); |
|
|
|
@ -184,7 +184,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
|
|
|
|
|
let task = new AnalyticsTask( |
|
|
|
|
id, |
|
|
|
|
AnalyticsTaskType.DETECT, |
|
|
|
|
{ pattern, lastPredictionTime: unit.lastDetectionTime, data, cache: oldCache } |
|
|
|
|
{ pattern, lastDetectionTime: unit.lastDetectionTime, data, cache: oldCache } |
|
|
|
|
); |
|
|
|
|
console.debug(`run task, id:${id}`); |
|
|
|
|
let result = await runTask(task); |
|
|
|
@ -192,7 +192,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
|
|
|
|
|
return []; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let payload = processPredictionResult(id, result.payload); |
|
|
|
|
let payload = processDetectionResult(id, result.payload); |
|
|
|
|
|
|
|
|
|
// TODO: implement segments merging without removing labeled
|
|
|
|
|
// if(segments.length > 0 && payload.segments.length > 0) {
|
|
|
|
@ -205,17 +205,17 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
await deleteNonpredictedSegments(id, payload); |
|
|
|
|
await deleteNonDetectedSegments(id, payload); |
|
|
|
|
|
|
|
|
|
Segment.insertSegments(payload.segments); |
|
|
|
|
AnalyticUnitCache.setData(id, payload.cache); |
|
|
|
|
AnalyticUnit.setDetectionTime(id, payload.lastPredictionTime); |
|
|
|
|
AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime); |
|
|
|
|
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); |
|
|
|
|
} catch(err) { |
|
|
|
|
let message = err.message || JSON.stringify(err); |
|
|
|
|
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); |
|
|
|
|
if(previousLastPredictionTime !== undefined) { |
|
|
|
|
await AnalyticUnit.setDetectionTime(id, previousLastPredictionTime); |
|
|
|
|
if(previousLastDetectionTime !== undefined) { |
|
|
|
|
await AnalyticUnit.setDetectionTime(id, previousLastDetectionTime); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -226,36 +226,36 @@ export async function remove(id: AnalyticUnit.AnalyticUnitId) {
|
|
|
|
|
await AnalyticUnit.remove(id); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
export async function deleteNonpredictedSegments(id, payload) { |
|
|
|
|
let lastPredictedSegments = await Segment.findMany(id, { labeled: false, deleted: false }); |
|
|
|
|
export async function deleteNonDetectedSegments(id, payload) { |
|
|
|
|
let lastDetectedSegments = await Segment.findMany(id, { labeled: false, deleted: false }); |
|
|
|
|
let segmentsToRemove: Segment.Segment[]; |
|
|
|
|
segmentsToRemove = _.differenceWith(lastPredictedSegments, payload.segments, (a, b: Segment.Segment) => a.equals(b)); |
|
|
|
|
segmentsToRemove = _.differenceWith(lastDetectedSegments, payload.segments, (a, b: Segment.Segment) => a.equals(b)); |
|
|
|
|
Segment.removeSegments(segmentsToRemove.map(s => s.id)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
function processPredictionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, predictionResult: PredictionResult): { |
|
|
|
|
lastPredictionTime: number, |
|
|
|
|
function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult): { |
|
|
|
|
lastDetectionTime: number, |
|
|
|
|
segments: Segment.Segment[], |
|
|
|
|
cache: any |
|
|
|
|
} { |
|
|
|
|
|
|
|
|
|
if (predictionResult.segments === undefined || !Array.isArray(predictionResult.segments)) { |
|
|
|
|
throw new Error(`Missing segments in result or it is corrupted: ${JSON.stringify(predictionResult)}`); |
|
|
|
|
if (detectionResult.segments === undefined || !Array.isArray(detectionResult.segments)) { |
|
|
|
|
throw new Error(`Missing segments in result or it is corrupted: ${JSON.stringify(detectionResult)}`); |
|
|
|
|
} |
|
|
|
|
if (predictionResult.lastPredictionTime === undefined || isNaN(+predictionResult.lastPredictionTime)) { |
|
|
|
|
if (detectionResult.lastDetectionTime === undefined || isNaN(+detectionResult.lastDetectionTime)) { |
|
|
|
|
throw new Error( |
|
|
|
|
`Missing lastPredictionTime is result or it is corrupted: ${JSON.stringify(predictionResult)}` |
|
|
|
|
`Missing lastDetectionTime in result or it is corrupted: ${JSON.stringify(detectionResult)}` |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let segments = predictionResult.segments.map( |
|
|
|
|
let segments = detectionResult.segments.map( |
|
|
|
|
segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false) |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
return { |
|
|
|
|
lastPredictionTime: predictionResult.lastPredictionTime, |
|
|
|
|
lastDetectionTime: detectionResult.lastDetectionTime, |
|
|
|
|
segments: segments, |
|
|
|
|
cache: predictionResult.cache |
|
|
|
|
cache: detectionResult.cache |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
} |
|
|
|
@ -287,6 +287,6 @@ export async function updateSegments(
|
|
|
|
|
|
|
|
|
|
// TODO: move setting status somehow "inside" learning
|
|
|
|
|
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING); |
|
|
|
|
runLearning(id).then(() => runPredict(id)); |
|
|
|
|
runLearning(id).then(() => runDetect(id)); |
|
|
|
|
return { addedIds, removed }; |
|
|
|
|
} |
|
|
|
|