Browse Source

detections continue

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
ed6818bbea
  1. 60
      server/src/services/analytic_service/analytic_service.rs
  2. 10
      server/src/services/analytic_service/types.rs

60
server/src/services/analytic_service/analytic_service.rs

@ -1,8 +1,4 @@
use super::{
analytic_client::AnalyticClient,
pattern_detector::{self, LearningResults, PatternDetector},
types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType},
};
use super::{analytic_client::AnalyticClient, pattern_detector::{self, LearningResults, PatternDetector}, types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType}};
use crate::services::{
metric_service::MetricService,
@ -14,7 +10,7 @@ use subbeat::metric::Metric;
use anyhow;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use futures::future;
@ -23,6 +19,7 @@ use super::types;
const DETECTION_STEP: u64 = 10;
const LEARNING_WAITING_INTERVAL: u64 = 100;
// TODO: now it's basically single analytic unit, service will opreate many AU
pub struct AnalyticService {
metric_service: MetricService,
@ -33,7 +30,10 @@ pub struct AnalyticService {
rx: mpsc::Receiver<AnalyticServiceMessage>,
// handlers
learning_handler: Option<tokio::task::JoinHandle<()>>
learning_handler: Option<tokio::task::JoinHandle<()>>,
// awaiters
learning_waiter: Vec<DetectionTask>
}
impl AnalyticService {
@ -53,7 +53,10 @@ impl AnalyticService {
rx,
// handlers
learning_handler: None
learning_handler: None,
// awaiters
learning_waiter: Vec::new()
}
}
@ -61,6 +64,10 @@ impl AnalyticService {
AnalyticClient::new(self.tx.clone())
}
fn run_detection_task(task: DetectionTask) {
// TODO: implement
}
fn consume_request(&mut self, req: types::RequestType) -> () {
match req {
RequestType::RunLearning => {
@ -78,6 +85,20 @@ impl AnalyticService {
}
}));
}
RequestType::RunDetection(task) => {
// TODO: move this loop to init closure
// while let status = ac.get_status().await.unwrap() {
// if status == LearningStatus::Learning {
// sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await;
// continue;
// }
// }
if self.learning_status == LearningStatus::Ready {
AnalyticService::run_detection_task(task)
} else {
self.learning_waiter.push(task);
}
},
RequestType::GetStatus(tx) => {
tx.send(self.learning_status.clone()).unwrap();
}
@ -92,6 +113,8 @@ impl AnalyticService {
self.learning_handler = None;
self.learning_results = Some(results);
self.learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter
},
ResponseType::LearningFinishedEmpty => {
self.learning_results = None;
@ -180,25 +203,20 @@ impl AnalyticService {
}
}
async fn get_pattern_detection(tx: mpsc::Sender<AnalyticServiceMessage>, lr: LearningResults, ms: MetricService, from: u64, to: u64) {
async fn get_pattern_detection(tx: oneshot::Sender<Vec<Segment>>, lr: LearningResults, ms: MetricService, from: u64, to: u64) {
// TODO: move this loop to init closure
// while let status = ac.get_status().await.unwrap() {
// if status == LearningStatus::Learning {
// sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await;
// continue;
// }
// }
let prom = ms.get_prom();
let pt = pattern_detector::PatternDetector::new(lr);
let mr = prom.query(from, to, DETECTION_STEP).await.unwrap();
// TODO: uncomment
// if mr.data.keys().len() == 0 {
// return Ok(Vec::new());
// }
if mr.data.keys().len() == 0 {
tx.send(Vec::new());
return;
}
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
@ -218,6 +236,8 @@ impl AnalyticService {
// TODO: run detections
// TODO: convert detections to segments
// Ok(result_segments)
tx.send(result_segments);
}
async fn get_threshold_detections(

10
server/src/services/analytic_service/types.rs

@ -1,3 +1,5 @@
use crate::services::segments_service::Segment;
use super::pattern_detector::LearningResults;
use serde::Serialize;
use tokio::sync::oneshot;
@ -18,9 +20,17 @@ pub enum ResponseType {
LearningFinishedEmpty
}
#[derive(Debug)]
pub struct DetectionTask {
pub sender: oneshot::Sender<LearningStatus>,
pub from: u64,
pub to: u64
}
#[derive(Debug)]
pub enum RequestType {
RunLearning,
RunDetection(DetectionTask),
GetStatus(oneshot::Sender<LearningStatus>),
}

Loading…
Cancel
Save