diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index 86add4f..34b0eb6 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -13,12 +13,8 @@ class AnalyticUnitWorker: self.analytic_unit_id = analytic_unit_id self.detector = detector - async def do_learn(self, segments: list, data: pd.DataFrame) -> None: - await self.detector.train(data, segments) + async def do_learn(self, segments: list, data: pd.DataFrame, cache: dict) -> dict: + return await self.detector.train(data, segments, cache) - async def do_predict(self, data: pd.DataFrame): - segments, last_prediction_time = await self.detector.predict(data) - return { - 'segments': segments, - 'lastPredictionTime': last_prediction_time - } + async def do_predict(self, data: pd.DataFrame, cache: dict) -> dict: + return await self.detector.predict(data, cache) diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index a6f3340..06147d4 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -32,26 +32,19 @@ class PatternDetector(Detector): async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict): # TODO: pass only part of dataframe that has segments - self.model.fit(dataframe, segments) + self.model.fit(dataframe, segments, cache) # TODO: save model after fit return cache async def predict(self, dataframe: pd.DataFrame, cache: dict): - predicted_indexes = await self.model.predict(dataframe) + predicted = await self.model.predict(dataframe, cache) - segments = [] - # for time_value in predicted_times: - # ts1 = int(time_value[0].timestamp() * 1000) - # ts2 = int(time_value[1].timestamp() * 1000) - # segments.append({ - # 'start': min(ts1, ts2), - # 'finish': max(ts1, ts2) - # }) + segments = [{ 'from': segment[0], 'to': segment[1] } for segment in predicted] last_dataframe_time = dataframe.iloc[-1]['timestamp'] - last_prediction_time = int(last_dataframe_time.timestamp() * 1000) + last_prediction_time = last_dataframe_time.value return { 'cache': cache, 'segments': segments, - 'last_prediction_time': last_prediction_time + 'lastPredictionTime': last_prediction_time } diff --git a/analytics/models/step_model.py b/analytics/models/step_model.py index d735755..767bd36 100644 --- a/analytics/models/step_model.py +++ b/analytics/models/step_model.py @@ -61,12 +61,11 @@ class StepModel(Model): #print(segment_min_line, segment_max_line) drop_height = 0.95 * (segment_max_line - segment_min_line) drop_height_list.append(drop_height) - drop_lenght = utils.find_drop_length(segment_data, segment_min_line, segment_max_line) - #print(drop_lenght) - drop_length_list.append(drop_lenght) + drop_length = utils.find_drop_length(segment_data, segment_min_line, segment_max_line) + drop_length_list.append(drop_length) cen_ind = utils.drop_intersection(flat_segment, segment_median) #finds all interseprions with median drop_center = cen_ind[0] - segment_cent_index = drop_center - 5 + segment['start'] + segment_cent_index = drop_center - 5 + segment_from_index self.idrops.append(segment_cent_index) labeled_drop = data[segment_cent_index - WINDOW_SIZE : segment_cent_index + WINDOW_SIZE] labeled_min = min(labeled_drop) @@ -101,22 +100,20 @@ class StepModel(Model): for i in range(0,len(dataframe['value'])): dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min - data = dataframe['value'] - - result = self.__predict(data) - result.sort() + result = await self.__predict(dataframe) if len(self.segments) > 0: - result = [segment for segment in result if not utils.is_intersect(segment, self.segments)] - return result + return [segment for segment in result if not utils.is_intersect(segment, self.segments)] - def __predict(self, data): + async def __predict(self, dataframe): #window_size = 24 #all_max_flatten_data = data.rolling(window=window_size).mean() #all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0] - #print(self.state['DROP_HEIGHT'],self.state['DROP_LENGTH'] ) + #print(self.state['DROP_HEIGHT'],self.state['DROP_LENGTH']) + data = dataframe['value'] possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) - return [(x - 1, x + 1) for x in self.__filter_prediction(possible_drops, data)] + filtered = self.__filter_prediction(possible_drops, data) + return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered] def __filter_prediction(self, segments, data): delete_list = [] diff --git a/analytics/server.py b/analytics/server.py index b4e1ed6..2b49700 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -40,7 +40,6 @@ async def handle_task(task: object): '_id': task['_id'], 'task': task['type'], 'analyticUnitId': task['analyticUnitId'], - 'cache': task['cache'], 'status': "IN_PROGRESS" } diff --git a/analytics/utils/__init__.py b/analytics/utils/__init__.py index 4321d23..80eb7d3 100644 --- a/analytics/utils/__init__.py +++ b/analytics/utils/__init__.py @@ -176,7 +176,8 @@ def find_drop_length(segment_data, min_line, max_line): if (idx[0] - idl[-1] + 1) > 0: return idx[0] - idl[-1] + 1 else: - return print("retard alert!") + print("retard alert!") + return 0 def drop_intersection(segment_data, median_line): x = np.arange(0, len(segment_data)) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 21f4f5c..55eb9e1 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -88,9 +88,6 @@ function getQueryRangeForLearningBySegments(segments: Segment.Segment[]) { } export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { - - let previousLastPredictionTime: number = undefined; - try { let analyticUnit = await AnalyticUnit.findById(id); @@ -117,94 +114,101 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { oldCache = oldCache.data; } let task = new AnalyticsTask( - id, AnalyticsTaskType.LEARN, - { pattern, segments: segmentObjs, data, cache: oldCache } + id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data, cache: oldCache } ); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); let result = await runTask(task); - let { lastPredictionTime, segments: predictedSegments, cache: newCache } = await processLearningResult(result); - AnalyticUnitCache.setData(id, newCache); - previousLastPredictionTime = analyticUnit.lastPredictionTime; - - await Promise.all([ - Segment.insertSegments(predictedSegments), - AnalyticUnit.setPredictionTime(id, lastPredictionTime) - ]); - await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); + if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { + throw new Error(result.error) + } + AnalyticUnitCache.setData(id, result.payload.cache); } catch (err) { let message = err.message || JSON.stringify(err); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); - if(previousLastPredictionTime !== undefined) { - await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime); - } } } -async function processLearningResult(taskResult: any): Promise<{ +function processPredictionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, taskResult: any): { lastPredictionTime: number, segments: Segment.Segment[], cache: any -}> { - if(taskResult.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { - return Promise.reject(taskResult.error); +} { + let payload = taskResult.payload; + if(payload === undefined) { + throw new Error(`Missing payload in result: ${taskResult}`); + } + if(payload.segments === undefined || !Array.isArray(payload.segments)) { + throw new Error(`Missing segments in result or it is corrupted: ${JSON.stringify(payload)}`); + } + if(payload.lastPredictionTime === undefined || isNaN(+payload.lastPredictionTime)) { + throw new Error( + `Missing lastPredictionTime is result or it is corrupted: ${JSON.stringify(payload)}` + ); } - console.log(taskResult) - // if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) { - // throw new Error('Missing segments in result or it is corrupted: ' + taskResult); - // } - // if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) { - // throw new Error( - // 'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime - // ); - // } + + let segments = payload.segments.map(segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false)); return { - lastPredictionTime: 0, - segments: [], + lastPredictionTime: payload.lastPredictionTime, + segments: segments, cache: {} }; } export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { - let unit = await AnalyticUnit.findById(id); - let pattern = unit.type; + let previousLastPredictionTime: number = undefined; - let segments = await Segment.findMany(id, { labeled: true }); - if (segments.length < 2) { - throw new Error('Need at least 2 labeled segments'); - } + try { + let unit = await AnalyticUnit.findById(id); + previousLastPredictionTime = unit.lastPredictionTime; + let pattern = unit.type; - let { from, to } = getQueryRangeForLearningBySegments(segments); - let data = await queryByMetric(unit.metric, unit.panelUrl, from, to); - if (data.length === 0) { - throw new Error('Empty data to predict on'); - } + let segments = await Segment.findMany(id, { labeled: true }); + if (segments.length < 2) { + throw new Error('Need at least 2 labeled segments'); + } - let task = new AnalyticsTask( - id, - AnalyticsTaskType.PREDICT, - { pattern, lastPredictionTime: unit.lastPredictionTime, data } - ); - let result = await runTask(task); - if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { - return []; - } - // Merging segments - if(segments.length > 0 && result.segments.length > 0) { - let lastOldSegment = segments[segments.length - 1]; - let firstNewSegment = result.segments[0]; - - if(firstNewSegment.from <= lastOldSegment.to) { - result.segments[0].from = lastOldSegment.from; - Segment.removeSegments([lastOldSegment.id]) + let { from, to } = getQueryRangeForLearningBySegments(segments); + let data = await queryByMetric(unit.metric, unit.panelUrl, from, to); + if (data.length === 0) { + throw new Error('Empty data to predict on'); } - } - Segment.insertSegments(result.segments); - AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); - return result.segments; + let task = new AnalyticsTask( + id, + AnalyticsTaskType.PREDICT, + { pattern, lastPredictionTime: unit.lastPredictionTime, data, cache: {} } + ); + let result = await runTask(task); + if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { + return []; + } + + let payload = processPredictionResult(id, result); + + // Merging segments + if(segments.length > 0 && payload.segments.length > 0) { + let lastOldSegment = segments[segments.length - 1]; + let firstNewSegment = payload.segments[0]; + + if(firstNewSegment.from <= lastOldSegment.to) { + payload.segments[0].from = lastOldSegment.from; + Segment.removeSegments([lastOldSegment.id]) + } + } + + Segment.insertSegments(payload.segments); + AnalyticUnit.setPredictionTime(id, payload.lastPredictionTime); + AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); + } catch(err) { + let message = err.message || JSON.stringify(err); + await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); + if(previousLastPredictionTime !== undefined) { + await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime); + } + } } export function isAnalyticReady(): boolean {