Browse Source

Fix threshold detector (#331)

pull/1/head
rozetko 5 years ago committed by Alexey Velikiy
parent
commit
d300362b49
  1. 4
      analytics/analytics/analytic_unit_manager.py
  2. 9
      analytics/analytics/detectors/threshold_detector.py
  3. 2
      server/src/controllers/analytics_controller.ts
  4. 3
      server/src/models/threshold_model.ts
  5. 5
      server/src/routes/threshold_router.ts
  6. 4
      server/src/services/data_puller.ts

4
analytics/analytics/analytic_unit_manager.py

@ -74,7 +74,9 @@ class AnalyticUnitManager:
worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType'])
data = prepare_data(payload['data'])
if task['type'] == 'PUSH':
return await worker.recieve_data(data, payload['cache'])
# TODO: do it a better way
res = await worker.recieve_data(data, payload['cache'])
return res.update({ 'analyticUnitId': analytic_unit_id })
elif task['type'] == 'LEARN':
if 'segments' in payload:
return await worker.do_train(payload['segments'], data, payload['cache'])

9
analytics/analytics/detectors/threshold_detector.py

@ -27,12 +27,15 @@ class ThresholdDetector(Detector):
value = cache['value']
condition = cache['condition']
last_entry = dataframe.iloc[-1]
dataframe_without_nans = dataframe.dropna()
if len(dataframe_without_nans) == 0:
return dict()
last_entry = dataframe_without_nans.iloc[-1]
last_value = last_entry['value']
# TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6
last_time = last_entry['timestamp'].value / 1000000
segment = (last_time, last_time)
segment = ({ 'from': last_time, 'to': last_time })
segments = []
if condition == '>':
if last_value > value:
@ -57,4 +60,4 @@ class ThresholdDetector(Detector):
}
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
return self.detect(self.bucket.data, cache)
return self.detect(data, cache)

2
server/src/controllers/analytics_controller.ts

@ -101,7 +101,7 @@ async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: Analytic
} else if(detector === AnalyticUnit.DetectorType.THRESHOLD) {
const now = Date.now();
range = {
from: now - 5 * SECONDS_IN_MINUTE,
from: now - 5 * SECONDS_IN_MINUTE * 1000,
to: now
};
}

3
server/src/models/threshold_model.ts

@ -59,7 +59,8 @@ export async function findOne(id: AnalyticUnitId): Promise<Threshold | null> {
}
export async function updateThreshold(id: AnalyticUnitId, value: number, condition: Condition) {
if(findOne(id) === null) {
const threshold = await db.findOne(id);
if(threshold === null) {
return db.insertOne({ id, value, condition });
}
return db.updateOne({ id }, { value, condition });

5
server/src/routes/threshold_router.ts

@ -9,8 +9,9 @@ import * as _ from 'lodash';
async function getThresholds(ctx: Router.IRouterContext) {
try {
const ids: AnalyticUnitId = ctx.request.query.ids;
if(ids === undefined || _.isEmpty(ids)) {
const ids: AnalyticUnitId[] = ctx.request.query.ids.split(',');
if(ids === undefined) {
throw new Error('analyticUnitIds (ids) are missing');
}

4
server/src/services/data_puller.ts

@ -94,11 +94,13 @@ export class DataPuller {
if(cache !== null) {
cache = cache.data
}
const detector = AnalyticUnit.getDetectorByType(analyticUnit.type);
let payload = {
data: payloadValues,
from: time,
to: now,
pattern: analyticUnit.type,
analyticUnitType: analyticUnit.type,
detector,
cache
};
this._unitTimes[analyticUnit.id] = now;

Loading…
Cancel
Save