Browse Source

"Error: Need at least 1 labeled segment" when labeling only negative segment #790 (#791)

pull/1/head
Alexandr Velikiy 5 years ago committed by rozetko
parent
commit
fa9673e347
  1. 25
      analytics/analytics/analytic_types/segment.py
  2. 5
      analytics/analytics/analytic_unit_manager.py
  3. 13
      analytics/analytics/detectors/pattern_detector.py
  4. 30
      analytics/analytics/models/model.py
  5. 5
      analytics/analytics/utils/meta.py
  6. 21
      analytics/tests/test_dataset.py
  7. 31
      analytics/tests/test_detectors.py
  8. 2
      server/src/controllers/analytics_controller.ts
  9. 4
      server/src/services/data_layer/mongodb.ts

25
analytics/analytics/analytic_types/segment.py

@ -1,20 +1,29 @@
from typing import Optional
import utils.meta
@utils.meta.JSONClass
class Segment:
'''
Used for segment manipulation instead of { 'from': ..., 'to': ... } dict
'''
def __init__(self, from_timestamp: int, to_timestamp: int, message: str = None):
def __init__(
self,
from_timestamp: int,
to_timestamp: int,
_id: Optional[str] = None,
analytic_unit_id: Optional[str] = None,
labeled: Optional[bool] = None,
deleted: Optional[bool] = None,
message: Optional[str] = None
):
if to_timestamp < from_timestamp:
raise ValueError(f'Can`t create segment with to < from: {to_timestamp} < {from_timestamp}')
self.from_timestamp = from_timestamp
self.to_timestamp = to_timestamp
self._id = _id
self.analytic_unit_id = analytic_unit_id
self.labeled = labeled
self.deleted = deleted
self.message = message
def to_json(self):
return {
'from': self.from_timestamp,
'to': self.to_timestamp,
'message': self.message
}

5
analytics/analytics/analytic_unit_manager.py

@ -5,6 +5,7 @@ from concurrent.futures import Executor, ThreadPoolExecutor
from analytic_unit_worker import AnalyticUnitWorker
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.segment import Segment
import detectors
@ -66,7 +67,9 @@ class AnalyticUnitManager:
return res
elif task['type'] == 'LEARN':
if 'segments' in payload:
return await worker.do_train(payload['segments'], data, payload['cache'])
segments = payload['segments']
segments = [Segment.from_json(segment) for segment in segments]
return await worker.do_train(segments, data, payload['cache'])
elif 'threshold' in payload:
return await worker.do_train(payload['threshold'], data, payload['cache'])
elif 'anomaly' in payload:

13
analytics/analytics/detectors/pattern_detector.py

@ -46,8 +46,14 @@ class PatternDetector(Detector):
self.model = resolve_model_by_pattern(self.pattern_type)
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, segments: List[dict], cache: Optional[ModelCache]) -> ModelCache:
def train(self, dataframe: pd.DataFrame, segments: List[Segment], cache: Optional[ModelCache]) -> ModelCache:
# TODO: pass only part of dataframe that has segments
if self.contains_labeled_segments(segments) == False:
msg = f'{self.analytic_unit_id} has no positive labeled segments. Pattern detector needs at least 1 positive labeled segment'
logger.error(msg)
raise ValueError(msg)
self.model.state: models.ModelState = self.model.get_state(cache)
new_cache: models.ModelState = self.model.fit(dataframe, segments, self.analytic_unit_id)
@ -134,3 +140,8 @@ class PatternDetector(Detector):
# TODO: windowSize -> window_size
return cache.get('windowSize', self.DEFAULT_WINDOW_SIZE)
def contains_labeled_segments(self, segments: List[Segment]) -> bool:
for segment in segments:
if segment.labeled == True:
return True
return False

30
analytics/analytics/models/model.py

