diff --git a/client/src/views/Home.vue b/client/src/views/Home.vue index 1bdb770..3eacb8f 100644 --- a/client/src/views/Home.vue +++ b/client/src/views/Home.vue @@ -19,8 +19,12 @@
Correlation score:
+ Anti correlation score: +
Model score: -

+
+ Threshold score: +

@@ -54,10 +58,20 @@ export default defineComponent({ cfg.correlation_score = parseFloat(e.target.value); this.$store.dispatch('patchConfig', { Pattern: cfg }); }, + antiCorrelationScoreChange(e) { + let cfg = _.clone(this.analyticUnitConfig); + cfg.anti_correlation_score = parseFloat(e.target.value); + this.$store.dispatch('patchConfig', { Pattern: cfg }); + }, modelScoreChange(e) { let cfg = _.clone(this.analyticUnitConfig); cfg.model_score = parseFloat(e.target.value); this.$store.dispatch('patchConfig', { Pattern: cfg }); + }, + thresholdScoreChange(e) { + let cfg = _.clone(this.analyticUnitConfig); + cfg.threshold_score = parseFloat(e.target.value); + this.$store.dispatch('patchConfig', { Pattern: cfg }); } }, data: function () { diff --git a/server/src/api/analytics.rs b/server/src/api/analytics.rs index e73a47b..ec09851 100644 --- a/server/src/api/analytics.rs +++ b/server/src/api/analytics.rs @@ -118,8 +118,10 @@ mod handlers { } } - pub async fn patch_config(client: Client, patch: PatchConfig) -> Result { - + pub async fn patch_config( + client: Client, + patch: PatchConfig, + ) -> Result { // println!("{:?}", patch); match client.patch_config(patch).await { Ok(cf) => Ok(API::json(&cf)), diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 403ec1d..d805127 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use super::analytic_unit::types::{AnalyticUnitConfig, PatternConfig, PatchConfig}; +use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig, PatternConfig}; use super::types::{self, DetectionRunnerConfig, LearningTrain}; use super::{ analytic_client::AnalyticClient, @@ -24,8 +24,6 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use chrono::Utc; - - // TODO: now it's basically single analytic unit, service will operate on many AU pub struct AnalyticService { metric_service: MetricService, @@ -64,10 +62,7 @@ impl AnalyticService { // TODO: get it from persistance analytic_unit: None, - analytic_unit_config: AnalyticUnitConfig::Pattern(PatternConfig { - correlation_score: 0.95, - model_score: 0.95, - }), + analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()), analytic_unit_learning_status: LearningStatus::Initialization, tx, @@ -171,7 +166,7 @@ impl AnalyticService { // } RequestType::GetConfig(tx) => { tx.send(self.analytic_unit_config.clone()).unwrap(); - }, + } RequestType::PatchConfig(patch_obj, tx) => { // TODO: path config // TODO: run learning if config type changed @@ -224,6 +219,16 @@ impl AnalyticService { self.analytic_unit_config = new_conf; if need_learning { self.consume_request(RequestType::RunLearning); + } else { + if self.analytic_unit.is_some() { + tokio::spawn({ + let au = self.analytic_unit.clone(); + let cfg = self.analytic_unit_config.clone(); + async move { + au.unwrap().write().await.set_config(cfg); + } + }); + } } } @@ -277,6 +282,8 @@ impl AnalyticService { from: u64, to: u64, ) { + // It's important that we don't drop read() lock until end + // because there mght be attempt to make .write() with setting new config let result = analytic_unit .read() .await diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index 5baba0f..66c6401 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -2,7 +2,7 @@ use crate::services::{ analytic_service::types, metric_service::MetricService, segments_service::SegmentsService, }; -use super::types::{AnalyticUnit, AnomalyConfig, LearningResult}; +use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult}; use async_trait::async_trait; @@ -21,6 +21,13 @@ impl AnomalyAnalyticUnit { #[async_trait] impl AnalyticUnit for AnomalyAnalyticUnit { + fn set_config(&mut self, config: AnalyticUnitConfig) { + if let AnalyticUnitConfig::Anomaly(cfg) = config { + self.config = cfg; + } else { + panic!("Bad config!"); + } + } async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult { return LearningResult::Finished; } diff --git a/server/src/services/analytic_service/analytic_unit/mod.rs b/server/src/services/analytic_service/analytic_unit/mod.rs index 6b84890..3bc2b36 100644 --- a/server/src/services/analytic_service/analytic_unit/mod.rs +++ b/server/src/services/analytic_service/analytic_unit/mod.rs @@ -1,14 +1,17 @@ +pub mod anomaly_analytic_unit; pub mod pattern_analytic_unit; pub mod threshold_analytic_unit; -pub mod anomaly_analytic_unit; pub mod types; -use self::{anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig}; +use self::{ + anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, + threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig, +}; pub fn resolve(cfg: AnalyticUnitConfig) -> Box { match cfg { AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new(c.clone())), AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new(c.clone())), - AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new(c.clone())) + AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new(c.clone())), } } diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index c832cb3..56ffdf6 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -16,7 +16,7 @@ use crate::services::{ segments_service::{Segment, SegmentType, SegmentsService}, }; -use super::types::{AnalyticUnit, LearningResult, PatternConfig}; +use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, PatternConfig}; use async_trait::async_trait; @@ -60,8 +60,6 @@ pub const FEATURES_SIZE: usize = 4; pub type Features = [f64; FEATURES_SIZE]; -pub const SCORE_THRESHOLD: f64 = 0.95; - fn nan_to_zero(n: f64) -> f64 { if n.is_nan() { return 0.; @@ -107,7 +105,7 @@ impl PatternAnalyticUnit { } } - fn corr_aligned(xs: &Vec, ys: &Vec) -> f64 { + fn corr_aligned(xs: &Vec, ys: &Vec) -> f32 { let n = xs.len() as f64; let mut s_xs: f64 = 0f64; let mut s_ys: f64 = 0f64; @@ -146,7 +144,7 @@ impl PatternAnalyticUnit { println!("WARNING: corr result > 1: {}", result); } - return result; + return result as f32; // we know that it's in -1..1 } fn get_features(xs: &Vec) -> Features { @@ -184,6 +182,15 @@ impl PatternAnalyticUnit { #[async_trait] impl AnalyticUnit for PatternAnalyticUnit { + + fn set_config(&mut self, config: AnalyticUnitConfig) { + if let AnalyticUnitConfig::Pattern(cfg) = config { + self.config = cfg; + } else { + panic!("Bad config!"); + } + } + async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult { // be careful if decide to store detections in db let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); @@ -325,9 +332,9 @@ impl AnalyticUnit for PatternAnalyticUnit { let apt = &lr.anti_patterns; for i in 0..ts.len() { - let mut pattern_match_score = 0f64; + let mut pattern_match_score = 0f32; let mut pattern_match_len = 0usize; - let mut anti_pattern_match_score = 0f64; + let mut anti_pattern_match_score = 0f32; for p in pt { if i + p.len() < ts.len() { @@ -364,14 +371,12 @@ impl AnalyticUnit for PatternAnalyticUnit { let fs = PatternAnalyticUnit::get_features(&backet); let detected = lr.model.lock().predict(Array::from_vec(fs.to_vec())); if detected { - pattern_match_score += 0.1; - } else { - anti_pattern_match_score += 0.1; + pattern_match_score += self.config.model_score; } } - if pattern_match_score > anti_pattern_match_score - && pattern_match_score >= SCORE_THRESHOLD + if pattern_match_score - anti_pattern_match_score * self.config.anti_correlation_score + >= self.config.threshold_score { results.push((ts[i].0, ts[i + pattern_match_len - 1].0)); } diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index 4430db6..5eeb673 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -2,7 +2,7 @@ use crate::services::{ analytic_service::types, metric_service::MetricService, segments_service::SegmentsService, }; -use super::types::{AnalyticUnit, LearningResult, ThresholdConfig}; +use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, ThresholdConfig}; use async_trait::async_trait; @@ -17,6 +17,7 @@ impl ThresholdAnalyticUnit { pub fn new(config: ThresholdConfig) -> ThresholdAnalyticUnit { ThresholdAnalyticUnit { config } } + } #[async_trait] @@ -24,6 +25,15 @@ impl AnalyticUnit for ThresholdAnalyticUnit { async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult { return LearningResult::Finished; } + + fn set_config(&mut self, config: AnalyticUnitConfig) { + if let AnalyticUnitConfig::Threshold(cfg) = config { + self.config = cfg; + } else { + panic!("Bad config!"); + } + } + async fn detect( &self, ms: MetricService, diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 845b651..05b2291 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -3,22 +3,23 @@ use serde::{Deserialize, Serialize}; use async_trait::async_trait; -use crate::services::{ - metric_service::MetricService, segments_service::SegmentsService, -}; - +use crate::services::{metric_service::MetricService, segments_service::SegmentsService}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct PatternConfig { pub correlation_score: f32, + pub anti_correlation_score: f32, pub model_score: f32, + pub threshold_score: f32, } impl Default for PatternConfig { fn default() -> Self { PatternConfig { - correlation_score: 0.95, - model_score: 0.95, + correlation_score: 0.7, + anti_correlation_score: 0.7, + model_score: 0.5, + threshold_score: 1.0, } } } @@ -30,9 +31,7 @@ pub struct AnomalyConfig { impl Default for AnomalyConfig { fn default() -> Self { - AnomalyConfig { - sesonality: false - } + AnomalyConfig { sesonality: false } } } @@ -43,81 +42,71 @@ pub struct ThresholdConfig { impl Default for ThresholdConfig { fn default() -> Self { - ThresholdConfig { - threashold: 0.5 - } + ThresholdConfig { threashold: 0.5 } } } - - #[derive(Debug, Serialize, Deserialize, Clone)] pub enum AnalyticUnitConfig { Pattern(PatternConfig), Threshold(ThresholdConfig), - Anomaly(AnomalyConfig) + Anomaly(AnomalyConfig), } impl AnalyticUnitConfig { // return tru if patch is different type pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) { match patch { - PatchConfig::Pattern(tcfg) => { - match self.clone() { - AnalyticUnitConfig::Pattern(_) => { - if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false) - } else { - return (AnalyticUnitConfig::Pattern(Default::default()), false) - } - }, - _ => { - if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true) - } else { - return (AnalyticUnitConfig::Pattern(Default::default()), true) - } - }, + PatchConfig::Pattern(tcfg) => match self.clone() { + AnalyticUnitConfig::Pattern(_) => { + if tcfg.is_some() { + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false); + } else { + return (AnalyticUnitConfig::Pattern(Default::default()), false); + } } - } - - PatchConfig::Anomaly(tcfg) => { - match self.clone() { - AnalyticUnitConfig::Anomaly(_) => { - if tcfg.is_some() { - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), false) - } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), false) - } - }, - _ => { - if tcfg.is_some() { - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true) - } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), true) - } - }, + _ => { + if tcfg.is_some() { + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true); + } else { + return (AnalyticUnitConfig::Pattern(Default::default()), true); + } } - } - - PatchConfig::Threshold(tcfg) => { - match self.clone() { - AnalyticUnitConfig::Threshold(_) => { - if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false) - } else { - return (AnalyticUnitConfig::Threshold(Default::default()), false) - } - }, - _ => { - if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true) - } else { - return (AnalyticUnitConfig::Threshold(Default::default()), true) - } - }, + }, + + PatchConfig::Anomaly(tcfg) => match self.clone() { + AnalyticUnitConfig::Anomaly(_) => { + if tcfg.is_some() { + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), false); + } else { + return (AnalyticUnitConfig::Anomaly(Default::default()), false); + } } - } + _ => { + if tcfg.is_some() { + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true); + } else { + return (AnalyticUnitConfig::Anomaly(Default::default()), true); + } + } + }, + + PatchConfig::Threshold(tcfg) => match self.clone() { + AnalyticUnitConfig::Threshold(_) => { + if tcfg.is_some() { + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false); + } else { + return (AnalyticUnitConfig::Threshold(Default::default()), false); + } + } + _ => { + if tcfg.is_some() { + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true); + } else { + return (AnalyticUnitConfig::Threshold(Default::default()), true); + } + } + }, } } } @@ -137,11 +126,13 @@ pub trait AnalyticUnit { from: u64, to: u64, ) -> anyhow::Result>; + + fn set_config(&mut self, c: AnalyticUnitConfig); } #[derive(Deserialize, Serialize, Debug)] pub enum PatchConfig { Pattern(Option), Threshold(Option), - Anomaly(Option) + Anomaly(Option), } diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 1382589..d9472db 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -7,7 +7,7 @@ use super::analytic_unit::{ types::AnalyticUnitConfig, }; -use super::analytic_unit::types::{PatchConfig}; +use super::analytic_unit::types::PatchConfig; use anyhow::Result; use serde::Serialize;