Browse Source

Error: detect missing cache #299 (#302)

* Pass cache to detect

* rename AnalyticUnitCache to ModelCache

* Send .data from cache

* Drop nans from bucket && set window size from cache && check cache None

* Read proper payload on DETECT message
pull/1/head
rozetko 6 years ago committed by Alexey Velikiy
parent
commit
77fbde6fa3
  1. 3
      analytics/analytics/analytic_unit_manager.py
  2. 14
      analytics/analytics/analytic_unit_worker.py
  3. 8
      analytics/analytics/detectors/detector.py
  4. 15
      analytics/analytics/detectors/pattern_detector.py
  5. 2
      analytics/analytics/models/__init__.py
  6. 13
      analytics/analytics/models/model.py
  7. 2
      server/src/controllers/analytics_controller.ts
  8. 15
      server/src/services/data_puller.ts

3
analytics/analytics/analytic_unit_manager.py

@ -6,6 +6,7 @@ from concurrent.futures import Executor, ThreadPoolExecutor
import detectors import detectors
from analytic_unit_worker import AnalyticUnitWorker from analytic_unit_worker import AnalyticUnitWorker
from models import ModelCache
logger = logging.getLogger('AnalyticUnitManager') logger = logging.getLogger('AnalyticUnitManager')
@ -63,7 +64,7 @@ class AnalyticUnitManager:
worker = self.__ensure_worker(analytic_unit_id, payload['pattern']) worker = self.__ensure_worker(analytic_unit_id, payload['pattern'])
data = prepare_data(payload['data']) data = prepare_data(payload['data'])
if task['type'] == 'PUSH': if task['type'] == 'PUSH':
return await worker.recieve_data(data) return await worker.recieve_data(data, payload['cache'])
elif task['type'] == 'LEARN': elif task['type'] == 'LEARN':
return await worker.do_train(payload['segments'], data, payload['cache']) return await worker.do_train(payload['segments'], data, payload['cache'])
elif task['type'] == 'DETECT': elif task['type'] == 'DETECT':

14
analytics/analytics/analytic_unit_worker.py

@ -3,7 +3,7 @@ import detectors
import logging import logging
import pandas as pd import pandas as pd
from typing import Optional from typing import Optional
from models import AnalyticUnitCache from models import ModelCache
from concurrent.futures import Executor, CancelledError from concurrent.futures import Executor, CancelledError
import asyncio import asyncio
@ -19,23 +19,23 @@ class AnalyticUnitWorker:
self._training_feature: asyncio.Future = None self._training_feature: asyncio.Future = None
async def do_train( async def do_train(
self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache] self, segments: list, data: pd.DataFrame, cache: Optional[ModelCache]
) -> AnalyticUnitCache: ) -> ModelCache:
self._training_feature = asyncio.get_event_loop().run_in_executor( self._training_feature = asyncio.get_event_loop().run_in_executor(
self._executor, self._detector.train, data, segments, cache self._executor, self._detector.train, data, segments, cache
) )
try: try:
new_cache: AnalyticUnitCache = await self._training_feature new_cache: ModelCache = await self._training_feature
return new_cache return new_cache
except CancelledError as e: except CancelledError as e:
return cache return cache
async def do_detect(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
return self._detector.detect(data, cache) return self._detector.detect(data, cache)
def cancel(self): def cancel(self):
if self._training_feature is not None: if self._training_feature is not None:
self._training_feature.cancel() self._training_feature.cancel()
async def recieve_data(self, data: pd.DataFrame): async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]):
return self._detector.recieve_data(data) return self._detector.recieve_data(data, cache)

8
analytics/analytics/detectors/detector.py

