Browse Source

begin detection runner and seasonality

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
e71d87e6be
  1. 4
      server/Cargo.lock
  2. 36
      server/src/services/analytic_service/analytic_service.rs
  3. 15
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  4. 27
      server/src/services/analytic_service/detection_runner.rs
  5. 2
      server/src/services/analytic_service/mod.rs
  6. 2
      server/src/services/analytic_service/types.rs
  7. 12
      server/src/services/metric_service.rs

4
server/Cargo.lock generated

@ -1538,7 +1538,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "subbeat"
version = "0.0.14"
version = "0.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fbf55cd694bd61f334302faf844f8eecff37fdc731a53c9c0a547793cb9c432"
dependencies = [
"anyhow",
"async-trait",

36
server/src/services/analytic_service/analytic_service.rs

@ -21,7 +21,6 @@ use anyhow;
use tokio::sync::{mpsc, oneshot, RwLock};
use chrono::Utc;
// TODO: now it's basically single analytic unit, service will operate on many AU
pub struct AnalyticService {
@ -35,16 +34,14 @@ pub struct AnalyticService {
tx: mpsc::Sender<AnalyticServiceMessage>,
rx: mpsc::Receiver<AnalyticServiceMessage>,
endpoint: Option<String>,
// handlers
learning_handler: Option<tokio::task::JoinHandle<()>>,
// awaiters
learning_waiters: Vec<LearningWaiter>,
// runner
runner_handler: Option<tokio::task::JoinHandle<()>>,
detection_runner: Option<DetectionRunnerConfig>
}
impl AnalyticService {
@ -67,7 +64,6 @@ impl AnalyticService {
tx,
rx,
endpoint,
// handlers
learning_handler: None,
@ -75,7 +71,7 @@ impl AnalyticService {
// awaiters
learning_waiters: Vec::new(),
runner_handler: None,
detection_runner: None
}
}
@ -102,19 +98,19 @@ impl AnalyticService {
});
}
// fn run_detection_runner(&mut self, task: DetectionRunnerConfig) {
// if self.runner_handler.is_some() {
// self.runner_handler.as_mut().unwrap().abort();
// }
// // TODO: save handler of the task
// self.runner_handler = Some(tokio::spawn({
// let au = self.analytic_unit.unwrap();
// let ms = self.metric_service.clone();
// async move {
// // TODO: implement
// }
// }));
// }
fn run_detection_runner(&mut self, task: DetectionRunnerConfig) {
// if self.runner_handler.is_some() {
// self.runner_handler.as_mut().unwrap().abort();
// }
// // TODO: save handler of the task
// self.runner_handler = Some(tokio::spawn({
// let au = self.analytic_unit.unwrap();
// let ms = self.metric_service.clone();
// async move {
// // TODO: implement
// }
// }));
}
// TODO: maybe make `consume_request` async
fn consume_request(&mut self, req: types::RequestType) -> () {

15
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -8,6 +8,17 @@ use subbeat::metric::MetricResult;
// TODO: move to config
const DETECTION_STEP: u64 = 10;
// offset from intex in timrange in seconds
fn get_value_with_offset(ts: &Vec<(u64, f64)>, index: usize, offset: u64) -> anyhow::Result<f64> {
if index == 0 {
return Err(anyhow::format_err!("index should be > 0"));
}
return Ok(0.0);
// let step =
// let index_candidate =
// let intex_candidate =
}
pub struct AnomalyAnalyticUnit {
config: AnomalyConfig,
}
@ -60,7 +71,7 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
from: u64,
to: u64,
) -> anyhow::Result<Vec<(u64, u64)>> {
let mr = ms.query(from - self.config.seasonality, to, DETECTION_STEP).await.unwrap();
let mr = ms.query(from - self.config.seasonality * 5, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
@ -119,4 +130,6 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
return self.get_hsr_from_metric_result(&mr);
}
}

27
server/src/services/analytic_service/detection_runner.rs

@ -0,0 +1,27 @@
use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit};
use std::sync::Arc;
use crate::config::Config;
use chrono::Utc;
use tokio::sync::{mpsc, RwLock};
struct DetectionRunner {
config: Config,
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
}
impl DetectionRunner {
pub fn new(config: Config, analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>) -> DetectionRunner {
DetectionRunner { config, analytic_unit }
}
pub async fn run() {
}
}

2
server/src/services/analytic_service/mod.rs

@ -4,4 +4,6 @@ pub mod types;
pub mod analytic_client;
mod detection_runner;
pub use analytic_service::AnalyticService;

2
server/src/services/analytic_service/types.rs

@ -84,6 +84,8 @@ pub enum LearningWaiter {
HSR(HSRTask),
}
// TODO: review if it's needed
#[derive(Debug)]
pub struct DetectionRunnerConfig {
// pub sender: mpsc::Sender<Result<Vec<Segment>>>,

12
server/src/services/metric_service.rs

@ -19,6 +19,16 @@ impl MetricService {
}
}
pub async fn query(&self, from: u64, to: u64, step: u64) -> anyhow::Result<MetricResult> {
return self.datasource.query(from, to, step).await;
let mut mr = self.datasource.query(from, to, step).await?;
// let keys: Vec<_> = mr.data.keys().into_iter().collect();
if mr.data.keys().len() > 0 {
// TODO: it's a hack, should replace all metrics
let key = mr.data.keys().nth(0).unwrap().clone();
let ts = mr.data.get_mut(&key).unwrap();
*ts = subbeat::utils::interpolate_nans_and_gaps_with_zeros(&ts, from, to, step);
// mr.data.insert(*k, ts_interpolated);
}
return Ok(mr);
}
}

Loading…
Cancel
Save