Browse Source

Rename predict to detect #279 (#284)

* dummy rename

* fixes

* renaming in analytics
pull/1/head
Evgeny Smyshlyaev 6 years ago committed by Alexey Velikiy
parent
commit
c02be4cbfc
  1. 4
      analytics/analytics/analytic_unit_manager.py
  2. 4
      analytics/analytics/analytic_unit_worker.py
  3. 2
      analytics/analytics/detectors/detector.py
  4. 12
      analytics/analytics/detectors/pattern_detector.py
  5. 2
      analytics/analytics/models/custom_model.py
  6. 6
      analytics/analytics/models/drop_model.py
  7. 6
      analytics/analytics/models/general_model.py
  8. 6
      analytics/analytics/models/jump_model.py
  9. 6
      analytics/analytics/models/model.py
  10. 6
      analytics/analytics/models/peak_model.py
  11. 6
      analytics/analytics/models/trough_model.py
  12. 8
      server/src/controllers/alerts_controller.ts
  13. 12
      server/src/controllers/analytics_controller.ts
  14. 10
      server/src/models/analytic_unit_model.ts
  15. 4
      server/src/models/analytics_message_model.ts
  16. 2
      server/src/models/analytics_task_model.ts

4
analytics/analytics/analytic_unit_manager.py

@ -65,8 +65,8 @@ class AnalyticUnitManager:
data = prepare_data(payload['data']) data = prepare_data(payload['data'])
if task['type'] == 'LEARN': if task['type'] == 'LEARN':
return await worker.do_train(payload['segments'], data, payload['cache']) return await worker.do_train(payload['segments'], data, payload['cache'])
elif task['type'] == 'PREDICT': elif task['type'] == 'DETECT':
return await worker.do_predict(data, payload['cache']) return await worker.do_detect(data, payload['cache'])
raise ValueError('Unknown task type "%s"' % task['type']) raise ValueError('Unknown task type "%s"' % task['type'])

4
analytics/analytics/analytic_unit_worker.py

