From ea18309ff54b351b31f6dd5bb713f83a6d32ea14 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Wed, 10 Nov 2021 21:34:48 +0300 Subject: [PATCH] basic hsr --- server/src/api/analytics.rs | 28 ++++++++++++++-- server/src/api/segments.rs | 33 +++++++++---------- .../analytic_service/analytic_service.rs | 26 ++++++++++----- .../analytic_unit/anomaly_analytic_unit.rs | 2 -- .../analytic_unit/pattern_analytic_unit.rs | 6 +++- .../analytic_service/analytic_unit/types.rs | 12 +++++-- server/src/services/analytic_service/types.rs | 2 +- 7 files changed, 76 insertions(+), 33 deletions(-) diff --git a/server/src/api/analytics.rs b/server/src/api/analytics.rs index ec09851..64c199b 100644 --- a/server/src/api/analytics.rs +++ b/server/src/api/analytics.rs @@ -11,6 +11,7 @@ pub mod filters { .or(status(client.clone())) .or(get_config(client.clone())) .or(put_config(client.clone())) + .or(get_hsr(client.clone())) // .or(list_train(client.clone())) // .or(create(db.clone())) // // .or(update(db.clone())) @@ -69,6 +70,17 @@ pub mod filters { // .and_then(handlers::list_train) // } + /// GET /analytics/hsr + pub fn get_hsr( + client: Client, + ) -> impl Filter + Clone { + warp::path!("analytics" / "hsr") + .and(warp::get()) + .and(warp::query::()) + .and(with_client(client)) + .and_then(handlers::get_hsr) + } + fn with_client( client: Client, ) -> impl Filter + Clone { @@ -79,7 +91,6 @@ pub mod filters { mod handlers { use hastic::services::analytic_service::analytic_unit::types::PatchConfig; - use serde_json::Value; use super::models::{Client, ListOptions, Status}; use crate::api::{BadQuery, API}; @@ -132,6 +143,20 @@ mod handlers { } } + pub async fn get_hsr( + lo: ListOptions, + client: Client, + ) -> Result { + // println!("{:?}", patch); + match client.get_hsr(lo.from, lo.to).await { + Ok(hsr) => Ok(API::json(&hsr)), + Err(e) => { + println!("{:?}", e); + Err(warp::reject::custom(BadQuery)) + } + } + } + // pub async fn list_train(client: Client) -> Result { // match client.get_train().await { // Ok(lt) => Ok(API::json(<)), @@ -149,7 +174,6 @@ mod models { pub type Client = analytic_service::analytic_client::AnalyticClient; - // The query parameters for list_todos. #[derive(Debug, Deserialize)] pub struct ListOptions { pub from: u64, diff --git a/server/src/api/segments.rs b/server/src/api/segments.rs index dd66cb4..27bd421 100644 --- a/server/src/api/segments.rs +++ b/server/src/api/segments.rs @@ -6,56 +6,56 @@ pub mod filters { /// The 4 REST API filters combined. pub fn filters( - db: Srv, + srv: Srv, ac: AnalyticClient, ) -> impl Filter + Clone { - list(db.clone()) - .or(create(db.clone(), ac.clone())) + list(srv.clone()) + .or(create(srv.clone(), ac.clone())) // .or(update(db.clone())) - .or(delete(db.clone(), ac.clone())) + .or(delete(srv.clone(), ac.clone())) } /// GET /segments?from=3&to=5 pub fn list( - db: Srv, + srv: Srv, ) -> impl Filter + Clone { warp::path!("segments") .and(warp::get()) .and(warp::query::()) - .and(with_srv(db)) + .and(with_srv(srv)) .and_then(handlers::list) } /// POST /segments with JSON body pub fn create( - db: Srv, + srv: Srv, ac: AnalyticClient, ) -> impl Filter + Clone { warp::path!("segments") .and(warp::post()) .and(warp::body::json()) - .and(with_srv(db)) + .and(with_srv(srv)) .and(warp::any().map(move || ac.clone())) .and_then(handlers::create) } /// POST /segments with JSON body pub fn delete( - db: Srv, + srv: Srv, ac: AnalyticClient, ) -> impl Filter + Clone { warp::path!("segments") .and(warp::delete()) .and(warp::query::()) - .and(with_srv(db)) + .and(with_srv(srv)) .and(warp::any().map(move || ac.clone())) .and_then(handlers::delete) } fn with_srv( - db: Srv, + srv: Srv, ) -> impl Filter + Clone { - warp::any().map(move || db.clone()) + warp::any().map(move || srv.clone()) } } @@ -78,10 +78,10 @@ mod handlers { pub async fn create( segment: segments_service::Segment, - src: Srv, + srv: Srv, ac: AnalyticClient, ) -> Result { - match src.insert_segment(&segment) { + match srv.insert_segment(&segment) { Ok(segment) => { ac.run_learning().await.unwrap(); Ok(API::json(&segment)) @@ -96,10 +96,10 @@ mod handlers { pub async fn delete( opts: ListOptions, - db: Srv, + srv: Srv, ac: AnalyticClient, ) -> Result { - match db.delete_segments_in_range(opts.from, opts.to) { + match srv.delete_segments_in_range(opts.from, opts.to) { Ok(count) => { ac.run_learning().await.unwrap(); Ok(API::json(&api::Message { @@ -118,7 +118,6 @@ mod models { pub type Srv = segments_service::SegmentsService; - // The query parameters for list_todos. #[derive(Debug, Deserialize)] pub struct ListOptions { pub from: u64, diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index cd7e762..1684a4e 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -19,7 +19,6 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear use anyhow; -use serde_json::Value; use tokio::sync::{mpsc, oneshot, RwLock}; use chrono::Utc; @@ -91,10 +90,14 @@ impl AnalyticService { let au = self.analytic_unit.as_ref().unwrap().clone(); async move { match learning_waiter { - LearningWaiter::Detection(task) => AnalyticService::get_detections(task.sender, au, ms, task.from, task.to).await, - LearningWaiter::HSR(task) => AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await, + LearningWaiter::Detection(task) => { + AnalyticService::get_detections(task.sender, au, ms, task.from, task.to) + .await + } + LearningWaiter::HSR(task) => { + AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await + } } - } }); } @@ -178,8 +181,11 @@ impl AnalyticService { // tx.send(()).unwrap(); } RequestType::GetHSR(task) => { - // self.analytic_unit. - // TODO: implement + if self.analytic_unit.is_some() { + self.run_learning_waiter(LearningWaiter::HSR(task)); + } else { + self.learning_waiters.push(LearningWaiter::HSR(task)); + } } }; } @@ -346,7 +352,12 @@ impl AnalyticService { from: u64, to: u64, ) { - let hsr = analytic_unit.read().await.get_hsr(ms, from, to).await.unwrap(); + let hsr = analytic_unit + .read() + .await + .get_hsr(ms, from, to) + .await + .unwrap(); match tx.send(Ok(hsr)) { Ok(_) => {} @@ -354,6 +365,5 @@ impl AnalyticService { println!("failed to send results"); } } - } } diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index 02d9509..a4ef35b 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -52,7 +52,6 @@ impl AnalyticUnit for AnomalyAnalyticUnit { let ct = ts[0]; - // TODO: implement // TODO: decide what to do it from is Some() in the end @@ -65,7 +64,6 @@ impl AnalyticUnit for AnomalyAnalyticUnit { from: u64, to: u64, ) -> anyhow::Result> { - // TODO: implement let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index d028b2a..ba1c8c5 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -353,7 +353,11 @@ impl AnalyticUnit for PatternAnalyticUnit { let fs = get_features(&vs); let lk = lr.model.lock(); let p = lk.predict(Array::from_vec(fs.to_vec())); - if p { 1 } else { -1 } + if p { + 1 + } else { + -1 + } }; let score = positive_corr * self.config.correlation_score diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 0f63507..8ff178d 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -31,7 +31,10 @@ pub struct AnomalyConfig { impl Default for AnomalyConfig { fn default() -> Self { - AnomalyConfig { alpha: 0.5, confidence: 10.0 } + AnomalyConfig { + alpha: 0.5, + confidence: 10.0, + } } } @@ -128,7 +131,12 @@ pub trait AnalyticUnit { ) -> anyhow::Result>; fn set_config(&mut self, c: AnalyticUnitConfig); - async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result> ; + async fn get_hsr( + &self, + ms: MetricService, + from: u64, + to: u64, + ) -> anyhow::Result>; } #[derive(Deserialize, Serialize, Debug)] diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index fd39126..f048a02 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -72,7 +72,7 @@ pub struct HSRTask { #[derive(Debug)] pub enum LearningWaiter { Detection(DetectionTask), - HSR(HSRTask) + HSR(HSRTask), } #[derive(Debug)]