From 7471f4df9c4c091530e2eb4b763b992c6a334bc7 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 9 Nov 2021 23:54:27 +0300 Subject: [PATCH] Window - style backet in pattern detector #7 --- .../analytic_unit/pattern_analytic_unit.rs | 293 +++++++++--------- server/src/services/analytic_service/types.rs | 1 - 2 files changed, 154 insertions(+), 140 deletions(-) diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index 6377581..e7f30a0 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -1,4 +1,4 @@ -use std::{fmt, sync::Arc}; +use std::{collections::VecDeque, fmt, sync::Arc}; use futures::future; use parking_lot::Mutex; @@ -25,12 +25,15 @@ const DETECTION_STEP: u64 = 10; #[derive(Clone)] pub struct LearningResults { + // TODO: replace with RWLock model: Arc>>, pub learning_train: LearningTrain, patterns: Vec>, anti_patterns: Vec>, + + avg_pattern_length: usize, } // impl Clone for LearningResults { @@ -91,92 +94,104 @@ async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Re }) } -pub struct PatternAnalyticUnit { - config: PatternConfig, - learning_results: Option, -} +fn get_features(xs: &Vec) -> Features { + let mut min = f64::MAX; + let mut max = f64::MIN; + let mut sum = 0f64; -// TODO: move this to loginc of analytic unit -impl PatternAnalyticUnit { - pub fn new(cfg: PatternConfig) -> PatternAnalyticUnit { - PatternAnalyticUnit { - config: cfg, - learning_results: None, - } + for x in xs { + min = min.min(*x); + max = max.max(*x); + sum += x; } - fn corr_aligned(xs: &Vec, ys: &Vec) -> f32 { - let n = xs.len() as f64; - let mut s_xs: f64 = 0f64; - let mut s_ys: f64 = 0f64; - let mut s_xsys: f64 = 0f64; - let mut s_xs_2: f64 = 0f64; - let mut s_ys_2: f64 = 0f64; - - let min = xs.len().min(ys.len()); - xs.iter() - .take(min) - .zip(ys.iter().take(min)) - .for_each(|(xi, yi)| { - s_xs += xi; - s_ys += yi; - s_xsys += xi * yi; - s_xs_2 += xi * xi; - s_ys_2 += yi * yi; - }); - - let numerator: f64 = n * s_xsys - s_xs * s_ys; - let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt(); - - // IT"s a hack - if denominator < 0.01 { - return 0.; - } + let mean = sum / xs.len() as f64; - let result: f64 = numerator / denominator; + sum = 0f64; - // assert!(result.abs() <= 1.01); + for x in xs { + sum += (x - mean) * (x - mean); + } - if result.abs() > 1.1 { - println!("{:?}", xs); - println!("------------"); - println!("{:?}", ys); - println!("WARNING: corr result > 1: {}", result); - } + let sd = sum.sqrt(); + + // TODO: add autocorrelation + // TODO: add FFT + // TODO: add DWT + + return [ + min, max, mean, sd, + // 0f64,0f64, + // 0f64,0f64,0f64, 0f64 + ]; +} - return result as f32; // we know that it's in -1..1 +fn corr_aligned(xs: &VecDeque, ys: &Vec) -> f32 { + let n = xs.len() as f64; + let mut s_xs: f64 = 0f64; + let mut s_ys: f64 = 0f64; + let mut s_xsys: f64 = 0f64; + let mut s_xs_2: f64 = 0f64; + let mut s_ys_2: f64 = 0f64; + + let min = xs.len().min(ys.len()); + xs.iter() + .take(min) + .zip(ys.iter().take(min)) + .for_each(|(xi, yi)| { + s_xs += xi; + s_ys += yi; + s_xsys += xi * yi; + s_xs_2 += xi * xi; + s_ys_2 += yi * yi; + }); + + let numerator: f64 = n * s_xsys - s_xs * s_ys; + let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt(); + + // IT"s a hack + if denominator < 0.01 { + return 0.; } - fn get_features(xs: &Vec) -> Features { - let mut min = f64::MAX; - let mut max = f64::MIN; - let mut sum = 0f64; + let result: f64 = numerator / denominator; - for x in xs { - min = min.min(*x); - max = max.max(*x); - sum += x; - } + // assert!(result.abs() <= 1.01); - let mean = sum / xs.len() as f64; + if result.abs() > 1.1 { + println!("{:?}", xs); + println!("------------"); + println!("{:?}", ys); + println!("WARNING: corr result > 1: {}", result); + } - sum = 0f64; + return result as f32; // we know that it's in -1..1 +} - for x in xs { - sum += (x - mean) * (x - mean); +fn max_corr_with_segments(xs: &VecDeque, yss: &Vec>) -> f32 { + let mut max_corr = 0.0; // we just take positive part of correlation + for ys in yss.iter() { + let c = corr_aligned(xs, ys); + // TODO: check that here no NaNs + if c > max_corr { + max_corr = c; } + } + return max_corr; +} - let sd = sum.sqrt(); - - // TODO: add autocorrelation - // TODO: add FFT - // TODO: add DWT +pub struct PatternAnalyticUnit { + config: PatternConfig, + learning_results: Option, +} - return [ - min, max, mean, sd, - // 0f64,0f64, - // 0f64,0f64,0f64, 0f64 - ]; +// TODO: move this to loginc of analytic unit +impl PatternAnalyticUnit { + pub fn new(cfg: PatternConfig) -> PatternAnalyticUnit { + PatternAnalyticUnit { + config: cfg, + learning_results: None, + } } } @@ -225,31 +240,30 @@ impl AnalyticUnit for PatternAnalyticUnit { } } - // let reads: &Vec> = // TODO - // let anti_reads: &Vec> // TODO - - // let size_avg = reads.iter().map(|r| r.len()).sum::() / reads.len(); - let mut patterns = Vec::>::new(); let mut anti_patterns = Vec::>::new(); let mut records_raw = Vec::::new(); let mut targets_raw = Vec::::new(); + let mut pattern_length_size_sum = 0usize; + for r in learn_tss { let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); - let fs = PatternAnalyticUnit::get_features(&xs); + let fs = get_features(&xs); records_raw.push(fs); targets_raw.push(true); + pattern_length_size_sum += xs.len(); patterns.push(xs); } for r in learn_anti_tss { let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); - let fs = PatternAnalyticUnit::get_features(&xs); + let fs = get_features(&xs); records_raw.push(fs); targets_raw.push(false); + pattern_length_size_sum += xs.len(); anti_patterns.push(xs); } @@ -259,35 +273,15 @@ impl AnalyticUnit for PatternAnalyticUnit { let targets = Array::from_vec(targets_raw.clone()); - // println!("{:?}", records); - // println!("{:?}", targets); - let train = linfa::Dataset::new(records, targets); - // The 'view' describes what set of data is drawn - // let v = ContinuousView::new() - // .add(s1) - // // .add(s2) - // .x_range(-500., 100.) - // .y_range(-200., 600.) - // .x_label("Some varying variable") - // .y_label("The response of something"); - - // Page::single(&v).save("scatter.svg").unwrap(); - - // let model = stat.iter().map(|(c, v)| v / *c as f64).collect(); - let model = Svm::<_, bool>::params() .pos_neg_weights(50000., 5000.) .gaussian_kernel(80.0) .fit(&train) .unwrap(); - // let prediction = model.predict(Array::from_vec(vec![ - // 715.3122807017543, 761.1228070175438, 745.0, 56.135764727158595, 0.0, 0.0 - // ])); - - // println!("pridiction: {}", prediction ); + let avg_pattern_length = pattern_length_size_sum / (&patterns.len() + &anti_patterns.len()); self.learning_results = Some(LearningResults { model: Arc::new(Mutex::new(model)), @@ -299,6 +293,8 @@ impl AnalyticUnit for PatternAnalyticUnit { patterns, anti_patterns, + + avg_pattern_length, }); return LearningResult::Finished; @@ -330,55 +326,74 @@ impl AnalyticUnit for PatternAnalyticUnit { let pt = &lr.patterns; let apt = &lr.anti_patterns; - for i in 0..ts.len() { - let mut pattern_match_score = 0f32; - let mut pattern_match_len = 0usize; - let mut anti_pattern_match_score = 0f32; + if lr.avg_pattern_length > ts.len() { + // TODO: handle case when we inside pattern + return Ok(results); + } - for p in pt { - if i + p.len() < ts.len() { - let mut backet = Vec::::new(); - for j in 0..p.len() { - backet.push(nan_to_zero(ts[i + j].1)); - } - let score = PatternAnalyticUnit::corr_aligned(&p, &backet); - if score > pattern_match_score { - pattern_match_score = score; - pattern_match_len = p.len(); - } - } - } + let mut window = VecDeque::::new(); + for i in 0..lr.avg_pattern_length { + window.push_back(nan_to_zero(ts[i].1)); + } - for p in apt { - if i + p.len() < ts.len() { - let mut backet = Vec::::new(); - for j in 0..p.len() { - backet.push(nan_to_zero(ts[i + j].1)); - } - let score = PatternAnalyticUnit::corr_aligned(&p, &backet); - if score > anti_pattern_match_score { - anti_pattern_match_score = score; - } - } - } + let mut i = lr.avg_pattern_length - 1; + + let mut from: Option = None; + let mut to: Option = None; + + loop { + let positive_corr = max_corr_with_segments(&window, pt); + let negative_corr = max_corr_with_segments(&window, apt); let model_weight = { - let mut backet = Vec::::new(); - for j in 0..pattern_match_len { - backet.push(nan_to_zero(ts[i + j].1)); + let mut vs: Vec = Vec::new(); + for v in window.iter() { + vs.push(*v); } - let fs = PatternAnalyticUnit::get_features(&backet); + let fs = get_features(&vs); let lk = lr.model.lock(); - lk.weighted_sum(&Array::from_vec(fs.to_vec())) - lk.rho + let p = lk.predict(Array::from_vec(fs.to_vec())); + if p { 1 } else { -1 } }; - let mut score = pattern_match_score * self.config.correlation_score; - score -= anti_pattern_match_score * self.config.anti_correlation_score; - score += (model_weight as f32) * self.config.model_score; - - if score >= self.config.threshold_score { - results.push((ts[i].0, ts[i + pattern_match_len - 1].0)); + let score = positive_corr * self.config.correlation_score + - negative_corr * self.config.anti_correlation_score + + model_weight as f32 * self.config.model_score; + + // TODO: replace it with score > config.score_treshold + if score > self.config.threshold_score { + // inside pattern + if from.is_none() { + from = Some(ts[i - (lr.avg_pattern_length - 1)].0); + } + to = Some(ts[i].0); + } else { + if to.is_some() { + // merge with last + if results.len() > 0 && results.last().unwrap().1 >= from.unwrap() { + let (prev_from, _) = results.pop().unwrap(); + results.push((prev_from, to.unwrap())); + } else { + results.push((from.unwrap(), to.unwrap())); + } + from = None; + to = None; + } + } + + i += 1; + if i == ts.len() { + break; } + + window.pop_front(); + window.push_back(ts[i].1); + } + + if to.is_some() { + results.push((from.unwrap(), to.unwrap())); + from = None; + to = None; } Ok(results) diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index d9472db..9654e65 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -11,7 +11,6 @@ use super::analytic_unit::types::PatchConfig; use anyhow::Result; use serde::Serialize; -use serde_json::Value; use tokio::sync::oneshot; use crate::services::analytic_service::analytic_unit::types::AnalyticUnit;