@ -1,4 +1,4 @@
from models import AnalyticUnitCache from models import ModelCache
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pandas import DataFrame from pandas import DataFrame
from typing import Optional from typing import Optional
@ -7,16 +7,16 @@ from typing import Optional
class Detector(ABC): class Detector(ABC):
@abstractmethod @abstractmethod
def train(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: def train(self, dataframe: DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache:
""" """
Should be thread-safe to other detectors' train method Should be thread-safe to other detectors' train method
""" """
pass pass
@abstractmethod @abstractmethod
def detect(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict:
pass pass
@abstractmethod @abstractmethod
def recieve_data(self, data: DataFrame) -> Optional[dict]: def recieve_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
pass pass

15
analytics/analytics/detectors/pattern_detector.py

@ -8,6 +8,7 @@ from typing import Optional
from detectors import Detector from detectors import Detector
from buckets import DataBucket from buckets import DataBucket
from models import ModelCache
logger = logging.getLogger('PATTERN_DETECTOR') logger = logging.getLogger('PATTERN_DETECTOR')
@ -37,14 +38,14 @@ class PatternDetector(Detector):
self.window_size = 100 self.window_size = 100
self.bucket = DataBucket() self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache: def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache:
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
new_cache = self.model.fit(dataframe, segments, cache) new_cache = self.model.fit(dataframe, segments, cache)
return { return {
'cache': new_cache 'cache': new_cache
} }
def detect(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict:
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
detected = self.model.detect(dataframe, cache) detected = self.model.detect(dataframe, cache)
@ -59,11 +60,13 @@ class PatternDetector(Detector):
'lastDetectionTime': last_detection_time 'lastDetectionTime': last_detection_time
} }
def recieve_data(self, data: pd.DataFrame) -> Optional[dict]: def recieve_data(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> Optional[dict]:
self.bucket.receive_data(data) self.bucket.receive_data(data.dropna())
if cache != None:
self.window_size = cache['WINDOW_SIZE']
if len(self.bucket.data) >= self.window_size: if len(self.bucket.data) >= self.window_size and cache != None:
res = self.detect(self.bucket.data) res = self.detect(self.bucket.data, cache)
excess_data = len(self.bucket.data) - self.window_size excess_data = len(self.bucket.data) - self.window_size
self.bucket.drop_data(excess_data) self.bucket.drop_data(excess_data)

2
analytics/analytics/models/__init__.py

@ -1,4 +1,4 @@
from models.model import Model, AnalyticUnitCache from models.model import Model, ModelCache
from models.drop_model import DropModel from models.drop_model import DropModel
from models.peak_model import PeakModel from models.peak_model import PeakModel
from models.jump_model import JumpModel from models.jump_model import JumpModel

13
analytics/analytics/models/model.py

@ -5,20 +5,21 @@ from typing import Optional
import pandas as pd import pandas as pd
import math import math
AnalyticUnitCache = dict ModelCache = dict
class Model(ABC): class Model(ABC):
@abstractmethod @abstractmethod
def do_fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> None: def do_fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache]) -> None:
pass pass
@abstractmethod @abstractmethod
def do_detect(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame) -> list:
pass pass
def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache:
if type(cache) is AnalyticUnitCache: if type(cache) is ModelCache:
self.state = cache self.state = cache
self.segments = segments self.segments = segments
@ -34,8 +35,8 @@ class Model(ABC):
self.do_fit(dataframe, segments) self.do_fit(dataframe, segments)
return self.state return self.state
def detect(self, dataframe: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
if type(cache) is AnalyticUnitCache: if type(cache) is ModelCache:
self.state = cache self.state = cache
result = self.do_detect(dataframe) result = self.do_detect(dataframe)

2
server/src/controllers/analytics_controller.ts

@ -55,7 +55,7 @@ async function onMessage(message: AnalyticsMessage) {
} }
if(message.method === AnalyticsMessageMethod.DETECT) { if(message.method === AnalyticsMessageMethod.DETECT) {
onDetect(message.payload); onDetect(message.payload.payload);
methodResolved = true; methodResolved = true;
} }

15
server/src/services/data_puller.ts

@ -1,5 +1,6 @@
import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model';
import * as AnalyticUnit from '../models/analytic_unit_model'; import * as AnalyticUnit from '../models/analytic_unit_model';
import * as AnalyticUnitCache from '../models/analytic_unit_cache_model';
import { AnalyticsService } from './analytics_service'; import { AnalyticsService } from './analytics_service';
import { HASTIC_API_KEY } from '../config'; import { HASTIC_API_KEY } from '../config';
@ -32,6 +33,7 @@ export class DataPuller {
if(unit === undefined) { if(unit === undefined) {
throw Error(`puller: can't pull undefined unit`); throw Error(`puller: can't pull undefined unit`);
} }
return queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY); return queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY);
} }
@ -59,6 +61,7 @@ export class DataPuller {
} }
private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) {
// TODO: lastDetectionTime can be in ns
const time = analyticUnit.lastDetectionTime + 1 || Date.now(); const time = analyticUnit.lastDetectionTime + 1 || Date.now();
this._unitTimes[analyticUnit.id] = time; this._unitTimes[analyticUnit.id] = time;
@ -77,7 +80,17 @@ export class DataPuller {
const now = Date.now(); const now = Date.now();
let payloadValues = data.values; let payloadValues = data.values;
let payload = { data: payloadValues, from: time, to: now, pattern: analyticUnit.type }; let cache = await AnalyticUnitCache.findById(analyticUnit.id);
if(cache !== null) {
cache = cache.data
}
let payload = {
data: payloadValues,
from: time,
to: now,
pattern: analyticUnit.type,
cache
};
this._unitTimes[analyticUnit.id] = now; this._unitTimes[analyticUnit.id] = now;
this.pushData(analyticUnit, payload); this.pushData(analyticUnit, payload);
} }

Loading…
Cancel
Save