@ -30,8 +30,8 @@ class AnalyticUnitWorker:
except CancelledError as e: except CancelledError as e:
return cache return cache
async def do_predict(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: async def do_detect(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
return self._detector.predict(data, cache) return self._detector.detect(data, cache)
def cancel(self): def cancel(self):
if self._training_feature is not None: if self._training_feature is not None:

2
analytics/analytics/detectors/detector.py

@ -14,5 +14,5 @@ class Detector(ABC):
pass pass
@abstractmethod @abstractmethod
def predict(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: def detect(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
pass pass

12
analytics/analytics/detectors/pattern_detector.py

@ -42,17 +42,17 @@ class PatternDetector(Detector):
'cache': new_cache 'cache': new_cache
} }
def predict(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict:
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
predicted = self.model.predict(dataframe, cache) detected = self.model.detect(dataframe, cache)
segments = [{ 'from': segment[0], 'to': segment[1] } for segment in predicted['segments']] segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']]
newCache = predicted['cache'] newCache = detected['cache']
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_prediction_time = last_dataframe_time.value last_detection_time = last_dataframe_time.value
return { return {
'cache': newCache, 'cache': newCache,
'segments': segments, 'segments': segments,
'lastPredictionTime': last_prediction_time 'lastDetectionTime': last_detection_time
} }

2
analytics/analytics/models/custom_model.py

@ -7,5 +7,5 @@ class CustomModel(Model):
def do_fit(self, dataframe: pd.DataFrame, segments: list) -> None: def do_fit(self, dataframe: pd.DataFrame, segments: list) -> None:
pass pass
def do_predict(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame) -> list:
return [] return []

6
analytics/analytics/models/drop_model.py

@ -100,13 +100,13 @@ class DropModel(Model):
else: else:
self.state['conv_del_max'] = self.state['WINDOW_SIZE'] self.state['conv_del_max'] = self.state['WINDOW_SIZE']
def do_predict(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame) -> list:
data = dataframe['value'] data = dataframe['value']
possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1)
return self.__filter_prediction(possible_drops, data) return self.__filter_detection(possible_drops, data)
def __filter_prediction(self, segments: list, data: list): def __filter_detection(self, segments: list, data: list):
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state['WINDOW_SIZE']
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)

6
analytics/analytics/models/general_model.py

@ -78,7 +78,7 @@ class GeneralModel(Model):
else: else:
self.state['conv_del_max'] = self.state['WINDOW_SIZE'] self.state['conv_del_max'] = self.state['WINDOW_SIZE']
def do_predict(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame) -> list:
data = dataframe['value'] data = dataframe['value']
pat_data = self.model_gen pat_data = self.model_gen
y = max(pat_data) y = max(pat_data)
@ -90,10 +90,10 @@ class GeneralModel(Model):
self.all_conv.append(max(conv)) self.all_conv.append(max(conv))
all_conv_peaks = utils.peak_finder(self.all_conv, self.state['WINDOW_SIZE'] * 2) all_conv_peaks = utils.peak_finder(self.all_conv, self.state['WINDOW_SIZE'] * 2)
filtered = self.__filter_prediction(all_conv_peaks, data) filtered = self.__filter_detection(all_conv_peaks, data)
return set(item + self.state['WINDOW_SIZE'] for item in filtered) return set(item + self.state['WINDOW_SIZE'] for item in filtered)
def __filter_prediction(self, segments: list, data: list): def __filter_detection(self, segments: list, data: list):
if len(segments) == 0 or len(self.ipats) == 0: if len(segments) == 0 or len(self.ipats) == 0:
return [] return []
delete_list = [] delete_list = []

6
analytics/analytics/models/jump_model.py

@ -102,13 +102,13 @@ class JumpModel(Model):
else: else:
self.state['conv_del_max'] = self.state['WINDOW_SIZE'] self.state['conv_del_max'] = self.state['WINDOW_SIZE']
def do_predict(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame) -> list:
data = dataframe['value'] data = dataframe['value']
possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1) possible_jumps = utils.find_jump(data, self.state['JUMP_HEIGHT'], self.state['JUMP_LENGTH'] + 1)
return self.__filter_prediction(possible_jumps, data) return self.__filter_detection(possible_jumps, data)
def __filter_prediction(self, segments, data): def __filter_detection(self, segments, data):
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state['WINDOW_SIZE']
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)

6
analytics/analytics/models/model.py

