diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 032e558..9fc506e 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -2,7 +2,9 @@ use std::sync::Arc; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig}; use super::detection_runner::DetectionRunner; -use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR, DetectionRunnerTask}; +use super::types::{ + self, AnalyticUnitRF, DetectionRunnerConfig, DetectionRunnerTask, LearningWaiter, HSR, +}; use super::{ analytic_client::AnalyticClient, types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType}, @@ -20,7 +22,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear use anyhow; -use chrono::{TimeZone, DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use tokio::sync::{mpsc, oneshot}; // TODO: now it's basically single analytic unit, service will operate on many AU @@ -112,25 +114,25 @@ impl AnalyticService { } } - // TODO: make + // TODO: make fn run_detection_runner(&mut self, from: u64) { // TODO: handle case or make it impossible to run_detection_runner second time if self.analytic_unit_learning_status != LearningStatus::Ready { - let task = DetectionRunnerTask { - from - }; - self.learning_waiters.push(LearningWaiter::DetectionRunner(task)); + let task = DetectionRunnerTask { from }; + self.learning_waiters + .push(LearningWaiter::DetectionRunner(task)); return; } let AlertingType::Webhook(acfg) = self.alerting.as_ref().unwrap().alerting_type.clone(); let drcfg = DetectionRunnerConfig { endpoint: acfg.endpoint.clone(), - interval: self.alerting.as_ref().unwrap().interval + interval: self.alerting.as_ref().unwrap().interval, }; - - let dr = DetectionRunner::new(drcfg, self.analytic_unit.as_ref().unwrap().clone()); + let tx = self.tx.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); + let dr = DetectionRunner::new(tx, drcfg, au); self.detection_runner = Some(dr); self.detection_runner.as_mut().unwrap().run(from); @@ -250,7 +252,7 @@ impl AnalyticService { self.analytic_unit_learning_status = LearningStatus::Initialization; } } - }, + } // TODO: create custom DatasourceError error type Err(_) => { self.analytic_unit = None; @@ -324,9 +326,9 @@ impl AnalyticService { let mut au = resolve(aucfg); match tx - .send(AnalyticServiceMessage::Response( - Ok(ResponseType::LearningStarted), - )) + .send(AnalyticServiceMessage::Response(Ok( + ResponseType::LearningStarted, + ))) .await { Ok(_) => {} @@ -335,13 +337,11 @@ impl AnalyticService { // TODO: maybe to spawn_blocking here let lr = match au.learn(ms, ss).await { - Ok(res) => { - match res { - LearningResult::Finished => Ok(ResponseType::LearningFinished(au)), - LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty) - } - } - Err(e) => Err(e) + Ok(res) => match res { + LearningResult::Finished => Ok(ResponseType::LearningFinished(au)), + LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty), + }, + Err(e) => Err(e), }; match tx.send(AnalyticServiceMessage::Response(lr)).await { diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index b02c6b2..0ce7c87 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -1,4 +1,8 @@ -use crate::services::{analytic_service::types::{AnomalyHSRConfig, HSR}, metric_service::MetricService, segments_service::SegmentsService}; +use crate::services::{ + analytic_service::types::{AnomalyHSRConfig, HSR}, + metric_service::MetricService, + segments_service::SegmentsService, +}; use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult}; @@ -7,14 +11,13 @@ use subbeat::metric::MetricResult; use chrono::prelude::*; - // TODO: move to config const DETECTION_STEP: u64 = 10; // timerange offset in seconds backwards from end of ts in assumption that ts has no gaps -fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)>{ +fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)> { // TODO: remove dependency to DETECTION_STEP - + let indexes_offset = (offset / DETECTION_STEP) as usize; let n = ts.len() - 1; if n < indexes_offset { @@ -24,12 +27,11 @@ fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64) return Some(ts[i]); } - struct SARIMA { pub ts: Vec<(u64, f64)>, pub seasonality: u64, pub confidence: f64, - pub seasonality_iterations: u64 + pub seasonality_iterations: u64, } impl SARIMA { @@ -38,12 +40,11 @@ impl SARIMA { ts: Vec::new(), seasonality, confidence, - seasonality_iterations + seasonality_iterations, }; } pub fn learn(&mut self, ts: &Vec<(u64, f64)>) -> anyhow::Result<()> { - // TODO: don't count NaNs in model // TODO: add exponental smooting to model // TODO: trend detection @@ -59,7 +60,10 @@ impl SARIMA { let iter_steps = (self.seasonality / DETECTION_STEP) as usize; if to - from != self.seasonality_iterations * self.seasonality { - return Err(anyhow::format_err!("timeserie to learn from should be {} * sasonality", self.seasonality_iterations)); + return Err(anyhow::format_err!( + "timeserie to learn from should be {} * sasonality", + self.seasonality_iterations + )); } for k in 0..iter_steps { @@ -91,7 +95,6 @@ impl SARIMA { let len_from = timestamp - from; // TODO: take avg if timestamp in between let index_diff = (len_from / DETECTION_STEP) % self.ts.len() as u64; - let p = self.ts[index_diff as usize].1; return (p, (p + self.confidence, p - self.confidence)); @@ -100,10 +103,8 @@ impl SARIMA { pub fn push_point() { // TODO: inmplement } - } - pub struct AnomalyAnalyticUnit { config: AnomalyConfig, sarima: Option, @@ -126,7 +127,7 @@ impl AnomalyAnalyticUnit { return Ok(HSR::AnomalyHSR(AnomalyHSRConfig { seasonality: self.config.seasonality, timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0, - ts: Vec::new() + ts: Vec::new(), })); } @@ -137,7 +138,7 @@ impl AnomalyAnalyticUnit { return Ok(HSR::AnomalyHSR(AnomalyHSRConfig { seasonality: self.config.seasonality, timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0, - ts: Vec::new() + ts: Vec::new(), })); } @@ -145,13 +146,13 @@ impl AnomalyAnalyticUnit { let sarima = self.sarima.as_ref().unwrap(); for vt in ts { let x = sarima.predict(vt.0); - sts.push((vt.0, x.0, (x.1.0, x.1.1))); + sts.push((vt.0, x.0, (x.1 .0, x.1 .1))); } return Ok(HSR::AnomalyHSR(AnomalyHSRConfig { seasonality: self.config.seasonality, timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0, - ts: sts + ts: sts, })); } } @@ -168,8 +169,16 @@ impl AnalyticUnit for AnomalyAnalyticUnit { panic!("Bad config!"); } } - async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> anyhow::Result { - let mut sarima = SARIMA::new(self.config.seasonality, self.config.confidence, self.config.seasonality_iterations); + async fn learn( + &mut self, + ms: MetricService, + _ss: SegmentsService, + ) -> anyhow::Result { + let mut sarima = SARIMA::new( + self.config.seasonality, + self.config.confidence, + self.config.seasonality_iterations, + ); let utc: DateTime = Utc::now(); let to = utc.timestamp() as u64; diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index f8f153f..e403751 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -222,7 +222,11 @@ impl AnalyticUnit for PatternAnalyticUnit { } } - async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result { + async fn learn( + &mut self, + ms: MetricService, + ss: SegmentsService, + ) -> anyhow::Result { // TODO: move to config let mut cfg = Config::new(); cfg.set_feature_size(FEATURES_SIZE); @@ -255,7 +259,9 @@ impl AnalyticUnit for PatternAnalyticUnit { for r in rs { if r.is_err() { // TODO: custom DatasourceError error type - return Err(anyhow::format_err!("Error extracting metrics from datasource")); + return Err(anyhow::format_err!( + "Error extracting metrics from datasource" + )); } let sd = r?; diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index e3745a4..6d2d461 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -21,7 +21,11 @@ impl ThresholdAnalyticUnit { #[async_trait] impl AnalyticUnit for ThresholdAnalyticUnit { - async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> anyhow::Result { + async fn learn( + &mut self, + _ms: MetricService, + _ss: SegmentsService, + ) -> anyhow::Result { return Ok(LearningResult::Finished); } diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index cf89d3c..10fa704 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -30,7 +30,7 @@ pub struct AnomalyConfig { pub alpha: f64, pub confidence: f64, pub seasonality: u64, // step in seconds, can be zero - pub seasonality_iterations: u64 + pub seasonality_iterations: u64, } impl Default for AnomalyConfig { @@ -39,7 +39,7 @@ impl Default for AnomalyConfig { alpha: 0.5, confidence: 10.0, seasonality: 60 * 60, - seasonality_iterations: 3 + seasonality_iterations: 3, } } } @@ -125,12 +125,16 @@ impl AnalyticUnitConfig { pub enum LearningResult { Finished, - FinishedEmpty + FinishedEmpty, } #[async_trait] pub trait AnalyticUnit { - async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result; + async fn learn( + &mut self, + ms: MetricService, + ss: SegmentsService, + ) -> anyhow::Result; async fn detect( &self, ms: MetricService, diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index c77e4ce..a2ebee8 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -6,18 +6,24 @@ use chrono::Utc; use tokio::sync::{mpsc, RwLock}; -use super::types::{AnalyticUnitRF, DetectionRunnerConfig}; +use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig}; use tokio::time::{sleep, Duration}; pub struct DetectionRunner { + tx: mpsc::Sender, config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF, running_handler: Option>, } impl DetectionRunner { - pub fn new(config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF) -> DetectionRunner { + pub fn new( + tx: mpsc::Sender, + config: DetectionRunnerConfig, + analytic_unit: AnalyticUnitRF, + ) -> DetectionRunner { DetectionRunner { + tx, config, analytic_unit, running_handler: None, @@ -34,14 +40,14 @@ impl DetectionRunner { // TODO: clone channel let cfg = self.config.clone(); async move { - // AnalyticService::run_learning(tx, cfg, ms, ss).await; - // TODO: run detection "from" + // TODO: run detection "from" for big timespan + // TODO: parse detections to webhooks // TODO: define window for detection // TODO: save last detection + // TODO: handle case when detection is in the end and continues after "now" println!("detection runner started from {}", from); loop { - // TODO: run detection periodically sleep(Duration::from_secs(cfg.interval)).await; } @@ -49,10 +55,10 @@ impl DetectionRunner { })); } - pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF, - ) { - self.analytic_unit = analytic_unit; - // TODO: stop running_handler - // TODO: rerun detection with new anomaly units - } + // pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF, + // ) { + // self.analytic_unit = analytic_unit; + // // TODO: stop running_handler + // // TODO: rerun detection with new anomaly units + // } } diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 767c7b1..e5bcc7d 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -45,7 +45,7 @@ impl Default for LearningTrain { pub enum ResponseType { LearningStarted, LearningFinished(Box), - LearningFinishedEmpty + LearningFinishedEmpty, } impl fmt::Debug for ResponseType { @@ -67,12 +67,11 @@ pub struct DetectionRunnerTask { pub from: u64, } - #[derive(Debug, Serialize)] pub struct AnomalyHSRConfig { pub timestamp: u64, pub seasonality: u64, - pub ts: Vec<(u64, f64, (f64, f64))> + pub ts: Vec<(u64, f64, (f64, f64))>, } // HSR Stands for Hastic Signal Representation, // varies for different analytic units @@ -103,7 +102,7 @@ pub struct DetectionRunnerConfig { // pub sender: mpsc::Sender>>, pub endpoint: String, // pub from: u64, - pub interval: u64 + pub interval: u64, } #[derive(Debug)]