diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index 6935d68..2f0938d 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -6,6 +6,7 @@ from concurrent.futures import Executor, ThreadPoolExecutor import detectors from analytic_unit_worker import AnalyticUnitWorker +from models import ModelCache logger = logging.getLogger('AnalyticUnitManager') @@ -63,7 +64,7 @@ class AnalyticUnitManager: worker = self.__ensure_worker(analytic_unit_id, payload['pattern']) data = prepare_data(payload['data']) if task['type'] == 'PUSH': - return await worker.recieve_data(data) + return await worker.recieve_data(data, payload['cache']) elif task['type'] == 'LEARN': return await worker.do_train(payload['segments'], data, payload['cache']) elif task['type'] == 'DETECT': diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 3b7f694..4fa3d75 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -3,7 +3,7 @@ import detectors import logging import pandas as pd from typing import Optional -from models import AnalyticUnitCache +from models import ModelCache from concurrent.futures import Executor, CancelledError import asyncio @@ -19,23 +19,23 @@ class AnalyticUnitWorker: self._training_feature: asyncio.Future = None async def do_train( - self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache] - ) -> AnalyticUnitCache: + self, segments: list, data: pd.DataFrame, cache: Optional[ModelCache] + ) -> ModelCache: self._training_feature = asyncio.get_event_loop().run_in_executor( self._executor, self._detector.train, data, segments, cache ) try: - new_cache: AnalyticUnitCache = await self._training_feature + new_cache: ModelCache = await self._training_feature return new_cache except CancelledError as e: return cache - async def do_detect(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: + async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: return self._detector.detect(data, cache) def cancel(self): if self._training_feature is not None: self._training_feature.cancel() - async def recieve_data(self, data: pd.DataFrame): - return self._detector.recieve_data(data) + async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): + return self._detector.recieve_data(data, cache) diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 8171ab2..063f6a7 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -1,4 +1,4 @@ -from models import AnalyticUnitCache +from models import ModelCache from abc import ABC, abstractmethod from pandas import DataFrame from typing import Optional @@ -7,16 +7,16 @@ from typing import Optional class Detector(ABC): @abstractmethod - def train(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + def train(self, dataframe: DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache: """ Should be thread-safe to other detectors' train method """ pass @abstractmethod - def detect(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: + def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: pass @abstractmethod - def recieve_data(self, data: DataFrame) -> Optional[dict]: + def recieve_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: pass diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 47ba0c9..564c22b 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -8,6 +8,7 @@ from typing import Optional from detectors import Detector from buckets import DataBucket +from models import ModelCache logger = logging.getLogger('PATTERN_DETECTOR') @@ -37,14 +38,14 @@ class PatternDetector(Detector): self.window_size = 100 self.bucket = DataBucket() - def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache: + def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: # TODO: pass only part of dataframe that has segments new_cache = self.model.fit(dataframe, segments, cache) return { 'cache': new_cache } - def detect(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict: + def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) detected = self.model.detect(dataframe, cache) @@ -59,11 +60,13 @@ class PatternDetector(Detector): 'lastDetectionTime': last_detection_time } - def recieve_data(self, data: pd.DataFrame) -> Optional[dict]: - self.bucket.receive_data(data) + def recieve_data(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> Optional[dict]: + self.bucket.receive_data(data.dropna()) + if cache != None: + self.window_size = cache['WINDOW_SIZE'] - if len(self.bucket.data) >= self.window_size: - res = self.detect(self.bucket.data) + if len(self.bucket.data) >= self.window_size and cache != None: + res = self.detect(self.bucket.data, cache) excess_data = len(self.bucket.data) - self.window_size self.bucket.drop_data(excess_data) diff --git a/analytics/analytics/models/__init__.py b/analytics/analytics/models/__init__.py index 595b9a0..105a449 100644 --- a/analytics/analytics/models/__init__.py +++ b/analytics/analytics/models/__init__.py @@ -1,4 +1,4 @@ -from models.model import Model, AnalyticUnitCache +from models.model import Model, ModelCache from models.drop_model import DropModel from models.peak_model import PeakModel from models.jump_model import JumpModel diff --git a/analytics/analytics/models/model.py b/analytics/analytics/models/model.py index 3edffb3..36dbc88 100644 --- a/analytics/analytics/models/model.py +++ b/analytics/analytics/models/model.py @@ -5,20 +5,21 @@ from typing import Optional import pandas as pd import math -AnalyticUnitCache = dict +ModelCache = dict + class Model(ABC): @abstractmethod - def do_fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> None: + def do_fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache]) -> None: pass @abstractmethod def do_detect(self, dataframe: pd.DataFrame) -> list: pass - def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: - if type(cache) is AnalyticUnitCache: + def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache: + if type(cache) is ModelCache: self.state = cache self.segments = segments @@ -34,8 +35,8 @@ class Model(ABC): self.do_fit(dataframe, segments) return self.state - def detect(self, dataframe: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: - if type(cache) is AnalyticUnitCache: + def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: + if type(cache) is ModelCache: self.state = cache result = self.do_detect(dataframe) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 2398ffa..3cf6f28 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -55,7 +55,7 @@ async function onMessage(message: AnalyticsMessage) { } if(message.method === AnalyticsMessageMethod.DETECT) { - onDetect(message.payload); + onDetect(message.payload.payload); methodResolved = true; } diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index 107abad..e19c641 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -1,5 +1,6 @@ import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; +import * as AnalyticUnitCache from '../models/analytic_unit_cache_model'; import { AnalyticsService } from './analytics_service'; import { HASTIC_API_KEY } from '../config'; @@ -32,6 +33,7 @@ export class DataPuller { if(unit === undefined) { throw Error(`puller: can't pull undefined unit`); } + return queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY); } @@ -59,6 +61,7 @@ export class DataPuller { } private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { + // TODO: lastDetectionTime can be in ns const time = analyticUnit.lastDetectionTime + 1 || Date.now(); this._unitTimes[analyticUnit.id] = time; @@ -77,7 +80,17 @@ export class DataPuller { const now = Date.now(); let payloadValues = data.values; - let payload = { data: payloadValues, from: time, to: now, pattern: analyticUnit.type }; + let cache = await AnalyticUnitCache.findById(analyticUnit.id); + if(cache !== null) { + cache = cache.data + } + let payload = { + data: payloadValues, + from: time, + to: now, + pattern: analyticUnit.type, + cache + }; this._unitTimes[analyticUnit.id] = now; this.pushData(analyticUnit, payload); }