From 006adbfeb3fc13a4a894acae3ca9388e3c95c9f0 Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Mon, 25 Feb 2019 04:02:51 +0300 Subject: [PATCH] Futures for detect and recieve #264 (#433) --- analytics/analytics/analytic_unit_worker.py | 25 +++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 02f7cff..3ede390 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -17,6 +17,8 @@ class AnalyticUnitWorker: self._detector = detector self._executor: Executor = executor self._training_feature: asyncio.Future = None + self._detection_feature: asyncio.Future = None + self._recieve_feature: asyncio.Future = None async def do_train( self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache] @@ -31,11 +33,30 @@ class AnalyticUnitWorker: return cache async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: - return self._detector.detect(data, cache) + self._detection_feature = asyncio.get_event_loop().run_in_executor( + self._executor, self._detector.detect, data, cache + ) + try: + detect_results = await self._detection_feature + return detect_results + except CancelledError as e: + return {} + def cancel(self): if self._training_feature is not None: self._training_feature.cancel() + if self._detection_feature is not None: + self._detection_feature.cancel() + if self._recieve_feature is not None: + self._recieve_feature.cancel() async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): - return self._detector.recieve_data(data, cache) + self._recieve_feature = asyncio.get_event_loop().run_in_executor( + self._executor, self._detector.recieve_data, data, cache + ) + try: + detect_results = await self._recieve_feature + return detect_results + except CancelledError as e: + return None