Browse Source

Make drops work (#121)

pull/1/head
rozetko 6 years ago committed by GitHub
parent
commit
9b3a68d4cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      analytics/analytic_unit_worker.py
  2. 17
      analytics/detectors/pattern_detector.py
  3. 23
      analytics/models/step_model.py
  4. 1
      analytics/server.py
  5. 3
      analytics/utils/__init__.py
  6. 132
      server/src/controllers/analytics_controller.ts

12
analytics/analytic_unit_worker.py

@ -13,12 +13,8 @@ class AnalyticUnitWorker:
self.analytic_unit_id = analytic_unit_id self.analytic_unit_id = analytic_unit_id
self.detector = detector self.detector = detector
async def do_learn(self, segments: list, data: pd.DataFrame) -> None: async def do_learn(self, segments: list, data: pd.DataFrame, cache: dict) -> dict:
await self.detector.train(data, segments) return await self.detector.train(data, segments, cache)
async def do_predict(self, data: pd.DataFrame): async def do_predict(self, data: pd.DataFrame, cache: dict) -> dict:
segments, last_prediction_time = await self.detector.predict(data) return await self.detector.predict(data, cache)
return {
'segments': segments,
'lastPredictionTime': last_prediction_time
}

17
analytics/detectors/pattern_detector.py

@ -32,26 +32,19 @@ class PatternDetector(Detector):
async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict): async def train(self, dataframe: pd.DataFrame, segments: list, cache: dict):
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
self.model.fit(dataframe, segments) self.model.fit(dataframe, segments, cache)
# TODO: save model after fit # TODO: save model after fit
return cache return cache
async def predict(self, dataframe: pd.DataFrame, cache: dict): async def predict(self, dataframe: pd.DataFrame, cache: dict):
predicted_indexes = await self.model.predict(dataframe) predicted = await self.model.predict(dataframe, cache)
segments = [] segments = [{ 'from': segment[0], 'to': segment[1] } for segment in predicted]
# for time_value in predicted_times:
# ts1 = int(time_value[0].timestamp() * 1000)
# ts2 = int(time_value[1].timestamp() * 1000)
# segments.append({
# 'start': min(ts1, ts2),
# 'finish': max(ts1, ts2)
# })
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_prediction_time = int(last_dataframe_time.timestamp() * 1000) last_prediction_time = last_dataframe_time.value
return { return {
'cache': cache, 'cache': cache,
'segments': segments, 'segments': segments,
'last_prediction_time': last_prediction_time 'lastPredictionTime': last_prediction_time
} }

23
analytics/models/step_model.py

@ -61,12 +61,11 @@ class StepModel(Model):
#print(segment_min_line, segment_max_line) #print(segment_min_line, segment_max_line)
drop_height = 0.95 * (segment_max_line - segment_min_line) drop_height = 0.95 * (segment_max_line - segment_min_line)
drop_height_list.append(drop_height) drop_height_list.append(drop_height)
drop_lenght = utils.find_drop_length(segment_data, segment_min_line, segment_max_line) drop_length = utils.find_drop_length(segment_data, segment_min_line, segment_max_line)
#print(drop_lenght) drop_length_list.append(drop_length)
drop_length_list.append(drop_lenght)
cen_ind = utils.drop_intersection(flat_segment, segment_median) #finds all interseprions with median cen_ind = utils.drop_intersection(flat_segment, segment_median) #finds all interseprions with median
drop_center = cen_ind[0] drop_center = cen_ind[0]
segment_cent_index = drop_center - 5 + segment['start'] segment_cent_index = drop_center - 5 + segment_from_index
self.idrops.append(segment_cent_index) self.idrops.append(segment_cent_index)
labeled_drop = data[segment_cent_index - WINDOW_SIZE : segment_cent_index + WINDOW_SIZE] labeled_drop = data[segment_cent_index - WINDOW_SIZE : segment_cent_index + WINDOW_SIZE]
labeled_min = min(labeled_drop) labeled_min = min(labeled_drop)
@ -101,22 +100,20 @@ class StepModel(Model):
for i in range(0,len(dataframe['value'])): for i in range(0,len(dataframe['value'])):
dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min
data = dataframe['value'] result = await self.__predict(dataframe)
result = self.__predict(data)
result.sort()
if len(self.segments) > 0: if len(self.segments) > 0:
result = [segment for segment in result if not utils.is_intersect(segment, self.segments)] return [segment for segment in result if not utils.is_intersect(segment, self.segments)]
return result
def __predict(self, data): async def __predict(self, dataframe):
#window_size = 24 #window_size = 24
#all_max_flatten_data = data.rolling(window=window_size).mean() #all_max_flatten_data = data.rolling(window=window_size).mean()
#all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0] #all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0]
#print(self.state['DROP_HEIGHT'],self.state['DROP_LENGTH'] ) #print(self.state['DROP_HEIGHT'],self.state['DROP_LENGTH'])
data = dataframe['value']
possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1) possible_drops = utils.find_drop(data, self.state['DROP_HEIGHT'], self.state['DROP_LENGTH'] + 1)
return [(x - 1, x + 1) for x in self.__filter_prediction(possible_drops, data)] filtered = self.__filter_prediction(possible_drops, data)
return [(dataframe['timestamp'][x - 1].value, dataframe['timestamp'][x + 1].value) for x in filtered]
def __filter_prediction(self, segments, data): def __filter_prediction(self, segments, data):
delete_list = [] delete_list = []

1
analytics/server.py

@ -40,7 +40,6 @@ async def handle_task(task: object):
'_id': task['_id'], '_id': task['_id'],
'task': task['type'], 'task': task['type'],
'analyticUnitId': task['analyticUnitId'], 'analyticUnitId': task['analyticUnitId'],
'cache': task['cache'],
'status': "IN_PROGRESS" 'status': "IN_PROGRESS"
} }

