Browse Source

run_detection_task

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
f124aa4717
  1. 29
      server/src/services/analytic_service/analytic_service.rs
  2. 5
      server/src/services/analytic_service/types.rs

29
server/src/services/analytic_service/analytic_service.rs

@ -33,7 +33,8 @@ pub struct AnalyticService {
learning_handler: Option<tokio::task::JoinHandle<()>>, learning_handler: Option<tokio::task::JoinHandle<()>>,
// awaiters // awaiters
learning_waiter: Vec<DetectionTask> learning_waiters: Vec<DetectionTask>
} }
impl AnalyticService { impl AnalyticService {
@ -56,7 +57,7 @@ impl AnalyticService {
learning_handler: None, learning_handler: None,
// awaiters // awaiters
learning_waiter: Vec::new() learning_waiters: Vec::new()
} }
} }
@ -64,8 +65,16 @@ impl AnalyticService {
AnalyticClient::new(self.tx.clone()) AnalyticClient::new(self.tx.clone())
} }
fn run_detection_task(task: DetectionTask) { fn run_detection_task(&self, task: DetectionTask) {
// TODO: implement // TODO: save handler of the task
tokio::spawn({
let lr = self.learning_results.unwrap().clone();
let ms = self.metric_service.clone();
async move {
AnalyticService::get_pattern_detection(
task.sender, lr, ms, task.from, task.to
);
}});
} }
fn consume_request(&mut self, req: types::RequestType) -> () { fn consume_request(&mut self, req: types::RequestType) -> () {
@ -94,9 +103,9 @@ impl AnalyticService {
// } // }
// } // }
if self.learning_status == LearningStatus::Ready { if self.learning_status == LearningStatus::Ready {
AnalyticService::run_detection_task(task) AnalyticService::run_detection_task(task);
} else { } else {
self.learning_waiter.push(task); self.learning_waiters.push(task);
} }
}, },
RequestType::GetStatus(tx) => { RequestType::GetStatus(tx) => {
@ -115,6 +124,10 @@ impl AnalyticService {
self.learning_status = LearningStatus::Ready; self.learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter // TODO: run tasks from self.learning_waiter
while self.learning_waiters.len() > 0 {
let task = self.learning_waiters.pop().unwrap();
AnalyticService::run_detection_task(task);
}
}, },
ResponseType::LearningFinishedEmpty => { ResponseType::LearningFinishedEmpty => {
self.learning_results = None; self.learning_results = None;
@ -203,10 +216,8 @@ impl AnalyticService {
} }
} }
async fn get_pattern_detection(tx: oneshot::Sender<Vec<Segment>>, lr: LearningResults, ms: MetricService, from: u64, to: u64) { async fn get_pattern_detection(tx: oneshot::Sender<anyhow::Result<Vec<Segment>>>, lr: LearningResults, ms: MetricService, from: u64, to: u64) {
let prom = ms.get_prom(); let prom = ms.get_prom();
let pt = pattern_detector::PatternDetector::new(lr); let pt = pattern_detector::PatternDetector::new(lr);

5
server/src/services/analytic_service/types.rs

@ -1,6 +1,7 @@
use crate::services::segments_service::Segment; use crate::services::segments_service::Segment;
use super::pattern_detector::LearningResults; use super::pattern_detector::LearningResults;
use anyhow::Result;
use serde::Serialize; use serde::Serialize;
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -22,8 +23,8 @@ pub enum ResponseType {
#[derive(Debug)] #[derive(Debug)]
pub struct DetectionTask { pub struct DetectionTask {
pub sender: oneshot::Sender<LearningStatus>, pub sender: oneshot::Sender<Result<Vec<Segment>>>,
pub from: u64, pub from: u64,
pub to: u64 pub to: u64
} }

Loading…
Cancel
Save