diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 3501334..decc4b1 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -57,7 +57,14 @@ class AnalyticUnitWorker: 'lastDetectionTime': None } - for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size): + chunks = [] + # XXX: get_chunks(data, chunk_size) == get_intersected_chunks(data, 0, chunk_size) + if self._detector.is_detection_intersected(): + chunks = get_intersected_chunks(data, chunk_intersection, chunk_size) + else: + chunks = get_chunks(data, chunk_size) + + for chunk in chunks: await asyncio.sleep(0) chunk_dataframe = prepare_data(chunk) detected = self._detector.detect(chunk_dataframe, cache) diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 06887c1..c248001 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -25,24 +25,28 @@ class AnomalyDetector(Detector): def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: data = dataframe['value'] - last_values = None + alpha = cache['alpha'] + confidence = cache['confidence'] + + last_value = None if cache is not None: - last_values = cache['last_values'] + last_value = cache.get('lastValue') - #TODO detection code here - smoth_data = utils.exponential_smoothing(data, cache['alpha']) - upper_bound = utils.exponential_smoothing(data + cache['confidence'], cache['alpha']) - lower_bound = utils.exponential_smoothing(data - cache['confidence'], cache['alpha']) + smooth_data = utils.exponential_smoothing(data, alpha, last_value) + upper_bound = utils.exponential_smoothing(data + confidence, alpha, last_value) + lower_bound = utils.exponential_smoothing(data - confidence, alpha, last_value) - segemnts = [] + segments = [] for idx, val in enumerate(data.values): if val > upper_bound[idx] or val < lower_bound[idx]: - segemnts.append(idx) + segments.append(idx) last_detection_time = dataframe['timestamp'][-1] + cache['lastValue'] = smooth_data[-1] + return { 'cache': cache, - 'segments': segemnts, + 'segments': segments, 'lastDetectionTime': last_detection_time } @@ -76,3 +80,7 @@ class AnomalyDetector(Detector): #TODO: calculate value based on `alpha` value from cache return 1 + + + def is_detection_intersected(self) -> bool: + return False \ No newline at end of file diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 6b44161..c0dc63e 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -24,3 +24,6 @@ class Detector(ABC): @abstractmethod def get_window_size(self, cache: Optional[ModelCache]) -> int: pass + + def is_detection_intersected(self) -> bool: + return True