diff --git a/server/src/config.ts b/server/src/config.ts index 6a486f5..66fc89d 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -18,6 +18,7 @@ export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join(os.tmpdir(), 'hastic')); export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002'); +export const HASTIC_API_KEY = getConfigField('HASTIC_API_KEY'); export const ANLYTICS_PING_INTERVAL = 500; // ms diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 53ab9b1..609a5ef 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -3,6 +3,7 @@ import { AnalyticsTask, AnalyticsTaskType, AnalyticsTaskId } from '../models/ana import * as Segment from '../models/segment_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; +import { queryByMetric } from '../services/grafana_service'; type TaskResult = any; @@ -75,10 +76,17 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { let previousLastPredictionTime: number = undefined; try { + let segments = await Segment.findMany(id, { labeled: true }); + let analyticUnit = await AnalyticUnit.findById(id); + let segmentObjs = segments.map(s => s.toObject()); + let data = await queryByMetric(analyticUnit.metric); + + if(data.length === 0) { + throw new Error('Empty data to learn on'); + } - let analyticUnit = await AnalyticUnit.findById(id); if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { throw new Error('Can`t starn learning when it`s already started [' + id + ']'); } diff --git a/server/src/services/grafana_service.ts b/server/src/services/grafana_service.ts new file mode 100644 index 0000000..ac6d83b --- /dev/null +++ b/server/src/services/grafana_service.ts @@ -0,0 +1,28 @@ +import { Metric } from '../models/metric_model'; + +import { HASTIC_API_KEY } from '../config'; + +import axios from 'axios'; + + +export type Timestamp = number; +/** + * @param metric to query to Grafana + * @returns [time, value][] array + */ +export async function queryByMetric(metric: Metric): Promise<[number, number][]> { + var params = {} + ''; + let headers = { 'Authorization': 'Bearer ' + HASTIC_API_KEY }; + let url = metric.datasource['origin'] + '/' + + metric.datasource['url'] + '?' + + encodeURIComponent(params); + var res = await axios.get(url, { headers }); + let results = res.data['results']; + if(results === undefined) { + throw new Error('reuslts field is undefined in response'); + } + if(results.series === undefined) { + return []; + } + return res['series'][0]; +} diff --git a/server/src/services/metrics_service.ts b/server/src/services/metrics_service.ts deleted file mode 100644 index 34fa1b2..0000000 --- a/server/src/services/metrics_service.ts +++ /dev/null @@ -1,229 +0,0 @@ -// import pandas as pd -// import os, re -// import numpy as np -// from urllib.parse import urlencode, urlparse -// import urllib.request -// import json -// from time import time -// from config import HASTIC_API_KEY - - -// MS_IN_WEEK = 604800000 - -// class GrafanaDataProvider: -// chunk_size = 50000 - -// def __init__(self, datasource, target, data_filename): -// self.datasource = datasource -// self.target = target -// self.data_filename = data_filename -// self.last_time = None -// self.total_size = 0 -// self.last_chunk_index = 0 -// self.chunk_last_times = {} -// self.__init_chunks() -// self.synchronize() - -// def get_dataframe(self, after_time=None): -// result = pd.DataFrame() -// for chunk_index, last_chunk_time in self.chunk_last_times.items(): -// if after_time is None or after_time <= last_chunk_time: -// chunk = self.__load_chunk(chunk_index) -// if after_time is not None: -// chunk = chunk[chunk['timestamp'] >= after_time] -// result = pd.concat([result, chunk]) -// return result - -// def get_upper_bound(self, after_time): -// for chunk_index, last_chunk_time in self.chunk_last_times.items(): -// if after_time < last_chunk_time: -// chunk = self.__load_chunk(chunk_index) -// chunk = chunk[chunk['timestamp'] >= after_time] -// return chunk.index[0] -// return self.size() - -// def size(self): -// return self.total_size - -// def get_data_range(self, start_index, stop_index=None): -// return self.__get_data(start_index, stop_index) - -// def transform_anomalies(self, anomalies): -// result = [] -// if len(anomalies) == 0: -// return result -// dataframe = self.get_dataframe(None) -// for anomaly in anomalies: -// start_time = pd.to_datetime(anomaly['start'] - 1, unit='ms') -// finish_time = pd.to_datetime(anomaly['finish'] + 1, unit='ms') -// current_index = (dataframe['timestamp'] >= start_time) & (dataframe['timestamp'] <= finish_time) -// anomaly_frame = dataframe[current_index] -// if anomaly_frame.empty: -// continue - -// cur_anomaly = { -// 'start': anomaly_frame.index[0], -// 'finish': anomaly_frame.index[len(anomaly_frame) - 1], -// 'labeled': anomaly['labeled'] -// } -// result.append(cur_anomaly) -// return result - -// def inverse_transform_indexes(self, indexes): -// if len(indexes) == 0: -// return [] -// dataframe = self.get_data_range(indexes[0][0], indexes[-1][1] + 1) - -// return [(dataframe['timestamp'][i1], dataframe['timestamp'][i2]) for (i1, i2) in indexes] - -// def synchronize(self): -// append_dataframe = self.load_from_db(self.last_time) -// self.__append_data(append_dataframe) - -// def custom_query(self, after_time, before_time = None): -// if self.datasource['type'] == 'influxdb': -// query = self.datasource['params']['q'] -// if after_time is not None: -// if before_time is not None: -// timeFilter = 'time >= %s AND time <= %s' % (after_time, before_time) -// else: -// timeFilter = 'time >= "%s"' % (str(after_time)) -// else: -// timeFilter = 'time > 0ms' -// query = re.sub(r'(?:time >.+?)(GROUP.+)*$', timeFilter + r' \1', query) -// return query -// else: -// raise 'Datasource type ' + self.datasource['type'] + ' is not supported yet' - -// def load_from_db(self, after_time=None): -// result = self.__load_data_chunks(after_time) -// if result == None or len(result['values']) == 0: -// dataframe = pd.DataFrame([]) -// else: -// dataframe = pd.DataFrame(result['values'], columns = result['columns']) -// cols = dataframe.columns.tolist() -// cols.remove('time') -// cols = ['time'] + cols -// dataframe = dataframe[cols] -// dataframe['time'] = pd.to_datetime(dataframe['time'], unit='ms') -// dataframe = dataframe.dropna(axis=0, how='any') - -// return dataframe - -// def __load_data_chunks(self, after_time = None): -// params = self.datasource['params'] - -// if after_time == None: -// res = { -// 'columns': [], -// 'values': [] -// } - -// after_time = int(time() * 1000 - MS_IN_WEEK) -// before_time = int(time() * 1000) -// while True: -// params['q'] = self.custom_query(str(after_time) + 'ms', str(before_time) + 'ms') -// serie = self.__query_grafana(params) - -// if serie != None: -// res['columns'] = serie['columns'] -// res['values'] += serie['values'] - -// after_time -= MS_IN_WEEK -// before_time -= MS_IN_WEEK -// else: -// return res -// else: -// params['q'] = self.custom_query(str(after_time)) - -// return self.__query_grafana(params) - -// def __query_grafana(self, params): - -// headers = { 'Authorization': 'Bearer ' + HASTIC_API_KEY } -// url = self.datasource['origin'] + '/' + self.datasource['url'] + '?' + urlencode(params) - -// req = urllib.request.Request(url, headers=headers) -// with urllib.request.urlopen(req) as resp: -// res = json.loads(resp.read().decode('utf-8'))['results'][0] -// if 'series' in res: -// return res['series'][0] -// else: -// return None - -// def __init_chunks(self): -// chunk_index = 0 -// self.last_chunk_index = 0 -// while True: -// filename = self.data_filename -// if chunk_index > 0: -// filename += "." + str(chunk_index) -// if os.path.exists(filename): -// self.last_chunk_index = chunk_index -// chunk = self.__load_chunk(chunk_index) -// chunk_last_time = chunk.iloc[len(chunk) - 1]['timestamp'] -// self.chunk_last_times[chunk_index] = chunk_last_time -// self.last_time = chunk_last_time -// else: -// break -// chunk_index += 1 -// self.total_size = self.last_chunk_index * self.chunk_size -// last_chunk = self.__load_chunk(self.last_chunk_index) -// self.total_size += len(last_chunk) - -// def __load_chunk(self, index): -// filename = self.data_filename -// if index > 0: -// filename += "." + str(index) - -// if os.path.exists(filename): -// chunk = pd.read_csv(filename, parse_dates=[0]) -// frame_index = np.arange(index * self.chunk_size, index * self.chunk_size + len(chunk)) -// chunk = chunk.set_index(frame_index) -// return chunk.rename(columns={chunk.columns[0]: "timestamp", chunk.columns[1]: "value"}) -// return pd.DataFrame() - -// def __save_chunk(self, index, dataframe): -// filename = self.data_filename -// if index > 0: -// filename += "." + str(index) - -// chunk_last_time = dataframe.iloc[len(dataframe) - 1]['time'] -// self.chunk_last_times[index] = chunk_last_time - -// if os.path.exists(filename): -// dataframe.to_csv(filename, mode='a', index=False, header=False) -// else: -// dataframe.to_csv(filename, mode='w', index=False, header=True) - -// def __append_data(self, dataframe): -// while len(dataframe) > 0: -// chunk = self.__load_chunk(self.last_chunk_index) -// rows_count = min(self.chunk_size - len(chunk), len(dataframe)) - -// rows = dataframe.iloc[0:rows_count] - -// if len(rows) > 0: -// self.__save_chunk(self.last_chunk_index, rows) -// self.total_size += rows_count - -// self.last_time = rows.iloc[-1]['time'] -// dataframe = dataframe[rows_count:] - -// if len(dataframe) > 0: -// self.last_chunk_index += 1 - -// def __get_data(self, start_index, stop_index): -// result = pd.DataFrame() -// start_chunk = start_index // self.chunk_size -// finish_chunk = self.last_chunk_index -// if stop_index is not None: -// finish_chunk = stop_index // self.chunk_size -// for chunk_num in range(start_chunk, finish_chunk + 1): -// chunk = self.__load_chunk(chunk_num) -// if stop_index is not None and chunk_num == finish_chunk: -// chunk = chunk[:stop_index % self.chunk_size] -// if chunk_num == start_chunk: -// chunk = chunk[start_index % self.chunk_size:] -// result = pd.concat([result, chunk]) -// return result