diff --git a/server/src/services/analytic_service/pattern_detector.rs b/server/src/services/analytic_service/pattern_detector.rs index aae2c89..5549005 100644 --- a/server/src/services/analytic_service/pattern_detector.rs +++ b/server/src/services/analytic_service/pattern_detector.rs @@ -1,18 +1,23 @@ -use std::{thread, time}; -use tokio::time::{sleep, Duration}; - #[derive(Debug, Clone)] pub struct LearningResults { - backet_size: usize, - // avg_min: f64, - // avg_max: f64, + model: Vec, // avg_min: f64, + // avg_max: f64 } +const CORR_THRESHOLD: f64 = 0.9; + #[derive(Clone)] pub struct PatternDetector { learning_results: LearningResults, } +fn nan_to_zero(n: f64) -> f64 { + if n.is_nan() { + return 0.; + } + return n; +} + // TODO: move this to loginc of analytic unit impl PatternDetector { pub fn new(learning_results: LearningResults) -> PatternDetector { @@ -20,55 +25,88 @@ impl PatternDetector { } pub async fn learn(reads: &Vec>) -> LearningResults { - // TODO: implement - let mut min_size = usize::MAX; - let mut max_size = 0usize; - for r in reads { - min_size = min_size.min(r.len()); - max_size = max_size.max(r.len()); - } - // let mut max_sum = 0; - // let mut min_sum = 0; + let size_avg = reads.iter().map(|r| r.len()).sum::() / reads.len(); - // for read in reads { - // let my_max = read.iter().map(|(t,v)| *v).max().unwrap(); - // let my_min = read.iter().min().unwrap(); - // } + let mut stat = Vec::<(usize, f64)>::new(); + for _i in 0..size_avg { + stat.push((0usize, 0f64)); + } - LearningResults { - backet_size: (min_size + max_size) / 2, + for r in reads { + let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); + if xs.len() > size_avg { + let offset = (xs.len() - size_avg) / 2; + for i in 0..size_avg { + stat[i].0 += 1; + stat[i].1 += xs[i + offset]; + } + } else { + let offset = (size_avg - xs.len()) / 2; + for i in 0..xs.len() { + stat[i + offset].0 += 1; + stat[i + offset].1 += xs[i]; + } + } } + + let model = stat.iter().map(|(c, v)| v / *c as f64).collect(); + + LearningResults { model } } // TODO: get iterator instead of vector pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> { let mut results = Vec::new(); let mut i = 0; - while i < ts.len() - self.learning_results.backet_size { - let backet: Vec<_> = ts - .iter() - .skip(i) - .take(self.learning_results.backet_size) - .collect(); - - let mut min = f64::MAX; - let mut max = f64::MIN; - - for (t, v) in backet.iter() { - min = v.min(min); - max = v.max(max); + let m = &self.learning_results.model; + + // TODO: here we ignoring gaps in data + while i < ts.len() - self.learning_results.model.len() { + let mut backet = Vec::::new(); + + for j in 0..m.len() { + backet.push(nan_to_zero(ts[j + i].1)); } - if min > 10_000. && max < 100_000. { - let from = backet[0].0; - let to = backet[backet.len() - 1].0; + let c = PatternDetector::corr(&backet, &m); + + if c >= CORR_THRESHOLD { + let from = ts[i].0; + let to = ts[i + backet.len() - 1].0; results.push((from, to)); } - i += 100; + i += m.len(); } return results; } + + fn corr(xs: &Vec, ys: &Vec) -> f64 { + + assert_eq!(xs.len(), ys.len()); + + let n = xs.len() as f64; + // TODO: compute it faster, with one iteration over x y + let s_xs: f64 = xs.iter().sum(); + let s_ys: f64 = ys.iter().sum(); + let s_xsys: f64 = xs.iter().zip(ys).map(|(xi, yi)| xi * yi).sum(); + let s_xs_2: f64 = xs.iter().map(|xi| xi * xi).sum(); + let s_ys_2: f64 = ys.iter().map(|yi| yi * yi).sum(); + + let numerator: f64 = n * s_xsys - s_xs * s_ys; + let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt(); + + let result: f64 = numerator / denominator; + + // assert!(result.abs() <= 1.01); + + if result.abs() > 1.0 { + println!("WARNING: corr result > 1: {}", result); + } + + return result; + } + }