diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index d4478d7..e81ad08 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -44,7 +44,7 @@ class AnalyticUnitManager: self.analytic_workers[analytic_unit_id] = worker return worker - async def __handle_analytic_task(self, task) -> dict: + async def __handle_analytic_task(self, task: object) -> dict: """ returns payload or None """ @@ -80,7 +80,7 @@ class AnalyticUnitManager: raise ValueError('Unknown task type "%s"' % task['type']) - async def handle_analytic_task(self, task): + async def handle_analytic_task(self, task: object): try: log.debug('Start handle_analytic_task with analytic unit: {}'.format(task['analyticUnitId'])) result_payload = await self.__handle_analytic_task(task) diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 7ed0aa1..c94961c 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -29,7 +29,7 @@ class AnalyticUnitWorker: self._training_future: asyncio.Future = None async def do_train( - self, payload: Union[list, dict], data: list, cache: Optional[ModelCache] + self, payload: Union[list, dict], data: TimeSeries, cache: Optional[ModelCache] ) -> Optional[ModelCache]: dataframe = prepare_data(data) @@ -94,7 +94,7 @@ class AnalyticUnitWorker: detection_result = self._detector.concat_detection_results(detections) return detection_result.to_json() - async def process_data(self, data: list, cache: ModelCache) -> dict: + async def process_data(self, data: TimeSeries, cache: ModelCache) -> dict: assert isinstance(self._detector, detectors.ProcessingDetector), \ f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data' assert cache is not None, f'{self.analytic_unit_id} got empty cache for processing data' diff --git a/analytics/analytics/config.py b/analytics/analytics/config.py index a9ec31f..293b0a6 100644 --- a/analytics/analytics/config.py +++ b/analytics/analytics/config.py @@ -14,7 +14,7 @@ else: print('Config file %s doesn`t exist, using defaults' % CONFIG_FILE) -def get_config_field(field, default_val = None): +def get_config_field(field: str, default_val = None): if field in os.environ: return os.environ[field] diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 3a146b8..f7ae2ca 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from pandas import DataFrame from typing import Optional, Union, List -from analytic_types import ModelCache +from analytic_types import ModelCache, TimeSeries from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.segment import Segment @@ -43,7 +43,7 @@ class Detector(ABC): class ProcessingDetector(Detector): @abstractmethod - def process_data(self, data, cache: Optional[ModelCache]) -> ProcessingResult: + def process_data(self, data: TimeSeries, cache: Optional[ModelCache]) -> ProcessingResult: pass def concat_processing_results(self, processing_results: List[ProcessingResult]) -> Optional[ProcessingResult]: diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 74ffc29..7052294 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -116,7 +116,7 @@ class PatternDetector(Detector): excess_data = bucket_len - bucket_size self.bucket.drop_data(excess_data) - logging.debug('End consume_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res)) + logging.debug('End consume_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, str(res.to_json()))) if res: return res diff --git a/analytics/analytics/models/model.py b/analytics/analytics/models/model.py index bba07ec..e945243 100644 --- a/analytics/analytics/models/model.py +++ b/analytics/analytics/models/model.py @@ -68,16 +68,16 @@ class ModelState(): def __init__( self, - pattern_center: List[int] = [], - pattern_model: List[float] = [], + pattern_center: List[int] = None, + pattern_model: List[float] = None, convolve_max: float = 0, convolve_min: float = 0, window_size: int = 0, conv_del_min: float = 0, conv_del_max: float = 0 ): - self.pattern_center = pattern_center - self.pattern_model = pattern_model + self.pattern_center = pattern_center if pattern_center is not None else [] + self.pattern_model = pattern_model if pattern_model is not None else [] self.convolve_max = convolve_max self.convolve_min = convolve_min self.window_size = window_size diff --git a/analytics/analytics/utils/common.py b/analytics/analytics/utils/common.py index 7191b9a..6aee340 100644 --- a/analytics/analytics/utils/common.py +++ b/analytics/analytics/utils/common.py @@ -36,16 +36,6 @@ def exponential_smoothing(series: pd.Series, alpha: float, last_smoothed_value: result.append(alpha * series[n] + (1 - alpha) * result[n - 1]) return pd.Series(result, index = series.index) -def segments_box(segments): - max_time = 0 - min_time = float("inf") - for segment in segments: - min_time = min(min_time, segment['from']) - max_time = max(max_time, segment['to']) - min_time = pd.to_datetime(min_time, unit='ms') - max_time = pd.to_datetime(max_time, unit='ms') - return min_time, max_time - def find_pattern(data: pd.Series, height: float, length: int, pattern_type: str) -> list: pattern_list = [] right_bound = len(data) - length - 1 @@ -59,7 +49,7 @@ def find_pattern(data: pd.Series, height: float, length: int, pattern_type: str) pattern_list.append(i) return pattern_list -def find_jump(data, height, lenght) -> List[int]: +def find_jump(data, height: float, lenght: int) -> List[int]: ''' Find jump indexes ''' @@ -70,7 +60,7 @@ def find_jump(data, height, lenght) -> List[int]: j_list.append(i) return(j_list) -def find_drop(data, height, length) -> List[int]: +def find_drop(data, height: float, length: int) -> List[int]: ''' Find drop indexes ''' @@ -81,7 +71,7 @@ def find_drop(data, height, length) -> List[int]: d_list.append(i) return(d_list) -def timestamp_to_index(dataframe, timestamp): +def timestamp_to_index(dataframe: pd.DataFrame, timestamp: int): data = dataframe['timestamp'] idx, = np.where(data >= timestamp) if len(idx) > 0: @@ -100,16 +90,16 @@ def find_peaks(data: Generator[float, None, None], size: int) -> Generator[float window.append(v) window.popleft() -def ar_mean(numbers): +def ar_mean(numbers: List[float]): return float(sum(numbers)) / max(len(numbers), 1) -def get_av_model(patterns_list): +def get_av_model(patterns_list: list): if not patterns_list: return [] patterns_list = get_same_length(patterns_list) value_list = list(map(list, zip(*patterns_list))) return list(map(ar_mean, value_list)) -def get_same_length(patterns_list): +def get_same_length(patterns_list: list): for index in range(len(patterns_list)): if type(patterns_list[index]) == pd.Series: patterns_list[index] = patterns_list[index].tolist() @@ -223,7 +213,7 @@ def find_confidence(segment: pd.Series) -> (float, float): else: return (0, 0) -def find_width(pattern: pd.Series, selector) -> int: +def find_width(pattern: pd.Series, selector: bool) -> int: pattern = pattern.values center = utils.find_extremum_index(pattern, selector) pattern_left = pattern[:center] @@ -458,6 +448,6 @@ def cut_dataframe(data: pd.DataFrame) -> pd.DataFrame: data['value'] = data['value'] - data_min return data -def get_min_max(array, default): +def get_min_max(array: list, default): return float(min(array, default=default)), float(max(array, default=default))