diff --git a/server/src/api/analytics.rs b/server/src/api/analytics.rs index dd66658..499dd6f 100644 --- a/server/src/api/analytics.rs +++ b/server/src/api/analytics.rs @@ -1,13 +1,13 @@ pub mod filters { use super::handlers; - use super::models::{ListOptions, Srv}; + use super::models::{Client, ListOptions}; use warp::Filter; /// The 4 REST API filters combined. pub fn filters( - srv: Srv, + client: Client, ) -> impl Filter + Clone { - list(srv.clone()) + list(client.clone()) // TODO: /status endpoint // .or(create(db.clone())) // // .or(update(db.clone())) @@ -16,30 +16,30 @@ pub mod filters { /// GET /analytics?from=3&to=5 pub fn list( - db: Srv, + client: Client, ) -> impl Filter + Clone { warp::path!("analytics") .and(warp::get()) .and(warp::query::()) - .and(with_srv(db)) + .and(with_client(client)) .and_then(handlers::list) } - fn with_srv( - srv: Srv, - ) -> impl Filter + Clone { - warp::any().map(move || srv.clone()) + fn with_client( + client: Client, + ) -> impl Filter + Clone { + warp::any().map(move || client.clone()) } } mod handlers { - use super::models::{ListOptions, Srv}; + use super::models::{Client, ListOptions}; use crate::api::{BadQuery, API}; - pub async fn list(opts: ListOptions, srv: Srv) -> Result { + pub async fn list(opts: ListOptions, srv: Client) -> Result { // match srv.get_threshold_detections(opts.from, opts.to, 10, 100_000.).await { - match srv.read().get_pattern_detection(opts.from, opts.to).await { + match srv.get_pattern_detection(opts.from, opts.to).await { Ok(segments) => Ok(API::json(&segments)), Err(e) => { println!("{:?}", e); @@ -53,13 +53,10 @@ mod models { use std::sync::Arc; use hastic::services::analytic_service; - use parking_lot::RwLock; - use serde::{Deserialize, Serialize}; - // use parking_lot::RwLock; - // use std::sync::Arc; + use serde::{Deserialize, Serialize}; - pub type Srv = Arc>; + pub type Client = analytic_service::analytic_client::AnalyticClient; // The query parameters for list_todos. #[derive(Debug, Deserialize)] diff --git a/server/src/api.rs b/server/src/api/mod.rs similarity index 94% rename from server/src/api.rs rename to server/src/api/mod.rs index b3feaa5..90888e8 100644 --- a/server/src/api.rs +++ b/server/src/api/mod.rs @@ -32,7 +32,8 @@ pub struct API<'a> { user_service: Arc>, metric_service: metric_service::MetricService, data_service: segments_service::SegmentsService, - analytic_service: Arc>, + // TODO: get analytic service as reference + analytic_service: AnalyticService, } impl API<'_> { @@ -45,7 +46,7 @@ impl API<'_> { user_service: Arc::new(RwLock::new(user_service::UserService::new())), metric_service: ms.clone(), data_service: ss.clone(), - analytic_service: Arc::new(RwLock::new(AnalyticService::new(ms, ss))) + analytic_service: AnalyticService::new(ms, ss), }) } @@ -81,7 +82,7 @@ impl API<'_> { let metrics = metric::get_route(self.metric_service.clone()); let login = auth::get_route(self.user_service.clone()); let segments = segments::filters::filters(self.data_service.clone()); - let analytics = analytics::filters::filters(self.analytic_service.clone()); + let analytics = analytics::filters::filters(self.analytic_service.get_client()); let public = warp::fs::dir("public"); println!("Start server on {} port", self.config.port); diff --git a/server/src/services/analytic_service/analytic_client.rs b/server/src/services/analytic_service/analytic_client.rs new file mode 100644 index 0000000..bf8b551 --- /dev/null +++ b/server/src/services/analytic_service/analytic_client.rs @@ -0,0 +1,27 @@ +use tokio::sync::mpsc; + +use crate::services::segments_service::Segment; + +use super::types::{AnalyticRequest}; + +/// CLient to be used multithreaded +/// +/// +#[derive(Clone)] +pub struct AnalyticClient { + tx: mpsc::Sender, +} + +impl AnalyticClient { + pub fn new(tx: mpsc::Sender) -> AnalyticClient { + AnalyticClient { tx } + } + pub async fn run_learning(&self) -> anyhow::Result<()> { + self.tx.send(AnalyticRequest::RunLearning).await?; + Ok(()) + } + + pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { + return Ok(Vec::new()); + } +} diff --git a/server/src/services/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs similarity index 82% rename from server/src/services/analytic_service.rs rename to server/src/services/analytic_service/analytic_service.rs index 912a039..11cd337 100644 --- a/server/src/services/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,21 +1,26 @@ -use crate::utils::{self, get_random_str}; - -use self::pattern_detector::{LearningResults, PatternDetector}; - use super::{ + analytic_client::AnalyticClient, + pattern_detector::{self, LearningResults, PatternDetector}, + types::AnalyticRequest, +}; + +use crate::services::{ metric_service::MetricService, segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH}, }; +use crate::utils::{self, get_random_str}; use subbeat::metric::Metric; use anyhow; -mod pattern_detector; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::{sleep, Duration}; use futures::future; -use tokio::sync::oneshot; -use tokio::time::{sleep, Duration}; + +use super::types; + const DETECTION_STEP: u64 = 10; const LEARNING_WAITING_INTERVAL: u64 = 100; @@ -23,17 +28,19 @@ const LEARNING_WAITING_INTERVAL: u64 = 100; #[derive(Clone, PartialEq)] enum LearningStatus { Initialization, + Starting, Learning, Error, Ready, } -#[derive(Clone)] pub struct AnalyticService { metric_service: MetricService, segments_service: SegmentsService, learning_results: Option, learning_status: LearningStatus, + tx: mpsc::Sender, + rx: mpsc::Receiver, } impl AnalyticService { @@ -41,17 +48,38 @@ impl AnalyticService { metric_service: MetricService, segments_service: segments_service::SegmentsService, ) -> AnalyticService { + let (tx, rx) = mpsc::channel::(32); + AnalyticService { metric_service, segments_service, // TODO: get it from persistance learning_results: None, learning_status: LearningStatus::Initialization, + tx, + rx, + } + } + + pub fn get_client(&self) -> AnalyticClient { + AnalyticClient::new(self.tx.clone()) + } + + pub async fn serve(&mut self) { + while let Some(request) = self.rx.recv().await { + match request { + types::AnalyticRequest::RunLearning => { + // TODO: not block and do logic when it's finished + self.run_learning().await; + } + } } } // call this from api - pub async fn run_learning(&mut self) { + async fn run_learning(&mut self) { + self.learning_status = LearningStatus::Starting; + println!("Learning starting"); let prom = self.metric_service.get_prom(); let ss = self.segments_service.clone(); @@ -101,7 +129,7 @@ impl AnalyticService { } } - pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { + async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { let prom = self.metric_service.get_prom(); while self.learning_status == LearningStatus::Learning { @@ -136,7 +164,7 @@ impl AnalyticService { Ok(result_segments) } - pub async fn get_threshold_detections( + async fn get_threshold_detections( &self, from: u64, to: u64, diff --git a/server/src/services/analytic_service/mod.rs b/server/src/services/analytic_service/mod.rs new file mode 100644 index 0000000..e67b90b --- /dev/null +++ b/server/src/services/analytic_service/mod.rs @@ -0,0 +1,7 @@ +mod analytic_service; +mod pattern_detector; +mod types; + +pub mod analytic_client; + +pub use analytic_service::AnalyticService; diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs new file mode 100644 index 0000000..5e33db3 --- /dev/null +++ b/server/src/services/analytic_service/types.rs @@ -0,0 +1,8 @@ +use tokio::sync::oneshot; + +#[derive(Debug, PartialEq)] +pub enum AnalyticRequest { + // Status, + RunLearning, + // Detect { from: u64, to: u64 }, +} diff --git a/server/src/services.rs b/server/src/services/mod.rs similarity index 100% rename from server/src/services.rs rename to server/src/services/mod.rs