From ce8523fafa67b76ba774104a258ec58648ecab61 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 26 Nov 2018 12:28:48 +0300 Subject: [PATCH] Cancel learning on analytic unit deletion #266 (#269) * basic cancelation in analytics * cancelation task on node * basic cancelation in analytics --- analytics/analytics/analytic_unit_manager.py | 15 ++++++++--- analytics/analytics/analytic_unit_worker.py | 26 +++++++++++++------ .../src/controllers/analytics_controller.ts | 6 +++++ server/src/models/analytics_task_model.ts | 3 ++- server/src/routes/analytic_units_router.ts | 3 ++- 5 files changed, 40 insertions(+), 13 deletions(-) diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index f009bfb..ec65053 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -39,7 +39,7 @@ class AnalyticUnitManager: self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict() self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS) - def __ensure_worker(self, analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: + def __ensure_worker(self, analytic_unit_id: AnalyticUnitId, analytic_unit_type) -> AnalyticUnitWorker: if analytic_unit_id in self.analytic_workers: # TODO: check that type is the same return self.analytic_workers[analytic_unit_id] @@ -50,12 +50,21 @@ class AnalyticUnitManager: async def handle_analytic_task(self, task): try: + analytic_unit_id: AnalyticUnitId = task['analyticUnitId'] + + if task['type'] == 'CANCEL': + if analytic_unit_id in self.analytic_workers: + self.analytic_workers[analytic_unit_id].cancel() + return { + 'status': 'SUCCESS' + } + payload = task['payload'] - worker = self.__ensure_worker(task['analyticUnitId'], payload['pattern']) + worker = self.__ensure_worker(analytic_unit_id, payload['pattern']) data = prepare_data(payload['data']) result_payload = {} if task['type'] == 'LEARN': - result_payload = await worker.do_learn(payload['segments'], data, payload['cache']) + result_payload = await worker.do_train(payload['segments'], data, payload['cache']) elif task['type'] == 'PREDICT': result_payload = await worker.do_predict(data, payload['cache']) else: diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index cc26efc..05fcc4b 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -4,7 +4,7 @@ import logging import pandas as pd from typing import Optional from models import AnalyticUnitCache -from concurrent.futures import Executor +from concurrent.futures import Executor, CancelledError import asyncio logger = logging.getLogger('AnalyticUnitWorker') @@ -14,16 +14,26 @@ class AnalyticUnitWorker: def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: Executor): self.analytic_unit_id = analytic_unit_id - self.detector = detector - self.executor: Executor = executor + self._detector = detector + self._executor: Executor = executor + self._training_feature: asyncio.Future = None - async def do_learn( + async def do_train( self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache] ) -> AnalyticUnitCache: - new_cache: AnalyticUnitCache = await asyncio.get_event_loop().run_in_executor( - self.executor, self.detector.train, data, segments, cache + self._training_feature = asyncio.get_event_loop().run_in_executor( + self._executor, self._detector.train, data, segments, cache ) - return new_cache + try: + new_cache: AnalyticUnitCache = await self._training_feature + return new_cache + except CancelledError as e: + return cache + async def do_predict(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: - return self.detector.predict(data, cache) + return self._detector.predict(data, cache) + + def cancel(self): + if self._training_feature is not None: + self._training_feature.cancel() diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 7a2e49e..f363d37 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -210,6 +210,12 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { } } +export async function remove(id: AnalyticUnit.AnalyticUnitId) { + let task = new AnalyticsTask(id, AnalyticsTaskType.CANCEL); + await runTask(task); + await AnalyticUnit.remove(id); +} + export async function deleteNonpredictedSegments(id, payload) { let lastPredictedSegments = await Segment.findMany(id, { labeled: false, deleted: false }); let segmentsToRemove: Segment.Segment[]; diff --git a/server/src/models/analytics_task_model.ts b/server/src/models/analytics_task_model.ts index 8f0f84f..c40e74d 100644 --- a/server/src/models/analytics_task_model.ts +++ b/server/src/models/analytics_task_model.ts @@ -8,7 +8,8 @@ const UID_LENGTH = 16; export type AnalyticsTaskId = string; export enum AnalyticsTaskType { LEARN = 'LEARN', - PREDICT = 'PREDICT' + PREDICT = 'PREDICT', + CANCEL = 'CANCEL' }; export class AnalyticsTask { diff --git a/server/src/routes/analytic_units_router.ts b/server/src/routes/analytic_units_router.ts index 7fbf283..7f0c233 100644 --- a/server/src/routes/analytic_units_router.ts +++ b/server/src/routes/analytic_units_router.ts @@ -1,3 +1,4 @@ +import * as AnalyticsController from '../controllers/analytics_controller'; import * as AnalyticUnit from '../models/analytic_unit_model'; import { createAnalyticUnitFromObject } from '../controllers/analytics_controller'; @@ -82,7 +83,7 @@ async function deleteUnit(ctx: Router.IRouterContext) { if(analyticUnitId === undefined) { throw new Error('Cannot delete undefined id'); } - await AnalyticUnit.remove(analyticUnitId); + await AnalyticsController.remove(analyticUnitId); ctx.response.body = { code: 200, message: 'Success'