Browse Source

Analytic unit worker bucket #273 (#297)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
ec3a3a093a
  1. 9
      analytics/analytics/analytic_unit_manager.py
  2. 3
      analytics/analytics/analytic_unit_worker.py
  3. 1
      analytics/analytics/buckets/__init__.py
  4. 13
      analytics/analytics/buckets/data_bucket.py
  5. 4
      analytics/analytics/detectors/detector.py
  6. 16
      analytics/analytics/detectors/pattern_detector.py
  7. 10
      analytics/bin/server
  8. 3
      server/src/models/analytics_message_model.ts
  9. 6
      server/src/services/analytics_service.ts
  10. 5
      server/src/services/data_puller.ts

9
analytics/analytics/analytic_unit_manager.py

@ -48,14 +48,10 @@ class AnalyticUnitManager:
self.analytic_workers[analytic_unit_id] = worker self.analytic_workers[analytic_unit_id] = worker
return worker return worker
async def __handle_analytic_task(self, task) -> dict: async def __handle_analytic_task(self, task) -> dict:
""" """
returns payload or None returns payload or None
""" """
if task['type'] == 'PUSH':
# TODO: implement PUSH message handling
return
analytic_unit_id: AnalyticUnitId = task['analyticUnitId'] analytic_unit_id: AnalyticUnitId = task['analyticUnitId']
if task['type'] == 'CANCEL': if task['type'] == 'CANCEL':
@ -66,14 +62,15 @@ class AnalyticUnitManager:
payload = task['payload'] payload = task['payload']
worker = self.__ensure_worker(analytic_unit_id, payload['pattern']) worker = self.__ensure_worker(analytic_unit_id, payload['pattern'])
data = prepare_data(payload['data']) data = prepare_data(payload['data'])
if task['type'] == 'LEARN': if task['type'] == 'PUSH':
return await worker.recieve_data(data)
elif task['type'] == 'LEARN':
return await worker.do_train(payload['segments'], data, payload['cache']) return await worker.do_train(payload['segments'], data, payload['cache'])
elif task['type'] == 'DETECT': elif task['type'] == 'DETECT':
return await worker.do_detect(data, payload['cache']) return await worker.do_detect(data, payload['cache'])
raise ValueError('Unknown task type "%s"' % task['type']) raise ValueError('Unknown task type "%s"' % task['type'])
async def handle_analytic_task(self, task): async def handle_analytic_task(self, task):
try: try:
result_payload = await self.__handle_analytic_task(task) result_payload = await self.__handle_analytic_task(task)

3
analytics/analytics/analytic_unit_worker.py

@ -36,3 +36,6 @@ class AnalyticUnitWorker:
def cancel(self): def cancel(self):
if self._training_feature is not None: if self._training_feature is not None:
self._training_feature.cancel() self._training_feature.cancel()
async def recieve_data(self, data: pd.DataFrame):
return self._detector.recieve_data(data)

1
analytics/analytics/buckets/__init__.py

@ -0,0 +1 @@
from buckets.data_bucket import DataBucket

13
analytics/analytics/buckets/data_bucket.py

@ -0,0 +1,13 @@
import pandas as pd
class DataBucket(object):
data: pd.DataFrame
def __init__(self):
self.data = pd.DataFrame([], columns=['timestamp', 'value'])
def receive_data(self, data: pd.DataFrame):
self.data = self.data.append(data, ignore_index=True)
def drop_data(self, count: int):
self.data = self.data.iloc[count:]

4
analytics/analytics/detectors/detector.py

