Browse Source

Revert "Merge branch 'concatinate-chunks-for-anomaly-detector-#614'"

This reverts commit c6eb1bd4d2, reversing
changes made to 74d45bf4f4.
pull/1/head
amper43 6 years ago
parent
commit
75e8081182
  1. 11
      analytics/analytics/analytic_unit_worker.py
  2. 31
      analytics/analytics/detectors/anomaly_detector.py
  3. 3
      analytics/analytics/detectors/detector.py

11
analytics/analytics/analytic_unit_worker.py

@ -57,14 +57,7 @@ class AnalyticUnitWorker:
'lastDetectionTime': None 'lastDetectionTime': None
} }
chunks = [] for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size):
# XXX: get_chunks(data, chunk_size) == get_intersected_chunks(data, 0, chunk_size)
if self._detector.is_detection_intersected():
chunks = get_intersected_chunks(data, chunk_intersection, chunk_size)
else:
chunks = get_chunks(data, chunk_size)
for chunk in chunks:
await asyncio.sleep(0) await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk) chunk_dataframe = prepare_data(chunk)
detected = self._detector.detect(chunk_dataframe, cache) detected = self._detector.detect(chunk_dataframe, cache)
@ -93,7 +86,7 @@ class AnalyticUnitWorker:
self.__append_detection_result(detection_result, detected) self.__append_detection_result(detection_result, detected)
detection_result['segments'] = self._detector.get_intersections(detection_result['segments']) detection_result['segments'] = self._detector.get_intersections(detection_result['segments'])
if detection_result['lastDetectionTime'] is None: if detection_result['lastDetectionTime'] is None:
return None return None
else: else:

31
analytics/analytics/detectors/anomaly_detector.py

@ -7,7 +7,6 @@ from analytic_types.data_bucket import DataBucket
from detectors import Detector from detectors import Detector
from models import ModelCache from models import ModelCache
import utils import utils
from analytic_types import AnalyticUnitId
MAX_DEPENDENCY_LEVEL = 100 MAX_DEPENDENCY_LEVEL = 100
MIN_DEPENDENCY_FACTOR = 0.1 MIN_DEPENDENCY_FACTOR = 0.1
@ -16,8 +15,7 @@ logger = logging.getLogger('ANOMALY_DETECTOR')
class AnomalyDetector(Detector): class AnomalyDetector(Detector):
def __init__(self, analytic_unit_id: AnalyticUnitId): def __init__(self, *args, **kwargs):
self.analytic_unit_id = analytic_unit_id
self.bucket = DataBucket() self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
@ -30,10 +28,7 @@ class AnomalyDetector(Detector):
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
data = dataframe['value'] data = dataframe['value']
alpha = cache['alpha'] last_values = None
confidence = cache['confidence']
last_value = None
if cache is not None: if cache is not None:
last_values = cache.get('last_values') last_values = cache.get('last_values')
@ -60,23 +55,14 @@ class AnomalyDetector(Detector):
} }
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
if cache is None: self.detect(data, cache)
msg = f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}'
logging.debug(msg)
raise ValueError(msg)
data_without_nan = data.dropna()
if len(data_without_nan) == 0:
return None
self.bucket.receive_data(data_without_nan)
if len(self.bucket.data) >= self.get_window_size(cache):
self.detect(self.bucket, cache)
def get_window_size(self, cache: Optional[ModelCache]) -> int: def get_window_size(self, cache: Optional[ModelCache]) -> int:
'''
get the number of values that will affect the next value
'''
if cache is None: if cache is None:
raise ValueError('anomaly detector got None cache') raise ValueError('anomaly detector got None cache')
@ -90,6 +76,3 @@ class AnomalyDetector(Detector):
segments = utils.merge_intersecting_intervals(segments) segments = utils.merge_intersecting_intervals(segments)
segments = [{'from': segment[0], 'to': segment[1]} for segment in segments] segments = [{'from': segment[0], 'to': segment[1]} for segment in segments]
return segments return segments
def is_detection_intersected(self) -> bool:
return False

3
analytics/analytics/detectors/detector.py

@ -28,6 +28,3 @@ class Detector(ABC):
@abstractmethod @abstractmethod
def get_intersections(self, segments: List[dict]) -> List[dict]: def get_intersections(self, segments: List[dict]) -> List[dict]:
pass pass
def is_detection_intersected(self) -> bool:
return True

Loading…
Cancel
Save