diff --git a/client/src/utils.ts b/client/src/utils.ts index 72aa441..483371f 100644 --- a/client/src/utils.ts +++ b/client/src/utils.ts @@ -4,7 +4,7 @@ export async function *getGenerator( ...args ): AsyncIterableIterator { - let timeout = async () => new Promise( + const timeout = async () => new Promise( resolve => setTimeout(resolve, duration) ); diff --git a/server/src/api/segments.rs b/server/src/api/segments.rs index f7bf696..116bdfa 100644 --- a/server/src/api/segments.rs +++ b/server/src/api/segments.rs @@ -1,56 +1,58 @@ pub mod filters { use super::handlers; - use super::models::{Db, ListOptions}; + use super::models::{Srv, ListOptions}; use hastic::services::analytic_service::analytic_client::AnalyticClient; use warp::Filter; /// The 4 REST API filters combined. pub fn filters( - db: Db, + db: Srv, ac: AnalyticClient, ) -> impl Filter + Clone { list(db.clone()) - .or(create(db.clone(), ac)) + .or(create(db.clone(), ac.clone())) // .or(update(db.clone())) - .or(delete(db.clone())) + .or(delete(db.clone(), ac.clone())) } /// GET /segments?from=3&to=5 pub fn list( - db: Db, + db: Srv, ) -> impl Filter + Clone { warp::path!("segments") .and(warp::get()) .and(warp::query::()) - .and(with_db(db)) + .and(with_srv(db)) .and_then(handlers::list) } /// POST /segments with JSON body pub fn create( - db: Db, + db: Srv, ac: AnalyticClient, ) -> impl Filter + Clone { warp::path!("segments") .and(warp::post()) .and(warp::body::json()) - .and(with_db(db)) + .and(with_srv(db)) .and(warp::any().map(move || ac.clone())) .and_then(handlers::create) } /// POST /segments with JSON body pub fn delete( - db: Db, + db: Srv, + ac: AnalyticClient, ) -> impl Filter + Clone { warp::path!("segments") .and(warp::delete()) .and(warp::query::()) - .and(with_db(db)) + .and(with_srv(db)) + .and(warp::any().map(move || ac.clone())) .and_then(handlers::delete) } - fn with_db(db: Db) -> impl Filter + Clone { + fn with_srv(db: Srv) -> impl Filter + Clone { warp::any().map(move || db.clone()) } } @@ -59,13 +61,13 @@ mod handlers { use hastic::services::analytic_service::analytic_client::AnalyticClient; use hastic::services::segments_service; - use super::models::{Db, ListOptions}; + use super::models::{Srv, ListOptions}; use crate::api; use crate::api::BadQuery; use crate::api::API; - pub async fn list(opts: ListOptions, db: Db) -> Result { - match db.get_segments_intersected(opts.from, opts.to) { + pub async fn list(opts: ListOptions, src: Srv) -> Result { + match src.get_segments_intersected(opts.from, opts.to) { Ok(segments) => Ok(API::json(&segments)), // TODO: return proper http error Err(_e) => Err(warp::reject::custom(BadQuery)), @@ -74,10 +76,10 @@ mod handlers { pub async fn create( segment: segments_service::Segment, - db: Db, + src: Srv, ac: AnalyticClient, ) -> Result { - match db.insert_segment(&segment) { + match src.insert_segment(&segment) { Ok(segment) => { ac.run_learning().await.unwrap(); Ok(API::json(&segment)) @@ -90,11 +92,14 @@ mod handlers { } } - pub async fn delete(opts: ListOptions, db: Db) -> Result { + pub async fn delete(opts: ListOptions, db: Srv, ac: AnalyticClient,) -> Result { match db.delete_segments_in_range(opts.from, opts.to) { - Ok(count) => Ok(API::json(&api::Message { - message: count.to_string(), - })), + Ok(count) => { + ac.run_learning().await.unwrap(); + Ok(API::json(&api::Message { + message: count.to_string(), + })) + }, // TODO: return proper http error Err(_e) => Err(warp::reject::custom(BadQuery)), } @@ -105,7 +110,7 @@ mod models { use hastic::services::segments_service::{self, SegmentId}; use serde::{Deserialize, Serialize}; - pub type Db = segments_service::SegmentsService; + pub type Srv = segments_service::SegmentsService; // The query parameters for list_todos. #[derive(Debug, Deserialize)] diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index e06cde5..ccb1b5d 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -82,6 +82,10 @@ impl AnalyticService { ResponseType::LearningFinished(results) => { self.learning_results = Some(results); self.learning_status = LearningStatus::Ready; + }, + ResponseType::LearningFinishedEmpty => { + self.learning_results = None; + self.learning_status = LearningStatus::Initialization; } } } @@ -95,7 +99,6 @@ impl AnalyticService { } } - // call this from api async fn run_learning( tx: mpsc::Sender, ms: MetricService, @@ -117,6 +120,20 @@ impl AnalyticService { // be careful if decide to store detections in db let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); + + if segments.len() == 0 { + match tx + .send(AnalyticServiceMessage::Response( + ResponseType::LearningFinishedEmpty, + )) + .await + { + Ok(_) => {} + Err(_e) => println!("Fail to send learning results"), + } + + return; + } let fs = segments .iter() @@ -150,27 +167,32 @@ impl AnalyticService { } } - async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { - let prom = self.metric_service.get_prom(); - - while self.learning_status == LearningStatus::Learning { - sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await; - } + async fn get_pattern_detection(tx: mpsc::Sender, lr: LearningResults, ms: MetricService, from: u64, to: u64) { + + // TODO: move this loop to init closure + // while let status = ac.get_status().await.unwrap() { + // if status == LearningStatus::Learning { + // sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await; + // continue; + // } + // } - let lr = self.learning_results.as_ref().unwrap().clone(); + let prom = ms.get_prom(); + let pt = pattern_detector::PatternDetector::new(lr); - let mr = prom.query(from, to, DETECTION_STEP).await?; + let mr = prom.query(from, to, DETECTION_STEP).await.unwrap(); - if mr.data.keys().len() == 0 { - return Ok(Vec::new()); - } + // TODO: uncomment + // if mr.data.keys().len() == 0 { + // return Ok(Vec::new()); + // } let k = mr.data.keys().nth(0).unwrap(); let ts = &mr.data[k]; let result = pt.detect(ts); - let result_segments = result + let result_segments: Vec = result .iter() .map(|(p, q)| Segment { from: *p, @@ -182,7 +204,7 @@ impl AnalyticService { // TODO: run detections // TODO: convert detections to segments - Ok(result_segments) + // Ok(result_segments) } async fn get_threshold_detections( diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 72fbbf8..1a4600b 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -15,6 +15,7 @@ pub enum LearningStatus { pub enum ResponseType { LearningStarted, LearningFinished(LearningResults), + LearningFinishedEmpty } #[derive(Debug)]