Browse Source

Sort segments in process detection result #354 (#355)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by GitHub
parent
commit
0e2efa8e12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      server/src/controllers/analytics_controller.ts
  2. 53
      server/src/services/alert_service.ts
  3. 8
      server/src/services/data_puller.ts
  4. 4
      server/src/services/notification_service.ts

7
server/src/controllers/analytics_controller.ts

@ -305,15 +305,16 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI
`Missing lastDetectionTime in result or it is corrupted: ${JSON.stringify(detectionResult)}` `Missing lastDetectionTime in result or it is corrupted: ${JSON.stringify(detectionResult)}`
); );
} }
console.debug(`got detection result with ${detectionResult.segments.length} segments`); console.debug(`got detection result for ${analyticUnitId} with ${detectionResult.segments.length} segments`);
const segments = detectionResult.segments.map( const sortedSegments: {from, to}[] = _.sortBy(detectionResult.segments, 'from');
const segments = sortedSegments.map(
segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false) segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false)
); );
const analyticUnit = await AnalyticUnit.findById(analyticUnitId); const analyticUnit = await AnalyticUnit.findById(analyticUnitId);
if (!_.isEmpty(segments) && analyticUnit.alert) { if (!_.isEmpty(segments) && analyticUnit.alert) {
try { try {
alertService.recieveAlert(analyticUnit, _.last(segments)); alertService.receiveAlert(analyticUnit, _.last(segments));
} catch(err) { } catch(err) {
console.error(`error while sending webhook: ${err.message}`); console.error(`error while sending webhook: ${err.message}`);
} }

53
server/src/services/alert_service.ts

@ -6,36 +6,52 @@ import { Segment } from '../models/segment_model';
export class Alert { export class Alert {
constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit, protected sender) {}; public enabled = true;
public recieve(segment: Segment) { constructor(protected analyticUnit: AnalyticUnit.AnalyticUnit) {};
this.sender(this.analyticUnit, segment); public receive(segment: Segment) {
if(this.enabled) {
sendWebhook(this.analyticUnit.name, segment);
}
}; };
public update(now: number) {};
} }
class PatternAlert extends Alert {}; class PatternAlert extends Alert {
private lastSentSegment: Segment;
public receive(segment: Segment) {
if(this.lastSentSegment === undefined || !segment.equals(this.lastSentSegment) ) {
this.lastSentSegment = segment;
if(this.enabled) {
sendWebhook(this.analyticUnit.name, segment);
}
}
}
};
class ThresholdAlert extends Alert { class ThresholdAlert extends Alert {
EXPIRE_PERIOD_MS = 60000; EXPIRE_PERIOD_MS = 60000;
lastOccurence = 0; lastOccurence = 0;
public recieve(segment: Segment) { public receive(segment: Segment) {
if(this.lastOccurence === 0) { if(this.lastOccurence === 0) {
this.lastOccurence = segment.from; this.lastOccurence = segment.from;
this.sender(this.analyticUnit, segment); if(this.enabled) {
sendWebhook(this.analyticUnit.name, segment);
}
} else { } else {
if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) { if(segment.from - this.lastOccurence > this.EXPIRE_PERIOD_MS) {
console.debug(`difference detween threshold occurences ${segment.from - this.lastOccurence}, send alert`); if(this.enabled) {
this.sender(this.analyticUnit, segment); console.debug(`time between threshold occurences ${segment.from - this.lastOccurence}ms, send alert`);
sendWebhook(this.analyticUnit.name, segment);
}
} }
this.lastOccurence = segment.from; this.lastOccurence = segment.from;
} }
} }
public update(now: number) {}
} }
@ -43,26 +59,23 @@ export class AlertService {
private _alerts: { [id: string]: Alert; }; private _alerts: { [id: string]: Alert; };
private _alertingEnable: boolean; private _alertingEnable: boolean;
private _sender: any;
constructor() { constructor() {
this._alerts = {} this._alerts = {}
this._alertingEnable = false;
this._sender = (analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) => {
if(this._alertingEnable) {
sendWebhook(analyticUnit.name, segment);
}
} }
public receiveAlert(analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) {
if(!this._alertingEnable) {
return;
} }
public recieveAlert(analyticUnit: AnalyticUnit.AnalyticUnit, segment: Segment) {
let id = analyticUnit.id; let id = analyticUnit.id;
if(!_.has(this._alerts, id)) { if(!_.has(this._alerts, id)) {
this.addAnalyticUnit(analyticUnit); this.addAnalyticUnit(analyticUnit);
} }
this._alerts[id].recieve(segment); this._alerts[id].receive(segment);
}; };
public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { public addAnalyticUnit(analyticUnit: AnalyticUnit.AnalyticUnit) {
@ -72,7 +85,7 @@ export class AlertService {
alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert; alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert;
alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert; alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert;
this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit, this._sender); this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit);
} }
public removeAnalyticUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) { public removeAnalyticUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) {

8
server/src/services/data_puller.ts

@ -56,7 +56,9 @@ export class DataPuller {
try { try {
this.analyticsService.sendTask(task); this.analyticsService.sendTask(task);
console.log(`data puller successfuly pushed ${data.data.length} points for unit id: ${unit.id}`); let fromTime = new Date(data.from).toLocaleTimeString();
let toTime = new Date(data.to).toLocaleTimeString();
console.log(`pushed ${data.data.length} points to unit: ${unit.id} ${fromTime}-${toTime}`);
} catch(e) { } catch(e) {
console.log(`data puller got error while push data ${e.message}`); console.log(`data puller got error while push data ${e.message}`);
} }
@ -109,14 +111,14 @@ export class DataPuller {
const detector = AnalyticUnit.getDetectorByType(analyticUnit.type); const detector = AnalyticUnit.getDetectorByType(analyticUnit.type);
let payload = { let payload = {
data: payloadValues, data: payloadValues,
from: time, from: this._unitTimes[analyticUnit.id],
to: now, to: now,
analyticUnitType: analyticUnit.type, analyticUnitType: analyticUnit.type,
detector, detector,
cache cache
}; };
this._unitTimes[analyticUnit.id] = now;
this.pushData(analyticUnit, payload); this.pushData(analyticUnit, payload);
this._unitTimes[analyticUnit.id] = now;
} }
} }

4
server/src/services/notification_service.ts

@ -13,7 +13,9 @@ export async function sendWebhook(analyticUnitName: string, segment: Segment) {
to: segment.to to: segment.to
}; };
console.log(`Sending alert name:${alert.analyticUnitName} from:${new Date(alert.from)} to:${new Date(alert.to)}`); const fromTime = new Date(alert.from).toLocaleTimeString();
const toTime = new Date(alert.to).toLocaleTimeString();
console.log(`Sending alert unit:${alert.analyticUnitName} from: ${fromTime} to: ${toTime}`);
if(HASTIC_WEBHOOK_URL === null) { if(HASTIC_WEBHOOK_URL === null) {
throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`); throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`);

Loading…
Cancel
Save