@ -16,3 +16,7 @@ class Detector(ABC):
@abstractmethod @abstractmethod
def detect(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: def detect(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
pass pass
@abstractmethod
def recieve_data(self, data: DataFrame) -> Optional[dict]:
pass

16
analytics/analytics/detectors/pattern_detector.py

@ -7,6 +7,7 @@ import pandas as pd
from typing import Optional from typing import Optional
from detectors import Detector from detectors import Detector
from buckets import DataBucket
logger = logging.getLogger('PATTERN_DETECTOR') logger = logging.getLogger('PATTERN_DETECTOR')
@ -33,7 +34,8 @@ class PatternDetector(Detector):
def __init__(self, pattern_type): def __init__(self, pattern_type):
self.pattern_type = pattern_type self.pattern_type = pattern_type
self.model = resolve_model_by_pattern(self.pattern_type) self.model = resolve_model_by_pattern(self.pattern_type)
window_size = 100 self.window_size = 100
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache: def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache:
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
@ -56,3 +58,15 @@ class PatternDetector(Detector):
'segments': segments, 'segments': segments,
'lastDetectionTime': last_detection_time 'lastDetectionTime': last_detection_time
} }
def recieve_data(self, data: pd.DataFrame) -> Optional[dict]:
self.bucket.receive_data(data)
if len(self.bucket.data) >= self.window_size:
res = self.detect(self.bucket.data)
excess_data = len(self.bucket.data) - self.window_size
self.bucket.drop_data(excess_data)
return res
return None

10
analytics/bin/server

@ -65,10 +65,20 @@ async def handle_task(task: object):
error_text = traceback.format_exc() error_text = traceback.format_exc()
logger.error("handle_task Exception: '%s'" % error_text) logger.error("handle_task Exception: '%s'" % error_text)
async def handle_data(task: object):
res = await analytic_unit_manager.handle_analytic_task(task)
if res['status'] == 'SUCCESS' and res['payload'] is not None:
res['_id'] = task['_id']
message = services.server_service.ServerMessage('DETECT', res)
await server_service.send_message(message)
async def handle_message(message: services.ServerMessage): async def handle_message(message: services.ServerMessage):
payload = None payload = None
if message.method == 'TASK': if message.method == 'TASK':
await handle_task(message.payload) await handle_task(message.payload)
if message.method == 'DATA':
await handle_data(message.payload)
def init_services(): def init_services():
logger.info("Starting services...") logger.info("Starting services...")

3
server/src/models/analytics_message_model.ts

@ -1,7 +1,8 @@
export enum AnalyticsMessageMethod { export enum AnalyticsMessageMethod {
TASK = 'TASK', TASK = 'TASK',
TASK_RESULT = 'TASK_RESULT', TASK_RESULT = 'TASK_RESULT',
DETECT = 'DETECT' DETECT = 'DETECT',
DATA = 'DATA'
} }
export class AnalyticsMessage { export class AnalyticsMessage {

6
server/src/services/analytics_service.ts

@ -1,4 +1,4 @@
import { AnalyticsTask } from '../models/analytics_task_model'; import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model';
import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'; import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model';
import * as config from '../config'; import * as config from '../config';
@ -31,8 +31,10 @@ export class AnalyticsService {
if(!this._ready) { if(!this._ready) {
return Promise.reject("Analytics is not ready"); return Promise.reject("Analytics is not ready");
} }
let method = task.type === AnalyticsTaskType.PUSH ?
AnalyticsMessageMethod.DATA : AnalyticsMessageMethod.TASK
let message = new AnalyticsMessage( let message = new AnalyticsMessage(
AnalyticsMessageMethod.TASK, method,
task.toObject() task.toObject()
); );
return this.sendMessage(message); return this.sendMessage(message);

5
server/src/services/data_puller.ts

@ -59,7 +59,7 @@ export class DataPuller {
} }
private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) {
const time = analyticUnit.lastDetectionTime || Date.now(); const time = analyticUnit.lastDetectionTime + 1 || Date.now();
this._unitTimes[analyticUnit.id] = time; this._unitTimes[analyticUnit.id] = time;
const dataGenerator = this.getDataGenerator( const dataGenerator = this.getDataGenerator(
@ -76,7 +76,8 @@ export class DataPuller {
} }
const now = Date.now(); const now = Date.now();
let payload = { data, from: time, to: now, pattern: analyticUnit.type }; let payloadValues = data.values;
let payload = { data: payloadValues, from: time, to: now, pattern: analyticUnit.type };
this._unitTimes[analyticUnit.id] = now; this._unitTimes[analyticUnit.id] = now;
this.pushData(analyticUnit, payload); this.pushData(analyticUnit, payload);
} }

Loading…
Cancel
Save