@ -26,14 +26,23 @@ class AnalyticSegment(Segment):
self,
from_timestamp: int,
to_timestamp: int,
_id: str,
analytic_unit_id: str,
labeled: bool,
deleted: bool,
message: str,
dataframe: pd.DataFrame,
center_finder = None
):
super().__init__(from_timestamp, to_timestamp)
self.labeled = labeled
self.deleted = deleted
super().__init__(
from_timestamp,
to_timestamp,
_id,
analytic_unit_id,
labeled,
deleted,
message
)
self.from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(self.from_timestamp, unit='ms'))
self.to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(self.to_timestamp, unit='ms'))
@ -119,19 +128,22 @@ class Model(ABC):
def get_state(self, cache: Optional[ModelCache] = None) -> ModelState:
pass
def fit(self, dataframe: pd.DataFrame, segments: List[dict], id: AnalyticUnitId) -> ModelState:
def fit(self, dataframe: pd.DataFrame, segments: List[Segment], id: AnalyticUnitId) -> ModelState:
logging.debug('Start method fit for analytic unit {}'.format(id))
data = dataframe['value']
max_length = 0
labeled = []
deleted = []
for segment_map in segments:
if segment_map['labeled'] or segment_map['deleted']:
if segment_map.labeled or segment_map.deleted:
segment = AnalyticSegment(
segment_map['from'],
segment_map['to'],
segment_map['labeled'],
segment_map['deleted'],
segment_map.from_timestamp,
segment_map.to_timestamp,
segment_map._id,
segment_map.analytic_unit_id,
segment_map.labeled,
segment_map.deleted,
segment_map.message,
dataframe,
self.find_segment_center
)

5
analytics/analytics/utils/meta.py

@ -8,9 +8,14 @@ CAMEL_REGEX = re.compile(r'([A-Z])')
UNDERSCORE_REGEX = re.compile(r'_([a-z])')
def camel_to_underscore(name):
#TODO: need to rename 'from'/'to' to 'from_timestamp'/'to_timestamp' everywhere(in analytics, server, panel)
if name == 'from' or name == 'to':
name += '_timestamp'
return CAMEL_REGEX.sub(lambda x: '_' + x.group(1).lower(), name)
def underscore_to_camel(name):
if name == 'from_timestamp' or name == 'to_timestamp':
name = name.replace('_timestamp', '')
return UNDERSCORE_REGEX.sub(lambda x: x.group(1).upper(), name)
def is_field_private(field_name: str) -> Optional[str]:

21
analytics/tests/test_dataset.py

