|
|
|
@ -38,18 +38,19 @@ class Worker(object):
|
|
|
|
|
def add_task(self, task): |
|
|
|
|
self.queue.put(task) |
|
|
|
|
|
|
|
|
|
# TODO: get task as an object built from json |
|
|
|
|
def do_task(self, task): |
|
|
|
|
try: |
|
|
|
|
type = task['type'] |
|
|
|
|
predictor_id = task['predictor_id'] |
|
|
|
|
analytic_unit_id = task['analyticUnitId'] |
|
|
|
|
if type == "predict": |
|
|
|
|
last_prediction_time = task['last_prediction_time'] |
|
|
|
|
last_prediction_time = task['lastPredictionTime'] |
|
|
|
|
pattern = task['pattern'] |
|
|
|
|
result = self.do_predict(predictor_id, last_prediction_time, pattern) |
|
|
|
|
result = self.do_predict(analytic_unit_id, last_prediction_time, pattern) |
|
|
|
|
elif type == "learn": |
|
|
|
|
segments = task['segments'] |
|
|
|
|
pattern = task['pattern'] |
|
|
|
|
result = self.do_learn(predictor_id, segments, pattern) |
|
|
|
|
result = self.do_learn(analytic_unit_id, segments, pattern) |
|
|
|
|
else: |
|
|
|
|
result = { |
|
|
|
|
'status': "failed", |
|
|
|
@ -59,51 +60,54 @@ class Worker(object):
|
|
|
|
|
#traceback.extract_stack() |
|
|
|
|
error_text = traceback.format_exc() |
|
|
|
|
logger.error("Exception: '%s'" % error_text) |
|
|
|
|
# TODO: move result to a class which renders to json for messaging to analytics |
|
|
|
|
result = { |
|
|
|
|
'task': type, |
|
|
|
|
'status': "failed", |
|
|
|
|
'predictor_id': predictor_id, |
|
|
|
|
'analyticUnitId': analytic_unit_id, |
|
|
|
|
'error': str(e) |
|
|
|
|
} |
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
def do_learn(self, predictor_id, segments, pattern): |
|
|
|
|
model = self.get_model(predictor_id, pattern) |
|
|
|
|
def do_learn(self, analytic_unit_id, segments, pattern): |
|
|
|
|
model = self.get_model(analytic_unit_id, pattern) |
|
|
|
|
model.synchronize_data() |
|
|
|
|
last_prediction_time = model.learn(segments) |
|
|
|
|
# TODO: we should not do predict before labeling in all models, not just in drops |
|
|
|
|
|
|
|
|
|
if pattern == 'drops' and len(segments) == 0: |
|
|
|
|
# TODO: move result to a class which renders to json for messaging to analytics |
|
|
|
|
result = { |
|
|
|
|
'status': 'success', |
|
|
|
|
'predictor_id': predictor_id, |
|
|
|
|
'analyticUnitId': analytic_unit_id, |
|
|
|
|
'segments': [], |
|
|
|
|
'last_prediction_time': last_prediction_time |
|
|
|
|
'lastPredictionTime': last_prediction_time |
|
|
|
|
} |
|
|
|
|
else: |
|
|
|
|
result = self.do_predict(predictor_id, last_prediction_time, pattern) |
|
|
|
|
result = self.do_predict(analytic_unit_id, last_prediction_time, pattern) |
|
|
|
|
|
|
|
|
|
result['task'] = 'learn' |
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
def do_predict(self, predictor_id, last_prediction_time, pattern): |
|
|
|
|
model = self.get_model(predictor_id, pattern) |
|
|
|
|
def do_predict(self, analytic_unit_id, last_prediction_time, pattern): |
|
|
|
|
model = self.get_model(analytic_unit_id, pattern) |
|
|
|
|
model.synchronize_data() |
|
|
|
|
segments, last_prediction_time = model.predict(last_prediction_time) |
|
|
|
|
return { |
|
|
|
|
'task': "predict", |
|
|
|
|
'status': "success", |
|
|
|
|
'analyticUnitId': predictor_id, |
|
|
|
|
'analyticUnitId': analytic_unit_id, |
|
|
|
|
'segments': segments, |
|
|
|
|
'lastPredictionTime': last_prediction_time |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def get_model(self, predictor_id, pattern): |
|
|
|
|
if predictor_id not in self.models_cache: |
|
|
|
|
def get_model(self, analytic_unit_id, pattern): |
|
|
|
|
if analytic_unit_id not in self.models_cache: |
|
|
|
|
if pattern.find('general') != -1: |
|
|
|
|
model = AnomalyModel(predictor_id) |
|
|
|
|
model = AnomalyModel(analytic_unit_id) |
|
|
|
|
else: |
|
|
|
|
model = PatternDetectionModel(predictor_id, pattern) |
|
|
|
|
self.models_cache[predictor_id] = model |
|
|
|
|
return self.models_cache[predictor_id] |
|
|
|
|
model = PatternDetectionModel(analytic_unit_id, pattern) |
|
|
|
|
self.models_cache[analytic_unit_id] = model |
|
|
|
|
return self.models_cache[analytic_unit_id] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|