@ -14,7 +14,7 @@ class Model(ABC):
pass pass
@abstractmethod @abstractmethod
def do_predict(self, dataframe: pd.DataFrame) -> list: def do_detect(self, dataframe: pd.DataFrame) -> list:
pass pass
def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache:
@ -34,11 +34,11 @@ class Model(ABC):
self.do_fit(dataframe, segments) self.do_fit(dataframe, segments)
return self.state return self.state
def predict(self, dataframe: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: def detect(self, dataframe: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
if type(cache) is AnalyticUnitCache: if type(cache) is AnalyticUnitCache:
self.state = cache self.state = cache
result = self.do_predict(dataframe) result = self.do_detect(dataframe)
# TODO: convert from ns to ms more proper way (not dividing by 10^6) # TODO: convert from ns to ms more proper way (not dividing by 10^6)
segments = [( segments = [(
dataframe['timestamp'][x - 1].value / 1000000, dataframe['timestamp'][x - 1].value / 1000000,

6
analytics/analytics/models/peak_model.py

@ -86,7 +86,7 @@ class PeakModel(Model):
else: else:
self.state['conv_del_max'] = self.state['WINDOW_SIZE'] self.state['conv_del_max'] = self.state['WINDOW_SIZE']
def do_predict(self, dataframe: pd.DataFrame): def do_detect(self, dataframe: pd.DataFrame):
data = dataframe['value'] data = dataframe['value']
window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data
all_maxs = argrelextrema(np.array(data), np.greater)[0] all_maxs = argrelextrema(np.array(data), np.greater)[0]
@ -100,9 +100,9 @@ class PeakModel(Model):
if data[i] > extrema_list[i]: if data[i] > extrema_list[i]:
segments.append(i) segments.append(i)
return self.__filter_prediction(segments, data) return self.__filter_detection(segments, data)
def __filter_prediction(self, segments: list, data: list) -> list: def __filter_detection(self, segments: list, data: list) -> list:
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state['WINDOW_SIZE']
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)

6
analytics/analytics/models/trough_model.py

@ -87,7 +87,7 @@ class TroughModel(Model):
else: else:
self.state['conv_del_max'] = self.state['WINDOW_SIZE'] self.state['conv_del_max'] = self.state['WINDOW_SIZE']
def do_predict(self, dataframe: pd.DataFrame): def do_detect(self, dataframe: pd.DataFrame):
data = dataframe['value'] data = dataframe['value']
window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data window_size = int(len(data)/SMOOTHING_COEFF) #test ws on flat data
all_mins = argrelextrema(np.array(data), np.less)[0] all_mins = argrelextrema(np.array(data), np.less)[0]
@ -101,9 +101,9 @@ class TroughModel(Model):
if data[i] < extrema_list[i]: if data[i] < extrema_list[i]:
segments.append(i) segments.append(i)
return self.__filter_prediction(segments, data) return self.__filter_detection(segments, data)
def __filter_prediction(self, segments: list, data: list) -> list: def __filter_detection(self, segments: list, data: list) -> list:
delete_list = [] delete_list = []
variance_error = self.state['WINDOW_SIZE'] variance_error = self.state['WINDOW_SIZE']
close_patterns = utils.close_filtering(segments, variance_error) close_patterns = utils.close_filtering(segments, variance_error)

8
server/src/controllers/alerts_controller.ts

@ -5,7 +5,7 @@
throw new Error('not supported'); throw new Error('not supported');
// import { runPredict } from './analytics_controller'; // import { runDetect } from './analytics_controller';
// import { getLabeledSegments } from './segments_controller'; // import { getLabeledSegments } from './segments_controller';
// import { AnalyticUnitId } from '../models/analytic_unit'; // import { AnalyticUnitId } from '../models/analytic_unit';
// import { sendNotification } from '../services/notification_service'; // import { sendNotification } from '../services/notification_service';
@ -56,10 +56,10 @@ throw new Error('not supported');
// async function alertsTick() { // async function alertsTick() {
// let alertsAnomalies = getAlertsAnomalies(); // let alertsAnomalies = getAlertsAnomalies();
// for (let predictorId of alertsAnomalies) { // for (let detectorId of alertsAnomalies) {
// try { // try {
// await runPredict(predictorId); // await runDetect(detectorId);
// processAlerts(predictorId); // processAlerts(detectorId);
// } catch (e) { // } catch (e) {
// console.error(e); // console.error(e);
// } // }

12
server/src/controllers/analytics_controller.ts

@ -51,7 +51,7 @@ async function onMessage(message: AnalyticsMessage) {
methodResolved = true; methodResolved = true;
} }
if(message.method === AnalyticsMessageMethod.PREDICT) { if(message.method === AnalyticsMessageMethod.DETECT) {
onPredict(message.payload); onPredict(message.payload);
methodResolved = true; methodResolved = true;
} }
@ -159,7 +159,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
try { try {
let unit = await AnalyticUnit.findById(id); let unit = await AnalyticUnit.findById(id);
previousLastPredictionTime = unit.lastPredictionTime; previousLastPredictionTime = unit.lastDetectionTime;
let pattern = unit.type; let pattern = unit.type;
let segments = await Segment.findMany(id, { labeled: true }); let segments = await Segment.findMany(id, { labeled: true });
@ -183,8 +183,8 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
} }
let task = new AnalyticsTask( let task = new AnalyticsTask(
id, id,
AnalyticsTaskType.PREDICT, AnalyticsTaskType.DETECT,
{ pattern, lastPredictionTime: unit.lastPredictionTime, data, cache: oldCache } { pattern, lastPredictionTime: unit.lastDetectionTime, data, cache: oldCache }
); );
console.debug(`run task, id:${id}`); console.debug(`run task, id:${id}`);
let result = await runTask(task); let result = await runTask(task);
@ -209,13 +209,13 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
Segment.insertSegments(payload.segments); Segment.insertSegments(payload.segments);
AnalyticUnitCache.setData(id, payload.cache); AnalyticUnitCache.setData(id, payload.cache);
AnalyticUnit.setPredictionTime(id, payload.lastPredictionTime); AnalyticUnit.setDetectionTime(id, payload.lastPredictionTime);
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch(err) { } catch(err) {
let message = err.message || JSON.stringify(err); let message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);
if(previousLastPredictionTime !== undefined) { if(previousLastPredictionTime !== undefined) {
await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime); await AnalyticUnit.setDetectionTime(id, previousLastPredictionTime);
} }
} }
} }

