From 6197641bf6bad0f870d50d7f36efdfdb8c617b03 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 9 Nov 2021 03:20:37 +0300 Subject: [PATCH] analytic unit --- server/src/api/analytics.rs | 36 +-- .../analytic_service/analytic_client.rs | 14 +- .../analytic_service/analytic_service.rs | 225 ++++--------- .../analytic_service/analytic_unit/mod.rs | 15 +- .../analytic_unit/pattern_analytic_unit.rs | 299 ++++++++++++------ .../analytic_unit/threshold_analytic_unit.rs | 46 ++- .../analytic_service/analytic_unit/types.rs | 29 +- server/src/services/analytic_service/mod.rs | 2 +- server/src/services/analytic_service/types.rs | 21 +- 9 files changed, 379 insertions(+), 308 deletions(-) diff --git a/server/src/api/analytics.rs b/server/src/api/analytics.rs index 0cbb229..2119010 100644 --- a/server/src/api/analytics.rs +++ b/server/src/api/analytics.rs @@ -10,7 +10,7 @@ pub mod filters { list(client.clone()) .or(status(client.clone())) .or(get_config(client.clone())) - .or(list_train(client.clone())) + // .or(list_train(client.clone())) // .or(create(db.clone())) // // .or(update(db.clone())) // .or(delete(db.clone())) @@ -48,14 +48,14 @@ pub mod filters { } /// GET /analytics/model - pub fn list_train( - client: Client, - ) -> impl Filter + Clone { - warp::path!("analytics" / "model") - .and(warp::get()) - .and(with_client(client)) - .and_then(handlers::list_train) - } + // pub fn list_train( + // client: Client, + // ) -> impl Filter + Clone { + // warp::path!("analytics" / "model") + // .and(warp::get()) + // .and(with_client(client)) + // .and_then(handlers::list_train) + // } fn with_client( client: Client, @@ -103,15 +103,15 @@ mod handlers { } } - pub async fn list_train(client: Client) -> Result { - match client.get_train().await { - Ok(lt) => Ok(API::json(<)), - Err(e) => { - println!("{:?}", e); - Err(warp::reject::custom(BadQuery)) - } - } - } + // pub async fn list_train(client: Client) -> Result { + // match client.get_train().await { + // Ok(lt) => Ok(API::json(<)), + // Err(e) => { + // println!("{:?}", e); + // Err(warp::reject::custom(BadQuery)) + // } + // } + // } } mod models { diff --git a/server/src/services/analytic_service/analytic_client.rs b/server/src/services/analytic_service/analytic_client.rs index c01c64a..de544e5 100644 --- a/server/src/services/analytic_service/analytic_client.rs +++ b/server/src/services/analytic_service/analytic_client.rs @@ -43,13 +43,13 @@ impl AnalyticClient { Ok(r) } - pub async fn get_train(&self) -> anyhow::Result { - let (tx, rx) = oneshot::channel(); - let req = AnalyticServiceMessage::Request(RequestType::GetLearningTrain(tx)); - self.tx.send(req).await?; - let r = rx.await?; - Ok(r) - } + // pub async fn get_train(&self) -> anyhow::Result { + // let (tx, rx) = oneshot::channel(); + // let req = AnalyticServiceMessage::Request(RequestType::GetLearningTrain(tx)); + // self.tx.send(req).await?; + // let r = rx.await?; + // Ok(r) + // } pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { let (tx, rx) = oneshot::channel(); diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 74e8888..dbc4f71 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use super::analytic_unit::types::{AnalyticUnitConfig, PatternConfig}; use super::types::{self, DetectionRunnerConfig, LearningTrain}; use super::{ @@ -6,53 +8,27 @@ use super::{ types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType}, }; +use crate::services::analytic_service::analytic_unit::resolve; use crate::services::{ metric_service::MetricService, segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH}, }; use crate::utils::{self, get_random_str}; -use anyhow; +use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, LearningResult}; -use tokio::sync::{mpsc, oneshot}; +use anyhow; -use futures::future; +use tokio::sync::{mpsc, oneshot, RwLock}; use chrono::Utc; -// TODO: get this from pattern detector -const DETECTION_STEP: u64 = 10; - -struct SegData { - label: bool, - data: Vec<(u64, f64)>, -} - -async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Result { - let mut mr = ms.query(segment.from, segment.to, DETECTION_STEP).await?; - - if mr.data.keys().len() == 0 { - return Ok(SegData { - label: segment.segment_type == SegmentType::Label, - data: Default::default(), - }); - } - - let k = mr.data.keys().nth(0).unwrap().clone(); - let ts = mr.data.remove(&k).unwrap(); - - Ok(SegData { - label: segment.segment_type == SegmentType::Label, - data: ts, - }) -} - // TODO: now it's basically single analytic unit, service will operate on many AU pub struct AnalyticService { metric_service: MetricService, segments_service: SegmentsService, - analytic_unit_learning_results: Option, + analytic_unit: Option>>>, analytic_unit_config: AnalyticUnitConfig, analytic_unit_learning_status: LearningStatus, @@ -84,10 +60,10 @@ impl AnalyticService { segments_service, // TODO: get it from persistance - analytic_unit_learning_results: None, + analytic_unit: None, analytic_unit_config: AnalyticUnitConfig::Pattern(PatternConfig { correlation_score: 0.95, - model_score: 0.95 + model_score: 0.95, }), analytic_unit_learning_status: LearningStatus::Initialization, @@ -113,28 +89,27 @@ impl AnalyticService { fn run_detection_task(&self, task: DetectionTask) { // TODO: save handler of the task tokio::spawn({ - let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone(); let ms = self.metric_service.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); async move { - AnalyticService::get_detections(task.sender, lr, ms, task.from, task.to) - .await; + AnalyticService::get_detections(task.sender, au, ms, task.from, task.to).await; } }); } - fn run_detection_runner(&mut self, task: DetectionRunnerConfig) { - if self.runner_handler.is_some() { - self.runner_handler.as_mut().unwrap().abort(); - } - // TODO: save handler of the task - self.runner_handler = Some(tokio::spawn({ - let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone(); - let ms = self.metric_service.clone(); - async move { - // TODO: implement - } - })); - } + // fn run_detection_runner(&mut self, task: DetectionRunnerConfig) { + // if self.runner_handler.is_some() { + // self.runner_handler.as_mut().unwrap().abort(); + // } + // // TODO: save handler of the task + // self.runner_handler = Some(tokio::spawn({ + // let au = self.analytic_unit.unwrap(); + // let ms = self.metric_service.clone(); + // async move { + // // TODO: implement + // } + // })); + // } fn consume_request(&mut self, req: types::RequestType) -> () { match req { @@ -148,8 +123,9 @@ impl AnalyticService { let tx = self.tx.clone(); let ms = self.metric_service.clone(); let ss = self.segments_service.clone(); + let cfg = self.analytic_unit_config.clone(); async move { - AnalyticService::run_learning(tx, ms, ss).await; + AnalyticService::run_learning(tx, cfg, ms, ss).await; } })); } @@ -176,20 +152,20 @@ impl AnalyticService { RequestType::GetStatus(tx) => { tx.send(self.analytic_unit_learning_status.clone()).unwrap(); } - RequestType::GetLearningTrain(tx) => { - if self.analytic_unit_learning_results.is_none() { - tx.send(LearningTrain::default()).unwrap(); - } else { - tx.send( - self.analytic_unit_learning_results - .as_ref() - .unwrap() - .learning_train - .clone(), - ) - .unwrap(); - } - } + // RequestType::GetLearningTrain(tx) => { + // if self.analytic_unit_learning_results.is_none() { + // tx.send(LearningTrain::default()).unwrap(); + // } else { + // tx.send( + // self.analytic_unit_learning_results + // .as_ref() + // .unwrap() + // .learning_train + // .clone(), + // ) + // .unwrap(); + // } + // } RequestType::GetConfig(tx) => { tx.send(self.analytic_unit_config.clone()).unwrap(); } @@ -199,10 +175,12 @@ impl AnalyticService { fn consume_response(&mut self, res: types::ResponseType) { match res { // TODO: handle when learning panic - ResponseType::LearningStarted => self.analytic_unit_learning_status = LearningStatus::Learning, + ResponseType::LearningStarted => { + self.analytic_unit_learning_status = LearningStatus::Learning + } ResponseType::LearningFinished(results) => { self.learning_handler = None; - self.analytic_unit_learning_results = Some(results); + self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); self.analytic_unit_learning_status = LearningStatus::Ready; // TODO: run tasks from self.learning_waiter @@ -212,21 +190,21 @@ impl AnalyticService { } // TODO: fix this - if self.endpoint.is_some() { - self.run_detection_runner(DetectionRunnerConfig { - endpoint: self.endpoint.as_ref().unwrap().clone(), - from: Utc::now().timestamp() as u64, - }); - } + // if self.endpoint.is_some() { + // self.run_detection_runner(DetectionRunnerConfig { + // endpoint: self.endpoint.as_ref().unwrap().clone(), + // from: Utc::now().timestamp() as u64, + // }); + // } } ResponseType::LearningFinishedEmpty => { // TODO: drop all learning_waiters with empty results - self.analytic_unit_learning_results = None; + self.analytic_unit = None; self.analytic_unit_learning_status = LearningStatus::Initialization; } ResponseType::LearningDatasourceError => { // TODO: drop all learning_waiters with error - self.analytic_unit_learning_results = None; + self.analytic_unit = None; self.analytic_unit_learning_status = LearningStatus::Error; } } @@ -246,9 +224,12 @@ impl AnalyticService { async fn run_learning( tx: mpsc::Sender, + aucfg: AnalyticUnitConfig, ms: MetricService, ss: SegmentsService, ) { + let mut au = resolve(aucfg); + match tx .send(AnalyticServiceMessage::Response( ResponseType::LearningStarted, @@ -256,70 +237,17 @@ impl AnalyticService { .await { Ok(_) => {} - Err(_e) => println!("Fail to send notification about learning start"), + Err(_e) => println!("Fail to send learning started notification"), } - // TODO: logic for returning error - - // be careful if decide to store detections in db - let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); - let has_segments_label = segments - .iter() - .find(|s| s.segment_type == SegmentType::Label) - .is_some(); - - if !has_segments_label { - match tx - .send(AnalyticServiceMessage::Response( - ResponseType::LearningFinishedEmpty, - )) - .await - { - Ok(_) => {} - Err(_e) => println!("Fail to send learning results"), - } - return; - } - - let fs = segments.iter().map(|s| segment_to_segdata(&ms, s)); - let rs = future::join_all(fs).await; - - let mut learn_tss = Vec::new(); - let mut learn_anti_tss = Vec::new(); - - for r in rs { - if r.is_err() { - println!("Error extracting metrics from datasource"); - match tx - .send(AnalyticServiceMessage::Response( - ResponseType::LearningDatasourceError, - )) - .await - { - Ok(_) => {} - Err(_e) => println!("Fail send error abour extracting error"), - } - return; - } - - let sd = r.unwrap(); - if sd.data.is_empty() { - continue; - } - if sd.label { - learn_tss.push(sd.data); - } else { - learn_anti_tss.push(sd.data); - } - } + // TODO: maybe to spawn_blocking here + let lr = match au.learn(ms, ss).await { + LearningResult::Finished => ResponseType::LearningFinished(au), + LearningResult::DatasourceError => ResponseType::LearningDatasourceError, + LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty, + }; - let lr = PatternAnalyticUnit::learn(&learn_tss, &learn_anti_tss).await; - match tx - .send(AnalyticServiceMessage::Response( - ResponseType::LearningFinished(lr), - )) - .await - { + match tx.send(AnalyticServiceMessage::Response(lr)).await { Ok(_) => {} Err(_e) => println!("Fail to send learning results"), } @@ -327,28 +255,17 @@ impl AnalyticService { async fn get_detections( tx: oneshot::Sender>>, - lr: LearningResults, + analytic_unit: Arc>>, ms: MetricService, from: u64, to: u64, ) { - let pt = pattern_analytic_unit::PatternAnalyticUnit::new(lr); - let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); - - if mr.data.keys().len() == 0 { - match tx.send(Ok(Vec::new())) { - Ok(_) => {} - Err(_e) => { - println!("failed to send empty results"); - } - } - return; - } - - let k = mr.data.keys().nth(0).unwrap(); - let ts = &mr.data[k]; - - let result = pt.detect(ts); + let result = analytic_unit + .read() + .await + .detect(ms, from, to) + .await + .unwrap(); let result_segments: Vec = result .iter() @@ -368,6 +285,4 @@ impl AnalyticService { } return; } - - } diff --git a/server/src/services/analytic_service/analytic_unit/mod.rs b/server/src/services/analytic_service/analytic_unit/mod.rs index 5088df5..56f7e98 100644 --- a/server/src/services/analytic_service/analytic_unit/mod.rs +++ b/server/src/services/analytic_service/analytic_unit/mod.rs @@ -2,11 +2,14 @@ pub mod pattern_analytic_unit; pub mod threshold_analytic_unit; pub mod types; -use async_trait::async_trait; +use self::{ + pattern_analytic_unit::PatternAnalyticUnit, threshold_analytic_unit::ThresholdAnalyticUnit, + types::AnalyticUnitConfig, +}; -#[async_trait] -trait AnalyticUnit { - async fn learn(reads: &Vec>, anti_reads: &Vec>); - fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)>; +pub fn resolve(cfg: AnalyticUnitConfig) -> Box { + match cfg { + AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new(c.clone())), + AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new(c.clone())), + } } - diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index 80c920e..c832cb3 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -1,19 +1,27 @@ use std::{fmt, sync::Arc}; +use futures::future; use parking_lot::Mutex; -use serde::{Deserialize, Serialize}; -use serde_json; use linfa::prelude::*; use linfa; -use linfa_svm::{error::Result, Svm}; +use linfa_svm::Svm; -use ndarray::{Array, ArrayView, Axis}; +use ndarray::Array; -use crate::services::analytic_service::types::LearningTrain; +use crate::services::{ + analytic_service::types::{self, LearningTrain}, + metric_service::MetricService, + segments_service::{Segment, SegmentType, SegmentsService}, +}; +use super::types::{AnalyticUnit, LearningResult, PatternConfig}; +use async_trait::async_trait; + +// TODO: move to config +const DETECTION_STEP: u64 = 10; #[derive(Clone)] pub struct LearningResults { @@ -54,11 +62,6 @@ pub type Features = [f64; FEATURES_SIZE]; pub const SCORE_THRESHOLD: f64 = 0.95; -#[derive(Clone)] -pub struct PatternAnalyticUnit { - learning_results: LearningResults, -} - fn nan_to_zero(n: f64) -> f64 { if n.is_nan() { return 0.; @@ -66,16 +69,159 @@ fn nan_to_zero(n: f64) -> f64 { return n; } +struct SegData { + label: bool, + data: Vec<(u64, f64)>, +} + +async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Result { + let mut mr = ms.query(segment.from, segment.to, DETECTION_STEP).await?; + + if mr.data.keys().len() == 0 { + return Ok(SegData { + label: segment.segment_type == SegmentType::Label, + data: Default::default(), + }); + } + + let k = mr.data.keys().nth(0).unwrap().clone(); + let ts = mr.data.remove(&k).unwrap(); + + Ok(SegData { + label: segment.segment_type == SegmentType::Label, + data: ts, + }) +} + +pub struct PatternAnalyticUnit { + config: PatternConfig, + learning_results: Option, +} + // TODO: move this to loginc of analytic unit impl PatternAnalyticUnit { - pub fn new(learning_results: LearningResults) -> PatternAnalyticUnit { - PatternAnalyticUnit { learning_results } + pub fn new(cfg: PatternConfig) -> PatternAnalyticUnit { + PatternAnalyticUnit { + config: cfg, + learning_results: None, + } + } + + fn corr_aligned(xs: &Vec, ys: &Vec) -> f64 { + let n = xs.len() as f64; + let mut s_xs: f64 = 0f64; + let mut s_ys: f64 = 0f64; + let mut s_xsys: f64 = 0f64; + let mut s_xs_2: f64 = 0f64; + let mut s_ys_2: f64 = 0f64; + + let min = xs.len().min(ys.len()); + xs.iter() + .take(min) + .zip(ys.iter().take(min)) + .for_each(|(xi, yi)| { + s_xs += xi; + s_ys += yi; + s_xsys += xi * yi; + s_xs_2 += xi * xi; + s_ys_2 += yi * yi; + }); + + let numerator: f64 = n * s_xsys - s_xs * s_ys; + let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt(); + + // IT"s a hack + if denominator < 0.01 { + return 0.; + } + + let result: f64 = numerator / denominator; + + // assert!(result.abs() <= 1.01); + + if result.abs() > 1.1 { + println!("{:?}", xs); + println!("------------"); + println!("{:?}", ys); + println!("WARNING: corr result > 1: {}", result); + } + + return result; } - pub async fn learn( - reads: &Vec>, - anti_reads: &Vec>, - ) -> LearningResults { + fn get_features(xs: &Vec) -> Features { + let mut min = f64::MAX; + let mut max = f64::MIN; + let mut sum = 0f64; + + for x in xs { + min = min.min(*x); + max = max.max(*x); + sum += x; + } + + let mean = sum / xs.len() as f64; + + sum = 0f64; + + for x in xs { + sum += (x - mean) * (x - mean); + } + + let sd = sum.sqrt(); + + // TODO: add autocorrelation + // TODO: add FFT + // TODO: add DWT + + return [ + min, max, mean, sd, + // 0f64,0f64, + // 0f64,0f64,0f64, 0f64 + ]; + } +} + +#[async_trait] +impl AnalyticUnit for PatternAnalyticUnit { + async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult { + // be careful if decide to store detections in db + let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); + let has_segments_label = segments + .iter() + .find(|s| s.segment_type == SegmentType::Label) + .is_some(); + + if !has_segments_label { + return LearningResult::FinishedEmpty; + } + + let fs = segments.iter().map(|s| segment_to_segdata(&ms, s)); + let rs = future::join_all(fs).await; + + let mut learn_tss = Vec::new(); + let mut learn_anti_tss = Vec::new(); + + for r in rs { + if r.is_err() { + println!("Error extracting metrics from datasource"); + return LearningResult::DatasourceError; + } + + let sd = r.unwrap(); + if sd.data.is_empty() { + continue; + } + if sd.label { + learn_tss.push(sd.data); + } else { + learn_anti_tss.push(sd.data); + } + } + + // let reads: &Vec> = // TODO + // let anti_reads: &Vec> // TODO + // let size_avg = reads.iter().map(|r| r.len()).sum::() / reads.len(); let mut patterns = Vec::>::new(); @@ -84,7 +230,7 @@ impl PatternAnalyticUnit { let mut records_raw = Vec::::new(); let mut targets_raw = Vec::::new(); - for r in reads { + for r in learn_tss { let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); let fs = PatternAnalyticUnit::get_features(&xs); @@ -93,7 +239,7 @@ impl PatternAnalyticUnit { patterns.push(xs); } - for r in anti_reads { + for r in learn_anti_tss { let xs: Vec = r.iter().map(|e| e.1).map(nan_to_zero).collect(); let fs = PatternAnalyticUnit::get_features(&xs); records_raw.push(fs); @@ -137,7 +283,7 @@ impl PatternAnalyticUnit { // println!("pridiction: {}", prediction ); - LearningResults { + self.learning_results = Some(LearningResults { model: Arc::new(Mutex::new(model)), learning_train: LearningTrain { @@ -147,15 +293,36 @@ impl PatternAnalyticUnit { patterns, anti_patterns, - } + }); + + return LearningResult::Finished; } // TODO: get iterator instead of vector - pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> { + async fn detect( + &self, + ms: MetricService, + from: u64, + to: u64, + ) -> anyhow::Result> { + if self.learning_results.is_none() { + return Err(anyhow::format_err!("Learning results are not ready")); + } + + let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); + + if mr.data.keys().len() == 0 { + return Ok(Vec::new()); + } + + let k = mr.data.keys().nth(0).unwrap(); + let ts = &mr.data[k]; + + let lr = self.learning_results.as_ref().unwrap(); let mut results = Vec::new(); - let pt = &self.learning_results.patterns; - let apt = &self.learning_results.anti_patterns; + let pt = &lr.patterns; + let apt = &lr.anti_patterns; for i in 0..ts.len() { let mut pattern_match_score = 0f64; @@ -168,7 +335,7 @@ impl PatternAnalyticUnit { for j in 0..p.len() { backet.push(nan_to_zero(ts[i + j].1)); } - let score = PatternAnalyticUnit::corr_aligned(p, &backet); + let score = PatternAnalyticUnit::corr_aligned(&p, &backet); if score > pattern_match_score { pattern_match_score = score; pattern_match_len = p.len(); @@ -182,7 +349,7 @@ impl PatternAnalyticUnit { for j in 0..p.len() { backet.push(nan_to_zero(ts[i + j].1)); } - let score = PatternAnalyticUnit::corr_aligned(p, &backet); + let score = PatternAnalyticUnit::corr_aligned(&p, &backet); if score > anti_pattern_match_score { anti_pattern_match_score = score; } @@ -195,11 +362,7 @@ impl PatternAnalyticUnit { backet.push(nan_to_zero(ts[i + j].1)); } let fs = PatternAnalyticUnit::get_features(&backet); - let detected = self - .learning_results - .model - .lock() - .predict(Array::from_vec(fs.to_vec())); + let detected = lr.model.lock().predict(Array::from_vec(fs.to_vec())); if detected { pattern_match_score += 0.1; } else { @@ -214,80 +377,6 @@ impl PatternAnalyticUnit { } } - return results; - } - - fn corr_aligned(xs: &Vec, ys: &Vec) -> f64 { - let n = xs.len() as f64; - let mut s_xs: f64 = 0f64; - let mut s_ys: f64 = 0f64; - let mut s_xsys: f64 = 0f64; - let mut s_xs_2: f64 = 0f64; - let mut s_ys_2: f64 = 0f64; - - let min = xs.len().min(ys.len()); - xs.iter() - .take(min) - .zip(ys.iter().take(min)) - .for_each(|(xi, yi)| { - s_xs += xi; - s_ys += yi; - s_xsys += xi * yi; - s_xs_2 += xi * xi; - s_ys_2 += yi * yi; - }); - - let numerator: f64 = n * s_xsys - s_xs * s_ys; - let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt(); - - // IT"s a hack - if denominator < 0.01 { - return 0.; - } - - let result: f64 = numerator / denominator; - - // assert!(result.abs() <= 1.01); - - if result.abs() > 1.1 { - println!("{:?}", xs); - println!("------------"); - println!("{:?}", ys); - println!("WARNING: corr result > 1: {}", result); - } - - return result; - } - - fn get_features(xs: &Vec) -> Features { - let mut min = f64::MAX; - let mut max = f64::MIN; - let mut sum = 0f64; - - for x in xs { - min = min.min(*x); - max = max.max(*x); - sum += x; - } - - let mean = sum / xs.len() as f64; - - sum = 0f64; - - for x in xs { - sum += (x - mean) * (x - mean); - } - - let sd = sum.sqrt(); - - // TODO: add autocorrelation - // TODO: add FFT - // TODO: add DWT - - return [ - min, max, mean, sd, - // 0f64,0f64, - // 0f64,0f64,0f64, 0f64 - ]; + Ok(results) } } diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index e79b21f..4430db6 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -1,15 +1,43 @@ -use super::types::ThresholdConfig; +use crate::services::{ + analytic_service::types, metric_service::MetricService, segments_service::SegmentsService, +}; -struct ThresholdDetector { - config: ThresholdConfig +use super::types::{AnalyticUnit, LearningResult, ThresholdConfig}; + +use async_trait::async_trait; + +// TODO: move to config +const DETECTION_STEP: u64 = 10; + +pub struct ThresholdAnalyticUnit { + config: ThresholdConfig, } -impl ThresholdDetector { - fn new(config: ThresholdConfig) -> ThresholdDetector { - ThresholdDetector{ config } +impl ThresholdAnalyticUnit { + pub fn new(config: ThresholdConfig) -> ThresholdAnalyticUnit { + ThresholdAnalyticUnit { config } } +} - pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> { +#[async_trait] +impl AnalyticUnit for ThresholdAnalyticUnit { + async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult { + return LearningResult::Finished; + } + async fn detect( + &self, + ms: MetricService, + from: u64, + to: u64, + ) -> anyhow::Result> { + let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); + + if mr.data.keys().len() == 0 { + return Ok(Vec::new()); + } + + let k = mr.data.keys().nth(0).unwrap(); + let ts = &mr.data[k]; let mut result = Vec::<(u64, u64)>::new(); let mut from: Option = None; @@ -35,6 +63,6 @@ impl ThresholdDetector { // TODO: decide what to do it from is Some() in the end - result + Ok(result) } -} \ No newline at end of file +} diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index f1b6ec3..0470212 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -1,9 +1,15 @@ use serde::{Deserialize, Serialize}; +use async_trait::async_trait; + +use crate::services::{ + analytic_service::types, metric_service::MetricService, segments_service::SegmentsService, +}; + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct PatternConfig { pub correlation_score: f32, - pub model_score: f32 + pub model_score: f32, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -14,5 +20,22 @@ pub struct ThresholdConfig { #[derive(Debug, Serialize, Deserialize, Clone)] pub enum AnalyticUnitConfig { Pattern(PatternConfig), - Threshold(ThresholdConfig) -} \ No newline at end of file + Threshold(ThresholdConfig), +} + +pub enum LearningResult { + Finished, + FinishedEmpty, + DatasourceError, +} + +#[async_trait] +pub trait AnalyticUnit { + async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult; + async fn detect( + &self, + ms: MetricService, + from: u64, + to: u64, + ) -> anyhow::Result>; +} diff --git a/server/src/services/analytic_service/mod.rs b/server/src/services/analytic_service/mod.rs index fb15d78..6f83efc 100644 --- a/server/src/services/analytic_service/mod.rs +++ b/server/src/services/analytic_service/mod.rs @@ -1,5 +1,5 @@ mod analytic_service; -mod analytic_unit; +pub mod analytic_unit; pub mod types; pub mod analytic_client; diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index b9cb94d..dd4e976 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -1,11 +1,18 @@ +use std::fmt; + use crate::services::segments_service::Segment; -use super::analytic_unit::{pattern_analytic_unit::{self, LearningResults}, types::AnalyticUnitConfig}; +use super::analytic_unit::{ + pattern_analytic_unit::{self, LearningResults}, + types::AnalyticUnitConfig, +}; use anyhow::Result; use serde::Serialize; use tokio::sync::oneshot; +use crate::services::analytic_service::analytic_unit::types::AnalyticUnit; + #[derive(Debug, Clone, PartialEq, Serialize)] pub enum LearningStatus { Initialization, @@ -31,14 +38,20 @@ impl Default for LearningTrain { } } -#[derive(Debug)] pub enum ResponseType { LearningStarted, - LearningFinished(LearningResults), + LearningFinished(Box), LearningFinishedEmpty, LearningDatasourceError, } +impl fmt::Debug for ResponseType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: implement + f.debug_tuple("foo").finish() + } +} + #[derive(Debug)] pub struct DetectionTask { pub sender: oneshot::Sender>>, @@ -60,7 +73,7 @@ pub enum RequestType { RunDetection(DetectionTask), GetStatus(oneshot::Sender), GetConfig(oneshot::Sender), - GetLearningTrain(oneshot::Sender), + // GetLearningTrain(oneshot::Sender), } #[derive(Debug)]