diff --git a/server/src/main.rs b/server/src/main.rs index 03b5b47..a9ed0cd 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,6 @@ mod api; -use hastic::services::{analytic_service, metric_service, segments_service, analytic_unit_service}; +use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service}; use anyhow; diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index ea3df6f..94fc5e6 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -59,7 +59,6 @@ impl AnalyticService { segments_service: segments_service::SegmentsService, alerting: Option, ) -> AnalyticService { - // TODO: move buffer size to config let (tx, rx) = mpsc::channel::(32); @@ -242,7 +241,9 @@ impl AnalyticService { println!("Detection runner started from {}", from) } ResponseType::DetectionRunnerUpdate(id, timestamp) => { - self.analytic_unit_service.set_last_detection(id, timestamp).unwrap(); + self.analytic_unit_service + .set_last_detection(id, timestamp) + .unwrap(); } ResponseType::LearningStarted => { self.analytic_unit_learning_status = LearningStatus::Learning @@ -274,14 +275,14 @@ impl AnalyticService { } fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { + let my_id = self + .analytic_unit_service + .get_config_id(&self.analytic_unit_config); - let my_id = self.analytic_unit_service.get_config_id(&self.analytic_unit_config); - let patch_id = patch.get_type_id(); let same_type = my_id == patch_id; - // TODO: need_learning and same_type logic overlaps, there is a way to optimise this let need_learning = self.analytic_unit_config.patch_needs_learning(&patch); @@ -289,7 +290,9 @@ impl AnalyticService { // TODO: check when learning should be started let new_conf = patch.get_new_config(); self.analytic_unit_config = new_conf.clone(); - self.analytic_unit_service.update_config_by_id(&my_id, &new_conf).unwrap(); + self.analytic_unit_service + .update_config_by_id(&my_id, &new_conf) + .unwrap(); if self.analytic_unit.is_some() { if need_learning { @@ -326,7 +329,10 @@ impl AnalyticService { } } } else { - let new_conf = self.analytic_unit_service.get_config_by_id(&patch_id).unwrap(); + let new_conf = self + .analytic_unit_service + .get_config_by_id(&patch_id) + .unwrap(); self.analytic_unit_config = new_conf.clone(); self.consume_request(RequestType::RunLearning); match tx.send(()) { @@ -365,7 +371,9 @@ impl AnalyticService { ) { let mut au = match aus.resolve(&aucfg) { Ok(a) => a, - Err(e) => { panic!("{}", e); } + Err(e) => { + panic!("{}", e); + } }; match tx diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index a33202d..98a3373 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -50,7 +50,10 @@ impl SARIMA { // TODO: trend detection if ts.len() < 2 { - return Err(anyhow::format_err!("too short timeserie to learn from, timeserie length: {}", ts.len())); + return Err(anyhow::format_err!( + "too short timeserie to learn from, timeserie length: {}", + ts.len() + )); } // TODO: ensure capacity with seasonality size let mut res_ts = Vec::<(u64, f64)>::new(); diff --git a/server/src/services/analytic_service/analytic_unit/mod.rs b/server/src/services/analytic_service/analytic_unit/mod.rs index 865bc99..ebb73c9 100644 --- a/server/src/services/analytic_service/analytic_unit/mod.rs +++ b/server/src/services/analytic_service/analytic_unit/mod.rs @@ -7,4 +7,3 @@ use self::{ anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig, }; - diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index f42d40a..0a043f4 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -63,14 +63,13 @@ pub enum AnalyticUnitConfig { } impl AnalyticUnitConfig { - pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig { let iid = id.as_str(); match iid { "1" => AnalyticUnitConfig::Threshold(Default::default()), "2" => AnalyticUnitConfig::Pattern(Default::default()), "3" => AnalyticUnitConfig::Anomaly(Default::default()), - _ => panic!("bad id for getting get_default_by_id") + _ => panic!("bad id for getting get_default_by_id"), } } @@ -81,9 +80,7 @@ impl AnalyticUnitConfig { AnalyticUnitConfig::Pattern(_) => { return false; } - _ => { - return true - } + _ => return true, }, PatchConfig::Anomaly(tcfg) => match self.clone() { @@ -212,15 +209,17 @@ impl PatchConfig { match &self { PatchConfig::Threshold(_) => "1".to_string(), PatchConfig::Pattern(_) => "2".to_string(), - PatchConfig::Anomaly(_) => "3".to_string() + PatchConfig::Anomaly(_) => "3".to_string(), } } pub fn get_new_config(&self) -> AnalyticUnitConfig { match &self { - PatchConfig::Threshold(cfg) => AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone()), + PatchConfig::Threshold(cfg) => { + AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone()) + } PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()), - PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()) + PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()), } - } -} \ No newline at end of file + } +} diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index 6ec3e8e..66ff8a4 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -1,16 +1,14 @@ -use chrono::{Utc, DateTime}; +use chrono::{DateTime, Utc}; -use tokio::sync::{mpsc}; +use tokio::sync::mpsc; use crate::services::metric_service::MetricService; use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use tokio::time::{sleep, Duration}; - const DETECTION_STEP: u64 = 10; - pub struct DetectionRunner { metric_service: MetricService, tx: mpsc::Sender, @@ -76,11 +74,16 @@ impl DetectionRunner { // TODO: run detection periodically // TODO: set info about detections to tx - - match tx.send(AnalyticServiceMessage::Response(Ok( - ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to) - ))).await { - Ok(_) => {}, + match tx + .send(AnalyticServiceMessage::Response(Ok( + ResponseType::DetectionRunnerUpdate( + au.as_ref().read().await.get_id(), + t_to, + ), + ))) + .await + { + Ok(_) => {} Err(_e) => println!("Fail to send detection runner started notification"), } diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index f769e46..cdcd84b 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -1,14 +1,19 @@ -use std::sync::{Arc, Mutex}; -use serde_json::{Result, Value}; use serde::{Deserialize, Serialize}; +use serde_json::{Result, Value}; +use std::sync::{Arc, Mutex}; use rusqlite::{params, Connection}; -use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit}; +use super::analytic_service::analytic_unit::{ + anomaly_analytic_unit::AnomalyAnalyticUnit, + pattern_analytic_unit::PatternAnalyticUnit, + threshold_analytic_unit::ThresholdAnalyticUnit, + types::{self, AnalyticUnitConfig}, +}; #[derive(Clone)] pub struct AnalyticUnitService { - connection: Arc> + connection: Arc>, } impl AnalyticUnitService { @@ -35,40 +40,50 @@ impl AnalyticUnitService { } // TODO: optional id - pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box { + pub fn resolve_au( + &self, + cfg: &AnalyticUnitConfig, + ) -> Box { match cfg { - AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())), - AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())), - AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())), + AnalyticUnitConfig::Threshold(c) => { + Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())) + } + AnalyticUnitConfig::Pattern(c) => { + Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())) + } + AnalyticUnitConfig::Anomaly(c) => { + Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())) + } } } // TODO: get id of analytic_unit which be used also as it's type - pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result> { + pub fn resolve( + &self, + cfg: &AnalyticUnitConfig, + ) -> anyhow::Result> { let au = self.resolve_au(cfg); let id = au.as_ref().get_id(); let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT id from analytic_unit WHERE id = ?1", - )?; + let mut stmt = conn.prepare("SELECT id from analytic_unit WHERE id = ?1")?; let res = stmt.exists(params![id])?; if res == false { let cfg_json = serde_json::to_string(&cfg)?; conn.execute( - "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", - params![id, cfg_json] + "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", + params![id, cfg_json], )?; } conn.execute( "UPDATE analytic_unit set active = FALSE where active = TRUE", - params![] + params![], )?; conn.execute( "UPDATE analytic_unit set active = TRUE where id = ?1", - params![id] + params![id], )?; return Ok(au); @@ -78,7 +93,7 @@ impl AnalyticUnitService { let conn = self.connection.lock().unwrap(); conn.execute( "UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2", - params![last_detection, id] + params![last_detection, id], )?; Ok(()) } @@ -86,9 +101,8 @@ impl AnalyticUnitService { pub fn get_active(&self) -> anyhow::Result> { // TODO: return default when there is no active let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT id, type, config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = + conn.prepare("SELECT id, type, config from analytic_unit WHERE active = TRUE")?; let au = stmt.query_row([], |row| { let c: String = row.get(2)?; @@ -97,15 +111,12 @@ impl AnalyticUnitService { })??; return Ok(au); - } pub fn get_active_config(&self) -> anyhow::Result { let exists = { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; stmt.exists([])? }; @@ -115,9 +126,7 @@ impl AnalyticUnitService { return Ok(c); } else { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; let acfg = stmt.query_row([], |row| { let c: String = row.get(0)?; let cfg = serde_json::from_str(&c).unwrap(); @@ -130,9 +139,7 @@ impl AnalyticUnitService { pub fn get_config_by_id(&self, id: &String) -> anyhow::Result { let exists = { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE id = ?1" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?; stmt.exists([id])? }; @@ -142,9 +149,7 @@ impl AnalyticUnitService { return Ok(c); } else { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE id = ?1" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?; let acfg = stmt.query_row([id], |row| { let c: String = row.get(0)?; let cfg = serde_json::from_str(&c).unwrap(); @@ -163,7 +168,6 @@ impl AnalyticUnitService { } pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { - // TODO: it's possble that config doesn't exist, but we trying to update it let conn = self.connection.lock().unwrap(); @@ -171,7 +175,7 @@ impl AnalyticUnitService { conn.execute( "UPDATE analytic_unit SET config = ?1 WHERE id = ?2", - params![cfg_json, id] + params![cfg_json, id], )?; return Ok(()); @@ -184,7 +188,7 @@ impl AnalyticUnitService { conn.execute( "UPDATE analytic_unit SET config = ?1 WHERE active = TRUE", - params![cfg_json] + params![cfg_json], )?; return Ok(()); diff --git a/server/src/services/mod.rs b/server/src/services/mod.rs index ea17ea9..2f122bf 100644 --- a/server/src/services/mod.rs +++ b/server/src/services/mod.rs @@ -1,5 +1,5 @@ pub mod analytic_service; +pub mod analytic_unit_service; pub mod metric_service; pub mod segments_service; pub mod user_service; -pub mod analytic_unit_service;