Browse Source

code format

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
360ffeec63
  1. 26
      server/src/config.rs
  2. 4
      server/src/services/analytic_service/analytic_service.rs
  3. 29
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  4. 2
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  5. 4
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  6. 15
      server/src/services/analytic_service/detection_runner.rs
  7. 3
      server/src/services/analytic_service/mod.rs
  8. 2
      server/src/services/analytic_service/types.rs

26
server/src/config.rs

@ -49,33 +49,34 @@ fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingCo
} }
if config.get::<String>("alerting.endpoint").is_err() { if config.get::<String>("alerting.endpoint").is_err() {
return Err(anyhow::format_err!("missing endpoint param in alerting")); return Err(anyhow::format_err!("missing endpoint param in alerting"));
} }
if config.get::<String>("alerting.interval").is_err() { if config.get::<String>("alerting.interval").is_err() {
return Err(anyhow::format_err!("missing interval param in alerting")); return Err(anyhow::format_err!("missing interval param in alerting"));
} }
if config.get::<u64>("alerting.interval").is_err() { if config.get::<u64>("alerting.interval").is_err() {
return Err(anyhow::format_err!("alerting interval should be a positive integer number")); return Err(anyhow::format_err!(
"alerting interval should be a positive integer number"
));
} }
let analytic_type = config.get::<String>("alerting.type").unwrap(); let analytic_type = config.get::<String>("alerting.type").unwrap();
if analytic_type != "webhook" { if analytic_type != "webhook" {
return Err(anyhow::format_err!("unknown alerting typy {}", analytic_type)); return Err(anyhow::format_err!(
"unknown alerting typy {}",
analytic_type
));
} }
let endpoint = config.get::<String>("alerting.endpoint").unwrap(); let endpoint = config.get::<String>("alerting.endpoint").unwrap();
let interval = config.get::<u64>("alerting.interval").unwrap(); let interval = config.get::<u64>("alerting.interval").unwrap();
return Ok(Some(AlertingConfig { return Ok(Some(AlertingConfig {
alerting_type: AlertingType::Webhook(WebhookAlertingConfig{ alerting_type: AlertingType::Webhook(WebhookAlertingConfig { endpoint }),
endpoint interval,
}), }));
interval
}))
} }
impl Config { impl Config {
pub fn new() -> anyhow::Result<Config> { pub fn new() -> anyhow::Result<Config> {
let mut config = config::Config::default(); let mut config = config::Config::default();
if std::path::Path::new("config.toml").exists() { if std::path::Path::new("config.toml").exists() {
@ -90,11 +91,10 @@ impl Config {
config.set("port", "8000").unwrap(); config.set("port", "8000").unwrap();
} }
Ok(Config { Ok(Config {
port: config.get::<u16>("port").unwrap(), port: config.get::<u16>("port").unwrap(),
datasource_config: resolve_datasource(&config)?, datasource_config: resolve_datasource(&config)?,
alerting: resolve_alerting(&config)? alerting: resolve_alerting(&config)?,
}) })
} }
} }

4
server/src/services/analytic_service/analytic_service.rs

@ -2,7 +2,7 @@ use std::sync::Arc;
use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig}; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig};
use super::detection_runner::DetectionRunner; use super::detection_runner::DetectionRunner;
use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, HSR, LearningWaiter}; use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR};
use super::{ use super::{
analytic_client::AnalyticClient, analytic_client::AnalyticClient,
types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType}, types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType},
@ -265,7 +265,7 @@ impl AnalyticService {
pub async fn serve(&mut self) { pub async fn serve(&mut self) {
// TODO: remove this hack // TODO: remove this hack
self.consume_request(RequestType::RunLearning); self.consume_request(RequestType::RunLearning);
// TODO: start detection runner if // TODO: start detection runner if
if self.alerting.is_some() { if self.alerting.is_some() {
self.run_detection_runner(); self.run_detection_runner();
} }

29
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -1,7 +1,5 @@
use crate::services::{ use crate::services::{
analytic_service::types::{HSR}, analytic_service::types::HSR, metric_service::MetricService, segments_service::SegmentsService,
metric_service::MetricService,
segments_service::SegmentsService,
}; };
use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult}; use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult};
@ -13,18 +11,17 @@ use chrono::prelude::*;
struct SARIMA { struct SARIMA {
pub ts: Vec<f64>, pub ts: Vec<f64>,
pub seasonality: u64 pub seasonality: u64,
} }
impl SARIMA { impl SARIMA {
pub fn new(seasonality: u64) -> SARIMA { pub fn new(seasonality: u64) -> SARIMA {
return SARIMA { return SARIMA {
ts: Vec::new(), ts: Vec::new(),
seasonality seasonality,
} };
} }
pub fn learn(&mut self, ts: &Vec<(u64, f64)>) { pub fn learn(&mut self, ts: &Vec<(u64, f64)>) {
// TODO: compute avg based on seasonality // TODO: compute avg based on seasonality
} }
@ -36,12 +33,10 @@ impl SARIMA {
pub fn push_point() { pub fn push_point() {
// TODO: inmplement // TODO: inmplement
} }
// TODO: don't count NaNs in model // TODO: don't count NaNs in model
} }
// TODO: move to config // TODO: move to config
const DETECTION_STEP: u64 = 10; const DETECTION_STEP: u64 = 10;
@ -59,12 +54,15 @@ fn get_value_with_offset(ts: &Vec<(u64, f64)>, index: usize, offset: u64) -> any
pub struct AnomalyAnalyticUnit { pub struct AnomalyAnalyticUnit {
config: AnomalyConfig, config: AnomalyConfig,
sarima: Option<SARIMA> sarima: Option<SARIMA>,
} }
impl AnomalyAnalyticUnit { impl AnomalyAnalyticUnit {
pub fn new(config: AnomalyConfig) -> AnomalyAnalyticUnit { pub fn new(config: AnomalyConfig) -> AnomalyAnalyticUnit {
AnomalyAnalyticUnit { config, sarima: None } AnomalyAnalyticUnit {
config,
sarima: None,
}
} }
fn get_hsr_from_metric_result(&self, mr: &MetricResult) -> anyhow::Result<HSR> { fn get_hsr_from_metric_result(&self, mr: &MetricResult) -> anyhow::Result<HSR> {
@ -114,7 +112,6 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
} }
} }
async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> LearningResult { async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> LearningResult {
let mut sarima = SARIMA::new(self.config.seasonality); let mut sarima = SARIMA::new(self.config.seasonality);
let utc: DateTime<Utc> = Utc::now(); let utc: DateTime<Utc> = Utc::now();
@ -129,10 +126,10 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
let k = mr.data.keys().nth(0).unwrap(); let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k]; let ts = &mr.data[k];
sarima.learn(ts); sarima.learn(ts);
// TODO: ensure that learning reruns on seasonaliy change // TODO: ensure that learning reruns on seasonaliy change
// TODO: load data to learning // TODO: load data to learning
// TODO: update model to work online // TODO: update model to work online
return LearningResult::Finished; return LearningResult::Finished;
} }

2
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -4,7 +4,7 @@ use futures::future;
use parking_lot::Mutex; use parking_lot::Mutex;
use gbdt::config::Config; use gbdt::config::Config;
use gbdt::decision_tree::{Data}; use gbdt::decision_tree::Data;
use gbdt::gradient_boost::GBDT; use gbdt::gradient_boost::GBDT;
use crate::services::{ use crate::services::{

4
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -1,7 +1,5 @@
use crate::services::{ use crate::services::{
analytic_service::types::{HSR}, analytic_service::types::HSR, metric_service::MetricService, segments_service::SegmentsService,
metric_service::MetricService,
segments_service::SegmentsService,
}; };
use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, ThresholdConfig}; use super::types::{AnalyticUnit, AnalyticUnitConfig, LearningResult, ThresholdConfig};

15
server/src/services/analytic_service/detection_runner.rs

@ -9,7 +9,6 @@ use tokio::sync::{mpsc, RwLock};
use super::types::{AnalyticUnitRF, DetectionRunnerConfig}; use super::types::{AnalyticUnitRF, DetectionRunnerConfig};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
pub struct DetectionRunner { pub struct DetectionRunner {
config: DetectionRunnerConfig, config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF, analytic_unit: AnalyticUnitRF,
@ -17,19 +16,15 @@ pub struct DetectionRunner {
} }
impl DetectionRunner { impl DetectionRunner {
pub fn new( pub fn new(config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF) -> DetectionRunner {
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
) -> DetectionRunner {
DetectionRunner { DetectionRunner {
config, config,
analytic_unit, analytic_unit,
running_handler: None running_handler: None,
} }
} }
pub async fn run(&mut self, from: u64) { pub async fn run(&mut self, from: u64) {
// TODO: get last detection timestamp from persistance // TODO: get last detection timestamp from persistance
// TODO: set lst detection from "now" // TODO: set lst detection from "now"
if self.running_handler.is_some() { if self.running_handler.is_some() {
@ -48,10 +43,10 @@ impl DetectionRunner {
} }
} }
})); }));
} }
pub async fn set_analytic_unit(analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>) { pub async fn set_analytic_unit(
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
) {
} }
} }

3
server/src/services/analytic_service/mod.rs

@ -1,9 +1,8 @@
mod analytic_service; mod analytic_service;
pub mod analytic_unit; pub mod analytic_unit;
pub mod types;
pub mod detection_runner; pub mod detection_runner;
pub mod types;
pub mod analytic_client; pub mod analytic_client;
pub use analytic_service::AnalyticService; pub use analytic_service::AnalyticService;

2
server/src/services/analytic_service/types.rs

@ -11,7 +11,7 @@ use super::analytic_unit::types::PatchConfig;
use anyhow::Result; use anyhow::Result;
use serde::Serialize; use serde::Serialize;
use tokio::sync::{RwLock, oneshot}; use tokio::sync::{oneshot, RwLock};
use crate::services::analytic_service::analytic_unit::types::AnalyticUnit; use crate::services::analytic_service::analytic_unit::types::AnalyticUnit;

Loading…
Cancel
Save