diff --git a/analytics/analytic_types/data_bucket.py b/analytics/analytic_types/data_bucket.py index 5eb3809..c1a91df 100755 --- a/analytics/analytic_types/data_bucket.py +++ b/analytics/analytic_types/data_bucket.py @@ -1,14 +1,33 @@ +from typing import Optional + import pandas as pd class DataBucket: - def __init__(self): + def __init__(self, max_size: Optional[int] = None): + self.set_max_size(max_size) self.data = pd.DataFrame([], columns=['timestamp', 'value']) - def receive_data(self, data: pd.DataFrame): + def append_data(self, data: pd.DataFrame) -> None: self.data = self.data.append(data, ignore_index=True) - def drop_data(self, count: int): + if self.max_size == None: + return + + bucket_size = self.get_current_size() + if bucket_size > self.max_size: + extra_data_count = bucket_size - self.max_size + self.drop_data(extra_data_count) + + def drop_data(self, count: int) -> None: if count > 0: self.data = self.data.iloc[count:] + + def set_max_size(self, max_size: Optional[int] = None) -> None: + if max_size is not None and max_size < 0: + raise Exception(f'Can`t set negative max size for bucket: {max_size}') + self.max_size = max_size + + def get_current_size(self) -> int: + return len(self.data) diff --git a/analytics/detectors/anomaly_detector.py b/analytics/detectors/anomaly_detector.py index 7885d01..870bde4 100644 --- a/analytics/detectors/anomaly_detector.py +++ b/analytics/detectors/anomaly_detector.py @@ -17,6 +17,9 @@ import utils MAX_DEPENDENCY_LEVEL = 100 MIN_DEPENDENCY_FACTOR = 0.1 BASIC_ALPHA = 0.5 + +BUCKET_SIZE = MAX_DEPENDENCY_LEVEL + logger = logging.getLogger('ANOMALY_DETECTOR') @@ -105,9 +108,12 @@ class AnomalyDetector(ProcessingDetector): if len(data_without_nan) == 0: return None - self.bucket.receive_data(data_without_nan) + window_size = self.get_window_size(cache) + + self.bucket.set_max_size(BUCKET_SIZE) + self.bucket.append_data(data_without_nan) - if len(self.bucket.data) >= self.get_window_size(cache): + if self.bucket.get_current_size() >= window_size: return self.detect(self.bucket.data, cache) return None @@ -264,7 +270,8 @@ class AnomalyDetector(ProcessingDetector): yield Segment( utils.convert_pd_timestamp_to_ms(segment_start), utils.convert_pd_timestamp_to_ms(segment_end), - message=f'{val} out of {str(bound.value)} bound' + # TODO: configurable decimals number + message=f'{val:.2f} out of {str(bound.value)} bound' ) in_segment = False else: @@ -273,5 +280,5 @@ class AnomalyDetector(ProcessingDetector): return Segment( utils.convert_pd_timestamp_to_ms(segment_start), utils.convert_pd_timestamp_to_ms(segment_end), - message=f'{val} out of {str(bound.value)} bound' + message=f'{val:.2f} out of {str(bound.value)} bound' ) diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index 3e3a949..4b1a7c4 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -110,23 +110,20 @@ class PatternDetector(Detector): if len(data_without_nan) == 0: return None - self.bucket.receive_data(data_without_nan) - # TODO: use ModelState window_size = cache['windowSize'] + bucket_max_size = max(window_size * self.BUCKET_WINDOW_SIZE_FACTOR, self.MIN_BUCKET_SIZE) + + self.bucket.set_max_size(bucket_max_size) + self.bucket.append_data(data_without_nan) - bucket_len = len(self.bucket.data) - if bucket_len < window_size * 2: - msg = f'{self.analytic_unit_id} bucket data {bucket_len} less than two window size {window_size * 2}, skip run detection from consume_data' + bucket_size = self.bucket.get_current_size() + if bucket_size < window_size * 2: + msg = f'{self.analytic_unit_id} bucket data {bucket_size} less than two window size {window_size * 2}, skip run detection from consume_data' logger.debug(msg) return None - res = self.detect(self.bucket.data, cache) - - bucket_size = max(window_size * self.BUCKET_WINDOW_SIZE_FACTOR, self.MIN_BUCKET_SIZE) - if bucket_len > bucket_size: - excess_data = bucket_len - bucket_size - self.bucket.drop_data(excess_data) + res = self.detect(self.bucket.data, cache) logging.debug('End consume_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, str(res.to_json()))) diff --git a/tests/test_bucket.py b/tests/test_bucket.py index 8bd138c..203a1d2 100644 --- a/tests/test_bucket.py +++ b/tests/test_bucket.py @@ -13,17 +13,17 @@ class TestBucket(unittest.TestCase): data_val = list(range(6)) timestamp_list = create_list_of_timestamps(len(data_val)) for val in data_val: - bucket.receive_data(get_pd_dataframe([val], [1523889000000 + val])) + bucket.append_data(get_pd_dataframe([val], [1523889000000 + val])) for idx, row in bucket.data.iterrows(): self.assertEqual(data_val[idx], row['value']) self.assertEqual(timestamp_list[idx], row['timestamp']) - def test_drop_data(self): + def test_max_size(self): bucket = DataBucket() data_val = list(range(10)) timestamp_list = create_list_of_timestamps(len(data_val)) - bucket.receive_data(get_pd_dataframe(data_val, timestamp_list)) - bucket.drop_data(5) + bucket.set_max_size(5) + bucket.append_data(get_pd_dataframe(data_val, timestamp_list)) expected_data = data_val[5:] expected_timestamp = timestamp_list[5:] self.assertEqual(expected_data, bucket.data['value'].tolist())