diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index fc885fb..113b1ae 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -33,7 +33,8 @@ pub struct AnalyticService { learning_handler: Option>, // awaiters - learning_waiter: Vec + learning_waiters: Vec + } impl AnalyticService { @@ -56,7 +57,7 @@ impl AnalyticService { learning_handler: None, // awaiters - learning_waiter: Vec::new() + learning_waiters: Vec::new() } } @@ -64,8 +65,16 @@ impl AnalyticService { AnalyticClient::new(self.tx.clone()) } - fn run_detection_task(task: DetectionTask) { - // TODO: implement + fn run_detection_task(&self, task: DetectionTask) { + // TODO: save handler of the task + tokio::spawn({ + let lr = self.learning_results.unwrap().clone(); + let ms = self.metric_service.clone(); + async move { + AnalyticService::get_pattern_detection( + task.sender, lr, ms, task.from, task.to + ); + }}); } fn consume_request(&mut self, req: types::RequestType) -> () { @@ -94,9 +103,9 @@ impl AnalyticService { // } // } if self.learning_status == LearningStatus::Ready { - AnalyticService::run_detection_task(task) + AnalyticService::run_detection_task(task); } else { - self.learning_waiter.push(task); + self.learning_waiters.push(task); } }, RequestType::GetStatus(tx) => { @@ -115,6 +124,10 @@ impl AnalyticService { self.learning_status = LearningStatus::Ready; // TODO: run tasks from self.learning_waiter + while self.learning_waiters.len() > 0 { + let task = self.learning_waiters.pop().unwrap(); + AnalyticService::run_detection_task(task); + } }, ResponseType::LearningFinishedEmpty => { self.learning_results = None; @@ -203,10 +216,8 @@ impl AnalyticService { } } - async fn get_pattern_detection(tx: oneshot::Sender>, lr: LearningResults, ms: MetricService, from: u64, to: u64) { - + async fn get_pattern_detection(tx: oneshot::Sender>>, lr: LearningResults, ms: MetricService, from: u64, to: u64) { - let prom = ms.get_prom(); let pt = pattern_detector::PatternDetector::new(lr); diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index f1398b3..9608485 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -1,6 +1,7 @@ use crate::services::segments_service::Segment; use super::pattern_detector::LearningResults; +use anyhow::Result; use serde::Serialize; use tokio::sync::oneshot; @@ -22,8 +23,8 @@ pub enum ResponseType { #[derive(Debug)] pub struct DetectionTask { - pub sender: oneshot::Sender, - pub from: u64, + pub sender: oneshot::Sender>>, + pub from: u64, pub to: u64 }