Browse Source

Merge segments before sending a webhook #742 (#743)

pull/1/head
rozetko 6 years ago committed by Alexey Velikiy
parent
commit
0c00dafd0a
  1. 67
      server/spec/analytic_controller.jest.ts
  2. 62
      server/spec/segments.jest.ts
  3. 15
      server/spec/utils_for_tests/segments.ts
  4. 48
      server/src/controllers/analytics_controller.ts
  5. 121
      server/src/models/segment_model.ts

67
server/spec/analytic_controller.jest.ts

@ -7,9 +7,11 @@ jest.mock('grafana-datasource-kit', () => (
}
));
import { saveAnalyticUnitFromObject, runDetect } from '../src/controllers/analytics_controller';
import { saveAnalyticUnitFromObject, runDetect, onDetect } from '../src/controllers/analytics_controller';
import * as AnalyticUnit from '../src/models/analytic_units';
import * as AnalyticUnitCache from '../src/models/analytic_unit_cache_model';
import * as Segment from '../src/models/segment_model';
import { buildSegments, clearDB, TEST_ANALYTIC_UNIT_ID } from './utils_for_tests/segments';
import { HASTIC_API_KEY } from '../src/config';
@ -88,3 +90,66 @@ describe('Check detection range', function() {
expect(queryByMetric).toBeCalledWith(analyticUnitObj.metric, undefined, expectedFrom, to, HASTIC_API_KEY);
});
});
describe('onDetect', () => {
const INITIAL_SEGMENTS = buildSegments([[0, 1], [2, 3], [4, 5]]);
beforeAll(async () => {
clearDB();
await AnalyticUnit.create(
AnalyticUnit.createAnalyticUnitFromObject({
_id: TEST_ANALYTIC_UNIT_ID,
name: 'name',
grafanaUrl: 'grafanaUrl',
panelId: 'panelId',
type: 'type',
detectorType: AnalyticUnit.DetectorType.ANOMALY
})
);
await AnalyticUnitCache.create(TEST_ANALYTIC_UNIT_ID);
await AnalyticUnitCache.setData(TEST_ANALYTIC_UNIT_ID, { timeStep: 1 });
});
beforeEach(async () => {
await Segment.mergeAndInsertSegments(INITIAL_SEGMENTS);
});
afterEach(async () => {
clearDB();
});
it('should not send a webhook after merging', async () => {
const detectedSegmentIds = await onDetect({
analyticUnitId: TEST_ANALYTIC_UNIT_ID,
segments: buildSegments([[5, 6]]),
lastDetectionTime: 0,
cache: {
data: {
timeStep: 1
}
}
});
let detectedSegments = await Promise.all(
detectedSegmentIds.map(id => Segment.findOne(id))
);
expect(
detectedSegments.map(segment => [segment.from, segment.to])
).toEqual([]);
});
it('should send a webhook when there was no merging', async () => {
const detectedSegmentIds = await onDetect({
analyticUnitId: TEST_ANALYTIC_UNIT_ID,
segments: buildSegments([[7, 8]]),
lastDetectionTime: 0
});
let detectedSegments = await Promise.all(
detectedSegmentIds.map(id => Segment.findOne(id))
);
expect(
detectedSegments.map(segment => [segment.from, segment.to])
).toEqual([[7, 8]]);
});
});

62
server/spec/segments.jest.ts

