Browse Source

send detection task + format code

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
975e9770ce
  1. 16
      server/src/api/segments.rs
  2. 12
      server/src/services/analytic_service/analytic_client.rs
  3. 68
      server/src/services/analytic_service/analytic_service.rs
  4. 1
      server/src/services/analytic_service/pattern_detector.rs
  5. 4
      server/src/services/analytic_service/types.rs

16
server/src/api/segments.rs

@ -1,6 +1,6 @@
pub mod filters { pub mod filters {
use super::handlers; use super::handlers;
use super::models::{Srv, ListOptions}; use super::models::{ListOptions, Srv};
use hastic::services::analytic_service::analytic_client::AnalyticClient; use hastic::services::analytic_service::analytic_client::AnalyticClient;
use warp::Filter; use warp::Filter;
@ -52,7 +52,9 @@ pub mod filters {
.and_then(handlers::delete) .and_then(handlers::delete)
} }
fn with_srv(db: Srv) -> impl Filter<Extract = (Srv,), Error = std::convert::Infallible> + Clone { fn with_srv(
db: Srv,
) -> impl Filter<Extract = (Srv,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || db.clone()) warp::any().map(move || db.clone())
} }
} }
@ -61,7 +63,7 @@ mod handlers {
use hastic::services::analytic_service::analytic_client::AnalyticClient; use hastic::services::analytic_service::analytic_client::AnalyticClient;
use hastic::services::segments_service; use hastic::services::segments_service;
use super::models::{Srv, ListOptions}; use super::models::{ListOptions, Srv};
use crate::api; use crate::api;
use crate::api::BadQuery; use crate::api::BadQuery;
use crate::api::API; use crate::api::API;
@ -92,14 +94,18 @@ mod handlers {
} }
} }
pub async fn delete(opts: ListOptions, db: Srv, ac: AnalyticClient,) -> Result<impl warp::Reply, warp::Rejection> { pub async fn delete(
opts: ListOptions,
db: Srv,
ac: AnalyticClient,
) -> Result<impl warp::Reply, warp::Rejection> {
match db.delete_segments_in_range(opts.from, opts.to) { match db.delete_segments_in_range(opts.from, opts.to) {
Ok(count) => { Ok(count) => {
ac.run_learning().await.unwrap(); ac.run_learning().await.unwrap();
Ok(API::json(&api::Message { Ok(API::json(&api::Message {
message: count.to_string(), message: count.to_string(),
})) }))
}, }
// TODO: return proper http error // TODO: return proper http error
Err(_e) => Err(warp::reject::custom(BadQuery)), Err(_e) => Err(warp::reject::custom(BadQuery)),
} }

12
server/src/services/analytic_service/analytic_client.rs

@ -3,6 +3,7 @@ use tokio::sync::oneshot;
use crate::services::segments_service::Segment; use crate::services::segments_service::Segment;
use super::types::DetectionTask;
use super::types::LearningStatus; use super::types::LearningStatus;
use super::types::{AnalyticServiceMessage, RequestType}; use super::types::{AnalyticServiceMessage, RequestType};
@ -33,6 +34,15 @@ impl AnalyticClient {
} }
pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> { pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
return Ok(Vec::new()); let (tx, rx) = oneshot::channel();
let req = AnalyticServiceMessage::Request(RequestType::RunDetection(DetectionTask {
sender: tx,
from,
to,
}));
self.tx.send(req).await?;
// TODO: handle second error
let r = rx.await??;
return Ok(r);
} }
} }

68
server/src/services/analytic_service/analytic_service.rs

