diff --git a/server/src/main.rs b/server/src/main.rs index 03b5b47..a9ed0cd 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,6 @@ mod api; -use hastic::services::{analytic_service, metric_service, segments_service, analytic_unit_service}; +use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service}; use anyhow; diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 7c007dd..94fc5e6 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -59,7 +59,6 @@ impl AnalyticService { segments_service: segments_service::SegmentsService, alerting: Option, ) -> AnalyticService { - // TODO: move buffer size to config let (tx, rx) = mpsc::channel::(32); @@ -242,7 +241,9 @@ impl AnalyticService { println!("Detection runner started from {}", from) } ResponseType::DetectionRunnerUpdate(id, timestamp) => { - self.analytic_unit_service.set_last_detection(id, timestamp).unwrap(); + self.analytic_unit_service + .set_last_detection(id, timestamp) + .unwrap(); } ResponseType::LearningStarted => { self.analytic_unit_learning_status = LearningStatus::Learning @@ -274,34 +275,52 @@ impl AnalyticService { } fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { - - let (new_conf, need_learning, same_type) = self.analytic_unit_config.patch(patch); - self.analytic_unit_config = new_conf.clone(); - if need_learning { - self.consume_request(RequestType::RunLearning); - // TODO: it's not fully correct: we need to wait when the learning starts - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } - } else { + let my_id = self + .analytic_unit_service + .get_config_id(&self.analytic_unit_config); + + let patch_id = patch.get_type_id(); + + let same_type = my_id == patch_id; + + // TODO: need_learning and same_type logic overlaps, there is a way to optimise this + let need_learning = self.analytic_unit_config.patch_needs_learning(&patch); + + if same_type { + // TODO: check when learning should be started + let new_conf = patch.get_new_config(); + self.analytic_unit_config = new_conf.clone(); + self.analytic_unit_service + .update_config_by_id(&my_id, &new_conf) + .unwrap(); + if self.analytic_unit.is_some() { - tokio::spawn({ - let au = self.analytic_unit.clone(); - let cfg = self.analytic_unit_config.clone(); - async move { - au.unwrap().write().await.set_config(cfg); - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } + if need_learning { + self.consume_request(RequestType::RunLearning); + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); } } - }); + return; + } else { + tokio::spawn({ + let au = self.analytic_unit.clone(); + let cfg = self.analytic_unit_config.clone(); + async move { + au.unwrap().write().await.set_config(cfg); + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); + } + } + } + }); + } } else { + // TODO: check if we need this else match tx.send(()) { Ok(_) => {} Err(_e) => { @@ -309,19 +328,20 @@ impl AnalyticService { } } } - } - - - if same_type { - // TODO: avoid using `unwrap` - self.analytic_unit_service.update_active_config(&new_conf).unwrap(); } else { - // TODO: it's a hack, make it a better way - // TODO: avoid using unwrap - self.analytic_unit_service.resolve(&new_conf).unwrap(); - self.analytic_unit_service.update_active_config(&new_conf).unwrap(); + let new_conf = self + .analytic_unit_service + .get_config_by_id(&patch_id) + .unwrap(); + self.analytic_unit_config = new_conf.clone(); + self.consume_request(RequestType::RunLearning); + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); + } + } } - } pub async fn serve(&mut self) { @@ -351,7 +371,9 @@ impl AnalyticService { ) { let mut au = match aus.resolve(&aucfg) { Ok(a) => a, - Err(e) => { panic!("{}", e); } + Err(e) => { + panic!("{}", e); + } }; match tx diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index a33202d..98a3373 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -50,7 +50,10 @@ impl SARIMA { // TODO: trend detection if ts.len() < 2 { - return Err(anyhow::format_err!("too short timeserie to learn from, timeserie length: {}", ts.len())); + return Err(anyhow::format_err!( + "too short timeserie to learn from, timeserie length: {}", + ts.len() + )); } // TODO: ensure capacity with seasonality size let mut res_ts = Vec::<(u64, f64)>::new(); diff --git a/server/src/services/analytic_service/analytic_unit/mod.rs b/server/src/services/analytic_service/analytic_unit/mod.rs index 865bc99..ebb73c9 100644 --- a/server/src/services/analytic_service/analytic_unit/mod.rs +++ b/server/src/services/analytic_service/analytic_unit/mod.rs @@ -7,4 +7,3 @@ use self::{ anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig, }; - diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 2ae8221..0a043f4 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -57,28 +57,77 @@ impl Default for ThresholdConfig { #[derive(Debug, Serialize, Deserialize, Clone)] pub enum AnalyticUnitConfig { - Pattern(PatternConfig), Threshold(ThresholdConfig), + Pattern(PatternConfig), Anomaly(AnomalyConfig), } impl AnalyticUnitConfig { - // return true if need needs relearning and true if the config of the same type - pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool, bool) { + pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig { + let iid = id.as_str(); + match iid { + "1" => AnalyticUnitConfig::Threshold(Default::default()), + "2" => AnalyticUnitConfig::Pattern(Default::default()), + "3" => AnalyticUnitConfig::Anomaly(Default::default()), + _ => panic!("bad id for getting get_default_by_id"), + } + } + + pub fn patch_needs_learning(&self, patch: &PatchConfig) -> bool { + // TODO: maybe use type id's to optimise code + match patch { + PatchConfig::Pattern(tcfg) => match self.clone() { + AnalyticUnitConfig::Pattern(_) => { + return false; + } + _ => return true, + }, + + PatchConfig::Anomaly(tcfg) => match self.clone() { + AnalyticUnitConfig::Anomaly(scfg) => { + if tcfg.is_some() { + let t = tcfg.as_ref().unwrap(); + let mut need_learning = t.seasonality != scfg.seasonality; + need_learning |= t.seasonality_iterations != scfg.seasonality_iterations; + return need_learning; + } else { + return false; + } + } + _ => { + return true; + } + }, + + PatchConfig::Threshold(tcfg) => match self.clone() { + AnalyticUnitConfig::Threshold(_) => { + return false; + } + _ => { + return true; + } + }, + } + } + + // TODO: maybe this method depricated + // return true if need needs relearning + pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) { match patch { PatchConfig::Pattern(tcfg) => match self.clone() { AnalyticUnitConfig::Pattern(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false, true); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false); } else { - return (AnalyticUnitConfig::Pattern(Default::default()), false, true); + // TODO: it should be extraced from db + return (AnalyticUnitConfig::Pattern(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Pattern(Default::default()), true, false); + return (AnalyticUnitConfig::Pattern(Default::default()), true); } } }, @@ -89,16 +138,16 @@ impl AnalyticUnitConfig { let t = tcfg.as_ref().unwrap(); let mut need_learning = t.seasonality != scfg.seasonality; need_learning |= t.seasonality_iterations != scfg.seasonality_iterations; - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning, true); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), false, true); + return (AnalyticUnitConfig::Anomaly(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), true, false); + return (AnalyticUnitConfig::Anomaly(Default::default()), true); } } }, @@ -106,16 +155,16 @@ impl AnalyticUnitConfig { PatchConfig::Threshold(tcfg) => match self.clone() { AnalyticUnitConfig::Threshold(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false, true); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), false, true); + return (AnalyticUnitConfig::Threshold(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), true, false); + return (AnalyticUnitConfig::Threshold(Default::default()), true); } } }, @@ -154,3 +203,23 @@ pub enum PatchConfig { Threshold(Option), Anomaly(Option), } + +impl PatchConfig { + pub fn get_type_id(&self) -> String { + match &self { + PatchConfig::Threshold(_) => "1".to_string(), + PatchConfig::Pattern(_) => "2".to_string(), + PatchConfig::Anomaly(_) => "3".to_string(), + } + } + + pub fn get_new_config(&self) -> AnalyticUnitConfig { + match &self { + PatchConfig::Threshold(cfg) => { + AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone()) + } + PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()), + PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()), + } + } +} diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index 6ec3e8e..66ff8a4 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -1,16 +1,14 @@ -use chrono::{Utc, DateTime}; +use chrono::{DateTime, Utc}; -use tokio::sync::{mpsc}; +use tokio::sync::mpsc; use crate::services::metric_service::MetricService; use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use tokio::time::{sleep, Duration}; - const DETECTION_STEP: u64 = 10; - pub struct DetectionRunner { metric_service: MetricService, tx: mpsc::Sender, @@ -76,11 +74,16 @@ impl DetectionRunner { // TODO: run detection periodically // TODO: set info about detections to tx - - match tx.send(AnalyticServiceMessage::Response(Ok( - ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to) - ))).await { - Ok(_) => {}, + match tx + .send(AnalyticServiceMessage::Response(Ok( + ResponseType::DetectionRunnerUpdate( + au.as_ref().read().await.get_id(), + t_to, + ), + ))) + .await + { + Ok(_) => {} Err(_e) => println!("Fail to send detection runner started notification"), } diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index bd7de52..cdcd84b 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -1,14 +1,19 @@ -use std::sync::{Arc, Mutex}; -use serde_json::{Result, Value}; use serde::{Deserialize, Serialize}; +use serde_json::{Result, Value}; +use std::sync::{Arc, Mutex}; use rusqlite::{params, Connection}; -use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit}; +use super::analytic_service::analytic_unit::{ + anomaly_analytic_unit::AnomalyAnalyticUnit, + pattern_analytic_unit::PatternAnalyticUnit, + threshold_analytic_unit::ThresholdAnalyticUnit, + types::{self, AnalyticUnitConfig}, +}; #[derive(Clone)] pub struct AnalyticUnitService { - connection: Arc> + connection: Arc>, } impl AnalyticUnitService { @@ -35,40 +40,50 @@ impl AnalyticUnitService { } // TODO: optional id - pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box { + pub fn resolve_au( + &self, + cfg: &AnalyticUnitConfig, + ) -> Box { match cfg { - AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())), - AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())), - AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())), + AnalyticUnitConfig::Threshold(c) => { + Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())) + } + AnalyticUnitConfig::Pattern(c) => { + Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())) + } + AnalyticUnitConfig::Anomaly(c) => { + Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())) + } } } // TODO: get id of analytic_unit which be used also as it's type - pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result> { + pub fn resolve( + &self, + cfg: &AnalyticUnitConfig, + ) -> anyhow::Result> { let au = self.resolve_au(cfg); let id = au.as_ref().get_id(); let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT id from analytic_unit WHERE id = ?1", - )?; + let mut stmt = conn.prepare("SELECT id from analytic_unit WHERE id = ?1")?; let res = stmt.exists(params![id])?; if res == false { let cfg_json = serde_json::to_string(&cfg)?; conn.execute( - "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", - params![id, cfg_json] + "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", + params![id, cfg_json], )?; } conn.execute( "UPDATE analytic_unit set active = FALSE where active = TRUE", - params![] + params![], )?; conn.execute( "UPDATE analytic_unit set active = TRUE where id = ?1", - params![id] + params![id], )?; return Ok(au); @@ -78,7 +93,7 @@ impl AnalyticUnitService { let conn = self.connection.lock().unwrap(); conn.execute( "UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2", - params![last_detection, id] + params![last_detection, id], )?; Ok(()) } @@ -86,9 +101,8 @@ impl AnalyticUnitService { pub fn get_active(&self) -> anyhow::Result> { // TODO: return default when there is no active let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT id, type, config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = + conn.prepare("SELECT id, type, config from analytic_unit WHERE active = TRUE")?; let au = stmt.query_row([], |row| { let c: String = row.get(2)?; @@ -97,15 +111,12 @@ impl AnalyticUnitService { })??; return Ok(au); - } pub fn get_active_config(&self) -> anyhow::Result { let exists = { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; stmt.exists([])? }; @@ -115,9 +126,7 @@ impl AnalyticUnitService { return Ok(c); } else { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; let acfg = stmt.query_row([], |row| { let c: String = row.get(0)?; let cfg = serde_json::from_str(&c).unwrap(); @@ -127,6 +136,51 @@ impl AnalyticUnitService { } } + pub fn get_config_by_id(&self, id: &String) -> anyhow::Result { + let exists = { + let conn = self.connection.lock().unwrap(); + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?; + stmt.exists([id])? + }; + + if exists == false { + let c = AnalyticUnitConfig::get_default_by_id(id); + self.resolve(&c)?; + return Ok(c); + } else { + let conn = self.connection.lock().unwrap(); + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?; + let acfg = stmt.query_row([id], |row| { + let c: String = row.get(0)?; + let cfg = serde_json::from_str(&c).unwrap(); + Ok(cfg) + })?; + return Ok(acfg); + } + } + + pub fn get_config_id(&self, cfg: &AnalyticUnitConfig) -> String { + match cfg { + AnalyticUnitConfig::Threshold(_) => "1".to_string(), + AnalyticUnitConfig::Pattern(_) => "2".to_string(), + AnalyticUnitConfig::Anomaly(_) => "3".to_string(), + } + } + + pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { + // TODO: it's possble that config doesn't exist, but we trying to update it + let conn = self.connection.lock().unwrap(); + + let cfg_json = serde_json::to_string(&cfg)?; + + conn.execute( + "UPDATE analytic_unit SET config = ?1 WHERE id = ?2", + params![cfg_json, id], + )?; + + return Ok(()); + } + pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { let conn = self.connection.lock().unwrap(); @@ -134,7 +188,7 @@ impl AnalyticUnitService { conn.execute( "UPDATE analytic_unit SET config = ?1 WHERE active = TRUE", - params![cfg_json] + params![cfg_json], )?; return Ok(()); diff --git a/server/src/services/mod.rs b/server/src/services/mod.rs index ea17ea9..2f122bf 100644 --- a/server/src/services/mod.rs +++ b/server/src/services/mod.rs @@ -1,5 +1,5 @@ pub mod analytic_service; +pub mod analytic_unit_service; pub mod metric_service; pub mod segments_service; pub mod user_service; -pub mod analytic_unit_service;