diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 02f7cff..3ede390 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -17,6 +17,8 @@ class AnalyticUnitWorker: self._detector = detector self._executor: Executor = executor self._training_feature: asyncio.Future = None + self._detection_feature: asyncio.Future = None + self._recieve_feature: asyncio.Future = None async def do_train( self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache] @@ -31,11 +33,30 @@ class AnalyticUnitWorker: return cache async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: - return self._detector.detect(data, cache) + self._detection_feature = asyncio.get_event_loop().run_in_executor( + self._executor, self._detector.detect, data, cache + ) + try: + detect_results = await self._detection_feature + return detect_results + except CancelledError as e: + return {} + def cancel(self): if self._training_feature is not None: self._training_feature.cancel() + if self._detection_feature is not None: + self._detection_feature.cancel() + if self._recieve_feature is not None: + self._recieve_feature.cancel() async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): - return self._detector.recieve_data(data, cache) + self._recieve_feature = asyncio.get_event_loop().run_in_executor( + self._executor, self._detector.recieve_data, data, cache + ) + try: + detect_results = await self._recieve_feature + return detect_results + except CancelledError as e: + return None