Browse Source

Make class for detection result (#634)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
d1cb6f0406
  1. 1
      analytics/analytics/analytic_types/__init__.py
  2. 16
      analytics/analytics/analytic_types/detector_typing.py
  3. 3
      analytics/analytics/analytic_unit_manager.py
  4. 39
      analytics/analytics/analytic_unit_worker.py
  5. 14
      analytics/analytics/detectors/anomaly_detector.py
  6. 8
      analytics/analytics/detectors/detector.py
  7. 16
      analytics/analytics/detectors/pattern_detector.py
  8. 14
      analytics/analytics/detectors/threshold_detector.py
  9. 2
      analytics/analytics/models/__init__.py
  10. 2
      analytics/analytics/models/model.py
  11. 2
      analytics/tests/test_detectors.py

1
analytics/analytics/analytic_types/__init__.py

@ -14,6 +14,7 @@ from typing import Union, List
AnalyticUnitId = str AnalyticUnitId = str
ModelCache = dict
""" """
Example: Example:

16
analytics/analytics/analytic_types/detector_typing.py

@ -0,0 +1,16 @@
import utils.meta
from analytic_types import ModelCache
@utils.meta.JSONClass
class DetectionResult:
def __init__(
self,
cache: ModelCache = ModelCache(),
segments: list = [],
last_detection_time: int = None
):
self.cache = cache
self.segments = segments
self.last_detection_time = last_detection_time

3
analytics/analytics/analytic_unit_manager.py

@ -4,9 +4,8 @@ import traceback
from concurrent.futures import Executor, ThreadPoolExecutor from concurrent.futures import Executor, ThreadPoolExecutor
from analytic_unit_worker import AnalyticUnitWorker from analytic_unit_worker import AnalyticUnitWorker
from analytic_types import AnalyticUnitId from analytic_types import AnalyticUnitId, ModelCache
import detectors import detectors
from models import ModelCache
logger = log.getLogger('AnalyticUnitManager') logger = log.getLogger('AnalyticUnitManager')

39
analytics/analytics/analytic_unit_worker.py

@ -3,12 +3,13 @@ import detectors
import logging import logging
import pandas as pd import pandas as pd
from typing import Optional, Union, Generator, List from typing import Optional, Union, Generator, List
from models import ModelCache
import concurrent.futures import concurrent.futures
import asyncio import asyncio
import utils import utils
from utils import get_intersected_chunks, get_chunks, prepare_data from utils import get_intersected_chunks, get_chunks, prepare_data
from analytic_types import ModelCache
from analytic_types.detector_typing import DetectionResult
logger = logging.getLogger('AnalyticUnitWorker') logger = logging.getLogger('AnalyticUnitWorker')
@ -45,39 +46,30 @@ class AnalyticUnitWorker:
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT))
async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
window_size = self._detector.get_window_size(cache) window_size = self._detector.get_window_size(cache)
chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR
chunk_intersection = window_size * self.CHUNK_INTERSECTION_FACTOR chunk_intersection = window_size * self.CHUNK_INTERSECTION_FACTOR
detection_result = { detection_result = DetectionResult()
'cache': None,
'segments': [],
'lastDetectionTime': None
}
for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size): for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size):
await asyncio.sleep(0) await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk) chunk_dataframe = prepare_data(chunk)
detected = self._detector.detect(chunk_dataframe, cache) detected = self._detector.detect(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected) self.__append_detection_result(detection_result, detected)
detection_result['segments'] = self._detector.get_intersections(detection_result['segments']) detection_result.segments = self._detector.get_intersections(detection_result.segments)
return detection_result return detection_result.to_json()
def cancel(self): def cancel(self):
if self._training_future is not None: if self._training_future is not None:
self._training_future.cancel() self._training_future.cancel()
async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
window_size = self._detector.get_window_size(cache) window_size = self._detector.get_window_size(cache)
#TODO: make class DetectionResult detection_result = DetectionResult()
detection_result = {
'cache': None,
'segments': [],
'lastDetectionTime': None
}
for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0) await asyncio.sleep(0)
@ -85,15 +77,16 @@ class AnalyticUnitWorker:
detected = self._detector.consume_data(chunk_dataframe, cache) detected = self._detector.consume_data(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected) self.__append_detection_result(detection_result, detected)
detection_result['segments'] = self._detector.get_intersections(detection_result['segments']) detection_result.segments = self._detector.get_intersections(detection_result.segments)
if detection_result['lastDetectionTime'] is None: if detection_result.last_detection_time is None:
return None return None
else: else:
return detection_result return detection_result.to_json()
def __append_detection_result(self, detection_result: dict, new_chunk: dict): # TODO: move result concatenation to Detectors
def __append_detection_result(self, detection_result: DetectionResult, new_chunk: dict):
if new_chunk is not None: if new_chunk is not None:
detection_result['cache'] = new_chunk['cache'] detection_result.cache = new_chunk.cache
detection_result['lastDetectionTime'] = new_chunk['lastDetectionTime'] detection_result.last_detection_time = new_chunk.last_detection_time
detection_result['segments'].extend(new_chunk['segments']) detection_result.segments.extend(new_chunk.segments)

14
analytics/analytics/detectors/anomaly_detector.py

