Browse Source

format code

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
4ecf6f80cf
  1. 21
      server/src/config.rs
  2. 2
      server/src/main.rs
  3. 2
      server/src/services/analytic_service/analytic_client.rs
  4. 9
      server/src/services/analytic_service/analytic_service.rs
  5. 46
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  6. 86
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  7. 13
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  8. 15
      server/src/services/analytic_service/analytic_unit/types.rs
  9. 21
      server/src/services/analytic_service/detection_runner.rs
  10. 4
      server/src/services/analytic_service/types.rs
  11. 4
      server/src/services/metric_service.rs

21
server/src/config.rs

@ -1,23 +1,21 @@
use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig}; use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig};
#[derive(Clone)] #[derive(Clone)]
pub struct WebhookAlertingConfig { pub struct WebhookAlertingConfig {
endpoint: String endpoint: String,
} }
#[derive(Clone)] #[derive(Clone)]
pub enum AlertingType { pub enum AlertingType {
Webhook(WebhookAlertingConfig) Webhook(WebhookAlertingConfig),
} }
#[derive(Clone)] #[derive(Clone)]
pub struct AlertingConfig { pub struct AlertingConfig {
alerting_type: AlertingType, alerting_type: AlertingType,
interval: u64 // interval in seconds interval: u64, // interval in seconds
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Config { pub struct Config {
pub port: u16, pub port: u16,
@ -25,16 +23,6 @@ pub struct Config {
pub alerting: Option<AlertingConfig>, pub alerting: Option<AlertingConfig>,
} }
// impl Clone for Config {
// fn clone(&self) -> Self {
// return Config {
// port: self.port,
// datasource_config: self.datasource_config.clone(),
// alerting: self.alerting.clone()
// };
// }
// }
fn resolve_datasource(config: &mut config::Config) -> anyhow::Result<DatasourceConfig> { fn resolve_datasource(config: &mut config::Config) -> anyhow::Result<DatasourceConfig> {
if config.get::<String>("prometheus.url").is_ok() { if config.get::<String>("prometheus.url").is_ok() {
return Ok(DatasourceConfig::Prometheus(PrometheusConfig { return Ok(DatasourceConfig::Prometheus(PrometheusConfig {
@ -58,7 +46,6 @@ fn resolve_datasource(config: &mut config::Config) -> anyhow::Result<DatasourceC
// TODO: use actual config and env variables // TODO: use actual config and env variables
impl Config { impl Config {
pub fn new() -> anyhow::Result<Config> { pub fn new() -> anyhow::Result<Config> {
// TODO: parse alerting config // TODO: parse alerting config
// TODO: throw error on bad config // TODO: throw error on bad config

2
server/src/main.rs

@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> {
let mut analytic_service = analytic_service::AnalyticService::new( let mut analytic_service = analytic_service::AnalyticService::new(
metric_service.clone(), metric_service.clone(),
segments_service.clone(), segments_service.clone(),
config.alerting config.alerting,
); );
let api = api::API::new( let api = api::API::new(

2
server/src/services/analytic_service/analytic_client.rs

@ -7,10 +7,10 @@ use crate::services::segments_service::Segment;
use super::analytic_unit::types::AnalyticUnitConfig; use super::analytic_unit::types::AnalyticUnitConfig;
use super::analytic_unit::types::PatchConfig; use super::analytic_unit::types::PatchConfig;
use super::types::DetectionTask; use super::types::DetectionTask;
use super::types::HSR;
use super::types::HSRTask; use super::types::HSRTask;
use super::types::LearningStatus; use super::types::LearningStatus;
use super::types::LearningTrain; use super::types::LearningTrain;
use super::types::HSR;
use super::types::{AnalyticServiceMessage, RequestType}; use super::types::{AnalyticServiceMessage, RequestType};
/// Client to be used multithreaded /// Client to be used multithreaded

9
server/src/services/analytic_service/analytic_service.rs

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig, PatternConfig}; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig, PatternConfig};
use super::types::{self, DetectionRunnerConfig, HSR, LearningTrain, LearningWaiter}; use super::types::{self, DetectionRunnerConfig, LearningTrain, LearningWaiter, HSR};
use super::{ use super::{
analytic_client::AnalyticClient, analytic_client::AnalyticClient,
analytic_unit::pattern_analytic_unit::{self, LearningResults, PatternAnalyticUnit}, analytic_unit::pattern_analytic_unit::{self, LearningResults, PatternAnalyticUnit},
@ -22,7 +22,6 @@ use anyhow;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
// TODO: now it's basically single analytic unit, service will operate on many AU // TODO: now it's basically single analytic unit, service will operate on many AU
pub struct AnalyticService { pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
@ -41,8 +40,7 @@ pub struct AnalyticService {
// awaiters // awaiters
learning_waiters: Vec<LearningWaiter>, learning_waiters: Vec<LearningWaiter>,
detection_runner: Option<DetectionRunnerConfig>,
detection_runner: Option<DetectionRunnerConfig>
} }
impl AnalyticService { impl AnalyticService {
@ -65,14 +63,13 @@ impl AnalyticService {
tx, tx,
rx, rx,
// handlers // handlers
learning_handler: None, learning_handler: None,
// awaiters // awaiters
learning_waiters: Vec::new(), learning_waiters: Vec::new(),
detection_runner: None detection_runner: None,
} }
} }

46
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -1,4 +1,8 @@
use crate::services::{analytic_service::types::{self, HSR}, metric_service::MetricService, segments_service::SegmentsService}; use crate::services::{
analytic_service::types::{self, HSR},
metric_service::MetricService,
segments_service::SegmentsService,
};
use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult}; use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult};
@ -14,9 +18,9 @@ fn get_value_with_offset(ts: &Vec<(u64, f64)>, index: usize, offset: u64) -> any
return Err(anyhow::format_err!("index should be > 0")); return Err(anyhow::format_err!("index should be > 0"));
} }
return Ok(0.0); return Ok(0.0);
// let step = // let step =
// let index_candidate = // let index_candidate =
// let intex_candidate = // let intex_candidate =
} }
pub struct AnomalyAnalyticUnit { pub struct AnomalyAnalyticUnit {
@ -41,12 +45,23 @@ impl AnomalyAnalyticUnit {
} }
let mut sts = Vec::new(); let mut sts = Vec::new();
sts.push((ts[0].0, ts[0].1, ((ts[0].1 + self.config.confidence, ts[0].1 - self.config.confidence)))); sts.push((
ts[0].0,
ts[0].1,
((
ts[0].1 + self.config.confidence,
ts[0].1 - self.config.confidence,
)),
));
for t in 1..ts.len() { for t in 1..ts.len() {
let alpha = self.config.alpha; let alpha = self.config.alpha;
let stv = alpha * ts[t].1 + (1.0 - alpha) * sts[t - 1].1; let stv = alpha * ts[t].1 + (1.0 - alpha) * sts[t - 1].1;
sts.push((ts[t].0, stv, (stv + self.config.confidence, stv - self.config.confidence))); sts.push((
ts[t].0,
stv,
(stv + self.config.confidence, stv - self.config.confidence),
));
} }
Ok(HSR::ConfidenceTimeSerie(sts)) Ok(HSR::ConfidenceTimeSerie(sts))
@ -71,7 +86,10 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
from: u64, from: u64,
to: u64, to: u64,
) -> anyhow::Result<Vec<(u64, u64)>> { ) -> anyhow::Result<Vec<(u64, u64)>> {
let mr = ms.query(from - self.config.seasonality * 5, to, DETECTION_STEP).await.unwrap(); let mr = ms
.query(from - self.config.seasonality * 5, to, DETECTION_STEP)
.await
.unwrap();
if mr.data.keys().len() == 0 { if mr.data.keys().len() == 0 {
return Ok(Vec::new()); return Ok(Vec::new());
@ -89,12 +107,11 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
let confidence_time_serie = self.get_hsr_from_metric_result(&mr)?; let confidence_time_serie = self.get_hsr_from_metric_result(&mr)?;
if let HSR::ConfidenceTimeSerie(hsr) = confidence_time_serie { if let HSR::ConfidenceTimeSerie(hsr) = confidence_time_serie {
let mut from = None; let mut from = None;
for ((t, _, (u, l)), (t1, rv)) in hsr.iter().zip(ts.iter()) { for ((t, _, (u, l)), (t1, rv)) in hsr.iter().zip(ts.iter()) {
if *t != *t1 { if *t != *t1 {
return Err(anyhow::format_err!("incompatible hsr/ts")) return Err(anyhow::format_err!("incompatible hsr/ts"));
} }
if rv > u || rv < l { if rv > u || rv < l {
if from.is_none() { if from.is_none() {
@ -113,23 +130,14 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
} }
return Ok(result); return Ok(result);
} else { } else {
return Err(anyhow::format_err!("bad hsr")); return Err(anyhow::format_err!("bad hsr"));
} }
} }
// TODO: use hsr for learning and detections // TODO: use hsr for learning and detections
async fn get_hsr( async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result<HSR> {
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<HSR> {
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
return self.get_hsr_from_metric_result(&mr); return self.get_hsr_from_metric_result(&mr);
} }
} }

86
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -3,19 +3,21 @@ use std::{collections::VecDeque, fmt, sync::Arc};
use futures::future; use futures::future;
use parking_lot::Mutex; use parking_lot::Mutex;
use gbdt::config::Config; use gbdt::config::Config;
use gbdt::decision_tree::{Data, DataVec, PredVec}; use gbdt::decision_tree::{Data, DataVec, PredVec};
use gbdt::gradient_boost::GBDT; use gbdt::gradient_boost::GBDT;
use crate::services::{
use crate::services::{analytic_service::types::{self, HSR, LearningTrain}, metric_service::MetricService, segments_service::{Segment, SegmentType, SegmentsService}}; analytic_service::types::{self, LearningTrain, HSR},
metric_service::MetricService,
segments_service::{Segment, SegmentType, SegmentsService},
};
use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, PatternConfig}; use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, PatternConfig};
use async_trait::async_trait; use async_trait::async_trait;
use rustfft::{self, FftPlanner, num_complex::Complex}; use rustfft::{self, num_complex::Complex, FftPlanner};
// TODO: move to config // TODO: move to config
const DETECTION_STEP: u64 = 10; const DETECTION_STEP: u64 = 10;
@ -119,9 +121,14 @@ fn get_features(xs: &Vec<f64>) -> Features {
let mut planner = FftPlanner::<f64>::new(); let mut planner = FftPlanner::<f64>::new();
let fft = planner.plan_fft_forward(FFT_LEN); let fft = planner.plan_fft_forward(FFT_LEN);
let mut c_buffer = vec![Complex{ re: 0.0f64, im: 0.0f64 }; FFT_LEN]; let mut c_buffer = vec![
Complex {
re: 0.0f64,
im: 0.0f64
};
FFT_LEN
];
let p = 1.0 / FFT_LEN as f64; let p = 1.0 / FFT_LEN as f64;
for i in 0..FFT_LEN.min(xs.len()) { for i in 0..FFT_LEN.min(xs.len()) {
@ -137,24 +144,42 @@ fn get_features(xs: &Vec<f64>) -> Features {
} }
return vec![ return vec![
min, max, min,
mean, sd, max,
c_buffer[0].re, c_buffer[0].im, mean,
c_buffer[1].re, c_buffer[1].im, sd,
c_buffer[2].re, c_buffer[2].im, c_buffer[0].re,
c_buffer[3].re, c_buffer[3].im, c_buffer[0].im,
c_buffer[4].re, c_buffer[4].im, c_buffer[1].re,
c_buffer[5].re, c_buffer[5].im, c_buffer[1].im,
c_buffer[6].re, c_buffer[6].im, c_buffer[2].re,
c_buffer[7].re, c_buffer[7].im, c_buffer[2].im,
c_buffer[8].re, c_buffer[8].im, c_buffer[3].re,
c_buffer[9].re, c_buffer[9].im, c_buffer[3].im,
c_buffer[10].re, c_buffer[10].im, c_buffer[4].re,
c_buffer[11].re, c_buffer[11].im, c_buffer[4].im,
c_buffer[12].re, c_buffer[12].im, c_buffer[5].re,
c_buffer[13].re, c_buffer[13].im, c_buffer[5].im,
c_buffer[14].re, c_buffer[14].im, c_buffer[6].re,
c_buffer[15].re, c_buffer[15].im, c_buffer[6].im,
c_buffer[7].re,
c_buffer[7].im,
c_buffer[8].re,
c_buffer[8].im,
c_buffer[9].re,
c_buffer[9].im,
c_buffer[10].re,
c_buffer[10].im,
c_buffer[11].re,
c_buffer[11].im,
c_buffer[12].re,
c_buffer[12].im,
c_buffer[13].re,
c_buffer[13].im,
c_buffer[14].re,
c_buffer[14].im,
c_buffer[15].re,
c_buffer[15].im,
// 0f64,0f64, // 0f64,0f64,
// 0f64,0f64,0f64, 0f64 // 0f64,0f64,0f64, 0f64
]; ];
@ -240,14 +265,13 @@ impl AnalyticUnit for PatternAnalyticUnit {
} }
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult { async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult {
// TODO: move to config // TODO: move to config
let mut cfg = Config::new(); let mut cfg = Config::new();
cfg.set_feature_size(FEATURES_SIZE); cfg.set_feature_size(FEATURES_SIZE);
cfg.set_max_depth(3); cfg.set_max_depth(3);
cfg.set_iterations(50); cfg.set_iterations(50);
cfg.set_shrinkage(0.1); cfg.set_shrinkage(0.1);
cfg.set_loss("LogLikelyhood"); cfg.set_loss("LogLikelyhood");
cfg.set_debug(false); cfg.set_debug(false);
cfg.set_data_sample_ratio(1.0); cfg.set_data_sample_ratio(1.0);
cfg.set_feature_sample_ratio(1.0); cfg.set_feature_sample_ratio(1.0);
@ -326,13 +350,12 @@ impl AnalyticUnit for PatternAnalyticUnit {
records_raw[i].iter().map(|e| *e as f32).collect(), records_raw[i].iter().map(|e| *e as f32).collect(),
1.0, 1.0,
if targets_raw[i] { 1.0 } else { -1.0 }, if targets_raw[i] { 1.0 } else { -1.0 },
Some(0.5) Some(0.5),
); );
// println!("{:?}", targets_raw[i]); // println!("{:?}", targets_raw[i]);
train_dv.push(data); train_dv.push(data);
} }
let mut model = GBDT::new(&cfg); let mut model = GBDT::new(&cfg);
model.fit(&mut train_dv); model.fit(&mut train_dv);
@ -458,12 +481,7 @@ impl AnalyticUnit for PatternAnalyticUnit {
} }
// TODO: use hsr for learning and detections // TODO: use hsr for learning and detections
async fn get_hsr( async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result<HSR> {
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<HSR> {
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 { if mr.data.keys().len() == 0 {

13
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -1,4 +1,8 @@
use crate::services::{analytic_service::types::{self, HSR}, metric_service::MetricService, segments_service::SegmentsService}; use crate::services::{
analytic_service::types::{self, HSR},
metric_service::MetricService,
segments_service::SegmentsService,
};
use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, ThresholdConfig}; use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, ThresholdConfig};
@ -74,12 +78,7 @@ impl AnalyticUnit for ThresholdAnalyticUnit {
} }
// TODO: use hsr for learning and detections // TODO: use hsr for learning and detections
async fn get_hsr( async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result<HSR> {
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<HSR> {
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 { if mr.data.keys().len() == 0 {

15
server/src/services/analytic_service/analytic_unit/types.rs

@ -2,7 +2,9 @@ use serde::{Deserialize, Serialize};
use async_trait::async_trait; use async_trait::async_trait;
use crate::services::{analytic_service::types::HSR, metric_service::MetricService, segments_service::SegmentsService}; use crate::services::{
analytic_service::types::HSR, metric_service::MetricService, segments_service::SegmentsService,
};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PatternConfig { pub struct PatternConfig {
@ -27,7 +29,7 @@ impl Default for PatternConfig {
pub struct AnomalyConfig { pub struct AnomalyConfig {
pub alpha: f64, pub alpha: f64,
pub confidence: f64, pub confidence: f64,
pub seasonality: u64 // step in seconds, can be zero pub seasonality: u64, // step in seconds, can be zero
} }
impl Default for AnomalyConfig { impl Default for AnomalyConfig {
@ -35,7 +37,7 @@ impl Default for AnomalyConfig {
AnomalyConfig { AnomalyConfig {
alpha: 0.5, alpha: 0.5,
confidence: 10.0, confidence: 10.0,
seasonality: 60 * 60 seasonality: 60 * 60,
} }
} }
} }
@ -133,12 +135,7 @@ pub trait AnalyticUnit {
) -> anyhow::Result<Vec<(u64, u64)>>; ) -> anyhow::Result<Vec<(u64, u64)>>;
fn set_config(&mut self, c: AnalyticUnitConfig); fn set_config(&mut self, c: AnalyticUnitConfig);
async fn get_hsr( async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result<HSR>;
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<HSR>;
} }
#[derive(Deserialize, Serialize, Debug)] #[derive(Deserialize, Serialize, Debug)]

21
server/src/services/analytic_service/detection_runner.rs

@ -1,4 +1,4 @@
use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit}; use crate::services::analytic_service::analytic_unit::types::AnalyticUnit;
use std::sync::Arc; use std::sync::Arc;
@ -8,22 +8,25 @@ use chrono::Utc;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
struct DetectionRunner { struct DetectionRunner {
config: Config, config: Config,
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>, analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
} }
impl DetectionRunner { impl DetectionRunner {
pub fn new(config: Config, analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>) -> DetectionRunner { pub fn new(
DetectionRunner { config, analytic_unit } config: Config,
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
) -> DetectionRunner {
DetectionRunner {
config,
analytic_unit,
}
} }
pub async fn run() { pub async fn run() {
// TODO: await detection step // TODO: await detection step
// TODO: get last detection timestamp from persistance // TODO: get last detection timestamp from persistance
// TODO: set lst detection from "now" // TODO: set lst detection from "now"
} }
} }

4
server/src/services/analytic_service/types.rs

@ -61,13 +61,12 @@ pub struct DetectionTask {
pub to: u64, pub to: u64,
} }
// HSR Stands for Hastic Signal Representation, // HSR Stands for Hastic Signal Representation,
// varies for different analytic units // varies for different analytic units
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub enum HSR { pub enum HSR {
TimeSerie(Vec<(u64, f64)>), TimeSerie(Vec<(u64, f64)>),
ConfidenceTimeSerie(Vec<(u64, f64, (f64, f64))>) ConfidenceTimeSerie(Vec<(u64, f64, (f64, f64))>),
} }
#[derive(Debug)] #[derive(Debug)]
@ -84,7 +83,6 @@ pub enum LearningWaiter {
HSR(HSRTask), HSR(HSRTask),
} }
// TODO: review if it's needed // TODO: review if it's needed
#[derive(Debug)] #[derive(Debug)]
pub struct DetectionRunnerConfig { pub struct DetectionRunnerConfig {

4
server/src/services/metric_service.rs

@ -23,11 +23,11 @@ impl MetricService {
// let keys: Vec<_> = mr.data.keys().into_iter().collect(); // let keys: Vec<_> = mr.data.keys().into_iter().collect();
if mr.data.keys().len() > 0 { if mr.data.keys().len() > 0 {
// TODO: it's a hack, should replace all metrics // TODO: it's a hack, should replace all metrics
let key = mr.data.keys().nth(0).unwrap().clone(); let key = mr.data.keys().nth(0).unwrap().clone();
let ts = mr.data.get_mut(&key).unwrap(); let ts = mr.data.get_mut(&key).unwrap();
*ts = subbeat::utils::interpolate_nans_and_gaps_with_zeros(&ts, from, to, step); *ts = subbeat::utils::interpolate_nans_and_gaps_with_zeros(&ts, from, to, step);
// mr.data.insert(*k, ts_interpolated); // mr.data.insert(*k, ts_interpolated);
} }
return Ok(mr); return Ok(mr);
} }

Loading…
Cancel
Save