Browse Source

tx to detection runner + formet

detection_runner_updade
Alexey Velikiy 2 years ago
parent
commit
2126025abd
  1. 42
      server/src/services/analytic_service/analytic_service.rs
  2. 45
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  3. 10
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  4. 6
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  5. 12
      server/src/services/analytic_service/analytic_unit/types.rs
  6. 28
      server/src/services/analytic_service/detection_runner.rs
  7. 7
      server/src/services/analytic_service/types.rs

42
server/src/services/analytic_service/analytic_service.rs

@ -2,7 +2,9 @@ use std::sync::Arc;
use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig};
use super::detection_runner::DetectionRunner;
use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR, DetectionRunnerTask};
use super::types::{
self, AnalyticUnitRF, DetectionRunnerConfig, DetectionRunnerTask, LearningWaiter, HSR,
};
use super::{
analytic_client::AnalyticClient,
types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType},
@ -20,7 +22,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear
use anyhow;
use chrono::{TimeZone, DateTime, Utc};
use chrono::{DateTime, TimeZone, Utc};
use tokio::sync::{mpsc, oneshot};
// TODO: now it's basically single analytic unit, service will operate on many AU
@ -112,25 +114,25 @@ impl AnalyticService {
}
}
// TODO: make
// TODO: make
fn run_detection_runner(&mut self, from: u64) {
// TODO: handle case or make it impossible to run_detection_runner second time
if self.analytic_unit_learning_status != LearningStatus::Ready {
let task = DetectionRunnerTask {
from
};
self.learning_waiters.push(LearningWaiter::DetectionRunner(task));
let task = DetectionRunnerTask { from };
self.learning_waiters
.push(LearningWaiter::DetectionRunner(task));
return;
}
let AlertingType::Webhook(acfg) = self.alerting.as_ref().unwrap().alerting_type.clone();
let drcfg = DetectionRunnerConfig {
endpoint: acfg.endpoint.clone(),
interval: self.alerting.as_ref().unwrap().interval
interval: self.alerting.as_ref().unwrap().interval,
};
let dr = DetectionRunner::new(drcfg, self.analytic_unit.as_ref().unwrap().clone());
let tx = self.tx.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
let dr = DetectionRunner::new(tx, drcfg, au);
self.detection_runner = Some(dr);
self.detection_runner.as_mut().unwrap().run(from);
@ -250,7 +252,7 @@ impl AnalyticService {
self.analytic_unit_learning_status = LearningStatus::Initialization;
}
}
},
}
// TODO: create custom DatasourceError error type
Err(_) => {
self.analytic_unit = None;
@ -324,9 +326,9 @@ impl AnalyticService {
let mut au = resolve(aucfg);
match tx
.send(AnalyticServiceMessage::Response(
Ok(ResponseType::LearningStarted),
))
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::LearningStarted,
)))
.await
{
Ok(_) => {}
@ -335,13 +337,11 @@ impl AnalyticService {
// TODO: maybe to spawn_blocking here
let lr = match au.learn(ms, ss).await {
Ok(res) => {
match res {
LearningResult::Finished => Ok(ResponseType::LearningFinished(au)),
LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty)
}
}
Err(e) => Err(e)
Ok(res) => match res {
LearningResult::Finished => Ok(ResponseType::LearningFinished(au)),
LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty),
},
Err(e) => Err(e),
};
match tx.send(AnalyticServiceMessage::Response(lr)).await {

45
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -1,4 +1,8 @@
use crate::services::{analytic_service::types::{AnomalyHSRConfig, HSR}, metric_service::MetricService, segments_service::SegmentsService};
use crate::services::{
analytic_service::types::{AnomalyHSRConfig, HSR},
metric_service::MetricService,
segments_service::SegmentsService,
};
use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult};
@ -7,14 +11,13 @@ use subbeat::metric::MetricResult;
use chrono::prelude::*;
// TODO: move to config
const DETECTION_STEP: u64 = 10;
// timerange offset in seconds backwards from end of ts in assumption that ts has no gaps
fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)>{
fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)> {
// TODO: remove dependency to DETECTION_STEP
let indexes_offset = (offset / DETECTION_STEP) as usize;
let n = ts.len() - 1;
if n < indexes_offset {
@ -24,12 +27,11 @@ fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)
return Some(ts[i]);
}
struct SARIMA {
pub ts: Vec<(u64, f64)>,
pub seasonality: u64,
pub confidence: f64,
pub seasonality_iterations: u64
pub seasonality_iterations: u64,
}
impl SARIMA {
@ -38,12 +40,11 @@ impl SARIMA {
ts: Vec::new(),
seasonality,
confidence,
seasonality_iterations
seasonality_iterations,
};
}
pub fn learn(&mut self, ts: &Vec<(u64, f64)>) -> anyhow::Result<()> {
// TODO: don't count NaNs in model
// TODO: add exponental smooting to model
// TODO: trend detection
@ -59,7 +60,10 @@ impl SARIMA {
let iter_steps = (self.seasonality / DETECTION_STEP) as usize;
if to - from != self.seasonality_iterations * self.seasonality {
return Err(anyhow::format_err!("timeserie to learn from should be {} * sasonality", self.seasonality_iterations));
return Err(anyhow::format_err!(
"timeserie to learn from should be {} * sasonality",
self.seasonality_iterations
));
}
for k in 0..iter_steps {
@ -91,7 +95,6 @@ impl SARIMA {
let len_from = timestamp - from;
// TODO: take avg if timestamp in between
let index_diff = (len_from / DETECTION_STEP) % self.ts.len() as u64;
let p = self.ts[index_diff as usize].1;
return (p, (p + self.confidence, p - self.confidence));
@ -100,10 +103,8 @@ impl SARIMA {
pub fn push_point() {
// TODO: inmplement
}
}
pub struct AnomalyAnalyticUnit {
config: AnomalyConfig,
sarima: Option<SARIMA>,
@ -126,7 +127,7 @@ impl AnomalyAnalyticUnit {
return Ok(HSR::AnomalyHSR(AnomalyHSRConfig {
seasonality: self.config.seasonality,
timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0,
ts: Vec::new()
ts: Vec::new(),
}));
}
@ -137,7 +138,7 @@ impl AnomalyAnalyticUnit {
return Ok(HSR::AnomalyHSR(AnomalyHSRConfig {
seasonality: self.config.seasonality,
timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0,
ts: Vec::new()
ts: Vec::new(),
}));
}
@ -145,13 +146,13 @@ impl AnomalyAnalyticUnit {
let sarima = self.sarima.as_ref().unwrap();
for vt in ts {
let x = sarima.predict(vt.0);
sts.push((vt.0, x.0, (x.1.0, x.1.1)));
sts.push((vt.0, x.0, (x.1 .0, x.1 .1)));
}
return Ok(HSR::AnomalyHSR(AnomalyHSRConfig {
seasonality: self.config.seasonality,
timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0,
ts: sts
ts: sts,
}));
}
}
@ -168,8 +169,16 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
panic!("Bad config!");
}
}
async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> anyhow::Result<LearningResult> {
let mut sarima = SARIMA::new(self.config.seasonality, self.config.confidence, self.config.seasonality_iterations);
async fn learn(
&mut self,
ms: MetricService,
_ss: SegmentsService,
) -> anyhow::Result<LearningResult> {
let mut sarima = SARIMA::new(
self.config.seasonality,
self.config.confidence,
self.config.seasonality_iterations,
);
let utc: DateTime<Utc> = Utc::now();
let to = utc.timestamp() as u64;

10
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -222,7 +222,11 @@ impl AnalyticUnit for PatternAnalyticUnit {
}
}
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result<LearningResult> {
async fn learn(
&mut self,
ms: MetricService,
ss: SegmentsService,
) -> anyhow::Result<LearningResult> {
// TODO: move to config
let mut cfg = Config::new();
cfg.set_feature_size(FEATURES_SIZE);
@ -255,7 +259,9 @@ impl AnalyticUnit for PatternAnalyticUnit {
for r in rs {
if r.is_err() {
// TODO: custom DatasourceError error type
return Err(anyhow::format_err!("Error extracting metrics from datasource"));
return Err(anyhow::format_err!(
"Error extracting metrics from datasource"
));
}
let sd = r?;

6
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -21,7 +21,11 @@ impl ThresholdAnalyticUnit {
#[async_trait]
impl AnalyticUnit for ThresholdAnalyticUnit {
async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> anyhow::Result<LearningResult> {
async fn learn(
&mut self,
_ms: MetricService,
_ss: SegmentsService,
) -> anyhow::Result<LearningResult> {
return Ok(LearningResult::Finished);
}

12
server/src/services/analytic_service/analytic_unit/types.rs

@ -30,7 +30,7 @@ pub struct AnomalyConfig {
pub alpha: f64,
pub confidence: f64,
pub seasonality: u64, // step in seconds, can be zero
pub seasonality_iterations: u64
pub seasonality_iterations: u64,
}
impl Default for AnomalyConfig {
@ -39,7 +39,7 @@ impl Default for AnomalyConfig {
alpha: 0.5,
confidence: 10.0,
seasonality: 60 * 60,
seasonality_iterations: 3
seasonality_iterations: 3,
}
}
}
@ -125,12 +125,16 @@ impl AnalyticUnitConfig {
pub enum LearningResult {
Finished,
FinishedEmpty
FinishedEmpty,
}
#[async_trait]
pub trait AnalyticUnit {
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result<LearningResult>;
async fn learn(
&mut self,
ms: MetricService,
ss: SegmentsService,
) -> anyhow::Result<LearningResult>;
async fn detect(
&self,
ms: MetricService,

28
server/src/services/analytic_service/detection_runner.rs

@ -6,18 +6,24 @@ use chrono::Utc;
use tokio::sync::{mpsc, RwLock};
use super::types::{AnalyticUnitRF, DetectionRunnerConfig};
use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig};
use tokio::time::{sleep, Duration};
pub struct DetectionRunner {
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
running_handler: Option<tokio::task::JoinHandle<()>>,
}
impl DetectionRunner {
pub fn new(config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF) -> DetectionRunner {
pub fn new(
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
) -> DetectionRunner {
DetectionRunner {
tx,
config,
analytic_unit,
running_handler: None,
@ -34,14 +40,14 @@ impl DetectionRunner {
// TODO: clone channel
let cfg = self.config.clone();
async move {
// AnalyticService::run_learning(tx, cfg, ms, ss).await;
// TODO: run detection "from"
// TODO: run detection "from" for big timespan
// TODO: parse detections to webhooks
// TODO: define window for detection
// TODO: save last detection
// TODO: handle case when detection is in the end and continues after "now"
println!("detection runner started from {}", from);
loop {
// TODO: run detection periodically
sleep(Duration::from_secs(cfg.interval)).await;
}
@ -49,10 +55,10 @@ impl DetectionRunner {
}));
}
pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF,
) {
self.analytic_unit = analytic_unit;
// TODO: stop running_handler
// TODO: rerun detection with new anomaly units
}
// pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF,
// ) {
// self.analytic_unit = analytic_unit;
// // TODO: stop running_handler
// // TODO: rerun detection with new anomaly units
// }
}

7
server/src/services/analytic_service/types.rs

@ -45,7 +45,7 @@ impl Default for LearningTrain {
pub enum ResponseType {
LearningStarted,
LearningFinished(Box<dyn AnalyticUnit + Send + Sync>),
LearningFinishedEmpty
LearningFinishedEmpty,
}
impl fmt::Debug for ResponseType {
@ -67,12 +67,11 @@ pub struct DetectionRunnerTask {
pub from: u64,
}
#[derive(Debug, Serialize)]
pub struct AnomalyHSRConfig {
pub timestamp: u64,
pub seasonality: u64,
pub ts: Vec<(u64, f64, (f64, f64))>
pub ts: Vec<(u64, f64, (f64, f64))>,
}
// HSR Stands for Hastic Signal Representation,
// varies for different analytic units
@ -103,7 +102,7 @@ pub struct DetectionRunnerConfig {
// pub sender: mpsc::Sender<Result<Vec<Segment>>>,
pub endpoint: String,
// pub from: u64,
pub interval: u64
pub interval: u64,
}
#[derive(Debug)]

Loading…
Cancel
Save