You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
123 lines
4.4 KiB
123 lines
4.4 KiB
5 years ago
|
import sys
|
||
|
ANALYTICS_PATH = '../analytics'
|
||
|
TESTS_PATH = '../tests'
|
||
|
sys.path.extend([ANALYTICS_PATH, TESTS_PATH])
|
||
|
|
||
|
import pandas as pd
|
||
|
import numpy as np
|
||
|
import utils
|
||
|
import test_dataset
|
||
|
from analytic_types.segment import Segment
|
||
|
from detectors import pattern_detector, threshold_detector, anomaly_detector
|
||
|
|
||
|
# TODO: get_dataset
|
||
|
# TODO: get_segment
|
||
|
PEAK_DATASETS = []
|
||
|
# dataset with 3 peaks
|
||
|
TEST_DATA = test_dataset.create_dataframe([0, 0, 3, 5, 7, 5, 3, 0, 0, 1, 0, 1, 4, 6, 8, 6, 4, 1, 0, 0, 0, 1, 0, 3, 5, 7, 5, 3, 0, 1, 1])
|
||
|
# TODO: more convenient way to specify labeled segments
|
||
|
POSITIVE_SEGMENTS = [{'from': 1523889000001, 'to': 1523889000007}, {'from': 1523889000022, 'to': 1523889000028}]
|
||
|
NEGATIVE_SEGMENTS = [{'from': 1523889000011, 'to': 1523889000017}]
|
||
|
|
||
|
class TesterSegment():
|
||
|
|
||
|
def __init__(self, start: int, end: int, labeled: bool):
|
||
|
self.start = start
|
||
|
self.end = end
|
||
|
self.labeled = labeled
|
||
|
|
||
|
def get_segment(self):
|
||
|
return {
|
||
|
'_id': 'q',
|
||
|
'analyticUnitId': 'q',
|
||
|
'from': self.start,
|
||
|
'to': self.end,
|
||
|
'labeled': self.labeled,
|
||
|
'deleted': not self.labeled
|
||
|
}
|
||
|
|
||
|
class Metric():
|
||
|
|
||
|
def __init__(self, expected_result, detector_result):
|
||
|
self.expected_result = expected_result
|
||
|
self.detector_result = detector_result['segments']
|
||
|
|
||
|
def get_amount(self):
|
||
|
return len(self.detector_result) / len(self.expected_result)
|
||
|
|
||
|
def get_accuracy(self):
|
||
|
correct_segment = 0
|
||
|
invalid_segment = 0
|
||
|
for segment in self.detector_result:
|
||
|
current_cs = correct_segment
|
||
|
for pattern in self.expected_result:
|
||
|
if pattern['from'] <= segment['from'] and pattern['to'] >= segment['to']:
|
||
|
correct_segment += 1
|
||
|
break
|
||
|
if correct_segment == current_cs:
|
||
|
invalid_segment += 1
|
||
|
non_detected = len(self.expected_result) - correct_segment
|
||
|
return (correct_segment, invalid_segment, non_detected)
|
||
|
|
||
|
class ModelData():
|
||
|
|
||
|
def __init__(self, frame: pd.DataFrame, positive_segments, negative_segments, model_type: str):
|
||
|
self.frame = frame
|
||
|
self.positive_segments = positive_segments
|
||
|
self.negative_segments = negative_segments
|
||
|
self.model_type = model_type
|
||
|
|
||
|
def get_segments_for_detection(self, positive_amount, negative_amount):
|
||
|
segments = []
|
||
|
for idx, bounds in enumerate(self.positive_segments):
|
||
|
if idx >= positive_amount:
|
||
|
break
|
||
|
segments.append(TesterSegment(bounds['from'], bounds['to'], True).get_segment())
|
||
|
|
||
|
for idx, bounds in enumerate(self.negative_segments):
|
||
|
if idx >= negative_amount:
|
||
|
break
|
||
|
segments.append(TesterSegment(bounds['from'], bounds['to'], False).get_segment())
|
||
|
|
||
|
return segments
|
||
|
|
||
|
def get_all_correct_segments(self):
|
||
|
return self.positive_segments
|
||
|
|
||
|
PEAK_DATA_1 = ModelData(TEST_DATA, POSITIVE_SEGMENTS, NEGATIVE_SEGMENTS, 'peak')
|
||
|
PEAK_DATASETS.append(PEAK_DATA_1)
|
||
|
|
||
|
def main(model_type: str) -> None:
|
||
|
table_metric = []
|
||
|
if model_type == 'peak':
|
||
|
for data in PEAK_DATASETS:
|
||
|
dataset = data.frame
|
||
|
segments = data.get_segments_for_detection(1, 0)
|
||
|
segments = [Segment.from_json(segment) for segment in segments]
|
||
|
detector = pattern_detector.PatternDetector('PEAK', 'test_id')
|
||
|
training_result = detector.train(dataset, segments, {})
|
||
|
cache = training_result['cache']
|
||
|
detect_result = detector.detect(dataset, cache)
|
||
|
detect_result = detect_result.to_json()
|
||
|
peak_metric = Metric(data.get_all_correct_segments(), detect_result)
|
||
|
table_metric.append((peak_metric.get_amount(), peak_metric.get_accuracy()))
|
||
|
return table_metric
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
'''
|
||
|
This tool applies the model on datasets and verifies that the detection result corresponds to the correct values.
|
||
|
sys.argv[1] expects one of the models name -> see correct_name
|
||
|
'''
|
||
|
# TODO: use enum
|
||
|
correct_name = ['peak', 'trough', 'jump', 'drop', 'general']
|
||
|
if len(sys.argv) < 2:
|
||
|
print('Enter one of models name: {}'.format(correct_name))
|
||
|
sys.exit(1)
|
||
|
model_type = str(sys.argv[1]).lower()
|
||
|
if model_type in correct_name:
|
||
|
print(main(model_type))
|
||
|
else:
|
||
|
print('Enter one of models name: {}'.format(correct_name))
|
||
|
|
||
|
|