10
server/src/models/analytic_unit_model.ts

@ -22,7 +22,7 @@ export class AnalyticUnit {
public type: string, public type: string,
public metric: Metric, public metric: Metric,
public id?: AnalyticUnitId, public id?: AnalyticUnitId,
public lastPredictionTime?: number, public lastDetectionTime?: number,
public status?: AnalyticUnitStatus, public status?: AnalyticUnitStatus,
public error?: string, public error?: string,
) { ) {
@ -47,7 +47,7 @@ export class AnalyticUnit {
panelUrl: this.panelUrl, panelUrl: this.panelUrl,
type: this.type, type: this.type,
metric: this.metric.toObject(), metric: this.metric.toObject(),
lastPredictionTime: this.lastPredictionTime, lastDetectionTime: this.lastDetectionTime,
status: this.status, status: this.status,
error: this.error error: this.error
}; };
@ -63,7 +63,7 @@ export class AnalyticUnit {
obj.type, obj.type,
Metric.fromObject(obj.metric), Metric.fromObject(obj.metric),
obj._id, obj._id,
obj.lastPredictionTime, obj.lastDetectionTime,
obj.status as AnalyticUnitStatus, obj.status as AnalyticUnitStatus,
obj.error, obj.error,
); );
@ -106,6 +106,6 @@ export async function setStatus(id: AnalyticUnitId, status: string, error?: stri
return db.updateOne(id, { status, error }); return db.updateOne(id, { status, error });
} }
export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) { export async function setDetectionTime(id: AnalyticUnitId, lastDetectionTime: number) {
return db.updateOne(id, { lastPredictionTime }); return db.updateOne(id, { lastDetectionTime });
} }

4
server/src/models/analytics_message_model.ts

@ -1,7 +1,7 @@
export enum AnalyticsMessageMethod { export enum AnalyticsMessageMethod {
TASK = 'TASK', TASK = 'TASK',
TASK_RESULT = 'TASK_RESULT', TASK_RESULT = 'TASK_RESULT',
PREDICT = 'PREDICT' DETECT = 'DETECT'
} }
export class AnalyticsMessage { export class AnalyticsMessage {
@ -27,4 +27,4 @@ export class AnalyticsMessage {
} }
return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); return new AnalyticsMessage(obj.method, obj.payload, obj.requestId);
} }
} }

2
server/src/models/analytics_task_model.ts

@ -8,7 +8,7 @@ const UID_LENGTH = 16;
export type AnalyticsTaskId = string; export type AnalyticsTaskId = string;
export enum AnalyticsTaskType { export enum AnalyticsTaskType {
LEARN = 'LEARN', LEARN = 'LEARN',
PREDICT = 'PREDICT', DETECT = 'DETECT',
CANCEL = 'CANCEL' CANCEL = 'CANCEL'
}; };

Loading…
Cancel
Save