@ -1,18 +1,18 @@
import { deleteNonDetectedSegments } from '../src/controllers/analytics_controller';
import { buildSegments, clearDB, TEST_ANALYTIC_UNIT_ID } from './utils_for_tests/segments';
import * as AnalyticUnit from '../src/models/analytic_units';
import * as Segment from '../src/models/segment_model';
import * as AnalyticUnitCache from '../src/models/analytic_unit_cache_model';
import * as _ from 'lodash';
const TEST_ID: AnalyticUnit.AnalyticUnitId = 'testid';
const INITIAL_SEGMENTS = segmentBuilder([[0, 1], [2, 3], [4, 5]]);
const INITIAL_SEGMENTS = buildSegments([[0, 1], [2, 3], [4, 5]]);
beforeAll(async () => {
clearDB();
await AnalyticUnit.create(
AnalyticUnit.createAnalyticUnitFromObject({
_id: TEST_ID,
_id: TEST_ANALYTIC_UNIT_ID,
name: 'name',
grafanaUrl: 'grafanaUrl',
panelId: 'panelId',
@ -20,59 +20,27 @@ beforeAll(async () => {
detectorType: AnalyticUnit.DetectorType.ANOMALY
})
);
await AnalyticUnitCache.create(TEST_ID);
await AnalyticUnitCache.setData(TEST_ID, { timeStep: 1 });
await AnalyticUnitCache.create(TEST_ANALYTIC_UNIT_ID);
await AnalyticUnitCache.setData(TEST_ANALYTIC_UNIT_ID, { timeStep: 1 });
});
beforeEach(async () => {
await Segment.insertSegments(INITIAL_SEGMENTS);
await Segment.mergeAndInsertSegments(INITIAL_SEGMENTS);
});
afterEach(async () => {
clearDB();
});
describe('Check deleted segments', function() {
let payload = {
lastDetectionTime: 0,
segments: [],
cache: null
};
it('previous segments not found', async function() {
payload.segments = segmentBuilder([[0, 1], [4, 5]]);
expect(await getDeletedSegments(TEST_ID, payload)).toEqual(segmentBuilder([[2, 3]]));
});
describe('mergeAndInsertSegments', function() {
it('Should be merged before insertion', async function() {
const segmentsToInsert = buildSegments([[1, 2]]);
await Segment.mergeAndInsertSegments(segmentsToInsert);
it('all previous segments found', async function() {
payload.segments = segmentBuilder([[0, 1], [2, 3], [4, 5]]);
expect(await getDeletedSegments(TEST_ID, payload)).toEqual([]);
let actualSegments = await Segment.findMany(TEST_ANALYTIC_UNIT_ID, {});
actualSegments.forEach(s => { s.id = undefined });
actualSegments = _.sortBy(actualSegments, s => s.from);
expect(actualSegments).toEqual(buildSegments([[0, 3], [4, 5]]));
});
});
async function getDeletedSegments(TEST_ID, payload): Promise<Segment.Segment[]> {
const preSegments = await Segment.findMany(TEST_ID, { labeled: false, deleted: false });
await deleteNonDetectedSegments(TEST_ID, payload);
const postSegments = await Segment.findMany(TEST_ID, { labeled: false, deleted: false });
const deleted = setDifference(preSegments, postSegments);
return deleted.map(s => {
s.id = undefined;
return s;
});
}
function setDifference(a, b: Segment.Segment[]): Segment.Segment[] {
return _.differenceWith(a, b, (x, y: Segment.Segment) => x.equals(y));
}
function segmentBuilder(times) {
return times.map(t => {
return new Segment.Segment(TEST_ID, t[0], t[1], false, false, undefined);
});
}
async function clearDB() {
const segments = await Segment.findMany(TEST_ID, { labeled: false, deleted: false });
await Segment.removeSegments(segments.map(s => s.id));
}

15
server/spec/utils_for_tests/segments.ts

@ -0,0 +1,15 @@
import * as AnalyticUnit from '../../src/models/analytic_units';
import * as Segment from '../../src/models/segment_model';
export const TEST_ANALYTIC_UNIT_ID: AnalyticUnit.AnalyticUnitId = 'testid';
export function buildSegments(times: number[][]): Segment.Segment[] {
return times.map(t => {
return new Segment.Segment(TEST_ANALYTIC_UNIT_ID, t[0], t[1], false, false, undefined);
});
}
export async function clearDB(): Promise<void> {
const segments = await Segment.findMany(TEST_ANALYTIC_UNIT_ID, { labeled: false, deleted: false });
await Segment.removeSegments(segments.map(s => s.id));
}

48
server/src/controllers/analytics_controller.ts

@ -59,27 +59,42 @@ function onTaskResult(taskResult: TaskResult) {
}
}
async function onDetect(detectionResult: DetectionResult) {
/**
* Processes detection result from analytics` DETECT message
* @returns IDs of segments inserted into DB if there was no merging
*/
export async function onDetect(detectionResult: DetectionResult): Promise<Segment.SegmentId[]> {
detectionsCount++;
let id = detectionResult.analyticUnitId;
let payload = await processDetectionResult(id, detectionResult);
const insertionResult = await Segment.mergeAndInsertSegments(payload.segments)
await Promise.all([
Segment.insertSegments(payload.segments),
AnalyticUnitCache.setData(id, payload.cache),
AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime),
]);
// removedIds.length > 0 means that there was at least 1 merge
if(insertionResult.removedIds.length > 0) {
return [];
}
return insertionResult.addedIds;
}
async function onPushDetect(detectionResult: DetectionResult) {
/**
* Processes detection result from analytics` PUSH_DETECT message
* Sends a webhook if it's needed
*/
async function onPushDetect(detectionResult: DetectionResult): Promise<void> {
const analyticUnit = await AnalyticUnit.findById(detectionResult.analyticUnitId);
if (!_.isEmpty(detectionResult.segments) && analyticUnit.alert) {
const segments = await onDetect(detectionResult);
if(!_.isEmpty(segments) && analyticUnit.alert) {
try {
alertService.receiveAlert(analyticUnit, _.last(detectionResult.segments));
const segment = await Segment.findOne(_.last(segments))
alertService.receiveAlert(analyticUnit, segment);
} catch(err) {
console.error(`error while sending webhook: ${err.message}`);
}
}
await onDetect(detectionResult);
}
async function onMessage(message: AnalyticsMessage) {
@ -166,7 +181,7 @@ async function getQueryRange(
else {
return getQueryRangeForLearningBySegments(segments);
}
default:
throw new Error(`Cannot get query range for detector type ${detectorType}`);
}
@ -368,13 +383,9 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number,
}
const payload = await processDetectionResult(id, result.payload);
const cache = AnalyticUnitCache.AnalyticUnitCache.fromObject({ _id: id, data: payload.cache });
// TODO: uncomment it
// It clears segments when redetecting on another timerange
// await deleteNonDetectedSegments(id, payload);
await Segment.mergeAndInsertSegments(payload.segments);
await Promise.all([
Segment.insertSegments(payload.segments),
AnalyticUnitCache.setData(id, payload.cache),
AnalyticUnit.setDetectionTime(id, range.to - intersection),
]);
@ -427,13 +438,6 @@ async function cancelAnalyticsTask(analyticUnitId: AnalyticUnit.AnalyticUnitId)
}
}
export async function deleteNonDetectedSegments(id, payload) {
let lastDetectedSegments = await Segment.findMany(id, { labeled: false, deleted: false });
let segmentsToRemove: Segment.Segment[];
segmentsToRemove = _.differenceWith(lastDetectedSegments, payload.segments, (a, b: Segment.Segment) => a.equals(b));
Segment.removeSegments(segmentsToRemove.map(s => s.id));
}
async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, detectionResult: DetectionResult):
Promise<{
lastDetectionTime: number,
@ -521,11 +525,11 @@ export async function updateSegments(
id: AnalyticUnit.AnalyticUnitId,
segmentsToInsert: Segment.Segment[],
removedIds: Segment.SegmentId[]
) {
): Promise<{ addedIds: Segment.SegmentId[] }> {
await Segment.removeSegments(removedIds);
const addedIds = await Segment.insertSegments(segmentsToInsert);
const insertionResult = await Segment.mergeAndInsertSegments(segmentsToInsert);
return { addedIds };
return { addedIds: insertionResult.addedIds };
}
export async function runLearningWithDetection(

121
server/src/models/segment_model.ts

@ -78,6 +78,10 @@ export type FindManyQuery = {
deleted?: boolean
}
export async function findOne(segmentId: SegmentId): Promise<Segment> {
return db.findOne({ _id: segmentId });
}
export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promise<Segment[]> {
var dbQuery: any = { analyticUnitId: id };
if(query.labeled !== undefined) {
@ -86,6 +90,15 @@ export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promis
if(query.deleted !== undefined) {
dbQuery.deleted = query.deleted;
}
if(query.from !== undefined) {
dbQuery.from = query.from;
}
if(query.to !== undefined) {
dbQuery.to = query.to;
}
if(query.$or !== undefined) {
dbQuery.$or = query.$or;
}
let segs = await db.findMany(dbQuery);
if(segs === null) {
return [];
@ -93,81 +106,59 @@ export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promis
return segs.map(Segment.fromObject);
}
export async function insertSegments(segments: Segment[]) {
/**
* Merges an array of segments with ones existing in the DB
* Inserts resulting segments into DB
* @param segments segments to be inserted
* @returns IDs of added and removed segments
*/
export async function mergeAndInsertSegments(segments: Segment[]): Promise<{
addedIds: SegmentId[],
removedIds: SegmentId[]
}> {
if(_.isEmpty(segments)) {
return [];
return { addedIds: [], removedIds: [] };
}
const analyticUnitId: AnalyticUnitId = segments[0].analyticUnitId;
const learningSegments = await findMany(analyticUnitId, {
labeled: true,
deleted: false
});
let segmentIdsToRemove: SegmentId[] = [];
let segmentsToInsert: Segment[] = [];
for(let segment of segments) {
const intersectedLearning = learningSegments.filter(s => {
return segment.from <= s.to && segment.to >= s.from;
});
if(intersectedLearning.length > 0) {
if(await isIntersectedWithExistingLabeled(segment)) {
continue;
}
if(!segment.deleted && !segment.labeled) {
const intersectedWithDeletedSegments = await findMany(analyticUnitId, {
to: { $gte: segment.from },
from: { $lte: segment.to },
labeled: false,
deleted: true
});
if(intersectedWithDeletedSegments.length > 0) {
if(await isIntersectedWithExistingDeleted(segment)) {
continue;
}
}
let cache = await AnalyticUnitCache.findById(analyticUnitId);
const timeStep = cache.getTimeStep();
let unit = await AnalyticUnit.findById(analyticUnitId);
const detector = unit.detectorType;
if(detector !== AnalyticUnit.DetectorType.PATTERN) {
const intersectedWithLeftBound = await findMany(analyticUnitId, {
to: { $gte: segment.from - timeStep, $lte: segment.from },
labeled: false,
deleted: false
let intersectedSegments: Segment[];
if(detector === AnalyticUnit.DetectorType.PATTERN) {
intersectedSegments = await findMany(analyticUnitId, {
to: { $gte: segment.from },
from: { $lte: segment.to },
labeled: segment.labeled,
deleted: segment.deleted
});
if(intersectedWithLeftBound.length > 0) {
const leftSegment = _.minBy(intersectedWithLeftBound, s => s.from);
segment.from = leftSegment.from;
segmentIdsToRemove.push(leftSegment.id);
}
const intersectedWithRightBound = await findMany(analyticUnitId, {
from: { $gte: segment.to, $lte: segment.to + timeStep },
labeled: false,
deleted: false
} else {
const timeStep = cache.getTimeStep();
intersectedSegments = await findMany(analyticUnitId, {
to: { $gte: segment.from - timeStep },
from: { $lte: segment.to + timeStep },
labeled: segment.labeled,
deleted: segment.deleted
});
if(intersectedWithRightBound.length > 0) {
const rightSegment = _.maxBy(intersectedWithRightBound, s => s.to);
segment.to = rightSegment.to;
segmentIdsToRemove.push(rightSegment.id);
}
}
const intersectedSegments = await findMany(analyticUnitId, {
to: { $gte: segment.from },
from: { $lte: segment.to },
labeled: segment.labeled,
deleted: segment.deleted
});
if(intersectedSegments.length > 0) {
let from = _.minBy(intersectedSegments, 'from').from;
let to = _.maxBy(intersectedSegments, 'to').to;
let from = _.minBy(intersectedSegments.concat(segment), s => s.from).from;
let to = _.maxBy(intersectedSegments.concat(segment), s => s.to).to;
let newSegment = Segment.fromObject(segment.toObject());
newSegment.from = from;
newSegment.to = to;
@ -179,7 +170,11 @@ export async function insertSegments(segments: Segment[]) {
}
await db.removeMany(segmentIdsToRemove);
return db.insertMany(segmentsToInsert.map(s => s.toObject()));
const addedIds = await db.insertMany(segmentsToInsert.map(s => s.toObject()));
return {
addedIds,
removedIds: segmentIdsToRemove
};
}
export async function setSegmentsDeleted(ids: SegmentId[]) {
@ -189,3 +184,25 @@ export async function setSegmentsDeleted(ids: SegmentId[]) {
export function removeSegments(idsToRemove: SegmentId[]) {
return db.removeMany(idsToRemove);
}
async function isIntersectedWithExistingLabeled(segment: Segment): Promise<boolean> {
const intersected = await findMany(segment.analyticUnitId, {
labeled: true,
deleted: false,
from: { $lte: segment.to },
to: { $gte: segment.from }
});
return intersected.length > 0;
}
async function isIntersectedWithExistingDeleted(segment: Segment): Promise<boolean> {
const intersected = await findMany(segment.analyticUnitId, {
labeled: false,
deleted: true,
from: { $lte: segment.to },
to: { $gte: segment.from }
});
return intersected.length > 0;
}

Loading…
Cancel
Save