diff --git a/server/src/services/alerts.ts b/server/src/services/alerts.ts index 55f177a..07e85d3 100644 --- a/server/src/services/alerts.ts +++ b/server/src/services/alerts.ts @@ -52,7 +52,7 @@ async function alertsTick() { const alertTimeout = 60000; // ms const activeAlerts = new Set(); -setTimeout(alertsTick, 5000); +// setTimeout(alertsTick, 5000); export { getAlertsAnomalies, saveAlertsAnomalies } diff --git a/server/src/services/analytics.ts b/server/src/services/analytics.ts index 1e7e18f..9de55d9 100644 --- a/server/src/services/analytics.ts +++ b/server/src/services/analytics.ts @@ -15,7 +15,6 @@ const learnWorker = spawn('python3', ['worker.py'], { cwd: ANALYTICS_PATH }) learnWorker.stdout.pipe(split()) .pipe( mapSync(function(line){ - console.log(line) onMessage(line) }) ); @@ -26,6 +25,7 @@ const taskMap = {}; let nextTaskId = 0; function onMessage(data) { + console.log(`worker stdout: ${data}`); let response = JSON.parse(data); let taskId = response.__task_id; // let anomalyName = response.anomaly_name; diff --git a/src/anomaly_model.py b/src/anomaly_model.py index bdd21b1..c582f56 100644 --- a/src/anomaly_model.py +++ b/src/anomaly_model.py @@ -4,6 +4,7 @@ from data_preprocessor import data_preprocessor import json import pandas as pd import logging +from urllib.parse import urlparse datasource_folder = "datasources/" dataset_folder = "datasets/" @@ -26,22 +27,22 @@ class AnomalyModel: self.anomaly_name = anomaly_name self.load_anomaly_config() - datasource = self.anomaly_config['metric']['datasource'] + parsedUrl = urlparse(self.anomaly_config['panelUrl']) + origin = parsedUrl.scheme + '://' + parsedUrl.netloc + + datasource = self.anomaly_config['datasource'] + datasource['origin'] = origin metric_name = self.anomaly_config['metric']['targets'][0] - dbconfig_filename = os.path.join(datasource_folder, datasource + ".json") target_filename = os.path.join(metrics_folder, metric_name + ".json") dataset_filename = os.path.join(dataset_folder, metric_name + ".csv") augmented_path = os.path.join(dataset_folder, metric_name + "_augmented.csv") - with open(dbconfig_filename, 'r') as config_file: - dbconfig = json.load(config_file) - with open(target_filename, 'r') as file: target = json.load(file) - self.data_prov = DataProvider(dbconfig, target, dataset_filename) + self.data_prov = DataProvider(datasource, target, dataset_filename) self.preprocessor = data_preprocessor(self.data_prov, augmented_path) self.model = None @@ -96,8 +97,6 @@ class AnomalyModel: start_index = self.data_prov.get_upper_bound(last_prediction_time) stop_index = self.data_prov.size() - # last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') - # dataframe = dataframe[dataframe['timestamp'] > last_prediction_time] last_prediction_time = int(last_prediction_time.timestamp() * 1000) predicted_anomalies = [] diff --git a/src/data_preprocessor.py b/src/data_preprocessor.py index 4c6eb91..6fac01a 100644 --- a/src/data_preprocessor.py +++ b/src/data_preprocessor.py @@ -64,7 +64,6 @@ class data_preprocessor: start_frame = start_index stop_frame = stop_index augmented = self.__get_data(start_frame, stop_frame) - if len(anomalies) > 0: anomalies_indexes = self.transform_anomalies(anomalies) augmented = augmented.drop(anomalies_indexes) @@ -84,8 +83,8 @@ class data_preprocessor: anomaly_index = current_index rows = dataframe[anomaly_index] - # indexes = np.floor_divide(rows.index, self.frame_size) - indexes = np.unique(rows.index) + indexes = np.floor_divide(rows.index, self.frame_size) + # indexes = np.unique(rows.index) return indexes def inverse_transform_anomalies(self, prediction): @@ -252,4 +251,4 @@ class data_preprocessor: augmented['time_of_day_column_x'] = np.cos(norm_seconds) if 'time_of_day_column_y' in features: augmented['time_of_day_column_y'] = np.sin(norm_seconds) - return augmented \ No newline at end of file + return augmented diff --git a/src/data_provider.py b/src/data_provider.py index 63409a2..d2764a8 100644 --- a/src/data_provider.py +++ b/src/data_provider.py @@ -1,14 +1,18 @@ -from influxdb import InfluxDBClient import pandas as pd -import os.path +import os, re import numpy as np +from urllib.parse import urlencode, urlparse +import urllib.request +import json +from time import time +MS_IN_WEEK = 604800000 class DataProvider: chunk_size = 50000 - def __init__(self, dbconfig, target, data_filename): - self.dbconfig = dbconfig + def __init__(self, datasource, target, data_filename): + self.datasource = datasource self.target = target self.data_filename = data_filename self.last_time = None @@ -69,79 +73,80 @@ class DataProvider: return [(dataframe['timestamp'][i1], dataframe['timestamp'][i2]) for (i1, i2) in indexes] def synchronize(self): - # last_time = None - # if len(self.dataframe/) > 0: - # last_time = self.dataframe['time'][len(self.dataframe)-1] append_dataframe = self.load_from_db(self.last_time) self.__append_data(append_dataframe) - # append_dataframe - # append_dataframe.to_csv(self.data_filename, mode='a', index=False, header=False) - # self.dataframe = pd.concat([self.dataframe, append_dataframe], ignore_index=True) - - # def load(self): - # if os.path.exists(self.data_filename): - # self.dataframe = pd.read_csv(self.data_filename, parse_dates=[0]) - # self.synchronize() - # else: - # append_dataframe = self.load_from_db() - # self.__append_data(append_dataframe) - # #self.dataframe.to_csv(self.data_filename, index=False, header=True) - - def custom_query(self, after_time): - query = self.target["query"] - timeFilter = "TRUE" - if after_time is not None: - timeFilter = "time > '%s'" % (str(after_time)) - query = query.replace("$timeFilter", timeFilter) - return query - def load_from_db(self, after_time=None): - """Instantiate a connection to the InfluxDB.""" - host = self.dbconfig['host'] - port = self.dbconfig['port'] - user = self.dbconfig['user'] - password = self.dbconfig['password'] - dbname = self.dbconfig['dbname'] - - client = InfluxDBClient(host, port, user, password, dbname) - # query = 'select k0, k1, k2 from vals;' - - measurement = self.target['measurement'] - select = self.target['select'] - tags = self.target['tags'] - - if "query" in self.target: - query = self.custom_query(after_time) + def custom_query(self, after_time, before_time = None): + if self.datasource['type'] == 'influxdb': + query = self.datasource['params']['q'] + if after_time is not None: + if before_time is not None: + timeFilter = 'time >= %s AND time <= %s' % (after_time, before_time) + else: + timeFilter = 'time >= "%s"' % (str(after_time)) + else: + timeFilter = 'time > 0ms' + query = re.sub(r'(?:time >.+?)(GROUP.+)*$', timeFilter + r' \1', query) + return query else: - select_values = select[0][0]['params'] - escaped_select_values = ["\"" + value + "\"" for value in select_values] - - conditions_entries = [] - if len(tags) > 0: - for tag in tags: - conditions_entries.append("(\"" + tag['key'] + "\"" + tag['operator'] + "'" + tag['value'] + "')") - if after_time: - conditions_entries.append("time > '%s'" % (str(after_time))) - - condition = "" - if len(conditions_entries) > 0: - condition = " where " + " AND ".join(conditions_entries) - - query = "select %s from \"%s\"%s;" % (",".join(escaped_select_values), measurement, condition) + raise 'Datasource type ' + self.datasource['type'] + ' is not supported yet' - result = client.query(query, chunked=True, chunk_size=10000) - dataframe = pd.DataFrame(result.get_points()) - if len(dataframe) > 0: + def load_from_db(self, after_time=None): + result = self.__load_data_chunks(after_time) + if result == None or len(result['values']) == 0: + dataframe = pd.DataFrame([]) + else: + dataframe = pd.DataFrame(result['values'], columns = result['columns']) cols = dataframe.columns.tolist() cols.remove('time') cols = ['time'] + cols dataframe = dataframe[cols] - - dataframe['time'] = pd.to_datetime(dataframe['time']) + dataframe['time'] = pd.to_datetime(dataframe['time'], unit='ms') dataframe = dataframe.dropna(axis=0, how='any') return dataframe + def __load_data_chunks(self, after_time = None): + params = self.datasource['params'] + + if after_time == None: + res = { + 'columns': [], + 'values': [] + } + + after_time = int(time()*1000 - MS_IN_WEEK) + before_time = int(time()*1000) + while True: + params['q'] = self.custom_query(str(after_time) + 'ms', str(before_time) + 'ms') + serie = self.__query_grafana(params) + + if serie != None: + res['columns'] = serie['columns'] + res['values'] += serie['values'] + + after_time -= MS_IN_WEEK + before_time -= MS_IN_WEEK + else: + return res + else: + params['q'] = self.custom_query(str(after_time) + 'ms') + + return self.__query_grafana(params) + + def __query_grafana(self, params): + + headers = { 'Authorization': 'Bearer ' + os.environ['API_KEY'] } + url = self.datasource['origin'] + '/' + self.datasource['url'] + '?' + urlencode(params) + + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as resp: + res = json.loads(resp.read().decode('utf-8'))['results'][0] + if 'series' in res: + return res['series'][0] + else: + return None + def __init_chunks(self): chunk_index = 0 self.last_chunk_index = 0 diff --git a/src/learn.py b/src/learn.py deleted file mode 100644 index 7925a03..0000000 --- a/src/learn.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -import csv -import os -from worker import worker - - -def enqueue_task(): - tasks_file = "tasks.csv" - tasks = [] - with open(tasks_file) as csvfile: - rdr = csv.reader(csvfile, delimiter=',') - tasks = list(rdr) - if len(tasks) == 0: - return None - res = tasks[0][0] - tasks = tasks[1:] - with open(tasks_file, "w+") as csvfile: - writer = csv.writer(csvfile) - writer.writerows(tasks) - return res - - -def set_lock(value): - lock_file = "learn.lock" - exists = os.path.exists(lock_file) - if exists == value: - return False - - if value: - open(lock_file, "w+") - else: - os.remove(lock_file) - return True - - -if __name__ == "__main__": - if not set_lock(True): - print("learn locked") - exit(0) - - w = worker() - while True: - task = enqueue_task() - if task is None: - break - - w.start() - w.add_task({"type": "learn", "anomaly_name": task}) - w.add_task({"type": "predict", "anomaly_name": task}) - w.stop() - - set_lock(False) \ No newline at end of file diff --git a/src/predict.py b/src/predict.py deleted file mode 100644 index bd975a8..0000000 --- a/src/predict.py +++ /dev/null @@ -1,83 +0,0 @@ -import argparse -import csv -import time -import datetime -import pandas as pd -import matplotlib.pyplot as plt - -from influxdb import InfluxDBClient -from sklearn import svm -import numpy as np -import math -import pickle - - -host = "209.205.120.226" -port = 8086 -datasetFile = "/tmp/dataset.csv" -anomaliesFile = "anomalies.csv" -predictedAnomaliesFile = "predicted_anomalies.csv" -modelFilename = 'finalized_model.sav' - - -def readAnomalies(): - anomalies = [] - - with open(anomaliesFile) as csvfile: - rdr = csv.reader(csvfile, delimiter=',') - for row in rdr: - anomaly = (int(row[0]), int(row[1])) - anomalies.append(anomaly) - - return anomalies - - -"""Instantiate a connection to the InfluxDB.""" -user = '' -password = '' -dbname = 'accelerometer' -query = 'select k0, k1, k2 from vals limit 10000;' - - -client = InfluxDBClient(host, port, user, password, dbname) - -def predict(host=host, port=port): - - result = client.query(query) - df = pd.DataFrame(result['vals'], columns=['time', 'k0', 'k1', 'k2']) - - basedAnomalies = readAnomalies() - - df2 = df.rolling(200, win_type='triang').sum() - df2['time'] = pd.to_datetime(df2['time']) - df2 = df2[np.isfinite(df2['k0'])] - - print(len(df2)) - - - anomalies = [] - last_anomaly = (-1, -1) - with open(modelFilename, 'rb') as fid: - clf = pickle.load(fid) - prediction = clf.predict(df2[['k0', 'k1', 'k2']]) - print(len(prediction)) - #print(prediction) - for i in range(len(prediction)): - if prediction[i] > 0.: - t = df2['time'][i + 199].timestamp() - t = ((t + 0 * 3600) * 1000) - if t < basedAnomalies[len(basedAnomalies) - 1][1]: - continue - if t < last_anomaly[1] + 1000: - last_anomaly = (last_anomaly[0], t) - else: - if last_anomaly[1] != -1: - anomalies.append(last_anomaly) - last_anomaly = (t, t) - - with open(predictedAnomaliesFile, "w") as file: - for anomaly in anomalies: - file.write(str(int(anomaly[0])) + "," + str(int(anomaly[1])) + "\n") - -predict() -