3
analytics/utils/__init__.py

@ -176,7 +176,8 @@ def find_drop_length(segment_data, min_line, max_line):
if (idx[0] - idl[-1] + 1) > 0: if (idx[0] - idl[-1] + 1) > 0:
return idx[0] - idl[-1] + 1 return idx[0] - idl[-1] + 1
else: else:
return print("retard alert!") print("retard alert!")
return 0
def drop_intersection(segment_data, median_line): def drop_intersection(segment_data, median_line):
x = np.arange(0, len(segment_data)) x = np.arange(0, len(segment_data))

132
server/src/controllers/analytics_controller.ts

@ -88,9 +88,6 @@ function getQueryRangeForLearningBySegments(segments: Segment.Segment[]) {
} }
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let previousLastPredictionTime: number = undefined;
try { try {
let analyticUnit = await AnalyticUnit.findById(id); let analyticUnit = await AnalyticUnit.findById(id);
@ -117,94 +114,101 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
oldCache = oldCache.data; oldCache = oldCache.data;
} }
let task = new AnalyticsTask( let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data, cache: oldCache }
{ pattern, segments: segmentObjs, data, cache: oldCache }
); );
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
let result = await runTask(task); let result = await runTask(task);
let { lastPredictionTime, segments: predictedSegments, cache: newCache } = await processLearningResult(result); if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) {
AnalyticUnitCache.setData(id, newCache); throw new Error(result.error)
previousLastPredictionTime = analyticUnit.lastPredictionTime; }
AnalyticUnitCache.setData(id, result.payload.cache);
await Promise.all([
Segment.insertSegments(predictedSegments),
AnalyticUnit.setPredictionTime(id, lastPredictionTime)
]);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch (err) { } catch (err) {
let message = err.message || JSON.stringify(err); let message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);
if(previousLastPredictionTime !== undefined) {
await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime);
}
} }
} }
async function processLearningResult(taskResult: any): Promise<{ function processPredictionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, taskResult: any): {
lastPredictionTime: number, lastPredictionTime: number,
segments: Segment.Segment[], segments: Segment.Segment[],
cache: any cache: any
}> { } {
if(taskResult.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { let payload = taskResult.payload;
return Promise.reject(taskResult.error); if(payload === undefined) {
throw new Error(`Missing payload in result: ${taskResult}`);
}
if(payload.segments === undefined || !Array.isArray(payload.segments)) {
throw new Error(`Missing segments in result or it is corrupted: ${JSON.stringify(payload)}`);
}
if(payload.lastPredictionTime === undefined || isNaN(+payload.lastPredictionTime)) {
throw new Error(
`Missing lastPredictionTime is result or it is corrupted: ${JSON.stringify(payload)}`
);
} }
console.log(taskResult)
// if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) { let segments = payload.segments.map(segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false));
// throw new Error('Missing segments in result or it is corrupted: ' + taskResult);
// }
// if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) {
// throw new Error(
// 'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime
// );
// }
return { return {
lastPredictionTime: 0, lastPredictionTime: payload.lastPredictionTime,
segments: [], segments: segments,
cache: {} cache: {}
}; };
} }
export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
let unit = await AnalyticUnit.findById(id); let previousLastPredictionTime: number = undefined;
let pattern = unit.type;
let segments = await Segment.findMany(id, { labeled: true }); try {
if (segments.length < 2) { let unit = await AnalyticUnit.findById(id);
throw new Error('Need at least 2 labeled segments'); previousLastPredictionTime = unit.lastPredictionTime;
} let pattern = unit.type;
let { from, to } = getQueryRangeForLearningBySegments(segments); let segments = await Segment.findMany(id, { labeled: true });
let data = await queryByMetric(unit.metric, unit.panelUrl, from, to); if (segments.length < 2) {
if (data.length === 0) { throw new Error('Need at least 2 labeled segments');
throw new Error('Empty data to predict on'); }
}
let task = new AnalyticsTask( let { from, to } = getQueryRangeForLearningBySegments(segments);
id, let data = await queryByMetric(unit.metric, unit.panelUrl, from, to);
AnalyticsTaskType.PREDICT, if (data.length === 0) {
{ pattern, lastPredictionTime: unit.lastPredictionTime, data } throw new Error('Empty data to predict on');
);
let result = await runTask(task);
if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) {
return [];
}
// Merging segments
if(segments.length > 0 && result.segments.length > 0) {
let lastOldSegment = segments[segments.length - 1];
let firstNewSegment = result.segments[0];
if(firstNewSegment.from <= lastOldSegment.to) {
result.segments[0].from = lastOldSegment.from;
Segment.removeSegments([lastOldSegment.id])
} }
}
Segment.insertSegments(result.segments); let task = new AnalyticsTask(
AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); id,
return result.segments; AnalyticsTaskType.PREDICT,
{ pattern, lastPredictionTime: unit.lastPredictionTime, data, cache: {} }
);
let result = await runTask(task);
if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) {
return [];
}
let payload = processPredictionResult(id, result);
// Merging segments
if(segments.length > 0 && payload.segments.length > 0) {
let lastOldSegment = segments[segments.length - 1];
let firstNewSegment = payload.segments[0];
if(firstNewSegment.from <= lastOldSegment.to) {
payload.segments[0].from = lastOldSegment.from;
Segment.removeSegments([lastOldSegment.id])
}
}
Segment.insertSegments(payload.segments);
AnalyticUnit.setPredictionTime(id, payload.lastPredictionTime);
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch(err) {
let message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);
if(previousLastPredictionTime !== undefined) {
await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime);
}
}
} }
export function isAnalyticReady(): boolean { export function isAnalyticReady(): boolean {

Loading…
Cancel
Save