Browse Source

Unit testing for analytics project #216 (#223)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
43b027e470
  1. 5
      analytics/models/jump_model.py
  2. 4
      analytics/server.py
  3. 1
      analytics/tests/__init__.py
  4. 12
      analytics/tests/test_utils.py
  5. 225
      analytics/utils/__init__.py
  6. 222
      analytics/utils/common.py
  7. 9
      analytics/utils/segments.py
  8. 6
      server/src/services/analytics_service.ts

5
analytics/models/jump_model.py

@ -1,6 +1,7 @@
from models import Model from models import Model
import utils import utils
from utils.segments import parse_segment
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import scipy.signal import scipy.signal
@ -37,9 +38,7 @@ class JumpModel(Model):
patterns_list = [] patterns_list = []
for segment in segments: for segment in segments:
if segment['labeled']: if segment['labeled']:
segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'], unit='ms')) segment_from_index, segment_to_index, segment_data = parse_segment(segment, dataframe)
segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'], unit='ms'))
segment_data = data[segment_from_index: segment_to_index + 1]
if len(segment_data) == 0: if len(segment_data) == 0:
continue continue
segment_min = min(segment_data) segment_min = min(segment_data)

4
analytics/server.py

@ -4,6 +4,10 @@ import logging
import sys import sys
import asyncio import asyncio
import traceback import traceback
import os
#TODO: make wrapper script that set PYTHONPATH instead
sys.path.append(os.path.join(os.path.dirname(__file__), 'utils'))
import services import services
from analytic_unit_manager import handle_analytic_task from analytic_unit_manager import handle_analytic_task

1
analytics/tests/__init__.py

@ -0,0 +1 @@
pass

12
analytics/tests/test_utils.py

@ -0,0 +1,12 @@
import unittest
class TestUtils(unittest.TestCase):
#example test for test's workflow purposes
def test_segment_parsion(self):
self.assertTrue(True)
if __name__ == '__main__':
unittest.main()

225
analytics/utils/__init__.py

