From 975e9770ce07979717c98f3771d149374dd0d7ef Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 1 Nov 2021 18:25:01 +0300 Subject: [PATCH] send detection task + format code --- server/src/api/segments.rs | 16 +++-- .../analytic_service/analytic_client.rs | 12 +++- .../analytic_service/analytic_service.rs | 68 ++++++++++--------- .../analytic_service/pattern_detector.rs | 1 + server/src/services/analytic_service/types.rs | 4 +- 5 files changed, 61 insertions(+), 40 deletions(-) diff --git a/server/src/api/segments.rs b/server/src/api/segments.rs index 116bdfa..dd66cb4 100644 --- a/server/src/api/segments.rs +++ b/server/src/api/segments.rs @@ -1,6 +1,6 @@ pub mod filters { use super::handlers; - use super::models::{Srv, ListOptions}; + use super::models::{ListOptions, Srv}; use hastic::services::analytic_service::analytic_client::AnalyticClient; use warp::Filter; @@ -52,7 +52,9 @@ pub mod filters { .and_then(handlers::delete) } - fn with_srv(db: Srv) -> impl Filter + Clone { + fn with_srv( + db: Srv, + ) -> impl Filter + Clone { warp::any().map(move || db.clone()) } } @@ -61,7 +63,7 @@ mod handlers { use hastic::services::analytic_service::analytic_client::AnalyticClient; use hastic::services::segments_service; - use super::models::{Srv, ListOptions}; + use super::models::{ListOptions, Srv}; use crate::api; use crate::api::BadQuery; use crate::api::API; @@ -92,14 +94,18 @@ mod handlers { } } - pub async fn delete(opts: ListOptions, db: Srv, ac: AnalyticClient,) -> Result { + pub async fn delete( + opts: ListOptions, + db: Srv, + ac: AnalyticClient, + ) -> Result { match db.delete_segments_in_range(opts.from, opts.to) { Ok(count) => { ac.run_learning().await.unwrap(); Ok(API::json(&api::Message { message: count.to_string(), })) - }, + } // TODO: return proper http error Err(_e) => Err(warp::reject::custom(BadQuery)), } diff --git a/server/src/services/analytic_service/analytic_client.rs b/server/src/services/analytic_service/analytic_client.rs index 0ab7836..c918458 100644 --- a/server/src/services/analytic_service/analytic_client.rs +++ b/server/src/services/analytic_service/analytic_client.rs @@ -3,6 +3,7 @@ use tokio::sync::oneshot; use crate::services::segments_service::Segment; +use super::types::DetectionTask; use super::types::LearningStatus; use super::types::{AnalyticServiceMessage, RequestType}; @@ -33,6 +34,15 @@ impl AnalyticClient { } pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { - return Ok(Vec::new()); + let (tx, rx) = oneshot::channel(); + let req = AnalyticServiceMessage::Request(RequestType::RunDetection(DetectionTask { + sender: tx, + from, + to, + })); + self.tx.send(req).await?; + // TODO: handle second error + let r = rx.await??; + return Ok(r); } } diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 2a3a8f1..4ec3495 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,4 +1,9 @@ -use super::{analytic_client::AnalyticClient, pattern_detector::{self, LearningResults, PatternDetector}, types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType}}; +use super::{ + analytic_client::AnalyticClient, + pattern_detector::{self, LearningResults, PatternDetector}, + types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType}, +}; +use super::types; use crate::services::{ metric_service::MetricService, @@ -14,13 +19,12 @@ use tokio::sync::{mpsc, oneshot}; use futures::future; -use super::types; -const DETECTION_STEP: u64 = 10; -const LEARNING_WAITING_INTERVAL: u64 = 100; +// TODO: get this from pattern detector +const DETECTION_STEP: u64 = 10; -// TODO: now it's basically single analytic unit, service will opreate many AU +// TODO: now it's basically single analytic unit, service will operate on many AU pub struct AnalyticService { metric_service: MetricService, segments_service: SegmentsService, @@ -33,8 +37,7 @@ pub struct AnalyticService { learning_handler: Option>, // awaiters - learning_waiters: Vec - + learning_waiters: Vec, } impl AnalyticService { @@ -57,7 +60,7 @@ impl AnalyticService { learning_handler: None, // awaiters - learning_waiters: Vec::new() + learning_waiters: Vec::new(), } } @@ -71,10 +74,10 @@ impl AnalyticService { let lr = self.learning_results.as_ref().unwrap().clone(); let ms = self.metric_service.clone(); async move { - AnalyticService::get_pattern_detection( - task.sender, lr, ms, task.from, task.to - ).await; - }}); + AnalyticService::get_pattern_detection(task.sender, lr, ms, task.from, task.to) + .await; + } + }); } fn consume_request(&mut self, req: types::RequestType) -> () { @@ -95,19 +98,12 @@ impl AnalyticService { })); } RequestType::RunDetection(task) => { - // TODO: move this loop to init closure - // while let status = ac.get_status().await.unwrap() { - // if status == LearningStatus::Learning { - // sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await; - // continue; - // } - // } if self.learning_status == LearningStatus::Ready { self.run_detection_task(task); } else { self.learning_waiters.push(task); } - }, + } RequestType::GetStatus(tx) => { tx.send(self.learning_status.clone()).unwrap(); } @@ -128,7 +124,7 @@ impl AnalyticService { let task = self.learning_waiters.pop().unwrap(); self.run_detection_task(task); } - }, + } ResponseType::LearningFinishedEmpty => { self.learning_results = None; self.learning_status = LearningStatus::Initialization; @@ -169,7 +165,7 @@ impl AnalyticService { // be careful if decide to store detections in db let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); - + if segments.len() == 0 { match tx .send(AnalyticServiceMessage::Response( @@ -216,18 +212,24 @@ impl AnalyticService { } } - async fn get_pattern_detection(tx: oneshot::Sender>>, lr: LearningResults, ms: MetricService, from: u64, to: u64) { - + async fn get_pattern_detection( + tx: oneshot::Sender>>, + lr: LearningResults, + ms: MetricService, + from: u64, + to: u64, + ) { let prom = ms.get_prom(); - + let pt = pattern_detector::PatternDetector::new(lr); let mr = prom.query(from, to, DETECTION_STEP).await.unwrap(); - if mr.data.keys().len() == 0 { match tx.send(Ok(Vec::new())) { - Ok(_) => {}, - Err(_e) => { println!("failed to send empty results"); } + Ok(_) => {} + Err(_e) => { + println!("failed to send empty results"); + } } return; } @@ -250,15 +252,17 @@ impl AnalyticService { // TODO: run detections // TODO: convert detections to segments // Ok(result_segments) - + match tx.send(Ok(result_segments)) { - Ok(_) => {}, - Err(_e) => { println!("failed to send results"); } + Ok(_) => {} + Err(_e) => { + println!("failed to send results"); + } } return; - } + // TODO: move this to another analytic unit async fn get_threshold_detections( &self, from: u64, diff --git a/server/src/services/analytic_service/pattern_detector.rs b/server/src/services/analytic_service/pattern_detector.rs index f5d03ee..d34b605 100644 --- a/server/src/services/analytic_service/pattern_detector.rs +++ b/server/src/services/analytic_service/pattern_detector.rs @@ -11,6 +11,7 @@ pub struct PatternDetector { learning_results: LearningResults, } +// TODO: move this to loginc of analytic unit impl PatternDetector { pub fn new(learning_results: LearningResults) -> PatternDetector { PatternDetector { learning_results } diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 9608485..54d0828 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -18,14 +18,14 @@ pub enum LearningStatus { pub enum ResponseType { LearningStarted, LearningFinished(LearningResults), - LearningFinishedEmpty + LearningFinishedEmpty, } #[derive(Debug)] pub struct DetectionTask { pub sender: oneshot::Sender>>, pub from: u64, - pub to: u64 + pub to: u64, } #[derive(Debug)]