Browse Source

Cancel learning on analytic unit deletion #266 (#269)

* basic cancelation in analytics

* cancelation task on node

* basic cancelation in analytics
pull/1/head
Alexey Velikiy 6 years ago committed by GitHub
parent
commit
ce8523fafa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      analytics/analytics/analytic_unit_manager.py
  2. 24
      analytics/analytics/analytic_unit_worker.py
  3. 6
      server/src/controllers/analytics_controller.ts
  4. 3
      server/src/models/analytics_task_model.ts
  5. 3
      server/src/routes/analytic_units_router.ts

15
analytics/analytics/analytic_unit_manager.py

@ -39,7 +39,7 @@ class AnalyticUnitManager:
self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict() self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS) self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS)
def __ensure_worker(self, analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: def __ensure_worker(self, analytic_unit_id: AnalyticUnitId, analytic_unit_type) -> AnalyticUnitWorker:
if analytic_unit_id in self.analytic_workers: if analytic_unit_id in self.analytic_workers:
# TODO: check that type is the same # TODO: check that type is the same
return self.analytic_workers[analytic_unit_id] return self.analytic_workers[analytic_unit_id]
@ -50,12 +50,21 @@ class AnalyticUnitManager:
async def handle_analytic_task(self, task): async def handle_analytic_task(self, task):
try: try:
analytic_unit_id: AnalyticUnitId = task['analyticUnitId']
if task['type'] == 'CANCEL':
if analytic_unit_id in self.analytic_workers:
self.analytic_workers[analytic_unit_id].cancel()
return {
'status': 'SUCCESS'
}
payload = task['payload'] payload = task['payload']
worker = self.__ensure_worker(task['analyticUnitId'], payload['pattern']) worker = self.__ensure_worker(analytic_unit_id, payload['pattern'])
data = prepare_data(payload['data']) data = prepare_data(payload['data'])
result_payload = {} result_payload = {}
if task['type'] == 'LEARN': if task['type'] == 'LEARN':
result_payload = await worker.do_learn(payload['segments'], data, payload['cache']) result_payload = await worker.do_train(payload['segments'], data, payload['cache'])
elif task['type'] == 'PREDICT': elif task['type'] == 'PREDICT':
result_payload = await worker.do_predict(data, payload['cache']) result_payload = await worker.do_predict(data, payload['cache'])
else: else:

24
analytics/analytics/analytic_unit_worker.py

@ -4,7 +4,7 @@ import logging
import pandas as pd import pandas as pd
from typing import Optional from typing import Optional
from models import AnalyticUnitCache from models import AnalyticUnitCache
from concurrent.futures import Executor from concurrent.futures import Executor, CancelledError
import asyncio import asyncio
logger = logging.getLogger('AnalyticUnitWorker') logger = logging.getLogger('AnalyticUnitWorker')
@ -14,16 +14,26 @@ class AnalyticUnitWorker:
def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: Executor): def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: Executor):
self.analytic_unit_id = analytic_unit_id self.analytic_unit_id = analytic_unit_id
self.detector = detector self._detector = detector
self.executor: Executor = executor self._executor: Executor = executor
self._training_feature: asyncio.Future = None
async def do_learn( async def do_train(
self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache] self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]
) -> AnalyticUnitCache: ) -> AnalyticUnitCache:
new_cache: AnalyticUnitCache = await asyncio.get_event_loop().run_in_executor( self._training_feature = asyncio.get_event_loop().run_in_executor(
self.executor, self.detector.train, data, segments, cache self._executor, self._detector.train, data, segments, cache
) )
try:
new_cache: AnalyticUnitCache = await self._training_feature
return new_cache return new_cache
except CancelledError as e:
return cache
async def do_predict(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: async def do_predict(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
return self.detector.predict(data, cache) return self._detector.predict(data, cache)
def cancel(self):
if self._training_feature is not None:
self._training_feature.cancel()

6
server/src/controllers/analytics_controller.ts

@ -210,6 +210,12 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
} }
} }
export async function remove(id: AnalyticUnit.AnalyticUnitId) {
let task = new AnalyticsTask(id, AnalyticsTaskType.CANCEL);
await runTask(task);
await AnalyticUnit.remove(id);
}
export async function deleteNonpredictedSegments(id, payload) { export async function deleteNonpredictedSegments(id, payload) {
let lastPredictedSegments = await Segment.findMany(id, { labeled: false, deleted: false }); let lastPredictedSegments = await Segment.findMany(id, { labeled: false, deleted: false });
let segmentsToRemove: Segment.Segment[]; let segmentsToRemove: Segment.Segment[];

3
server/src/models/analytics_task_model.ts

@ -8,7 +8,8 @@ const UID_LENGTH = 16;
export type AnalyticsTaskId = string; export type AnalyticsTaskId = string;
export enum AnalyticsTaskType { export enum AnalyticsTaskType {
LEARN = 'LEARN', LEARN = 'LEARN',
PREDICT = 'PREDICT' PREDICT = 'PREDICT',
CANCEL = 'CANCEL'
}; };
export class AnalyticsTask { export class AnalyticsTask {

3
server/src/routes/analytic_units_router.ts

@ -1,3 +1,4 @@
import * as AnalyticsController from '../controllers/analytics_controller';
import * as AnalyticUnit from '../models/analytic_unit_model'; import * as AnalyticUnit from '../models/analytic_unit_model';
import { createAnalyticUnitFromObject } from '../controllers/analytics_controller'; import { createAnalyticUnitFromObject } from '../controllers/analytics_controller';
@ -82,7 +83,7 @@ async function deleteUnit(ctx: Router.IRouterContext) {
if(analyticUnitId === undefined) { if(analyticUnitId === undefined) {
throw new Error('Cannot delete undefined id'); throw new Error('Cannot delete undefined id');
} }
await AnalyticUnit.remove(analyticUnitId); await AnalyticsController.remove(analyticUnitId);
ctx.response.body = { ctx.response.body = {
code: 200, code: 200,
message: 'Success' message: 'Success'

Loading…
Cancel
Save