@ -1,223 +1,2 @@
import numpy as np from common import *
import pandas as pd from segments import *
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n - 1])
return result
def find_steps(array, threshold):
"""
Finds local maxima by segmenting array based on positions at which
the threshold value is crossed. Note that this thresholding is
applied after the absolute value of the array is taken. Thus,
the distinction between upward and downward steps is lost. However,
get_step_sizes can be used to determine directionality after the
fact.
Parameters
----------
array : numpy array
1 dimensional array that represents time series of data points
threshold : int / float
Threshold value that defines a step
Returns
-------
steps : list
List of indices of the detected steps
"""
steps = []
array = np.abs(array)
above_points = np.where(array > threshold, 1, 0)
ap_dif = np.diff(above_points)
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]
for upi, dni in zip(cross_ups,cross_dns):
steps.append(np.argmax(array[upi:dni]) + upi)
return steps
def anomalies_to_timestamp(anomalies):
for anomaly in anomalies:
anomaly['from'] = int(anomaly['from'].timestamp() * 1000)
anomaly['to'] = int(anomaly['to'].timestamp() * 1000)
return anomalies
def segments_box(segments):
max_time = 0
min_time = float("inf")
for segment in segments:
min_time = min(min_time, segment['from'])
max_time = max(max_time, segment['to'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
def intersection_segment(data, median):
"""
Finds all intersections between flatten data and median
"""
cen_ind = []
for i in range(1, len(data)-1):
if data[i - 1] < median and data[i + 1] > median:
cen_ind.append(i)
del_ind = []
for i in range(1, len(cen_ind)):
if cen_ind[i] == cen_ind[i - 1] + 1:
del_ind.append(i - 1)
return [x for (idx, x) in enumerate(cen_ind) if idx not in del_ind]
def logistic_sigmoid_distribution(self, x1, x2, alpha, height):
return map(lambda x: logistic_sigmoid(x, alpha, height), range(x1, x2))
def logistic_sigmoid(x, alpha, height):
return height / (1 + math.exp(-x * alpha))
def MyLogisticSigmoid(interval, alpha, heigh):
distribution = []
for i in range(-interval, interval):
F = height / (1 + math.exp(-i * alpha))
distribution.append(F)
return distribution
def find_one_jump(data, x, size, height, err):
l = []
for i in range(x + 1, x + size):
if (data[i] > data[x] and data[x + size] > data[x] + height):
l.append(data[i])
if len(l) > size * err:
return x
else:
return 0
def find_all_jumps(data, size, height):
possible_jump_list = []
for i in range(len(data - size)):
x = find_one_jump(data, i, size, height, 0.9)
if x > 0:
possible_jump_list.append(x)
return possible_jump_list
def find_jump_center(cen_ind):
jump_center = cen_ind[0]
for i in range(len(cen_ind)):
x = cen_ind[i]
cx = scipy.signal.fftconvolve(pat_sigm, flat_data[x - WINDOW_SIZE : x + WINDOW_SIZE])
c.append(cx[2 * WINDOW_SIZE])
if i > 0 and cx > c[i - 1]:
jump_center = x
return jump_center
def find_ind_median(median, segment_data):
x = np.arange(0, len(segment_data))
f = []
for i in range(len(segment_data)):
f.append(median)
f = np.array(f)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
return idx
def find_jump_length(segment_data, min_line, max_line):
x = np.arange(0, len(segment_data))
f = []
l = []
for i in range(len(segment_data)):
f.append(min_line)
l.append(max_line)
f = np.array(f)
l = np.array(l)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
idl = np.argwhere(np.diff(np.sign(l - g)) != 0).reshape(-1) + 0
if (idl[0] - idx[-1] + 1) > 0:
return idl[0] - idx[-1] + 1
else:
print("retard alert!")
return 0
def find_jump(data, height, lenght):
j_list = []
for i in range(len(data)-lenght-1):
for x in range(1, lenght):
if(data[i+x] > data[i] + height):
j_list.append(i)
return(j_list)
def find_drop_length(segment_data, min_line, max_line):
x = np.arange(0, len(segment_data))
f = []
l = []
for i in range(len(segment_data)):
f.append(min_line)
l.append(max_line)
f = np.array(f)
l = np.array(l)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0 #min_line
idl = np.argwhere(np.diff(np.sign(l - g)) != 0).reshape(-1) + 0 #max_line
if (idx[0] - idl[-1] + 1) > 0:
return idx[0] - idl[-1] + 1
else:
print("retard alert!")
return 0
def drop_intersection(segment_data, median_line):
x = np.arange(0, len(segment_data))
f = []
for i in range(len(segment_data)):
f.append(median_line)
f = np.array(f)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
return idx
def find_drop(data, height, length):
d_list = []
for i in range(len(data)-length-1):
for x in range(1, length):
if(data[i+x] < data[i] - height):
d_list.append(i)
return(d_list)
def timestamp_to_index(dataframe, timestamp):
data = dataframe['timestamp']
for i in range(len(data)):
if data[i] >= timestamp:
return i
def peak_finder(data, size):
all_max = []
for i in range(size, len(data) - size):
if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]:
all_max.append(i)
return all_max
def ar_mean(numbers):
return float(sum(numbers)) / max(len(numbers), 1)
def get_av_model(patterns_list):
x = len(patterns_list[0])
if len(pattern_list) > 1 and len(patterns_list[1]) != x:
raise NameError('All elements of patterns_list should have same length')
model_pat = []
for i in range(x):
av_val = []
for j in patterns_list:
av_val.append(j.values[i])
model_pat.append(ar_mean(av_val))
return model_pat

222
analytics/utils/common.py

@ -0,0 +1,222 @@
import numpy as np
import pandas as pd
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n - 1])
return result
def find_steps(array, threshold):
"""
Finds local maxima by segmenting array based on positions at which
the threshold value is crossed. Note that this thresholding is
applied after the absolute value of the array is taken. Thus,
the distinction between upward and downward steps is lost. However,
get_step_sizes can be used to determine directionality after the
fact.
Parameters
----------
array : numpy array
1 dimensional array that represents time series of data points
threshold : int / float
Threshold value that defines a step
Returns
-------
steps : list
List of indices of the detected steps
"""
steps = []
array = np.abs(array)
above_points = np.where(array > threshold, 1, 0)
ap_dif = np.diff(above_points)
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]
for upi, dni in zip(cross_ups,cross_dns):
steps.append(np.argmax(array[upi:dni]) + upi)
return steps
def anomalies_to_timestamp(anomalies):
for anomaly in anomalies:
anomaly['from'] = int(anomaly['from'].timestamp() * 1000)
anomaly['to'] = int(anomaly['to'].timestamp() * 1000)
return anomalies
def segments_box(segments):
max_time = 0
min_time = float("inf")
for segment in segments:
min_time = min(min_time, segment['from'])
max_time = max(max_time, segment['to'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
def intersection_segment(data, median):
"""
Finds all intersections between flatten data and median
"""
cen_ind = []
for i in range(1, len(data)-1):
if data[i - 1] < median and data[i + 1] > median:
cen_ind.append(i)
del_ind = []
for i in range(1, len(cen_ind)):
if cen_ind[i] == cen_ind[i - 1] + 1:
del_ind.append(i - 1)
return [x for (idx, x) in enumerate(cen_ind) if idx not in del_ind]
def logistic_sigmoid_distribution(self, x1, x2, alpha, height):
return map(lambda x: logistic_sigmoid(x, alpha, height), range(x1, x2))
def logistic_sigmoid(x, alpha, height):
return height / (1 + math.exp(-x * alpha))
def MyLogisticSigmoid(interval, alpha, heigh):
distribution = []
for i in range(-interval, interval):
F = height / (1 + math.exp(-i * alpha))
distribution.append(F)
return distribution
def find_one_jump(data, x, size, height, err):
l = []
for i in range(x + 1, x + size):
if (data[i] > data[x] and data[x + size] > data[x] + height):
l.append(data[i])
if len(l) > size * err:
return x
else:
return 0
def find_all_jumps(data, size, height):
possible_jump_list = []
for i in range(len(data - size)):
x = find_one_jump(data, i, size, height, 0.9)
if x > 0:
possible_jump_list.append(x)
return possible_jump_list
def find_jump_center(cen_ind):
jump_center = cen_ind[0]
for i in range(len(cen_ind)):
x = cen_ind[i]
cx = scipy.signal.fftconvolve(pat_sigm, flat_data[x - WINDOW_SIZE : x + WINDOW_SIZE])
c.append(cx[2 * WINDOW_SIZE])
if i > 0 and cx > c[i - 1]:
jump_center = x
return jump_center
def find_ind_median(median, segment_data):
x = np.arange(0, len(segment_data))
f = []
for i in range(len(segment_data)):
f.append(median)
f = np.array(f)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
return idx
def find_jump_length(segment_data, min_line, max_line):
x = np.arange(0, len(segment_data))
f = []
l = []
for i in range(len(segment_data)):
f.append(min_line)
l.append(max_line)
f = np.array(f)
l = np.array(l)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
idl = np.argwhere(np.diff(np.sign(l - g)) != 0).reshape(-1) + 0
if (idl[0] - idx[-1] + 1) > 0:
return idl[0] - idx[-1] + 1
else:
print("retard alert!")
return 0
def find_jump(data, height, lenght):
j_list = []
for i in range(len(data)-lenght-1):
for x in range(1, lenght):
if(data[i+x] > data[i] + height):
j_list.append(i)
return(j_list)
def find_drop_length(segment_data, min_line, max_line):
x = np.arange(0, len(segment_data))
f = []
l = []
for i in range(len(segment_data)):
f.append(min_line)
l.append(max_line)
f = np.array(f)
l = np.array(l)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0 #min_line
idl = np.argwhere(np.diff(np.sign(l - g)) != 0).reshape(-1) + 0 #max_line
if (idx[0] - idl[-1] + 1) > 0:
return idx[0] - idl[-1] + 1
else:
print("retard alert!")
return 0
def drop_intersection(segment_data, median_line):
x = np.arange(0, len(segment_data))
f = []
for i in range(len(segment_data)):
f.append(median_line)
f = np.array(f)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
return idx
def find_drop(data, height, length):
d_list = []
for i in range(len(data)-length-1):
for x in range(1, length):
if(data[i+x] < data[i] - height):
d_list.append(i)
return(d_list)
def timestamp_to_index(dataframe, timestamp):
data = dataframe['timestamp']
for i in range(len(data)):
if data[i] >= timestamp:
return i
def peak_finder(data, size):
all_max = []
for i in range(size, len(data) - size):
if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]:
all_max.append(i)
return all_max
def ar_mean(numbers):
return float(sum(numbers)) / max(len(numbers), 1)
def get_av_model(patterns_list):
x = len(patterns_list[0])
if len(patterns_list[1]) != x:
raise NameError('All elements of patterns_list should have same length')
model_pat = []
for i in range(x):
av_val = []
for j in patterns_list:
av_val.append(j.values[i])
model_pat.append(ar_mean(av_val))
return model_pat

9
analytics/utils/segments.py

@ -0,0 +1,9 @@
import pandas as pd
from common import timestamp_to_index
def parse_segment(segment, dataframe):
start = timestamp_to_index(dataframe, pd.to_datetime(segment['from'], unit='ms'))
end = timestamp_to_index(dataframe, pd.to_datetime(segment['to'], unit='ms'))
data = dataframe['value'][start: end + 1]
return start, end, data

6
server/src/services/analytics_service.ts

@ -138,7 +138,7 @@ export class AnalyticsService {
var resolved = false; var resolved = false;
cp.stdout.on('data', (data) => { cp.stdout.on('data', (data) => {
console.log(data); console.log(data.toString());
if(resolved) { if(resolved) {
return; return;
} }
@ -146,8 +146,8 @@ export class AnalyticsService {
resolve(cp); resolve(cp);
}); });
cp.stderr.on('data', function(data) { cp.stderr.on('data', (data) => {
console.error(data); console.error(data.toString());
if(resolved) { if(resolved) {
return; return;
} }

Loading…
Cancel
Save