diff --git a/analytics/config.py b/analytics/config.py index d072594..fd0ae65 100644 --- a/analytics/config.py +++ b/analytics/config.py @@ -3,7 +3,6 @@ import json PARENT_FOLDER = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) -DATA_FOLDER = os.path.join(PARENT_FOLDER, 'data') CONFIG_FILE = os.path.join(PARENT_FOLDER, 'config.json') @@ -14,7 +13,6 @@ if config_exists: def get_config_field(field, default_val = None): - if field in os.environ: return os.environ[field] @@ -26,12 +24,5 @@ def get_config_field(field, default_val = None): raise Exception('Please configure {}'.format(field)) - -DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets') -MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models') -METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics') - -HASTIC_API_KEY = get_config_field('HASTIC_API_KEY') - ZMQ_DEV_PORT = get_config_field('ZMQ_DEV_PORT', '8002') ZMQ_CONNECTION_STRING = get_config_field('ZMQ_CONNECTION_STRING', 'tcp://*:%s' % ZMQ_DEV_PORT) diff --git a/analytics/detectors/general_detector/general_detector.py b/analytics/detectors/general_detector/general_detector.py index 7ddf1d3..1339678 100644 --- a/analytics/detectors/general_detector/general_detector.py +++ b/analytics/detectors/general_detector/general_detector.py @@ -17,28 +17,9 @@ logger = logging.getLogger('analytic_toolset') class GeneralDetector: - def __init__(self, anomaly_name): + def __init__(self, anomaly_name, data): self.anomaly_name = anomaly_name - - parsedUrl = urlparse(self.anomaly_config['panelUrl']) - origin = parsedUrl.scheme + '://' + parsedUrl.netloc - - datasource = self.anomaly_config['datasource'] - datasource['origin'] = origin - metric_name = self.anomaly_config['metric']['targets'][0] - - target_filename = os.path.join(config.METRICS_FOLDER, metric_name + ".json") - - dataset_filename = os.path.join(config.DATASET_FOLDER, metric_name + ".csv") - augmented_path = os.path.join(config.DATASET_FOLDER, metric_name + "_augmented.csv") - - with open(target_filename, 'r') as file: - target = json.load(file) - - self.data_prov = GrafanaDataProvider(datasource, target, dataset_filename) - self.preprocessor = data_preprocessor(self.data_prov, augmented_path) self.model = None - self.__load_model() async def learn(self, segments): @@ -112,13 +93,7 @@ class GeneralDetector: return SupervisedAlgorithm() def __save_model(self): - logger.info("Save model '%s'" % self.anomaly_name) - model_filename = os.path.join(config.MODELS_FOLDER, self.anomaly_name + ".m") - self.model.save(model_filename) + # TODO: use data_service to save anything def __load_model(self): - logger.info("Load model '%s'" % self.anomaly_name) - model_filename = os.path.join(config.MODELS_FOLDER, self.anomaly_name + ".m") - if os.path.exists(model_filename): - self.model = self.create_algorithm() - self.model.load(model_filename) + # TODO: use data_service to save anything diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index b0de85d..55691bc 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -34,23 +34,6 @@ class PatternDetector: self.analytic_unit_id = analytic_unit_id self.pattern_type = pattern_type - self.__load_anomaly_config() - - parsedUrl = urlparse(self.anomaly_config['panelUrl']) - origin = parsedUrl.scheme + '://' + parsedUrl.netloc - - datasource = self.anomaly_config['datasource'] - metric_name = self.anomaly_config['metric']['targets'][0] - - target_filename = os.path.join(config.METRICS_FOLDER, metric_name + ".json") - datasource['origin'] = origin - dataset_filename = os.path.join(config.DATASET_FOLDER, metric_name + ".csv") - - with open(target_filename, 'r') as file: - target = json.load(file) - - self.data_prov = GrafanaDataProvider(datasource, target, dataset_filename) - self.model = None self.__load_model(pattern_type) @@ -99,13 +82,7 @@ class PatternDetector: self.data_prov.synchronize() def __save_model(self): - logger.info("Save model '%s'" % self.analytic_unit_id) - model_filename = os.path.join(config.MODELS_FOLDER, self.analytic_unit_id + ".m") - self.model.save(model_filename) + # TODO: use data_service to save anything def __load_model(self, pattern): - logger.info("Load model '%s'" % self.analytic_unit_id) - model_filename = os.path.join(config.MODELS_FOLDER, self.pattern_type + ".m") - if os.path.exists(model_filename): - self.model = resolve_model_by_pattern(pattern) - self.model.load(model_filename) + # TODO: use data_service to save anything diff --git a/analytics/grafana_data_provider.py b/analytics/grafana_data_provider.py deleted file mode 100644 index afd31e7..0000000 --- a/analytics/grafana_data_provider.py +++ /dev/null @@ -1,229 +0,0 @@ -import pandas as pd -import os, re -import numpy as np -from urllib.parse import urlencode, urlparse -import urllib.request -import json -from time import time -from config import HASTIC_API_KEY - - -MS_IN_WEEK = 604800000 - -class GrafanaDataProvider: - chunk_size = 50000 - - def __init__(self, datasource, target, data_filename): - self.datasource = datasource - self.target = target - self.data_filename = data_filename - self.last_time = None - self.total_size = 0 - self.last_chunk_index = 0 - self.chunk_last_times = {} - self.__init_chunks() - self.synchronize() - - def get_dataframe(self, after_time=None): - result = pd.DataFrame() - for chunk_index, last_chunk_time in self.chunk_last_times.items(): - if after_time is None or after_time <= last_chunk_time: - chunk = self.__load_chunk(chunk_index) - if after_time is not None: - chunk = chunk[chunk['timestamp'] >= after_time] - result = pd.concat([result, chunk]) - return result - - def get_upper_bound(self, after_time): - for chunk_index, last_chunk_time in self.chunk_last_times.items(): - if after_time < last_chunk_time: - chunk = self.__load_chunk(chunk_index) - chunk = chunk[chunk['timestamp'] >= after_time] - return chunk.index[0] - return self.size() - - def size(self): - return self.total_size - - def get_data_range(self, start_index, stop_index=None): - return self.__get_data(start_index, stop_index) - - def transform_anomalies(self, anomalies): - result = [] - if len(anomalies) == 0: - return result - dataframe = self.get_dataframe(None) - for anomaly in anomalies: - start_time = pd.to_datetime(anomaly['start'] - 1, unit='ms') - finish_time = pd.to_datetime(anomaly['finish'] + 1, unit='ms') - current_index = (dataframe['timestamp'] >= start_time) & (dataframe['timestamp'] <= finish_time) - anomaly_frame = dataframe[current_index] - if anomaly_frame.empty: - continue - - cur_anomaly = { - 'start': anomaly_frame.index[0], - 'finish': anomaly_frame.index[len(anomaly_frame) - 1], - 'labeled': anomaly['labeled'] - } - result.append(cur_anomaly) - return result - - def inverse_transform_indexes(self, indexes): - if len(indexes) == 0: - return [] - dataframe = self.get_data_range(indexes[0][0], indexes[-1][1] + 1) - - return [(dataframe['timestamp'][i1], dataframe['timestamp'][i2]) for (i1, i2) in indexes] - - def synchronize(self): - append_dataframe = self.load_from_db(self.last_time) - self.__append_data(append_dataframe) - - def custom_query(self, after_time, before_time = None): - if self.datasource['type'] == 'influxdb': - query = self.datasource['params']['q'] - if after_time is not None: - if before_time is not None: - timeFilter = 'time >= %s AND time <= %s' % (after_time, before_time) - else: - timeFilter = 'time >= "%s"' % (str(after_time)) - else: - timeFilter = 'time > 0ms' - query = re.sub(r'(?:time >.+?)(GROUP.+)*$', timeFilter + r' \1', query) - return query - else: - raise 'Datasource type ' + self.datasource['type'] + ' is not supported yet' - - def load_from_db(self, after_time=None): - result = self.__load_data_chunks(after_time) - if result == None or len(result['values']) == 0: - dataframe = pd.DataFrame([]) - else: - dataframe = pd.DataFrame(result['values'], columns = result['columns']) - cols = dataframe.columns.tolist() - cols.remove('time') - cols = ['time'] + cols - dataframe = dataframe[cols] - dataframe['time'] = pd.to_datetime(dataframe['time'], unit='ms') - dataframe = dataframe.dropna(axis=0, how='any') - - return dataframe - - def __load_data_chunks(self, after_time = None): - params = self.datasource['params'] - - if after_time == None: - res = { - 'columns': [], - 'values': [] - } - - after_time = int(time() * 1000 - MS_IN_WEEK) - before_time = int(time() * 1000) - while True: - params['q'] = self.custom_query(str(after_time) + 'ms', str(before_time) + 'ms') - serie = self.__query_grafana(params) - - if serie != None: - res['columns'] = serie['columns'] - res['values'] += serie['values'] - - after_time -= MS_IN_WEEK - before_time -= MS_IN_WEEK - else: - return res - else: - params['q'] = self.custom_query(str(after_time)) - - return self.__query_grafana(params) - - def __query_grafana(self, params): - - headers = { 'Authorization': 'Bearer ' + HASTIC_API_KEY } - url = self.datasource['origin'] + '/' + self.datasource['url'] + '?' + urlencode(params) - - req = urllib.request.Request(url, headers=headers) - with urllib.request.urlopen(req) as resp: - res = json.loads(resp.read().decode('utf-8'))['results'][0] - if 'series' in res: - return res['series'][0] - else: - return None - - def __init_chunks(self): - chunk_index = 0 - self.last_chunk_index = 0 - while True: - filename = self.data_filename - if chunk_index > 0: - filename += "." + str(chunk_index) - if os.path.exists(filename): - self.last_chunk_index = chunk_index - chunk = self.__load_chunk(chunk_index) - chunk_last_time = chunk.iloc[len(chunk) - 1]['timestamp'] - self.chunk_last_times[chunk_index] = chunk_last_time - self.last_time = chunk_last_time - else: - break - chunk_index += 1 - self.total_size = self.last_chunk_index * self.chunk_size - last_chunk = self.__load_chunk(self.last_chunk_index) - self.total_size += len(last_chunk) - - def __load_chunk(self, index): - filename = self.data_filename - if index > 0: - filename += "." + str(index) - - if os.path.exists(filename): - chunk = pd.read_csv(filename, parse_dates=[0]) - frame_index = np.arange(index * self.chunk_size, index * self.chunk_size + len(chunk)) - chunk = chunk.set_index(frame_index) - return chunk.rename(columns={chunk.columns[0]: "timestamp", chunk.columns[1]: "value"}) - return pd.DataFrame() - - def __save_chunk(self, index, dataframe): - filename = self.data_filename - if index > 0: - filename += "." + str(index) - - chunk_last_time = dataframe.iloc[len(dataframe) - 1]['time'] - self.chunk_last_times[index] = chunk_last_time - - if os.path.exists(filename): - dataframe.to_csv(filename, mode='a', index=False, header=False) - else: - dataframe.to_csv(filename, mode='w', index=False, header=True) - - def __append_data(self, dataframe): - while len(dataframe) > 0: - chunk = self.__load_chunk(self.last_chunk_index) - rows_count = min(self.chunk_size - len(chunk), len(dataframe)) - - rows = dataframe.iloc[0:rows_count] - - if len(rows) > 0: - self.__save_chunk(self.last_chunk_index, rows) - self.total_size += rows_count - - self.last_time = rows.iloc[-1]['time'] - dataframe = dataframe[rows_count:] - - if len(dataframe) > 0: - self.last_chunk_index += 1 - - def __get_data(self, start_index, stop_index): - result = pd.DataFrame() - start_chunk = start_index // self.chunk_size - finish_chunk = self.last_chunk_index - if stop_index is not None: - finish_chunk = stop_index // self.chunk_size - for chunk_num in range(start_chunk, finish_chunk + 1): - chunk = self.__load_chunk(chunk_num) - if stop_index is not None and chunk_num == finish_chunk: - chunk = chunk[:stop_index % self.chunk_size] - if chunk_num == start_chunk: - chunk = chunk[start_index % self.chunk_size:] - result = pd.concat([result, chunk]) - return result