Browse Source

Error: Can't find interval length of data #688 (#689)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by GitHub
parent
commit
070f593b7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      analytics/analytics/analytic_types/detector_typing.py
  2. 17
      analytics/analytics/detectors/anomaly_detector.py
  3. 5
      analytics/analytics/detectors/pattern_detector.py
  4. 9
      analytics/analytics/detectors/threshold_detector.py
  5. 1
      analytics/tests/test_detectors.py
  6. 4
      server/src/controllers/analytics_controller.ts

7
analytics/analytics/analytic_types/detector_typing.py

@ -11,8 +11,7 @@ class DetectionResult:
self, self,
cache: Optional[ModelCache] = None, cache: Optional[ModelCache] = None,
segments: Optional[List[Segment]] = None, segments: Optional[List[Segment]] = None,
last_detection_time: int = None, last_detection_time: int = None
time_step: int = None
): ):
if cache is None: if cache is None:
cache = {} cache = {}
@ -21,15 +20,13 @@ class DetectionResult:
self.cache = cache self.cache = cache
self.segments = segments self.segments = segments
self.last_detection_time = last_detection_time self.last_detection_time = last_detection_time
self.time_step = time_step
# TODO: use @utils.meta.JSONClass (now it can't serialize list of objects) # TODO: use @utils.meta.JSONClass (now it can't serialize list of objects)
def to_json(self): def to_json(self):
return { return {
'cache': self.cache, 'cache': self.cache,
'segments': list(map(lambda segment: segment.to_json(), self.segments)), 'segments': list(map(lambda segment: segment.to_json(), self.segments)),
'lastDetectionTime': self.last_detection_time, 'lastDetectionTime': self.last_detection_time
'timeStep': self.time_step
} }
@utils.meta.JSONClass @utils.meta.JSONClass

17
analytics/analytics/detectors/anomaly_detector.py

@ -26,10 +26,12 @@ class AnomalyDetector(ProcessingDetector):
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
segments = payload.get('segments') segments = payload.get('segments')
prepared_segments = [] prepared_segments = []
time_step = utils.find_interval(dataframe)
new_cache = { new_cache = {
'confidence': payload['confidence'], 'confidence': payload['confidence'],
'alpha': payload['alpha'] 'alpha': payload['alpha'],
'timeStep': time_step
} }
if segments is not None: if segments is not None:
@ -47,12 +49,8 @@ class AnomalyDetector(ProcessingDetector):
segment_data = dataframe[from_index : to_index] segment_data = dataframe[from_index : to_index]
prepared_segments.append({'from': segment['from'], 'data': segment_data.value.tolist()}) prepared_segments.append({'from': segment['from'], 'data': segment_data.value.tolist()})
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1])
time_step = data_second_time - data_start_time
new_cache['seasonality'] = seasonality new_cache['seasonality'] = seasonality
new_cache['segments'] = prepared_segments new_cache['segments'] = prepared_segments
new_cache['timeStep'] = time_step
return { return {
'cache': new_cache 'cache': new_cache
@ -61,9 +59,9 @@ class AnomalyDetector(ProcessingDetector):
# TODO: ModelCache -> ModelState # TODO: ModelCache -> ModelState
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult: def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
data = dataframe['value'] data = dataframe['value']
time_step = cache['timeStep']
segments = cache.get('segments') segments = cache.get('segments')
time_step = utils.find_interval(dataframe)
smoothed_data = utils.exponential_smoothing(data, cache['alpha']) smoothed_data = utils.exponential_smoothing(data, cache['alpha'])
# TODO: use class for cache to avoid using string literals # TODO: use class for cache to avoid using string literals
@ -78,7 +76,6 @@ class AnomalyDetector(ProcessingDetector):
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1])
time_step = data_second_time - data_start_time
for segment in segments: for segment in segments:
seasonality_index = seasonality // time_step seasonality_index = seasonality // time_step
@ -113,7 +110,7 @@ class AnomalyDetector(ProcessingDetector):
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
return DetectionResult(cache, segments, last_detection_time, time_step) return DetectionResult(cache, segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
if cache is None: if cache is None:
@ -155,7 +152,7 @@ class AnomalyDetector(ProcessingDetector):
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult() result = DetectionResult()
time_step = detections[0].time_step time_step = detections[0].cache['timeStep']
for detection in detections: for detection in detections:
result.segments.extend(detection.segments) result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time result.last_detection_time = detection.last_detection_time
@ -180,7 +177,7 @@ class AnomalyDetector(ProcessingDetector):
f'{self.analytic_unit_id} got invalid seasonality {seasonality}' f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
time_step = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) - utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) time_step = cache['timeStep']
for segment in segments: for segment in segments:
seasonality_index = seasonality // time_step seasonality_index = seasonality // time_step

