Browse Source

Detection return empty result #347 (#348)

* set constant window size
* improve logging, save detected segments from push\pull process
pull/1/head
Evgeny Smyshlyaev 5 years ago committed by GitHub
parent
commit
6bf1114b7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      analytics/analytics/analytic_unit_manager.py
  2. 18
      analytics/analytics/detectors/pattern_detector.py
  3. 3
      analytics/analytics/detectors/threshold_detector.py
  4. 2
      server/src/services/data_puller.ts

6
analytics/analytics/analytic_unit_manager.py

@ -16,9 +16,9 @@ WORKERS_EXECUTORS = 20
AnalyticUnitId = str
def get_detector_by_type(detector_type: str, analytic_unit_type: str) -> detectors.Detector:
def get_detector_by_type(detector_type: str, analytic_unit_type: str, analytic_unit_id: AnalyticUnitId) -> detectors.Detector:
if detector_type == 'pattern':
return detectors.PatternDetector(analytic_unit_type)
return detectors.PatternDetector(analytic_unit_type, analytic_unit_id)
elif detector_type == 'threshold':
return detectors.ThresholdDetector()
@ -52,7 +52,7 @@ class AnalyticUnitManager:
if analytic_unit_id in self.analytic_workers:
# TODO: check that type is the same
return self.analytic_workers[analytic_unit_id]
detector = get_detector_by_type(detector_type, analytic_unit_type)
detector = get_detector_by_type(detector_type, analytic_unit_type, analytic_unit_id)
worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor)
self.analytic_workers[analytic_unit_id] = worker
return worker

18
analytics/analytics/detectors/pattern_detector.py

@ -29,14 +29,16 @@ def resolve_model_by_pattern(pattern: str) -> models.Model:
return models.CustomModel()
raise ValueError('Unknown pattern "%s"' % pattern)
AnalyticUnitId = str
class PatternDetector(Detector):
def __init__(self, pattern_type):
def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
self.pattern_type = pattern_type
self.model = resolve_model_by_pattern(self.pattern_type)
self.window_size = 100
self.window_size = 150
self.bucket = DataBucket()
self.bucket_full_reported = False
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache:
# TODO: pass only part of dataframe that has segments
@ -46,6 +48,7 @@ class PatternDetector(Detector):
}
def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict:
logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe)))
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
detected = self.model.detect(dataframe, cache)
@ -63,14 +66,19 @@ class PatternDetector(Detector):
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
self.bucket.receive_data(data.dropna())
if cache != None:
self.window_size = cache['WINDOW_SIZE']
if len(self.bucket.data) >= self.window_size and cache != None:
if not self.bucket_full_reported:
logging.debug('{} unit`s bucket full, run detect'.format(self.analytic_unit_id))
self.bucket_full_reported = True
res = self.detect(self.bucket.data, cache)
excess_data = len(self.bucket.data) - self.window_size
self.bucket.drop_data(excess_data)
return res
else:
filling = len(self.bucket.data)*100 / self.window_size
logging.debug('bucket for {} {}% full'.format(self.analytic_unit_id, filling))
return None

3
analytics/analytics/detectors/threshold_detector.py

@ -17,7 +17,6 @@ class ThresholdDetector(Detector):
pass
def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache:
log.debug('run train for threshold detector')
return {
'cache': {
'value': threshold['value'],
@ -26,7 +25,6 @@ class ThresholdDetector(Detector):
}
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
log.debug('run detect for threshold detector')
value = cache['value']
condition = cache['condition']
@ -62,5 +60,4 @@ class ThresholdDetector(Detector):
}
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
log.debug('threshhold recieve data')
return self.detect(data, cache)

2
server/src/services/data_puller.ts

@ -58,7 +58,7 @@ export class DataPuller {
try {
this.analyticsService.sendTask(task);
console.log(`data puller successfuly pushed data for unit id: ${unit.id}`);
console.log(`data puller successfuly pushed ${data.data.length} points for unit id: ${unit.id}`);
} catch(e) {
console.log(`data puller got error while push data ${e.message}`);
}

Loading…
Cancel
Save