From fa9673e347c2d49e08691bb6eb76bf2e3b4a01f2 Mon Sep 17 00:00:00 2001 From: Alexandr Velikiy <39257464+VargBurz@users.noreply.github.com> Date: Mon, 21 Oct 2019 20:04:13 +0300 Subject: [PATCH] "Error: Need at least 1 labeled segment" when labeling only negative segment #790 (#791) --- analytics/analytics/analytic_types/segment.py | 25 ++++++++++----- analytics/analytics/analytic_unit_manager.py | 5 ++- .../analytics/detectors/pattern_detector.py | 13 +++++++- analytics/analytics/models/model.py | 30 ++++++++++++------ analytics/analytics/utils/meta.py | 5 +++ analytics/tests/test_dataset.py | 21 +++++++++++++ analytics/tests/test_detectors.py | 31 +++++++++++++++++++ .../src/controllers/analytics_controller.ts | 2 +- server/src/services/data_layer/mongodb.ts | 4 +-- 9 files changed, 114 insertions(+), 22 deletions(-) diff --git a/analytics/analytics/analytic_types/segment.py b/analytics/analytics/analytic_types/segment.py index 4c3760b..e15033a 100644 --- a/analytics/analytics/analytic_types/segment.py +++ b/analytics/analytics/analytic_types/segment.py @@ -1,20 +1,29 @@ from typing import Optional +import utils.meta + +@utils.meta.JSONClass class Segment: ''' Used for segment manipulation instead of { 'from': ..., 'to': ... } dict ''' - def __init__(self, from_timestamp: int, to_timestamp: int, message: str = None): + def __init__( + self, + from_timestamp: int, + to_timestamp: int, + _id: Optional[str] = None, + analytic_unit_id: Optional[str] = None, + labeled: Optional[bool] = None, + deleted: Optional[bool] = None, + message: Optional[str] = None + ): if to_timestamp < from_timestamp: raise ValueError(f'Can`t create segment with to < from: {to_timestamp} < {from_timestamp}') self.from_timestamp = from_timestamp self.to_timestamp = to_timestamp + self._id = _id + self.analytic_unit_id = analytic_unit_id + self.labeled = labeled + self.deleted = deleted self.message = message - - def to_json(self): - return { - 'from': self.from_timestamp, - 'to': self.to_timestamp, - 'message': self.message - } diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index bbbffa9..e99fd36 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -5,6 +5,7 @@ from concurrent.futures import Executor, ThreadPoolExecutor from analytic_unit_worker import AnalyticUnitWorker from analytic_types import AnalyticUnitId, ModelCache +from analytic_types.segment import Segment import detectors @@ -66,7 +67,9 @@ class AnalyticUnitManager: return res elif task['type'] == 'LEARN': if 'segments' in payload: - return await worker.do_train(payload['segments'], data, payload['cache']) + segments = payload['segments'] + segments = [Segment.from_json(segment) for segment in segments] + return await worker.do_train(segments, data, payload['cache']) elif 'threshold' in payload: return await worker.do_train(payload['threshold'], data, payload['cache']) elif 'anomaly' in payload: diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index ad3769d..e1f21a5 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -46,8 +46,14 @@ class PatternDetector(Detector): self.model = resolve_model_by_pattern(self.pattern_type) self.bucket = DataBucket() - def train(self, dataframe: pd.DataFrame, segments: List[dict], cache: Optional[ModelCache]) -> ModelCache: + def train(self, dataframe: pd.DataFrame, segments: List[Segment], cache: Optional[ModelCache]) -> ModelCache: # TODO: pass only part of dataframe that has segments + + if self.contains_labeled_segments(segments) == False: + msg = f'{self.analytic_unit_id} has no positive labeled segments. Pattern detector needs at least 1 positive labeled segment' + logger.error(msg) + raise ValueError(msg) + self.model.state: models.ModelState = self.model.get_state(cache) new_cache: models.ModelState = self.model.fit(dataframe, segments, self.analytic_unit_id) @@ -134,3 +140,8 @@ class PatternDetector(Detector): # TODO: windowSize -> window_size return cache.get('windowSize', self.DEFAULT_WINDOW_SIZE) + def contains_labeled_segments(self, segments: List[Segment]) -> bool: + for segment in segments: + if segment.labeled == True: + return True + return False diff --git a/analytics/analytics/models/model.py b/analytics/analytics/models/model.py index 8525230..a749585 100644 --- a/analytics/analytics/models/model.py +++ b/analytics/analytics/models/model.py @@ -26,14 +26,23 @@ class AnalyticSegment(Segment): self, from_timestamp: int, to_timestamp: int, + _id: str, + analytic_unit_id: str, labeled: bool, deleted: bool, + message: str, dataframe: pd.DataFrame, center_finder = None ): - super().__init__(from_timestamp, to_timestamp) - self.labeled = labeled - self.deleted = deleted + super().__init__( + from_timestamp, + to_timestamp, + _id, + analytic_unit_id, + labeled, + deleted, + message + ) self.from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(self.from_timestamp, unit='ms')) self.to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(self.to_timestamp, unit='ms')) @@ -119,19 +128,22 @@ class Model(ABC): def get_state(self, cache: Optional[ModelCache] = None) -> ModelState: pass - def fit(self, dataframe: pd.DataFrame, segments: List[dict], id: AnalyticUnitId) -> ModelState: + def fit(self, dataframe: pd.DataFrame, segments: List[Segment], id: AnalyticUnitId) -> ModelState: logging.debug('Start method fit for analytic unit {}'.format(id)) data = dataframe['value'] max_length = 0 labeled = [] deleted = [] for segment_map in segments: - if segment_map['labeled'] or segment_map['deleted']: + if segment_map.labeled or segment_map.deleted: segment = AnalyticSegment( - segment_map['from'], - segment_map['to'], - segment_map['labeled'], - segment_map['deleted'], + segment_map.from_timestamp, + segment_map.to_timestamp, + segment_map._id, + segment_map.analytic_unit_id, + segment_map.labeled, + segment_map.deleted, + segment_map.message, dataframe, self.find_segment_center ) diff --git a/analytics/analytics/utils/meta.py b/analytics/analytics/utils/meta.py index 94ffe14..b6365d7 100644 --- a/analytics/analytics/utils/meta.py +++ b/analytics/analytics/utils/meta.py @@ -8,9 +8,14 @@ CAMEL_REGEX = re.compile(r'([A-Z])') UNDERSCORE_REGEX = re.compile(r'_([a-z])') def camel_to_underscore(name): + #TODO: need to rename 'from'/'to' to 'from_timestamp'/'to_timestamp' everywhere(in analytics, server, panel) + if name == 'from' or name == 'to': + name += '_timestamp' return CAMEL_REGEX.sub(lambda x: '_' + x.group(1).lower(), name) def underscore_to_camel(name): + if name == 'from_timestamp' or name == 'to_timestamp': + name = name.replace('_timestamp', '') return UNDERSCORE_REGEX.sub(lambda x: x.group(1).upper(), name) def is_field_private(field_name: str) -> Optional[str]: diff --git a/analytics/tests/test_dataset.py b/analytics/tests/test_dataset.py index f880ab7..4bc7d98 100644 --- a/analytics/tests/test_dataset.py +++ b/analytics/tests/test_dataset.py @@ -6,6 +6,8 @@ import models import random import scipy.signal +from analytic_types.segment import Segment + class TestDataset(unittest.TestCase): def test_models_with_corrupted_dataframe(self): @@ -32,6 +34,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}, {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000003, 'to': 1523889000005, 'labeled': False, 'deleted': True}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.PeakModel() @@ -46,6 +49,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000016, 'labeled': True, 'deleted': False}, {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': False, 'deleted': True}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.JumpModel() @@ -60,6 +64,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}, {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000003, 'to': 1523889000005, 'labeled': False, 'deleted': True}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.TroughModel() @@ -74,6 +79,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000016, 'labeled': True, 'deleted': False}, {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': False, 'deleted': True}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.DropModel() @@ -88,6 +94,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}, {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000003, 'to': 1523889000005, 'labeled': False, 'deleted': True}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.GeneralModel() @@ -102,6 +109,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000025, 'labeled': True, 'deleted': False}, {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.JumpModel() @@ -116,6 +124,7 @@ class TestDataset(unittest.TestCase): dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000025, 'labeled': True, 'deleted': False}, {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.DropModel() @@ -129,6 +138,7 @@ class TestDataset(unittest.TestCase): data_val = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 5.0, 5.0, 4.0, 5.0, 5.0, 6.0, 5.0, 1.0, 2.0, 3.0, 4.0, 5.0,3.0,3.0,2.0,7.0,8.0,9.0,8.0,7.0,6.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000007, 'to': 1523889000011, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] try: model = models.JumpModel() @@ -166,6 +176,7 @@ class TestDataset(unittest.TestCase): data_val = [1.0, 1.0, 1.0, 1.0, 1.0, 5.0, 2.0, 5.0, 5.0, 1.0, 1.0, 1.0, 1.0, 9.0, 9.0, 9.0, 9.0, 2.0, 3.0, 4.0, 5.0, 4.0, 2.0, 1.0, 3.0, 4.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000004, 'to': 1523889000006, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] model_instances = [ models.GeneralModel(), @@ -183,6 +194,8 @@ class TestDataset(unittest.TestCase): data_val = [1.0, 2.0, 5.0, 2.0, 1.0, 1.0, 3.0, 6.0, 4.0, 2.0, 1.0, 0, 0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] + model = models.GeneralModel() model.state = model.get_state(None) model.fit(dataframe, segments,'test') @@ -209,6 +222,8 @@ class TestDataset(unittest.TestCase): data_val = [2.0, 5.0, 1.0, 1.0, 1.0, 2.0, 5.0, 1.0, 1.0, 2.0, 3.0, 7.0, 1.0, 1.0, 1.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] + model = models.PeakModel() model.state = model.get_state(cache) result = model.fit(dataframe, segments, 'test') @@ -228,6 +243,8 @@ class TestDataset(unittest.TestCase): data_val = [5.0, 5.0, 1.0, 4.0, 5.0, 5.0, 0.0, 4.0, 5.0, 5.0, 6.0, 1.0, 5.0, 5.0, 5.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] + model = models.TroughModel() model.state = model.get_state(cache) result = model.fit(dataframe, segments, 'test') @@ -247,6 +264,8 @@ class TestDataset(unittest.TestCase): data_val = [1.0, 1.0, 1.0, 4.0, 4.0, 0.0, 0.0, 5.0, 5.0, 0.0, 0.0, 4.0, 4.0, 4.0, 4.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] + model = models.JumpModel() model.state = model.get_state(cache) result = model.fit(dataframe, segments, 'test') @@ -266,6 +285,8 @@ class TestDataset(unittest.TestCase): data_val = [5.0, 5.0, 5.0, 5.0, 1.0, 1.0, 1.0, 1.0, 9.0, 9.0, 9.0, 9.0, 0, 0, 0, 0, 0, 0, 6.0, 6.0, 6.0, 1.0, 1.0, 1.0, 1.0, 1.0] dataframe = create_dataframe(data_val) segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000024, 'labeled': True, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] + try: model = models.DropModel() model_name = model.__class__.__name__ diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 9981625..66a13f6 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -3,6 +3,8 @@ import pandas as pd from detectors import pattern_detector, threshold_detector, anomaly_detector from analytic_types.detector_typing import DetectionResult, ProcessingResult +from analytic_types.segment import Segment +from tests.test_dataset import create_dataframe class TestPatternDetector(unittest.TestCase): @@ -16,6 +18,35 @@ class TestPatternDetector(unittest.TestCase): with self.assertRaises(ValueError): detector.detect(dataframe, cache) + def test_only_negative_segments(self): + data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1] + data_ind = [1523889000000 + i for i in range(len(data_val))] + data = {'timestamp': data_ind, 'value': data_val} + dataframe = pd.DataFrame(data = data) + segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000025, 'labeled': False, 'deleted': False}, + {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': False, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] + cache = {} + detector = pattern_detector.PatternDetector('PEAK', 'test_id') + excepted_error_message = 'test_id has no positive labeled segments. Pattern detector needs at least 1 positive labeled segment' + + try: + detector.train(dataframe, segments, cache) + except ValueError as e: + self.assertEqual(str(e), excepted_error_message) + + def test_positive_and_negative_segments(self): + data_val = [1.0, 1.0, 1.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 5.0, 7.0, 5.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] + dataframe = create_dataframe(data_val) + segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000004, 'to': 1523889000006, 'labeled': True, 'deleted': False}, + {'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': False, 'deleted': False}] + segments = [Segment.from_json(segment) for segment in segments] + cache = {} + detector = pattern_detector.PatternDetector('PEAK', 'test_id') + try: + detector.train(dataframe, segments, cache) + except Exception as e: + self.fail('detector.train fail with error {}'.format(e)) class TestThresholdDetector(unittest.TestCase): diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 5dbd7df..6ffc1de 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -272,7 +272,7 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number switch(detector) { case AnalyticUnit.DetectorType.PATTERN: - let segments = await Segment.findMany(id, { labeled: true }); + let segments = await Segment.findMany(id, { $or: [{ labeled: true }, { deleted: true }]} ); if(segments.length === 0) { throw new Error('Need at least 1 labeled segment'); } diff --git a/server/src/services/data_layer/mongodb.ts b/server/src/services/data_layer/mongodb.ts index 141afbb..fc83e5f 100644 --- a/server/src/services/data_layer/mongodb.ts +++ b/server/src/services/data_layer/mongodb.ts @@ -122,10 +122,10 @@ export class MongoDbQueryWrapper implements DbQueryWrapper { } function convertQueryToMongoFormat(query: any): object { - if(query.$or !== undefined) { + if(query.$or !== undefined && typeof query.$or === 'object') { query.$or = convertQueryFieldToMongoFormat(query.$or); } - if(query.$and !== undefined) { + if(query.$and !== undefined && typeof query.$or === 'object') { query.$and = convertQueryFieldToMongoFormat(query.$and); } return query;