diff --git a/server/Cargo.lock b/server/Cargo.lock index 9567a63..b6306c2 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -482,6 +482,7 @@ name = "hastic" version = "0.0.1" dependencies = [ "anyhow", + "async-trait", "bincode", "chrono", "config", diff --git a/server/Cargo.toml b/server/Cargo.toml index 3ebfe96..da5f8fe 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -27,6 +27,7 @@ linfa = "0.5.0" linfa-svm = { version="0.5.0", features=["serde"] } ndarray = "0.15.3" bincode = "1.3.3" +async-trait = "0.1.51" # TODO: remove this from prod # plotlib = "0.5.1" diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 449b229..74e8888 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,8 +1,8 @@ -use super::analytic_unit::types::{AnalyticUnitConfig, PatternDetectorConfig}; +use super::analytic_unit::types::{AnalyticUnitConfig, PatternConfig}; use super::types::{self, DetectionRunnerConfig, LearningTrain}; use super::{ analytic_client::AnalyticClient, - analytic_unit::pattern_detector::{self, LearningResults, PatternDetector}, + analytic_unit::pattern_analytic_unit::{self, LearningResults, PatternAnalyticUnit}, types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType}, }; @@ -51,9 +51,11 @@ async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Re pub struct AnalyticService { metric_service: MetricService, segments_service: SegmentsService, - learning_results: Option, + + analytic_unit_learning_results: Option, analytic_unit_config: AnalyticUnitConfig, - learning_status: LearningStatus, + analytic_unit_learning_status: LearningStatus, + tx: mpsc::Sender, rx: mpsc::Receiver, @@ -82,13 +84,13 @@ impl AnalyticService { segments_service, // TODO: get it from persistance - learning_results: None, - analytic_unit_config: AnalyticUnitConfig::PatternDetector(PatternDetectorConfig { + analytic_unit_learning_results: None, + analytic_unit_config: AnalyticUnitConfig::Pattern(PatternConfig { correlation_score: 0.95, model_score: 0.95 }), - learning_status: LearningStatus::Initialization, + analytic_unit_learning_status: LearningStatus::Initialization, tx, rx, @@ -111,10 +113,10 @@ impl AnalyticService { fn run_detection_task(&self, task: DetectionTask) { // TODO: save handler of the task tokio::spawn({ - let lr = self.learning_results.as_ref().unwrap().clone(); + let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone(); let ms = self.metric_service.clone(); async move { - AnalyticService::get_pattern_detection(task.sender, lr, ms, task.from, task.to) + AnalyticService::get_detections(task.sender, lr, ms, task.from, task.to) .await; } }); @@ -126,7 +128,7 @@ impl AnalyticService { } // TODO: save handler of the task self.runner_handler = Some(tokio::spawn({ - let lr = self.learning_results.as_ref().unwrap().clone(); + let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone(); let ms = self.metric_service.clone(); async move { // TODO: implement @@ -142,7 +144,7 @@ impl AnalyticService { self.learning_handler = None; } self.learning_handler = Some(tokio::spawn({ - self.learning_status = LearningStatus::Starting; + self.analytic_unit_learning_status = LearningStatus::Starting; let tx = self.tx.clone(); let ms = self.metric_service.clone(); let ss = self.segments_service.clone(); @@ -152,7 +154,7 @@ impl AnalyticService { })); } RequestType::RunDetection(task) => { - if self.learning_status == LearningStatus::Initialization { + if self.analytic_unit_learning_status == LearningStatus::Initialization { match task .sender .send(Err(anyhow::format_err!("Analytics in initialization"))) @@ -165,21 +167,21 @@ impl AnalyticService { } return; } - if self.learning_status == LearningStatus::Ready { + if self.analytic_unit_learning_status == LearningStatus::Ready { self.run_detection_task(task); } else { self.learning_waiters.push(task); } } RequestType::GetStatus(tx) => { - tx.send(self.learning_status.clone()).unwrap(); + tx.send(self.analytic_unit_learning_status.clone()).unwrap(); } RequestType::GetLearningTrain(tx) => { - if self.learning_results.is_none() { + if self.analytic_unit_learning_results.is_none() { tx.send(LearningTrain::default()).unwrap(); } else { tx.send( - self.learning_results + self.analytic_unit_learning_results .as_ref() .unwrap() .learning_train @@ -197,11 +199,11 @@ impl AnalyticService { fn consume_response(&mut self, res: types::ResponseType) { match res { // TODO: handle when learning panic - ResponseType::LearningStarted => self.learning_status = LearningStatus::Learning, + ResponseType::LearningStarted => self.analytic_unit_learning_status = LearningStatus::Learning, ResponseType::LearningFinished(results) => { self.learning_handler = None; - self.learning_results = Some(results); - self.learning_status = LearningStatus::Ready; + self.analytic_unit_learning_results = Some(results); + self.analytic_unit_learning_status = LearningStatus::Ready; // TODO: run tasks from self.learning_waiter while self.learning_waiters.len() > 0 { @@ -219,13 +221,13 @@ impl AnalyticService { } ResponseType::LearningFinishedEmpty => { // TODO: drop all learning_waiters with empty results - self.learning_results = None; - self.learning_status = LearningStatus::Initialization; + self.analytic_unit_learning_results = None; + self.analytic_unit_learning_status = LearningStatus::Initialization; } ResponseType::LearningDatasourceError => { // TODO: drop all learning_waiters with error - self.learning_results = None; - self.learning_status = LearningStatus::Error; + self.analytic_unit_learning_results = None; + self.analytic_unit_learning_status = LearningStatus::Error; } } } @@ -311,7 +313,7 @@ impl AnalyticService { } } - let lr = PatternDetector::learn(&learn_tss, &learn_anti_tss).await; + let lr = PatternAnalyticUnit::learn(&learn_tss, &learn_anti_tss).await; match tx .send(AnalyticServiceMessage::Response( ResponseType::LearningFinished(lr), @@ -323,14 +325,14 @@ impl AnalyticService { } } - async fn get_pattern_detection( + async fn get_detections( tx: oneshot::Sender>>, lr: LearningResults, ms: MetricService, from: u64, to: u64, ) { - let pt = pattern_detector::PatternDetector::new(lr); + let pt = pattern_analytic_unit::PatternAnalyticUnit::new(lr); let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); if mr.data.keys().len() == 0 { @@ -367,58 +369,5 @@ impl AnalyticService { return; } - // TODO: move this to another analytic unit - async fn get_threshold_detections( - &self, - from: u64, - to: u64, - step: u64, - threashold: f64, - ) -> anyhow::Result> { - let mr = self.metric_service.query(from, to, step).await?; - if mr.data.keys().len() == 0 { - return Ok(Vec::new()); - } - - let key = mr.data.keys().nth(0).unwrap(); - let ts = &mr.data[key]; - - let mut result = Vec::::new(); - let mut from: Option = None; - for (t, v) in ts { - if *v > threashold { - if from.is_some() { - continue; - } else { - from = Some(*t); - } - } else { - if from.is_some() { - result.push(Segment { - // TODO: persist detections together with id - id: Some(get_random_str(ID_LENGTH)), - from: from.unwrap(), - to: *t, - segment_type: SegmentType::Detection, - }); - from = None; - } - } - } - - // TODO: don't repeat myself - if from.is_some() { - result.push(Segment { - id: Some(get_random_str(ID_LENGTH)), - from: from.unwrap(), - to, - segment_type: SegmentType::Detection, - }); - } - - // TODO: decide what to do it from is Some() in the end - - Ok(result) - } } diff --git a/server/src/services/analytic_service/analytic_unit/mod.rs b/server/src/services/analytic_service/analytic_unit/mod.rs index a185aea..5088df5 100644 --- a/server/src/services/analytic_service/analytic_unit/mod.rs +++ b/server/src/services/analytic_service/analytic_unit/mod.rs @@ -1,6 +1,12 @@ -pub mod pattern_detector; +pub mod pattern_analytic_unit; +pub mod threshold_analytic_unit; pub mod types; +use async_trait::async_trait; + +#[async_trait] trait AnalyticUnit { - -} \ No newline at end of file + async fn learn(reads: &Vec>, anti_reads: &Vec>); + fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)>; +} + diff --git a/server/src/services/analytic_service/analytic_unit/pattern_detector.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs similarity index 93% rename from server/src/services/analytic_service/analytic_unit/pattern_detector.rs rename to server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index 7663aa5..80c920e 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_detector.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -55,7 +55,7 @@ pub type Features = [f64; FEATURES_SIZE]; pub const SCORE_THRESHOLD: f64 = 0.95; #[derive(Clone)] -pub struct PatternDetector { +pub struct PatternAnalyticUnit { learning_results: LearningResults, } @@ -67,9 +67,9 @@ fn nan_to_zero(n: f64) -> f64 { } // TODO: move this to loginc of analytic unit -impl PatternDetector { - pub fn new(learning_results: LearningResults) -> PatternDetector { - PatternDetector { learning_results } +impl PatternAnalyticUnit { + pub fn new(learning_results: LearningResults) -> PatternAnalyticUnit { + PatternAnalyticUnit { learning_results } } pub async fn learn( @@ -86,7 +86,7 @@ impl PatternDetector { for r in reads { let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); - let fs = PatternDetector::get_features(&xs); + let fs = PatternAnalyticUnit::get_features(&xs); records_raw.push(fs); targets_raw.push(true); @@ -95,7 +95,7 @@ impl PatternDetector { for r in anti_reads { let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); - let fs = PatternDetector::get_features(&xs); + let fs = PatternAnalyticUnit::get_features(&xs); records_raw.push(fs); targets_raw.push(false); anti_patterns.push(xs); @@ -168,7 +168,7 @@ impl PatternDetector { for j in 0..p.len() { backet.push(nan_to_zero(ts[i + j].1)); } - let score = PatternDetector::corr_aligned(p, &backet); + let score = PatternAnalyticUnit::corr_aligned(p, &backet); if score > pattern_match_score { pattern_match_score = score; pattern_match_len = p.len(); @@ -182,7 +182,7 @@ impl PatternDetector { for j in 0..p.len() { backet.push(nan_to_zero(ts[i + j].1)); } - let score = PatternDetector::corr_aligned(p, &backet); + let score = PatternAnalyticUnit::corr_aligned(p, &backet); if score > anti_pattern_match_score { anti_pattern_match_score = score; } @@ -194,7 +194,7 @@ impl PatternDetector { for j in 0..pattern_match_len { backet.push(nan_to_zero(ts[i + j].1)); } - let fs = PatternDetector::get_features(&backet); + let fs = PatternAnalyticUnit::get_features(&backet); let detected = self .learning_results .model @@ -245,8 +245,6 @@ impl PatternDetector { return 0.; } - // TODO: case when denominator = 0 - let result: f64 = numerator / denominator; // assert!(result.abs() <= 1.01); diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs new file mode 100644 index 0000000..e79b21f --- /dev/null +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -0,0 +1,40 @@ +use super::types::ThresholdConfig; + +struct ThresholdDetector { + config: ThresholdConfig +} + +impl ThresholdDetector { + fn new(config: ThresholdConfig) -> ThresholdDetector { + ThresholdDetector{ config } + } + + pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> { + + let mut result = Vec::<(u64, u64)>::new(); + let mut from: Option = None; + for (t, v) in ts { + if *v > self.config.threashold { + if from.is_some() { + continue; + } else { + from = Some(*t); + } + } else { + if from.is_some() { + result.push((from.unwrap(), *t)); + from = None; + } + } + } + + // TODO: don't repeat myself + if from.is_some() { + result.push((from.unwrap(), ts.last().unwrap().0)); + } + + // TODO: decide what to do it from is Some() in the end + + result + } +} \ No newline at end of file diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index d87f12f..f1b6ec3 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -1,12 +1,18 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct PatternDetectorConfig { +pub struct PatternConfig { pub correlation_score: f32, pub model_score: f32 } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ThresholdConfig { + pub threashold: f64, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub enum AnalyticUnitConfig { - PatternDetector(PatternDetectorConfig) + Pattern(PatternConfig), + Threshold(ThresholdConfig) } \ No newline at end of file diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 7aa5669..b9cb94d 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -1,6 +1,6 @@ use crate::services::segments_service::Segment; -use super::analytic_unit::{pattern_detector::{self, LearningResults}, types::AnalyticUnitConfig}; +use super::analytic_unit::{pattern_analytic_unit::{self, LearningResults}, types::AnalyticUnitConfig}; use anyhow::Result; use serde::Serialize; @@ -18,7 +18,7 @@ pub enum LearningStatus { // TODO: move to analytic_unit config of pattern detector #[derive(Clone, Serialize, Debug)] pub struct LearningTrain { - pub features: Vec, + pub features: Vec, pub target: Vec, }