diff --git a/server/src/config.rs b/server/src/config.rs index ed5c936..466d790 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -1,23 +1,21 @@ use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig}; - #[derive(Clone)] pub struct WebhookAlertingConfig { - endpoint: String + endpoint: String, } #[derive(Clone)] pub enum AlertingType { - Webhook(WebhookAlertingConfig) + Webhook(WebhookAlertingConfig), } #[derive(Clone)] pub struct AlertingConfig { - alerting_type: AlertingType, - interval: u64 // interval in seconds + alerting_type: AlertingType, + interval: u64, // interval in seconds } - #[derive(Clone)] pub struct Config { pub port: u16, @@ -25,16 +23,6 @@ pub struct Config { pub alerting: Option, } -// impl Clone for Config { -// fn clone(&self) -> Self { -// return Config { -// port: self.port, -// datasource_config: self.datasource_config.clone(), -// alerting: self.alerting.clone() -// }; -// } -// } - fn resolve_datasource(config: &mut config::Config) -> anyhow::Result { if config.get::("prometheus.url").is_ok() { return Ok(DatasourceConfig::Prometheus(PrometheusConfig { @@ -58,7 +46,6 @@ fn resolve_datasource(config: &mut config::Config) -> anyhow::Result anyhow::Result { - // TODO: parse alerting config // TODO: throw error on bad config diff --git a/server/src/main.rs b/server/src/main.rs index 6864f11..87e7d44 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> { let mut analytic_service = analytic_service::AnalyticService::new( metric_service.clone(), segments_service.clone(), - config.alerting + config.alerting, ); let api = api::API::new( diff --git a/server/src/services/analytic_service/analytic_client.rs b/server/src/services/analytic_service/analytic_client.rs index 48d60a1..b7ee885 100644 --- a/server/src/services/analytic_service/analytic_client.rs +++ b/server/src/services/analytic_service/analytic_client.rs @@ -7,10 +7,10 @@ use crate::services::segments_service::Segment; use super::analytic_unit::types::AnalyticUnitConfig; use super::analytic_unit::types::PatchConfig; use super::types::DetectionTask; -use super::types::HSR; use super::types::HSRTask; use super::types::LearningStatus; use super::types::LearningTrain; +use super::types::HSR; use super::types::{AnalyticServiceMessage, RequestType}; /// Client to be used multithreaded diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 3f05369..1b3e187 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig, PatternConfig}; -use super::types::{self, DetectionRunnerConfig, HSR, LearningTrain, LearningWaiter}; +use super::types::{self, DetectionRunnerConfig, LearningTrain, LearningWaiter, HSR}; use super::{ analytic_client::AnalyticClient, analytic_unit::pattern_analytic_unit::{self, LearningResults, PatternAnalyticUnit}, @@ -22,7 +22,6 @@ use anyhow; use tokio::sync::{mpsc, oneshot, RwLock}; - // TODO: now it's basically single analytic unit, service will operate on many AU pub struct AnalyticService { metric_service: MetricService, @@ -41,8 +40,7 @@ pub struct AnalyticService { // awaiters learning_waiters: Vec, - - detection_runner: Option + detection_runner: Option, } impl AnalyticService { @@ -65,14 +63,13 @@ impl AnalyticService { tx, rx, - // handlers learning_handler: None, // awaiters learning_waiters: Vec::new(), - detection_runner: None + detection_runner: None, } } diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index 4cbba0d..dc4ba3b 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -1,4 +1,8 @@ -use crate::services::{analytic_service::types::{self, HSR}, metric_service::MetricService, segments_service::SegmentsService}; +use crate::services::{ + analytic_service::types::{self, HSR}, + metric_service::MetricService, + segments_service::SegmentsService, +}; use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult}; @@ -14,9 +18,9 @@ fn get_value_with_offset(ts: &Vec<(u64, f64)>, index: usize, offset: u64) -> any return Err(anyhow::format_err!("index should be > 0")); } return Ok(0.0); - // let step = - // let index_candidate = - // let intex_candidate = + // let step = + // let index_candidate = + // let intex_candidate = } pub struct AnomalyAnalyticUnit { @@ -41,12 +45,23 @@ impl AnomalyAnalyticUnit { } let mut sts = Vec::new(); - sts.push((ts[0].0, ts[0].1, ((ts[0].1 + self.config.confidence, ts[0].1 - self.config.confidence)))); + sts.push(( + ts[0].0, + ts[0].1, + (( + ts[0].1 + self.config.confidence, + ts[0].1 - self.config.confidence, + )), + )); for t in 1..ts.len() { let alpha = self.config.alpha; let stv = alpha * ts[t].1 + (1.0 - alpha) * sts[t - 1].1; - sts.push((ts[t].0, stv, (stv + self.config.confidence, stv - self.config.confidence))); + sts.push(( + ts[t].0, + stv, + (stv + self.config.confidence, stv - self.config.confidence), + )); } Ok(HSR::ConfidenceTimeSerie(sts)) @@ -71,7 +86,10 @@ impl AnalyticUnit for AnomalyAnalyticUnit { from: u64, to: u64, ) -> anyhow::Result> { - let mr = ms.query(from - self.config.seasonality * 5, to, DETECTION_STEP).await.unwrap(); + let mr = ms + .query(from - self.config.seasonality * 5, to, DETECTION_STEP) + .await + .unwrap(); if mr.data.keys().len() == 0 { return Ok(Vec::new()); @@ -89,12 +107,11 @@ impl AnalyticUnit for AnomalyAnalyticUnit { let confidence_time_serie = self.get_hsr_from_metric_result(&mr)?; if let HSR::ConfidenceTimeSerie(hsr) = confidence_time_serie { - let mut from = None; for ((t, _, (u, l)), (t1, rv)) in hsr.iter().zip(ts.iter()) { if *t != *t1 { - return Err(anyhow::format_err!("incompatible hsr/ts")) + return Err(anyhow::format_err!("incompatible hsr/ts")); } if rv > u || rv < l { if from.is_none() { @@ -113,23 +130,14 @@ impl AnalyticUnit for AnomalyAnalyticUnit { } return Ok(result); - } else { return Err(anyhow::format_err!("bad hsr")); } - } // TODO: use hsr for learning and detections - async fn get_hsr( - &self, - ms: MetricService, - from: u64, - to: u64, - ) -> anyhow::Result { + async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result { let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); return self.get_hsr_from_metric_result(&mr); } - - } diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index c8ecbe1..550898e 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -3,19 +3,21 @@ use std::{collections::VecDeque, fmt, sync::Arc}; use futures::future; use parking_lot::Mutex; - use gbdt::config::Config; use gbdt::decision_tree::{Data, DataVec, PredVec}; use gbdt::gradient_boost::GBDT; - -use crate::services::{analytic_service::types::{self, HSR, LearningTrain}, metric_service::MetricService, segments_service::{Segment, SegmentType, SegmentsService}}; +use crate::services::{ + analytic_service::types::{self, LearningTrain, HSR}, + metric_service::MetricService, + segments_service::{Segment, SegmentType, SegmentsService}, +}; use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, PatternConfig}; use async_trait::async_trait; -use rustfft::{self, FftPlanner, num_complex::Complex}; +use rustfft::{self, num_complex::Complex, FftPlanner}; // TODO: move to config const DETECTION_STEP: u64 = 10; @@ -119,9 +121,14 @@ fn get_features(xs: &Vec) -> Features { let mut planner = FftPlanner::::new(); - let fft = planner.plan_fft_forward(FFT_LEN); - let mut c_buffer = vec![Complex{ re: 0.0f64, im: 0.0f64 }; FFT_LEN]; + let mut c_buffer = vec![ + Complex { + re: 0.0f64, + im: 0.0f64 + }; + FFT_LEN + ]; let p = 1.0 / FFT_LEN as f64; for i in 0..FFT_LEN.min(xs.len()) { @@ -137,24 +144,42 @@ fn get_features(xs: &Vec) -> Features { } return vec![ - min, max, - mean, sd, - c_buffer[0].re, c_buffer[0].im, - c_buffer[1].re, c_buffer[1].im, - c_buffer[2].re, c_buffer[2].im, - c_buffer[3].re, c_buffer[3].im, - c_buffer[4].re, c_buffer[4].im, - c_buffer[5].re, c_buffer[5].im, - c_buffer[6].re, c_buffer[6].im, - c_buffer[7].re, c_buffer[7].im, - c_buffer[8].re, c_buffer[8].im, - c_buffer[9].re, c_buffer[9].im, - c_buffer[10].re, c_buffer[10].im, - c_buffer[11].re, c_buffer[11].im, - c_buffer[12].re, c_buffer[12].im, - c_buffer[13].re, c_buffer[13].im, - c_buffer[14].re, c_buffer[14].im, - c_buffer[15].re, c_buffer[15].im, + min, + max, + mean, + sd, + c_buffer[0].re, + c_buffer[0].im, + c_buffer[1].re, + c_buffer[1].im, + c_buffer[2].re, + c_buffer[2].im, + c_buffer[3].re, + c_buffer[3].im, + c_buffer[4].re, + c_buffer[4].im, + c_buffer[5].re, + c_buffer[5].im, + c_buffer[6].re, + c_buffer[6].im, + c_buffer[7].re, + c_buffer[7].im, + c_buffer[8].re, + c_buffer[8].im, + c_buffer[9].re, + c_buffer[9].im, + c_buffer[10].re, + c_buffer[10].im, + c_buffer[11].re, + c_buffer[11].im, + c_buffer[12].re, + c_buffer[12].im, + c_buffer[13].re, + c_buffer[13].im, + c_buffer[14].re, + c_buffer[14].im, + c_buffer[15].re, + c_buffer[15].im, // 0f64,0f64, // 0f64,0f64,0f64, 0f64 ]; @@ -240,14 +265,13 @@ impl AnalyticUnit for PatternAnalyticUnit { } async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult { - // TODO: move to config let mut cfg = Config::new(); cfg.set_feature_size(FEATURES_SIZE); cfg.set_max_depth(3); cfg.set_iterations(50); cfg.set_shrinkage(0.1); - cfg.set_loss("LogLikelyhood"); + cfg.set_loss("LogLikelyhood"); cfg.set_debug(false); cfg.set_data_sample_ratio(1.0); cfg.set_feature_sample_ratio(1.0); @@ -326,13 +350,12 @@ impl AnalyticUnit for PatternAnalyticUnit { records_raw[i].iter().map(|e| *e as f32).collect(), 1.0, if targets_raw[i] { 1.0 } else { -1.0 }, - Some(0.5) + Some(0.5), ); // println!("{:?}", targets_raw[i]); train_dv.push(data); } - let mut model = GBDT::new(&cfg); model.fit(&mut train_dv); @@ -458,12 +481,7 @@ impl AnalyticUnit for PatternAnalyticUnit { } // TODO: use hsr for learning and detections - async fn get_hsr( - &self, - ms: MetricService, - from: u64, - to: u64, - ) -> anyhow::Result { + async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result { let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); if mr.data.keys().len() == 0 { diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index 53fe45c..97923db 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -1,4 +1,8 @@ -use crate::services::{analytic_service::types::{self, HSR}, metric_service::MetricService, segments_service::SegmentsService}; +use crate::services::{ + analytic_service::types::{self, HSR}, + metric_service::MetricService, + segments_service::SegmentsService, +}; use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, ThresholdConfig}; @@ -74,12 +78,7 @@ impl AnalyticUnit for ThresholdAnalyticUnit { } // TODO: use hsr for learning and detections - async fn get_hsr( - &self, - ms: MetricService, - from: u64, - to: u64, - ) -> anyhow::Result { + async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result { let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); if mr.data.keys().len() == 0 { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 6a3f9c6..9d1635c 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -2,7 +2,9 @@ use serde::{Deserialize, Serialize}; use async_trait::async_trait; -use crate::services::{analytic_service::types::HSR, metric_service::MetricService, segments_service::SegmentsService}; +use crate::services::{ + analytic_service::types::HSR, metric_service::MetricService, segments_service::SegmentsService, +}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct PatternConfig { @@ -27,7 +29,7 @@ impl Default for PatternConfig { pub struct AnomalyConfig { pub alpha: f64, pub confidence: f64, - pub seasonality: u64 // step in seconds, can be zero + pub seasonality: u64, // step in seconds, can be zero } impl Default for AnomalyConfig { @@ -35,7 +37,7 @@ impl Default for AnomalyConfig { AnomalyConfig { alpha: 0.5, confidence: 10.0, - seasonality: 60 * 60 + seasonality: 60 * 60, } } } @@ -133,12 +135,7 @@ pub trait AnalyticUnit { ) -> anyhow::Result>; fn set_config(&mut self, c: AnalyticUnitConfig); - async fn get_hsr( - &self, - ms: MetricService, - from: u64, - to: u64, - ) -> anyhow::Result; + async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result; } #[derive(Deserialize, Serialize, Debug)] diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index 9fd8956..fe9cbca 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -1,4 +1,4 @@ -use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit}; +use crate::services::analytic_service::analytic_unit::types::AnalyticUnit; use std::sync::Arc; @@ -8,22 +8,25 @@ use chrono::Utc; use tokio::sync::{mpsc, RwLock}; - - - struct DetectionRunner { config: Config, analytic_unit: Arc>>, } impl DetectionRunner { - pub fn new(config: Config, analytic_unit: Arc>>) -> DetectionRunner { - DetectionRunner { config, analytic_unit } + pub fn new( + config: Config, + analytic_unit: Arc>>, + ) -> DetectionRunner { + DetectionRunner { + config, + analytic_unit, + } } pub async fn run() { - // TODO: await detection step - // TODO: get last detection timestamp from persistance - // TODO: set lst detection from "now" + // TODO: await detection step + // TODO: get last detection timestamp from persistance + // TODO: set lst detection from "now" } } diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 8a00d03..cc0ccaa 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -61,13 +61,12 @@ pub struct DetectionTask { pub to: u64, } - // HSR Stands for Hastic Signal Representation, // varies for different analytic units #[derive(Debug, Serialize)] pub enum HSR { TimeSerie(Vec<(u64, f64)>), - ConfidenceTimeSerie(Vec<(u64, f64, (f64, f64))>) + ConfidenceTimeSerie(Vec<(u64, f64, (f64, f64))>), } #[derive(Debug)] @@ -84,7 +83,6 @@ pub enum LearningWaiter { HSR(HSRTask), } - // TODO: review if it's needed #[derive(Debug)] pub struct DetectionRunnerConfig { diff --git a/server/src/services/metric_service.rs b/server/src/services/metric_service.rs index 3f6c076..7a1a476 100644 --- a/server/src/services/metric_service.rs +++ b/server/src/services/metric_service.rs @@ -23,11 +23,11 @@ impl MetricService { // let keys: Vec<_> = mr.data.keys().into_iter().collect(); if mr.data.keys().len() > 0 { - // TODO: it's a hack, should replace all metrics + // TODO: it's a hack, should replace all metrics let key = mr.data.keys().nth(0).unwrap().clone(); let ts = mr.data.get_mut(&key).unwrap(); *ts = subbeat::utils::interpolate_nans_and_gaps_with_zeros(&ts, from, to, step); - // mr.data.insert(*k, ts_interpolated); + // mr.data.insert(*k, ts_interpolated); } return Ok(mr); }