diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index f3f43f0..6935d68 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -48,14 +48,10 @@ class AnalyticUnitManager: self.analytic_workers[analytic_unit_id] = worker return worker - async def __handle_analytic_task(self, task) -> dict: """ returns payload or None """ - if task['type'] == 'PUSH': - # TODO: implement PUSH message handling - return analytic_unit_id: AnalyticUnitId = task['analyticUnitId'] if task['type'] == 'CANCEL': @@ -66,14 +62,15 @@ class AnalyticUnitManager: payload = task['payload'] worker = self.__ensure_worker(analytic_unit_id, payload['pattern']) data = prepare_data(payload['data']) - if task['type'] == 'LEARN': + if task['type'] == 'PUSH': + return await worker.recieve_data(data) + elif task['type'] == 'LEARN': return await worker.do_train(payload['segments'], data, payload['cache']) elif task['type'] == 'DETECT': return await worker.do_detect(data, payload['cache']) raise ValueError('Unknown task type "%s"' % task['type']) - async def handle_analytic_task(self, task): try: result_payload = await self.__handle_analytic_task(task) diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 6fa841c..3b7f694 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -36,3 +36,6 @@ class AnalyticUnitWorker: def cancel(self): if self._training_feature is not None: self._training_feature.cancel() + + async def recieve_data(self, data: pd.DataFrame): + return self._detector.recieve_data(data) diff --git a/analytics/analytics/buckets/__init__.py b/analytics/analytics/buckets/__init__.py new file mode 100644 index 0000000..d5481d4 --- /dev/null +++ b/analytics/analytics/buckets/__init__.py @@ -0,0 +1 @@ +from buckets.data_bucket import DataBucket diff --git a/analytics/analytics/buckets/data_bucket.py b/analytics/analytics/buckets/data_bucket.py new file mode 100644 index 0000000..4dd79a1 --- /dev/null +++ b/analytics/analytics/buckets/data_bucket.py @@ -0,0 +1,13 @@ +import pandas as pd + +class DataBucket(object): + + data: pd.DataFrame + + def __init__(self): + self.data = pd.DataFrame([], columns=['timestamp', 'value']) + def receive_data(self, data: pd.DataFrame): + self.data = self.data.append(data, ignore_index=True) + + def drop_data(self, count: int): + self.data = self.data.iloc[count:] diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 4161b9c..8171ab2 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -16,3 +16,7 @@ class Detector(ABC): @abstractmethod def detect(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: pass + + @abstractmethod + def recieve_data(self, data: DataFrame) -> Optional[dict]: + pass diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 97b4b3d..47ba0c9 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -7,6 +7,7 @@ import pandas as pd from typing import Optional from detectors import Detector +from buckets import DataBucket logger = logging.getLogger('PATTERN_DETECTOR') @@ -33,7 +34,8 @@ class PatternDetector(Detector): def __init__(self, pattern_type): self.pattern_type = pattern_type self.model = resolve_model_by_pattern(self.pattern_type) - window_size = 100 + self.window_size = 100 + self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache: # TODO: pass only part of dataframe that has segments @@ -56,3 +58,15 @@ class PatternDetector(Detector): 'segments': segments, 'lastDetectionTime': last_detection_time } + + def recieve_data(self, data: pd.DataFrame) -> Optional[dict]: + self.bucket.receive_data(data) + + if len(self.bucket.data) >= self.window_size: + res = self.detect(self.bucket.data) + + excess_data = len(self.bucket.data) - self.window_size + self.bucket.drop_data(excess_data) + return res + + return None diff --git a/analytics/bin/server b/analytics/bin/server index da87cd7..4ef262f 100755 --- a/analytics/bin/server +++ b/analytics/bin/server @@ -65,10 +65,20 @@ async def handle_task(task: object): error_text = traceback.format_exc() logger.error("handle_task Exception: '%s'" % error_text) +async def handle_data(task: object): + res = await analytic_unit_manager.handle_analytic_task(task) + + if res['status'] == 'SUCCESS' and res['payload'] is not None: + res['_id'] = task['_id'] + message = services.server_service.ServerMessage('DETECT', res) + await server_service.send_message(message) + async def handle_message(message: services.ServerMessage): payload = None if message.method == 'TASK': await handle_task(message.payload) + if message.method == 'DATA': + await handle_data(message.payload) def init_services(): logger.info("Starting services...") diff --git a/server/src/models/analytics_message_model.ts b/server/src/models/analytics_message_model.ts index 44ba29a..49ec62a 100644 --- a/server/src/models/analytics_message_model.ts +++ b/server/src/models/analytics_message_model.ts @@ -1,7 +1,8 @@ export enum AnalyticsMessageMethod { TASK = 'TASK', TASK_RESULT = 'TASK_RESULT', - DETECT = 'DETECT' + DETECT = 'DETECT', + DATA = 'DATA' } export class AnalyticsMessage { diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index b4463c7..1d9d5dc 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,4 +1,4 @@ -import { AnalyticsTask } from '../models/analytics_task_model'; +import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'; import * as config from '../config'; @@ -31,8 +31,10 @@ export class AnalyticsService { if(!this._ready) { return Promise.reject("Analytics is not ready"); } + let method = task.type === AnalyticsTaskType.PUSH ? + AnalyticsMessageMethod.DATA : AnalyticsMessageMethod.TASK let message = new AnalyticsMessage( - AnalyticsMessageMethod.TASK, + method, task.toObject() ); return this.sendMessage(message); diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index 65966f2..107abad 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -59,7 +59,7 @@ export class DataPuller { } private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { - const time = analyticUnit.lastDetectionTime || Date.now(); + const time = analyticUnit.lastDetectionTime + 1 || Date.now(); this._unitTimes[analyticUnit.id] = time; const dataGenerator = this.getDataGenerator( @@ -76,7 +76,8 @@ export class DataPuller { } const now = Date.now(); - let payload = { data, from: time, to: now, pattern: analyticUnit.type }; + let payloadValues = data.values; + let payload = { data: payloadValues, from: time, to: now, pattern: analyticUnit.type }; this._unitTimes[analyticUnit.id] = now; this.pushData(analyticUnit, payload); }