Browse Source

132-proxy-db-queries-through-grafana

pull/1/head
rozetko 6 years ago
parent
commit
b79200a552
  1. 2
      server/src/services/alerts.ts
  2. 2
      server/src/services/analytics.ts
  3. 15
      src/anomaly_model.py
  4. 7
      src/data_preprocessor.py
  5. 133
      src/data_provider.py
  6. 52
      src/learn.py
  7. 83
      src/predict.py

2
server/src/services/alerts.ts

@ -52,7 +52,7 @@ async function alertsTick() {
const alertTimeout = 60000; // ms const alertTimeout = 60000; // ms
const activeAlerts = new Set<string>(); const activeAlerts = new Set<string>();
setTimeout(alertsTick, 5000); // setTimeout(alertsTick, 5000);
export { getAlertsAnomalies, saveAlertsAnomalies } export { getAlertsAnomalies, saveAlertsAnomalies }

2
server/src/services/analytics.ts

@ -15,7 +15,6 @@ const learnWorker = spawn('python3', ['worker.py'], { cwd: ANALYTICS_PATH })
learnWorker.stdout.pipe(split()) learnWorker.stdout.pipe(split())
.pipe( .pipe(
mapSync(function(line){ mapSync(function(line){
console.log(line)
onMessage(line) onMessage(line)
}) })
); );
@ -26,6 +25,7 @@ const taskMap = {};
let nextTaskId = 0; let nextTaskId = 0;
function onMessage(data) { function onMessage(data) {
console.log(`worker stdout: ${data}`);
let response = JSON.parse(data); let response = JSON.parse(data);
let taskId = response.__task_id; let taskId = response.__task_id;
// let anomalyName = response.anomaly_name; // let anomalyName = response.anomaly_name;

15
src/anomaly_model.py

@ -4,6 +4,7 @@ from data_preprocessor import data_preprocessor
import json import json
import pandas as pd import pandas as pd
import logging import logging
from urllib.parse import urlparse
datasource_folder = "datasources/" datasource_folder = "datasources/"
dataset_folder = "datasets/" dataset_folder = "datasets/"
@ -26,22 +27,22 @@ class AnomalyModel:
self.anomaly_name = anomaly_name self.anomaly_name = anomaly_name
self.load_anomaly_config() self.load_anomaly_config()
datasource = self.anomaly_config['metric']['datasource'] parsedUrl = urlparse(self.anomaly_config['panelUrl'])
origin = parsedUrl.scheme + '://' + parsedUrl.netloc
datasource = self.anomaly_config['datasource']
datasource['origin'] = origin
metric_name = self.anomaly_config['metric']['targets'][0] metric_name = self.anomaly_config['metric']['targets'][0]
dbconfig_filename = os.path.join(datasource_folder, datasource + ".json")
target_filename = os.path.join(metrics_folder, metric_name + ".json") target_filename = os.path.join(metrics_folder, metric_name + ".json")
dataset_filename = os.path.join(dataset_folder, metric_name + ".csv") dataset_filename = os.path.join(dataset_folder, metric_name + ".csv")
augmented_path = os.path.join(dataset_folder, metric_name + "_augmented.csv") augmented_path = os.path.join(dataset_folder, metric_name + "_augmented.csv")
with open(dbconfig_filename, 'r') as config_file:
dbconfig = json.load(config_file)
with open(target_filename, 'r') as file: with open(target_filename, 'r') as file:
target = json.load(file) target = json.load(file)
self.data_prov = DataProvider(dbconfig, target, dataset_filename) self.data_prov = DataProvider(datasource, target, dataset_filename)
self.preprocessor = data_preprocessor(self.data_prov, augmented_path) self.preprocessor = data_preprocessor(self.data_prov, augmented_path)
self.model = None self.model = None
@ -96,8 +97,6 @@ class AnomalyModel:
start_index = self.data_prov.get_upper_bound(last_prediction_time) start_index = self.data_prov.get_upper_bound(last_prediction_time)
stop_index = self.data_prov.size() stop_index = self.data_prov.size()
# last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms')
# dataframe = dataframe[dataframe['timestamp'] > last_prediction_time]
last_prediction_time = int(last_prediction_time.timestamp() * 1000) last_prediction_time = int(last_prediction_time.timestamp() * 1000)
predicted_anomalies = [] predicted_anomalies = []

7
src/data_preprocessor.py

@ -64,7 +64,6 @@ class data_preprocessor:
start_frame = start_index start_frame = start_index
stop_frame = stop_index stop_frame = stop_index
augmented = self.__get_data(start_frame, stop_frame) augmented = self.__get_data(start_frame, stop_frame)
if len(anomalies) > 0: if len(anomalies) > 0:
anomalies_indexes = self.transform_anomalies(anomalies) anomalies_indexes = self.transform_anomalies(anomalies)
augmented = augmented.drop(anomalies_indexes) augmented = augmented.drop(anomalies_indexes)
@ -84,8 +83,8 @@ class data_preprocessor:
anomaly_index = current_index anomaly_index = current_index
rows = dataframe[anomaly_index] rows = dataframe[anomaly_index]
# indexes = np.floor_divide(rows.index, self.frame_size) indexes = np.floor_divide(rows.index, self.frame_size)
indexes = np.unique(rows.index) # indexes = np.unique(rows.index)
return indexes return indexes
def inverse_transform_anomalies(self, prediction): def inverse_transform_anomalies(self, prediction):
@ -252,4 +251,4 @@ class data_preprocessor:
augmented['time_of_day_column_x'] = np.cos(norm_seconds) augmented['time_of_day_column_x'] = np.cos(norm_seconds)
if 'time_of_day_column_y' in features: if 'time_of_day_column_y' in features:
augmented['time_of_day_column_y'] = np.sin(norm_seconds) augmented['time_of_day_column_y'] = np.sin(norm_seconds)
return augmented return augmented

133
src/data_provider.py

@ -1,14 +1,18 @@
from influxdb import InfluxDBClient
import pandas as pd import pandas as pd
import os.path import os, re
import numpy as np import numpy as np
from urllib.parse import urlencode, urlparse
import urllib.request
import json
from time import time
MS_IN_WEEK = 604800000
class DataProvider: class DataProvider:
chunk_size = 50000 chunk_size = 50000
def __init__(self, dbconfig, target, data_filename): def __init__(self, datasource, target, data_filename):
self.dbconfig = dbconfig self.datasource = datasource
self.target = target self.target = target
self.data_filename = data_filename self.data_filename = data_filename
self.last_time = None self.last_time = None
@ -69,79 +73,80 @@ class DataProvider:
return [(dataframe['timestamp'][i1], dataframe['timestamp'][i2]) for (i1, i2) in indexes] return [(dataframe['timestamp'][i1], dataframe['timestamp'][i2]) for (i1, i2) in indexes]
def synchronize(self): def synchronize(self):
# last_time = None
# if len(self.dataframe/) > 0:
# last_time = self.dataframe['time'][len(self.dataframe)-1]
append_dataframe = self.load_from_db(self.last_time) append_dataframe = self.load_from_db(self.last_time)
self.__append_data(append_dataframe) self.__append_data(append_dataframe)
# append_dataframe
# append_dataframe.to_csv(self.data_filename, mode='a', index=False, header=False)
# self.dataframe = pd.concat([self.dataframe, append_dataframe], ignore_index=True)
# def load(self):
# if os.path.exists(self.data_filename):
# self.dataframe = pd.read_csv(self.data_filename, parse_dates=[0])
# self.synchronize()
# else:
# append_dataframe = self.load_from_db()
# self.__append_data(append_dataframe)
# #self.dataframe.to_csv(self.data_filename, index=False, header=True)
def custom_query(self, after_time):
query = self.target["query"]
timeFilter = "TRUE"
if after_time is not None:
timeFilter = "time > '%s'" % (str(after_time))
query = query.replace("$timeFilter", timeFilter)
return query
def load_from_db(self, after_time=None): def custom_query(self, after_time, before_time = None):
"""Instantiate a connection to the InfluxDB.""" if self.datasource['type'] == 'influxdb':
host = self.dbconfig['host'] query = self.datasource['params']['q']
port = self.dbconfig['port'] if after_time is not None:
user = self.dbconfig['user'] if before_time is not None:
password = self.dbconfig['password'] timeFilter = 'time >= %s AND time <= %s' % (after_time, before_time)
dbname = self.dbconfig['dbname'] else:
timeFilter = 'time >= "%s"' % (str(after_time))
client = InfluxDBClient(host, port, user, password, dbname) else:
# query = 'select k0, k1, k2 from vals;' timeFilter = 'time > 0ms'
query = re.sub(r'(?:time >.+?)(GROUP.+)*$', timeFilter + r' \1', query)
measurement = self.target['measurement'] return query
select = self.target['select']
tags = self.target['tags']
if "query" in self.target:
query = self.custom_query(after_time)
else: else:
select_values = select[0][0]['params'] raise 'Datasource type ' + self.datasource['type'] + ' is not supported yet'
escaped_select_values = ["\"" + value + "\"" for value in select_values]
conditions_entries = []
if len(tags) > 0:
for tag in tags:
conditions_entries.append("(\"" + tag['key'] + "\"" + tag['operator'] + "'" + tag['value'] + "')")
if after_time:
conditions_entries.append("time > '%s'" % (str(after_time)))
condition = ""
if len(conditions_entries) > 0:
condition = " where " + " AND ".join(conditions_entries)
query = "select %s from \"%s\"%s;" % (",".join(escaped_select_values), measurement, condition)
result = client.query(query, chunked=True, chunk_size=10000) def load_from_db(self, after_time=None):
dataframe = pd.DataFrame(result.get_points()) result = self.__load_data_chunks(after_time)
if len(dataframe) > 0: if result == None or len(result['values']) == 0:
dataframe = pd.DataFrame([])
else:
dataframe = pd.DataFrame(result['values'], columns = result['columns'])
cols = dataframe.columns.tolist() cols = dataframe.columns.tolist()
cols.remove('time') cols.remove('time')
cols = ['time'] + cols cols = ['time'] + cols
dataframe = dataframe[cols] dataframe = dataframe[cols]
dataframe['time'] = pd.to_datetime(dataframe['time'], unit='ms')
dataframe['time'] = pd.to_datetime(dataframe['time'])
dataframe = dataframe.dropna(axis=0, how='any') dataframe = dataframe.dropna(axis=0, how='any')
return dataframe return dataframe
def __load_data_chunks(self, after_time = None):
params = self.datasource['params']
if after_time == None:
res = {
'columns': [],
'values': []
}
after_time = int(time()*1000 - MS_IN_WEEK)
before_time = int(time()*1000)
while True:
params['q'] = self.custom_query(str(after_time) + 'ms', str(before_time) + 'ms')
serie = self.__query_grafana(params)
if serie != None:
res['columns'] = serie['columns']
res['values'] += serie['values']
after_time -= MS_IN_WEEK
before_time -= MS_IN_WEEK
else:
return res
else:
params['q'] = self.custom_query(str(after_time) + 'ms')
return self.__query_grafana(params)
def __query_grafana(self, params):
headers = { 'Authorization': 'Bearer ' + os.environ['API_KEY'] }
url = self.datasource['origin'] + '/' + self.datasource['url'] + '?' + urlencode(params)
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as resp:
res = json.loads(resp.read().decode('utf-8'))['results'][0]
if 'series' in res:
return res['series'][0]
else:
return None
def __init_chunks(self): def __init_chunks(self):
chunk_index = 0 chunk_index = 0
self.last_chunk_index = 0 self.last_chunk_index = 0

52
src/learn.py

@ -1,52 +0,0 @@
#!/usr/bin/env python
import csv
import os
from worker import worker
def enqueue_task():
tasks_file = "tasks.csv"
tasks = []
with open(tasks_file) as csvfile:
rdr = csv.reader(csvfile, delimiter=',')
tasks = list(rdr)
if len(tasks) == 0:
return None
res = tasks[0][0]
tasks = tasks[1:]
with open(tasks_file, "w+") as csvfile:
writer = csv.writer(csvfile)
writer.writerows(tasks)
return res
def set_lock(value):
lock_file = "learn.lock"
exists = os.path.exists(lock_file)
if exists == value:
return False
if value:
open(lock_file, "w+")
else:
os.remove(lock_file)
return True
if __name__ == "__main__":
if not set_lock(True):
print("learn locked")
exit(0)
w = worker()
while True:
task = enqueue_task()
if task is None:
break
w.start()
w.add_task({"type": "learn", "anomaly_name": task})
w.add_task({"type": "predict", "anomaly_name": task})
w.stop()
set_lock(False)

83
src/predict.py

@ -1,83 +0,0 @@
import argparse
import csv
import time
import datetime
import pandas as pd
import matplotlib.pyplot as plt
from influxdb import InfluxDBClient
from sklearn import svm
import numpy as np
import math
import pickle
host = "209.205.120.226"
port = 8086
datasetFile = "/tmp/dataset.csv"
anomaliesFile = "anomalies.csv"
predictedAnomaliesFile = "predicted_anomalies.csv"
modelFilename = 'finalized_model.sav'
def readAnomalies():
anomalies = []
with open(anomaliesFile) as csvfile:
rdr = csv.reader(csvfile, delimiter=',')
for row in rdr:
anomaly = (int(row[0]), int(row[1]))
anomalies.append(anomaly)
return anomalies
"""Instantiate a connection to the InfluxDB."""
user = ''
password = ''
dbname = 'accelerometer'
query = 'select k0, k1, k2 from vals limit 10000;'
client = InfluxDBClient(host, port, user, password, dbname)
def predict(host=host, port=port):
result = client.query(query)
df = pd.DataFrame(result['vals'], columns=['time', 'k0', 'k1', 'k2'])
basedAnomalies = readAnomalies()
df2 = df.rolling(200, win_type='triang').sum()
df2['time'] = pd.to_datetime(df2['time'])
df2 = df2[np.isfinite(df2['k0'])]
print(len(df2))
anomalies = []
last_anomaly = (-1, -1)
with open(modelFilename, 'rb') as fid:
clf = pickle.load(fid)
prediction = clf.predict(df2[['k0', 'k1', 'k2']])
print(len(prediction))
#print(prediction)
for i in range(len(prediction)):
if prediction[i] > 0.:
t = df2['time'][i + 199].timestamp()
t = ((t + 0 * 3600) * 1000)
if t < basedAnomalies[len(basedAnomalies) - 1][1]:
continue
if t < last_anomaly[1] + 1000:
last_anomaly = (last_anomaly[0], t)
else:
if last_anomaly[1] != -1:
anomalies.append(last_anomaly)
last_anomaly = (t, t)
with open(predictedAnomaliesFile, "w") as file:
for anomaly in anomalies:
file.write(str(int(anomaly[0])) + "," + str(int(anomaly[1])) + "\n")
predict()
Loading…
Cancel
Save