You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
6.5 KiB

import numpy as np
import pandas as pd
def is_intersect(target_segment, segments):
for segment in segments:
start = max(segment['from'], target_segment[0])
finish = min(segment['to'], target_segment[1])
if start <= finish:
return True
return False
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n - 1])
return result
def find_steps(array, threshold):
"""
Finds local maxima by segmenting array based on positions at which
the threshold value is crossed. Note that this thresholding is
applied after the absolute value of the array is taken. Thus,
the distinction between upward and downward steps is lost. However,
get_step_sizes can be used to determine directionality after the
fact.
Parameters
----------
array : numpy array
1 dimensional array that represents time series of data points
threshold : int / float
Threshold value that defines a step
Returns
-------
steps : list
List of indices of the detected steps
"""
steps = []
array = np.abs(array)
above_points = np.where(array > threshold, 1, 0)
ap_dif = np.diff(above_points)
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]
for upi, dni in zip(cross_ups,cross_dns):
steps.append(np.argmax(array[upi:dni]) + upi)
return steps
def anomalies_to_timestamp(anomalies):
for anomaly in anomalies:
anomaly['from'] = int(anomaly['from'].timestamp() * 1000)
anomaly['to'] = int(anomaly['to'].timestamp() * 1000)
return anomalies
def segments_box(segments):
max_time = 0
min_time = float("inf")
for segment in segments:
min_time = min(min_time, segment['from'])
max_time = max(max_time, segment['to'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
def intersection_segment(data, median):
6 years ago
"""
Finds all intersections between flatten data and median
"""
cen_ind = []
for i in range(1, len(data)-1):
if data[i - 1] < median and data[i + 1] > median:
cen_ind.append(i)
del_ind = []
6 years ago
for i in range(1, len(cen_ind)):
if cen_ind[i] == cen_ind[i - 1] + 1:
del_ind.append(i - 1)
6 years ago
return [x for (idx, x) in enumerate(cen_ind) if idx not in del_ind]
6 years ago
def logistic_sigmoid_distribution(self, x1, x2, alpha, height):
return map(lambda x: logistic_sigmoid(x, alpha, height), range(x1, x2))
6 years ago
6 years ago
def logistic_sigmoid(x, alpha, height):
return height / (1 + math.exp(-x * alpha))
def MyLogisticSigmoid(interval, alpha, heigh):
distribution = []
for i in range(-interval, interval):
F = height / (1 + math.exp(-i * alpha))
distribution.append(F)
return distribution
6 years ago
def find_one_jump(data, x, size, height, err):
6 years ago
l = []
for i in range(x + 1, x + size):
if (data[i] > data[x] and data[x + size] > data[x] + height):
l.append(data[i])
if len(l) > size * err:
return x
else:
return 0
6 years ago
def find_all_jumps(data, size, height):
6 years ago
possible_jump_list = []
for i in range(len(data - size)):
6 years ago
x = find_one_jump(data, i, size, height, 0.9)
6 years ago
if x > 0:
possible_jump_list.append(x)
return possible_jump_list
6 years ago
def find_jump_center(cen_ind):
jump_center = cen_ind[0]
for i in range(len(cen_ind)):
x = cen_ind[i]
cx = scipy.signal.fftconvolve(pat_sigm, flat_data[x - WINDOW_SIZE : x + WINDOW_SIZE])
c.append(cx[2 * WINDOW_SIZE])
if i > 0 and cx > c[i - 1]:
jump_center = x
return jump_center
def find_ind_median(median, segment_data):
x = np.arange(0, len(segment_data))
f = []
for i in range(len(segment_data)):
f.append(median)
f = np.array(f)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
return idx
def find_jump_length(segment_data, min_line, max_line):
x = np.arange(0, len(segment_data))
f = []
l = []
for i in range(len(segment_data)):
f.append(min_line)
l.append(max_line)
f = np.array(f)
l = np.array(l)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
idl = np.argwhere(np.diff(np.sign(l - g)) != 0).reshape(-1) + 0
if (idl[0] - idx[-1] + 1) > 0:
return idl[0] - idx[-1] + 1
else:
print("retard alert!")
return 0
def find_jump(data, height, lenght):
j_list = []
for i in range(len(data)-lenght-1):
for x in range(1, lenght):
if(data[i+x] > data[i] + height):
j_list.append(i)
return(j_list)
def find_drop_length(segment_data, min_line, max_line):
x = np.arange(0, len(segment_data))
f = []
l = []
for i in range(len(segment_data)):
f.append(min_line)
l.append(max_line)
f = np.array(f)
l = np.array(l)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0 #min_line
idl = np.argwhere(np.diff(np.sign(l - g)) != 0).reshape(-1) + 0 #max_line
if (idx[0] - idl[-1] + 1) > 0:
return idx[0] - idl[-1] + 1
else:
print("retard alert!")
return 0
def drop_intersection(segment_data, median_line):
x = np.arange(0, len(segment_data))
f = []
for i in range(len(segment_data)):
f.append(median_line)
f = np.array(f)
g = []
for i in segment_data:
g.append(i)
g = np.array(g)
idx = np.argwhere(np.diff(np.sign(f - g)) != 0).reshape(-1) + 0
return idx
def find_drop(data, height, length):
d_list = []
for i in range(len(data)-length-1):
for x in range(1, length):
if(data[i+x] < data[i] - height):
d_list.append(i)
return(d_list)
def timestamp_to_index(dataframe, timestamp):
data = dataframe['timestamp']
for i in range(len(data)):
if data[i] >= timestamp:
return i
def peak_finder(data, size):
all_max = []
for i in range(size, len(data) - size):
if data[i] == max(data[i - size: i + size]) and data[i] > data[i + 1]:
all_max.append(i)
return all_max