diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 179d795..3a66900 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -13,6 +13,7 @@ use crate::utils::{self, get_random_str}; use anyhow; +use subbeat::metric::MetricResult; use tokio::sync::{mpsc, oneshot}; use futures::future; @@ -22,6 +23,30 @@ use chrono::Utc; // TODO: get this from pattern detector const DETECTION_STEP: u64 = 10; +struct SegData { + label: bool, + data: Vec<(u64, f64)>, +} + +async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Result { + let mut mr = ms.query(segment.from, segment.to, DETECTION_STEP).await?; + + if mr.data.keys().len() == 0 { + return Ok(SegData { + label: segment.segment_type == SegmentType::Label, + data: Default::default(), + }); + } + + let k = mr.data.keys().nth(0).unwrap().clone(); + let ts = mr.data.remove(&k).unwrap(); + + Ok(SegData { + label: segment.segment_type == SegmentType::Label, + data: ts, + }) +} + // TODO: now it's basically single analytic unit, service will operate on many AU pub struct AnalyticService { metric_service: MetricService, @@ -212,8 +237,9 @@ impl AnalyticService { // be careful if decide to store detections in db let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); + let has_segments_label = segments.iter().find(|s| s.segment_type == SegmentType::Label).is_some(); - if segments.len() == 0 { + if !has_segments_label { match tx .send(AnalyticServiceMessage::Response( ResponseType::LearningFinishedEmpty, @@ -223,18 +249,15 @@ impl AnalyticService { Ok(_) => {} Err(_e) => println!("Fail to send learning results"), } - return; } - let fs = segments - .iter() - .map(|s| ms.query(s.from, s.to, DETECTION_STEP)); + let fs = segments.iter().map(|s| segment_to_segdata(&ms, s)); let rs = future::join_all(fs).await; - // TODO: run this on label adding - // TODO: save learning results in cache let mut learn_tss = Vec::new(); + let mut learn_anti_tss = Vec::new(); + for r in rs { if r.is_err() { println!("Error extracting metrics from datasource"); @@ -249,18 +272,19 @@ impl AnalyticService { } return; } - let mr = r.unwrap(); - if mr.data.keys().len() == 0 { + + let sd = r.unwrap(); + if sd.data.is_empty() { continue; } - let k = mr.data.keys().nth(0).unwrap(); - let ts = &mr.data[k]; - // TODO: maybe not clone - learn_tss.push(ts.clone()); + if sd.label { + learn_tss.push(sd.data); + } else { + learn_anti_tss.push(sd.data); + } } - let lr = PatternDetector::learn(&learn_tss).await; - + let lr = PatternDetector::learn(&learn_tss, &learn_anti_tss).await; match tx .send(AnalyticServiceMessage::Response( ResponseType::LearningFinished(lr), @@ -270,6 +294,7 @@ impl AnalyticService { Ok(_) => {} Err(_e) => println!("Fail to send learning results"), } + } async fn get_pattern_detection( diff --git a/server/src/services/analytic_service/pattern_detector.rs b/server/src/services/analytic_service/pattern_detector.rs index bda0cdd..1fd30ed 100644 --- a/server/src/services/analytic_service/pattern_detector.rs +++ b/server/src/services/analytic_service/pattern_detector.rs @@ -2,6 +2,7 @@ pub struct LearningResults { // model: Vec, patterns: Vec>, + anti_patterns: Vec>, } const CORR_THRESHOLD: f64 = 0.95; @@ -24,43 +25,32 @@ impl PatternDetector { PatternDetector { learning_results } } - pub async fn learn(reads: &Vec>) -> LearningResults { + pub async fn learn( + reads: &Vec>, + anti_reads: &Vec>, + ) -> LearningResults { // let size_avg = reads.iter().map(|r| r.len()).sum::() / reads.len(); - // let mut stat = Vec::<(usize, f64)>::new(); - // for _i in 0..size_avg { - // stat.push((0usize, 0f64)); - // } - let mut patterns = Vec::>::new(); + let mut anti_patterns = Vec::>::new(); - // for r in reads { - // let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); - // if xs.len() > size_avg { - // let offset = (xs.len() - size_avg) / 2; - // for i in 0..size_avg { - // stat[i].0 += 1; - // stat[i].1 += xs[i + offset]; - // } - // } else { - // let offset = (size_avg - xs.len()) / 2; - // for i in 0..xs.len() { - // stat[i + offset].0 += 1; - // stat[i + offset].1 += xs[i]; - // } - // } - // } + // TODO: implement actual learning for r in reads { let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); patterns.push(xs); } + for r in anti_reads { + let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); + anti_patterns.push(xs); + } + // let model = stat.iter().map(|(c, v)| v / *c as f64).collect(); LearningResults { - patterns - //model + patterns, + anti_patterns, } } diff --git a/server/src/services/segments_service.rs b/server/src/services/segments_service.rs index bf8f1c0..52ea86b 100644 --- a/server/src/services/segments_service.rs +++ b/server/src/services/segments_service.rs @@ -14,7 +14,7 @@ pub type SegmentId = String; pub enum SegmentType { Detection = 0, Label = 1, - AntiLabel = 2 + AntiLabel = 2, } impl SegmentType {