@ -2,10 +2,10 @@ import logging
import pandas as pd import pandas as pd
from typing import Optional, Union, List, Tuple from typing import Optional, Union, List, Tuple
from analytic_types import AnalyticUnitId from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult
from analytic_types.data_bucket import DataBucket from analytic_types.data_bucket import DataBucket
from detectors import Detector from detectors import Detector
from models import ModelCache
import utils import utils
MAX_DEPENDENCY_LEVEL = 100 MAX_DEPENDENCY_LEVEL = 100
@ -26,7 +26,7 @@ class AnomalyDetector(Detector):
} }
} }
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
data = dataframe['value'] data = dataframe['value']
last_values = None last_values = None
if cache is not None: if cache is not None:
@ -48,13 +48,9 @@ class AnomalyDetector(Detector):
) for segment in segments] ) for segment in segments]
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
return { return DetectionResult(cache, segments, last_detection_time)
'cache': cache,
'segments': segments,
'lastDetectionTime': last_detection_time
}
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
self.detect(data, cache) self.detect(data, cache)

8
analytics/analytics/detectors/detector.py

@ -1,8 +1,10 @@
from models import ModelCache
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pandas import DataFrame from pandas import DataFrame
from typing import Optional, Union, List from typing import Optional, Union, List
from analytic_types import ModelCache
from analytic_types.detector_typing import DetectionResult
class Detector(ABC): class Detector(ABC):
@ -14,11 +16,11 @@ class Detector(ABC):
pass pass
@abstractmethod @abstractmethod
def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
pass pass
@abstractmethod @abstractmethod
def consume_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def consume_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
pass pass
@abstractmethod @abstractmethod

16
analytics/analytics/detectors/pattern_detector.py

@ -9,9 +9,9 @@ from typing import Optional, Generator, List
from detectors import Detector from detectors import Detector
from analytic_types.data_bucket import DataBucket from analytic_types.data_bucket import DataBucket
from models import ModelCache
from utils import convert_pd_timestamp_to_ms from utils import convert_pd_timestamp_to_ms
from analytic_types import AnalyticUnitId from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult
logger = logging.getLogger('PATTERN_DETECTOR') logger = logging.getLogger('PATTERN_DETECTOR')
@ -45,7 +45,7 @@ class PatternDetector(Detector):
self.model = resolve_model_by_pattern(self.pattern_type) self.model = resolve_model_by_pattern(self.pattern_type)
self.bucket = DataBucket() self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, segments: List[dict], cache: Optional[models.ModelCache]) -> models.ModelState: def train(self, dataframe: pd.DataFrame, segments: List[dict], cache: Optional[ModelCache]) -> ModelCache:
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
self.model.state = self.model.get_state(cache) self.model.state = self.model.get_state(cache)
new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id) new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id)
@ -56,7 +56,7 @@ class PatternDetector(Detector):
'cache': new_cache 'cache': new_cache
} }
def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe)))
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
@ -82,13 +82,9 @@ class PatternDetector(Detector):
new_cache = detected['cache'].to_json() new_cache = detected['cache'].to_json()
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time)
return { return DetectionResult(new_cache, segments, last_detection_time)
'cache': new_cache,
'segments': segments,
'lastDetectionTime': last_detection_time
}
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id))
if cache is None: if cache is None:

14
analytics/analytics/detectors/threshold_detector.py

@ -4,8 +4,9 @@ import pandas as pd
import numpy as np import numpy as np
from typing import Optional, List from typing import Optional, List
from analytic_types import ModelCache
from analytic_types.detector_typing import DetectionResult
from detectors import Detector from detectors import Detector
from models import ModelCache
from time import time from time import time
from utils import convert_sec_to_ms, convert_pd_timestamp_to_ms from utils import convert_sec_to_ms, convert_pd_timestamp_to_ms
@ -28,7 +29,7 @@ class ThresholdDetector(Detector):
} }
} }
def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> DetectionResult:
if cache is None or cache == {}: if cache is None or cache == {}:
raise ValueError('Threshold detector error: cannot detect before learning') raise ValueError('Threshold detector error: cannot detect before learning')
if len(dataframe) == 0: if len(dataframe) == 0:
@ -68,13 +69,10 @@ class ThresholdDetector(Detector):
last_entry = dataframe.iloc[-1] last_entry = dataframe.iloc[-1]
last_detection_time = convert_pd_timestamp_to_ms(last_entry['timestamp']) last_detection_time = convert_pd_timestamp_to_ms(last_entry['timestamp'])
return { return DetectionResult(cache, segments, last_detection_time)
'cache': cache,
'segments': segments,
'lastDetectionTime': last_detection_time
}
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
result = self.detect(data, cache) result = self.detect(data, cache)
return result if result else None return result if result else None

2
analytics/analytics/models/__init__.py

@ -1,4 +1,4 @@
from models.model import Model, ModelCache, ModelState from models.model import Model, ModelState
from models.drop_model import DropModel, DropModelState from models.drop_model import DropModel, DropModelState
from models.peak_model import PeakModel, PeakModelState from models.peak_model import PeakModel, PeakModelState
from models.jump_model import JumpModel, JumpModelState from models.jump_model import JumpModel, JumpModelState

2
analytics/analytics/models/model.py

@ -10,8 +10,6 @@ from analytic_types import AnalyticUnitId
import utils.meta import utils.meta
ModelCache = dict
class Segment(AttrDict): class Segment(AttrDict):
def __init__(self, dataframe: pd.DataFrame, segment_map: dict, center_finder = None): def __init__(self, dataframe: pd.DataFrame, segment_map: dict, center_finder = None):

2
analytics/tests/test_detectors.py

@ -45,4 +45,4 @@ class TestAnomalyDetector(unittest.TestCase):
detector = anomaly_detector.AnomalyDetector() detector = anomaly_detector.AnomalyDetector()
detect_result = detector.detect(dataframe, cache) detect_result = detector.detect(dataframe, cache)
result = [(1523889000005.0, 1523889000005.0)] result = [(1523889000005.0, 1523889000005.0)]
self.assertEqual(result, detect_result['segments']) self.assertEqual(result, detect_result.segments)

Loading…
Cancel
Save