@ -6,6 +6,8 @@ import models
import random
import scipy.signal
from analytic_types.segment import Segment
class TestDataset(unittest.TestCase):
def test_models_with_corrupted_dataframe(self):
@ -32,6 +34,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000003, 'to': 1523889000005, 'labeled': False, 'deleted': True}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.PeakModel()
@ -46,6 +49,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000016, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': False, 'deleted': True}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.JumpModel()
@ -60,6 +64,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000003, 'to': 1523889000005, 'labeled': False, 'deleted': True}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.TroughModel()
@ -74,6 +79,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000016, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': False, 'deleted': True}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.DropModel()
@ -88,6 +94,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000003, 'to': 1523889000005, 'labeled': False, 'deleted': True}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.GeneralModel()
@ -102,6 +109,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000025, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.JumpModel()
@ -116,6 +124,7 @@ class TestDataset(unittest.TestCase):
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000025, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.DropModel()
@ -129,6 +138,7 @@ class TestDataset(unittest.TestCase):
data_val = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 5.0, 5.0, 4.0, 5.0, 5.0, 6.0, 5.0, 1.0, 2.0, 3.0, 4.0, 5.0,3.0,3.0,2.0,7.0,8.0,9.0,8.0,7.0,6.0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000007, 'to': 1523889000011, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.JumpModel()
@ -166,6 +176,7 @@ class TestDataset(unittest.TestCase):
data_val = [1.0, 1.0, 1.0, 1.0, 1.0, 5.0, 2.0, 5.0, 5.0, 1.0, 1.0, 1.0, 1.0, 9.0, 9.0, 9.0, 9.0, 2.0, 3.0, 4.0, 5.0, 4.0, 2.0, 1.0, 3.0, 4.0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000004, 'to': 1523889000006, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
model_instances = [
models.GeneralModel(),
@ -183,6 +194,8 @@ class TestDataset(unittest.TestCase):
data_val = [1.0, 2.0, 5.0, 2.0, 1.0, 1.0, 3.0, 6.0, 4.0, 2.0, 1.0, 0, 0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
model = models.GeneralModel()
model.state = model.get_state(None)
model.fit(dataframe, segments,'test')
@ -209,6 +222,8 @@ class TestDataset(unittest.TestCase):
data_val = [2.0, 5.0, 1.0, 1.0, 1.0, 2.0, 5.0, 1.0, 1.0, 2.0, 3.0, 7.0, 1.0, 1.0, 1.0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
model = models.PeakModel()
model.state = model.get_state(cache)
result = model.fit(dataframe, segments, 'test')
@ -228,6 +243,8 @@ class TestDataset(unittest.TestCase):
data_val = [5.0, 5.0, 1.0, 4.0, 5.0, 5.0, 0.0, 4.0, 5.0, 5.0, 6.0, 1.0, 5.0, 5.0, 5.0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000010, 'to': 1523889000012, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
model = models.TroughModel()
model.state = model.get_state(cache)
result = model.fit(dataframe, segments, 'test')
@ -247,6 +264,8 @@ class TestDataset(unittest.TestCase):
data_val = [1.0, 1.0, 1.0, 4.0, 4.0, 0.0, 0.0, 5.0, 5.0, 0.0, 0.0, 4.0, 4.0, 4.0, 4.0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 152388900009, 'to': 1523889000013, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
model = models.JumpModel()
model.state = model.get_state(cache)
result = model.fit(dataframe, segments, 'test')
@ -266,6 +285,8 @@ class TestDataset(unittest.TestCase):
data_val = [5.0, 5.0, 5.0, 5.0, 1.0, 1.0, 1.0, 1.0, 9.0, 9.0, 9.0, 9.0, 0, 0, 0, 0, 0, 0, 6.0, 6.0, 6.0, 1.0, 1.0, 1.0, 1.0, 1.0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000024, 'labeled': True, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
try:
model = models.DropModel()
model_name = model.__class__.__name__

31
analytics/tests/test_detectors.py

@ -3,6 +3,8 @@ import pandas as pd
from detectors import pattern_detector, threshold_detector, anomaly_detector
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.segment import Segment
from tests.test_dataset import create_dataframe
class TestPatternDetector(unittest.TestCase):
@ -16,6 +18,35 @@ class TestPatternDetector(unittest.TestCase):
with self.assertRaises(ValueError):
detector.detect(dataframe, cache)
def test_only_negative_segments(self):
data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1]
data_ind = [1523889000000 + i for i in range(len(data_val))]
data = {'timestamp': data_ind, 'value': data_val}
dataframe = pd.DataFrame(data = data)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000019, 'to': 1523889000025, 'labeled': False, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000002, 'to': 1523889000008, 'labeled': False, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
cache = {}
detector = pattern_detector.PatternDetector('PEAK', 'test_id')
excepted_error_message = 'test_id has no positive labeled segments. Pattern detector needs at least 1 positive labeled segment'
try:
detector.train(dataframe, segments, cache)
except ValueError as e:
self.assertEqual(str(e), excepted_error_message)
def test_positive_and_negative_segments(self):
data_val = [1.0, 1.0, 1.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 5.0, 7.0, 5.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
dataframe = create_dataframe(data_val)
segments = [{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000004, 'to': 1523889000006, 'labeled': True, 'deleted': False},
{'_id': 'Esl7uetLhx4lCqHa', 'analyticUnitId': 'opnICRJwOmwBELK8', 'from': 1523889000001, 'to': 1523889000003, 'labeled': False, 'deleted': False}]
segments = [Segment.from_json(segment) for segment in segments]
cache = {}
detector = pattern_detector.PatternDetector('PEAK', 'test_id')
try:
detector.train(dataframe, segments, cache)
except Exception as e:
self.fail('detector.train fail with error {}'.format(e))
class TestThresholdDetector(unittest.TestCase):

2
server/src/controllers/analytics_controller.ts

@ -272,7 +272,7 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
switch(detector) {
case AnalyticUnit.DetectorType.PATTERN:
let segments = await Segment.findMany(id, { labeled: true });
let segments = await Segment.findMany(id, { $or: [{ labeled: true }, { deleted: true }]} );
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}

4
server/src/services/data_layer/mongodb.ts

@ -122,10 +122,10 @@ export class MongoDbQueryWrapper implements DbQueryWrapper {
}
function convertQueryToMongoFormat(query: any): object {
if(query.$or !== undefined) {
if(query.$or !== undefined && typeof query.$or === 'object') {
query.$or = convertQueryFieldToMongoFormat(query.$or);
}
if(query.$and !== undefined) {
if(query.$and !== undefined && typeof query.$or === 'object') {
query.$and = convertQueryFieldToMongoFormat(query.$and);
}
return query;

Loading…
Cancel
Save