@ -1,4 +1,9 @@
use super::{analytic_client::AnalyticClient, pattern_detector::{self, LearningResults, PatternDetector}, types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType}}; use super::{
analytic_client::AnalyticClient,
pattern_detector::{self, LearningResults, PatternDetector},
types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType},
};
use super::types;
use crate::services::{ use crate::services::{
metric_service::MetricService, metric_service::MetricService,
@ -14,13 +19,12 @@ use tokio::sync::{mpsc, oneshot};
use futures::future; use futures::future;
use super::types;
const DETECTION_STEP: u64 = 10;
const LEARNING_WAITING_INTERVAL: u64 = 100;
// TODO: get this from pattern detector
const DETECTION_STEP: u64 = 10;
// TODO: now it's basically single analytic unit, service will opreate many AU // TODO: now it's basically single analytic unit, service will operate on many AU
pub struct AnalyticService { pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
segments_service: SegmentsService, segments_service: SegmentsService,
@ -33,8 +37,7 @@ pub struct AnalyticService {
learning_handler: Option<tokio::task::JoinHandle<()>>, learning_handler: Option<tokio::task::JoinHandle<()>>,
// awaiters // awaiters
learning_waiters: Vec<DetectionTask> learning_waiters: Vec<DetectionTask>,
} }
impl AnalyticService { impl AnalyticService {
@ -57,7 +60,7 @@ impl AnalyticService {
learning_handler: None, learning_handler: None,
// awaiters // awaiters
learning_waiters: Vec::new() learning_waiters: Vec::new(),
} }
} }
@ -71,10 +74,10 @@ impl AnalyticService {
let lr = self.learning_results.as_ref().unwrap().clone(); let lr = self.learning_results.as_ref().unwrap().clone();
let ms = self.metric_service.clone(); let ms = self.metric_service.clone();
async move { async move {
AnalyticService::get_pattern_detection( AnalyticService::get_pattern_detection(task.sender, lr, ms, task.from, task.to)
task.sender, lr, ms, task.from, task.to .await;
).await; }
}}); });
} }
fn consume_request(&mut self, req: types::RequestType) -> () { fn consume_request(&mut self, req: types::RequestType) -> () {
@ -95,19 +98,12 @@ impl AnalyticService {
})); }));
} }
RequestType::RunDetection(task) => { RequestType::RunDetection(task) => {
// TODO: move this loop to init closure
// while let status = ac.get_status().await.unwrap() {
// if status == LearningStatus::Learning {
// sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await;
// continue;
// }
// }
if self.learning_status == LearningStatus::Ready { if self.learning_status == LearningStatus::Ready {
self.run_detection_task(task); self.run_detection_task(task);
} else { } else {
self.learning_waiters.push(task); self.learning_waiters.push(task);
} }
}, }
RequestType::GetStatus(tx) => { RequestType::GetStatus(tx) => {
tx.send(self.learning_status.clone()).unwrap(); tx.send(self.learning_status.clone()).unwrap();
} }
@ -128,7 +124,7 @@ impl AnalyticService {
let task = self.learning_waiters.pop().unwrap(); let task = self.learning_waiters.pop().unwrap();
self.run_detection_task(task); self.run_detection_task(task);
} }
}, }
ResponseType::LearningFinishedEmpty => { ResponseType::LearningFinishedEmpty => {
self.learning_results = None; self.learning_results = None;
self.learning_status = LearningStatus::Initialization; self.learning_status = LearningStatus::Initialization;
@ -169,7 +165,7 @@ impl AnalyticService {
// be careful if decide to store detections in db // be careful if decide to store detections in db
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
if segments.len() == 0 { if segments.len() == 0 {
match tx match tx
.send(AnalyticServiceMessage::Response( .send(AnalyticServiceMessage::Response(
@ -216,18 +212,24 @@ impl AnalyticService {
} }
} }
async fn get_pattern_detection(tx: oneshot::Sender<anyhow::Result<Vec<Segment>>>, lr: LearningResults, ms: MetricService, from: u64, to: u64) { async fn get_pattern_detection(
tx: oneshot::Sender<anyhow::Result<Vec<Segment>>>,
lr: LearningResults,
ms: MetricService,
from: u64,
to: u64,
) {
let prom = ms.get_prom(); let prom = ms.get_prom();
let pt = pattern_detector::PatternDetector::new(lr); let pt = pattern_detector::PatternDetector::new(lr);
let mr = prom.query(from, to, DETECTION_STEP).await.unwrap(); let mr = prom.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 { if mr.data.keys().len() == 0 {
match tx.send(Ok(Vec::new())) { match tx.send(Ok(Vec::new())) {
Ok(_) => {}, Ok(_) => {}
Err(_e) => { println!("failed to send empty results"); } Err(_e) => {
println!("failed to send empty results");
}
} }
return; return;
} }
@ -250,15 +252,17 @@ impl AnalyticService {
// TODO: run detections // TODO: run detections
// TODO: convert detections to segments // TODO: convert detections to segments
// Ok(result_segments) // Ok(result_segments)
match tx.send(Ok(result_segments)) { match tx.send(Ok(result_segments)) {
Ok(_) => {}, Ok(_) => {}
Err(_e) => { println!("failed to send results"); } Err(_e) => {
println!("failed to send results");
}
} }
return; return;
} }
// TODO: move this to another analytic unit
async fn get_threshold_detections( async fn get_threshold_detections(
&self, &self,
from: u64, from: u64,

1
server/src/services/analytic_service/pattern_detector.rs

@ -11,6 +11,7 @@ pub struct PatternDetector {
learning_results: LearningResults, learning_results: LearningResults,
} }
// TODO: move this to loginc of analytic unit
impl PatternDetector { impl PatternDetector {
pub fn new(learning_results: LearningResults) -> PatternDetector { pub fn new(learning_results: LearningResults) -> PatternDetector {
PatternDetector { learning_results } PatternDetector { learning_results }

4
server/src/services/analytic_service/types.rs

@ -18,14 +18,14 @@ pub enum LearningStatus {
pub enum ResponseType { pub enum ResponseType {
LearningStarted, LearningStarted,
LearningFinished(LearningResults), LearningFinished(LearningResults),
LearningFinishedEmpty LearningFinishedEmpty,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct DetectionTask { pub struct DetectionTask {
pub sender: oneshot::Sender<Result<Vec<Segment>>>, pub sender: oneshot::Sender<Result<Vec<Segment>>>,
pub from: u64, pub from: u64,
pub to: u64 pub to: u64,
} }
#[derive(Debug)] #[derive(Debug)]

Loading…
Cancel
Save