5
analytics/analytics/detectors/pattern_detector.py

@ -65,8 +65,10 @@ class PatternDetector(Detector):
msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection'
logger.error(msg) logger.error(msg)
raise ValueError(msg) raise ValueError(msg)
self.model.state = self.model.get_state(cache) self.model.state = self.model.get_state(cache)
window_size = self.model.state.window_size window_size = self.model.state.window_size
if window_size is None: if window_size is None:
message = '{} got cache without window_size for detection'.format(self.analytic_unit_id) message = '{} got cache without window_size for detection'.format(self.analytic_unit_id)
logger.error(message) logger.error(message)
@ -77,14 +79,13 @@ class PatternDetector(Detector):
logger.error(message) logger.error(message)
raise ValueError(message) raise ValueError(message)
time_step = utils.find_interval(dataframe)
detected = self.model.detect(dataframe, self.analytic_unit_id) detected = self.model.detect(dataframe, self.analytic_unit_id)
segments = [Segment(segment[0], segment[1]) for segment in detected['segments']] segments = [Segment(segment[0], segment[1]) for segment in detected['segments']]
new_cache = detected['cache'].to_json() new_cache = detected['cache'].to_json()
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time)
return DetectionResult(new_cache, segments, last_detection_time, time_step) return DetectionResult(new_cache, segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id))

9
analytics/analytics/detectors/threshold_detector.py

@ -23,10 +23,12 @@ class ThresholdDetector(Detector):
pass pass
def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache: def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache:
time_step = utils.find_interval(dataframe)
return { return {
'cache': { 'cache': {
'value': threshold['value'], 'value': threshold['value'],
'condition': threshold['condition'] 'condition': threshold['condition'],
'timeStep': time_step
} }
} }
@ -38,7 +40,6 @@ class ThresholdDetector(Detector):
value = cache['value'] value = cache['value']
condition = cache['condition'] condition = cache['condition']
time_step = utils.find_interval(dataframe)
segments = [] segments = []
for index, row in dataframe.iterrows(): for index, row in dataframe.iterrows():
@ -69,7 +70,7 @@ class ThresholdDetector(Detector):
last_entry = dataframe.iloc[-1] last_entry = dataframe.iloc[-1]
last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp']) last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp'])
return DetectionResult(cache, segments, last_detection_time, time_step) return DetectionResult(cache, segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
@ -81,7 +82,7 @@ class ThresholdDetector(Detector):
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult() result = DetectionResult()
time_step = detections[0].time_step time_step = detections[0].cache['timeStep']
for detection in detections: for detection in detections:
result.segments.extend(detection.segments) result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time result.last_detection_time = detection.last_detection_time

1
analytics/tests/test_detectors.py

@ -41,6 +41,7 @@ class TestAnomalyDetector(unittest.TestCase):
cache = { cache = {
'confidence': 2, 'confidence': 2,
'alpha': 0.1, 'alpha': 0.1,
'timeStep': 1
} }
detector = anomaly_detector.AnomalyDetector('test_id') detector = anomaly_detector.AnomalyDetector('test_id')
detect_result = detector.detect(dataframe, cache) detect_result = detector.detect(dataframe, cache)

4
server/src/controllers/analytics_controller.ts

@ -256,6 +256,7 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
value: (analyticUnit as ThresholdAnalyticUnit).value, value: (analyticUnit as ThresholdAnalyticUnit).value,
condition: (analyticUnit as ThresholdAnalyticUnit).condition condition: (analyticUnit as ThresholdAnalyticUnit).condition
}; };
taskPayload.data = await getPayloadData(analyticUnit, from, to);
break; break;
case AnalyticUnit.DetectorType.ANOMALY: case AnalyticUnit.DetectorType.ANOMALY:
taskPayload.anomaly = { taskPayload.anomaly = {
@ -263,6 +264,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
confidence: (analyticUnit as AnomalyAnalyticUnit).confidence confidence: (analyticUnit as AnomalyAnalyticUnit).confidence
}; };
taskPayload.data = await getPayloadData(analyticUnit, from, to);
const seasonality = (analyticUnit as AnomalyAnalyticUnit).seasonality; const seasonality = (analyticUnit as AnomalyAnalyticUnit).seasonality;
if(seasonality > 0) { if(seasonality > 0) {
let segments = await Segment.findMany(id, { deleted: true }); let segments = await Segment.findMany(id, { deleted: true });
@ -274,7 +277,6 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
let segmentObjs = segments.map(s => s.toObject()); let segmentObjs = segments.map(s => s.toObject());
taskPayload.anomaly.segments = segmentObjs; taskPayload.anomaly.segments = segmentObjs;
taskPayload.data = await getPayloadData(analyticUnit, from, to);
} }
break; break;
default: default:

Loading…
Cancel
Save