diff --git a/server/src/api.rs b/server/src/api.rs index 7832764..b3feaa5 100644 --- a/server/src/api.rs +++ b/server/src/api.rs @@ -32,23 +32,20 @@ pub struct API<'a> { user_service: Arc>, metric_service: metric_service::MetricService, data_service: segments_service::SegmentsService, - analytic_service: AnalyticService, + analytic_service: Arc>, } impl API<'_> { pub fn new(config: &Config) -> anyhow::Result> { let ss = segments_service::SegmentsService::new()?; - let ms = metric_service::MetricService::new( - &config.prom_url, - &config.query, - ); + let ms = metric_service::MetricService::new(&config.prom_url, &config.query); Ok(API { config: config, user_service: Arc::new(RwLock::new(user_service::UserService::new())), metric_service: ms.clone(), data_service: ss.clone(), - analytic_service: AnalyticService::new(ms, ss), + analytic_service: Arc::new(RwLock::new(AnalyticService::new(ms, ss))) }) } diff --git a/server/src/api/analytics.rs b/server/src/api/analytics.rs index a48dff2..dd66658 100644 --- a/server/src/api/analytics.rs +++ b/server/src/api/analytics.rs @@ -8,6 +8,7 @@ pub mod filters { srv: Srv, ) -> impl Filter + Clone { list(srv.clone()) + // TODO: /status endpoint // .or(create(db.clone())) // // .or(update(db.clone())) // .or(delete(db.clone())) @@ -38,7 +39,7 @@ mod handlers { pub async fn list(opts: ListOptions, srv: Srv) -> Result { // match srv.get_threshold_detections(opts.from, opts.to, 10, 100_000.).await { - match srv.get_pattern_detection(opts.from, opts.to).await { + match srv.read().get_pattern_detection(opts.from, opts.to).await { Ok(segments) => Ok(API::json(&segments)), Err(e) => { println!("{:?}", e); @@ -49,13 +50,16 @@ mod handlers { } mod models { + use std::sync::Arc; + use hastic::services::analytic_service; + use parking_lot::RwLock; use serde::{Deserialize, Serialize}; // use parking_lot::RwLock; // use std::sync::Arc; - pub type Srv = analytic_service::AnalyticService; + pub type Srv = Arc>; // The query parameters for list_todos. #[derive(Debug, Deserialize)] diff --git a/server/src/api/metric.rs b/server/src/api/metric.rs index 6ea43da..81dc062 100644 --- a/server/src/api/metric.rs +++ b/server/src/api/metric.rs @@ -17,7 +17,6 @@ use crate::api::{self, API}; use std::collections::HashMap; - use super::BadQuery; #[derive(Serialize)] diff --git a/server/src/services/analytic_service.rs b/server/src/services/analytic_service.rs index 8e937b4..912a039 100644 --- a/server/src/services/analytic_service.rs +++ b/server/src/services/analytic_service.rs @@ -1,7 +1,11 @@ +use crate::utils::{self, get_random_str}; -use crate::{utils::get_random_str}; +use self::pattern_detector::{LearningResults, PatternDetector}; -use super::{metric_service::MetricService, segments_service::{self, ID_LENGTH, Segment, SegmentType, SegmentsService}}; +use super::{ + metric_service::MetricService, + segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH}, +}; use subbeat::metric::Metric; @@ -10,58 +14,126 @@ use anyhow; mod pattern_detector; use futures::future; - +use tokio::sync::oneshot; +use tokio::time::{sleep, Duration}; + +const DETECTION_STEP: u64 = 10; +const LEARNING_WAITING_INTERVAL: u64 = 100; + +#[derive(Clone, PartialEq)] +enum LearningStatus { + Initialization, + Learning, + Error, + Ready, +} #[derive(Clone)] pub struct AnalyticService { metric_service: MetricService, - segments_service: SegmentsService + segments_service: SegmentsService, + learning_results: Option, + learning_status: LearningStatus, } impl AnalyticService { - pub fn new(metric_service: MetricService, segments_service: segments_service::SegmentsService) -> AnalyticService { + pub fn new( + metric_service: MetricService, + segments_service: segments_service::SegmentsService, + ) -> AnalyticService { AnalyticService { metric_service, - segments_service + segments_service, + // TODO: get it from persistance + learning_results: None, + learning_status: LearningStatus::Initialization, } } - pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { + // call this from api + pub async fn run_learning(&mut self) { + let prom = self.metric_service.get_prom(); + let ss = self.segments_service.clone(); - let innter_step = 10u64; - let segments = self.segments_service.get_segments_inside(0, u64::MAX / 2)?; + let (tx, rx) = oneshot::channel(); - let prom = self.metric_service.get_prom(); - let fs = segments.iter().map(|s| prom.query(s.from, s.to, innter_step)); - let rs = future::join_all(fs).await; - - let mut pt = pattern_detector::PatternDetector::new(); - - // TODO: run this on label adding - // TODO: save learning results in cache - let mut learn_tss = Vec::new(); - for r in rs { - let mr = r.unwrap(); - if mr.data.keys().len() == 0 { - continue; + tokio::spawn(async move { + // TODO: logic for returning error + + // be careful if decide to store detections in db + let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); + + let fs = segments + .iter() + .map(|s| prom.query(s.from, s.to, DETECTION_STEP)); + let rs = future::join_all(fs).await; + + // TODO: run this on label adding + // TODO: save learning results in cache + let mut learn_tss = Vec::new(); + for r in rs { + let mr = r.unwrap(); + if mr.data.keys().len() == 0 { + continue; + } + let k = mr.data.keys().nth(0).unwrap(); + let ts = &mr.data[k]; + // TODO: maybe not clone + learn_tss.push(ts.clone()); + } + + let lr = PatternDetector::learn(&learn_tss).await; + + if let Err(_) = tx.send(lr) { + println!("Error: receive of learning results dropped"); + } + }); + + match rx.await { + Ok(lr) => { + self.learning_results = Some(lr); + self.learning_status = LearningStatus::Ready; + } + Err(_) => { + self.learning_status = LearningStatus::Error; + println!("learning dropped") } - let k = mr.data.keys().nth(0).unwrap(); - let ts = &mr.data[k]; - // TODO: maybe not clone - learn_tss.push(ts.clone()); } + } - pt.learn(&learn_tss); + pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { + let prom = self.metric_service.get_prom(); - let ts = prom.query(from, to, innter_step).await?; + while self.learning_status == LearningStatus::Learning { + sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await; + } + let lr = self.learning_results.as_ref().unwrap().clone(); + let pt = pattern_detector::PatternDetector::new(lr); + let mr = prom.query(from, to, DETECTION_STEP).await?; + if mr.data.keys().len() == 0 { + return Ok(Vec::new()); + } - // pt.detect(ts); + let k = mr.data.keys().nth(0).unwrap(); + let ts = &mr.data[k]; + + let result = pt.detect(ts); + + let result_segments = result + .iter() + .map(|(p, q)| Segment { + from: *p, + to: *q, + id: Some(utils::get_random_str(ID_LENGTH)), + segment_type: SegmentType::Detection, + }) + .collect(); // TODO: run detections // TODO: convert detections to segments - Ok(Vec::new()) + Ok(result_segments) } pub async fn get_threshold_detections( @@ -69,7 +141,7 @@ impl AnalyticService { from: u64, to: u64, step: u64, - threashold: f64 + threashold: f64, ) -> anyhow::Result> { let prom = self.metric_service.get_prom(); let mr = prom.query(from, to, step).await?; diff --git a/server/src/services/analytic_service/pattern_detector.rs b/server/src/services/analytic_service/pattern_detector.rs index 6e2feb2..35bc44a 100644 --- a/server/src/services/analytic_service/pattern_detector.rs +++ b/server/src/services/analytic_service/pattern_detector.rs @@ -1,20 +1,22 @@ +use std::{thread, time}; +use tokio::time::{sleep, Duration}; -struct LearningResults { - backet_size: usize +#[derive(Clone)] +pub struct LearningResults { + backet_size: usize, } +#[derive(Clone)] pub struct PatternDetector { - learning_results: Option + learning_results: LearningResults, } impl PatternDetector { - pub fn new() -> PatternDetector { - PatternDetector{ - learning_results: None - } + pub fn new(learning_results: LearningResults) -> PatternDetector { + PatternDetector { learning_results } } - pub fn learn(&mut self, reads: &Vec>) { + pub async fn learn(reads: &Vec>) -> LearningResults { // TODO: implement let mut min_size = usize::MAX; let mut max_size = 0usize; @@ -23,13 +25,18 @@ impl PatternDetector { max_size = max_size.max(r.len()); } - self.learning_results = Some(LearningResults{ - backet_size: (min_size + max_size) / 2 - }); + let ten_millis = time::Duration::from_millis(1000); + thread::sleep(ten_millis); + + sleep(Duration::from_millis(1000)).await; + + LearningResults { + backet_size: (min_size + max_size) / 2, + } } pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> { // fill backet return Vec::new(); } -} \ No newline at end of file +}