Browse Source

asyncio.wait_for training_future

pull/1/head
Coin de Gamma 5 years ago
parent
commit
4dd870ce2b
  1. 18
      analytics/analytics/analytic_unit_worker.py

18
analytics/analytics/analytic_unit_worker.py

@ -4,32 +4,34 @@ import logging
import pandas as pd import pandas as pd
from typing import Optional, Union from typing import Optional, Union
from models import ModelCache from models import ModelCache
from concurrent.futures import Executor, CancelledError, TimeoutError import concurrent.futures
import asyncio import asyncio
logger = logging.getLogger('AnalyticUnitWorker') logger = logging.getLogger('AnalyticUnitWorker')
class AnalyticUnitWorker: class AnalyticUnitWorker:
def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: Executor): def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: concurrent.futures.Executor):
self.analytic_unit_id = analytic_unit_id self.analytic_unit_id = analytic_unit_id
self._detector = detector self._detector = detector
self._executor: Executor = executor self._executor: concurrent.futures.Executor = executor
self._training_future: asyncio.Future = None self._training_future: asyncio.Future = None
async def do_train( async def do_train(
self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache] self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache]
) -> ModelCache: ) -> ModelCache:
self._training_future = self._executor.submit( cfuture: concurrent.futures.Future = self._executor.submit(
self._detector.train, data, payload, cache self._detector.train, data, payload, cache
) )
self._training_future = asyncio.wrap_future(cfuture)
try: try:
new_cache: ModelCache = self._training_future.result(timeout = config.LEARNING_TIMEOUT) new_cache: ModelCache = await asyncio.wait_for(self._training_future, timeout = config.LEARNING_TIMEOUT)
return new_cache return new_cache
except CancelledError: except asyncio.CancelledError:
return cache return None
except TimeoutError: except asyncio.TimeoutError:
raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT))
async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict:

Loading…
Cancel
Save