From e0cba0326db6e39ab3fc67f67776936db58bdb6f Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 31 Oct 2021 15:36:58 +0300 Subject: [PATCH] learning continue --- .../analytic_service/analytic_client.rs | 8 +- .../analytic_service/analytic_service.rs | 120 ++++++++++-------- .../analytic_service/pattern_detector.rs | 2 +- server/src/services/analytic_service/types.rs | 20 ++- 4 files changed, 85 insertions(+), 65 deletions(-) diff --git a/server/src/services/analytic_service/analytic_client.rs b/server/src/services/analytic_service/analytic_client.rs index bf8b551..bbb7ac7 100644 --- a/server/src/services/analytic_service/analytic_client.rs +++ b/server/src/services/analytic_service/analytic_client.rs @@ -2,22 +2,22 @@ use tokio::sync::mpsc; use crate::services::segments_service::Segment; -use super::types::{AnalyticRequest}; +use super::types::{AnalyticServiceMessage, RequestType}; /// CLient to be used multithreaded /// /// #[derive(Clone)] pub struct AnalyticClient { - tx: mpsc::Sender, + tx: mpsc::Sender, } impl AnalyticClient { - pub fn new(tx: mpsc::Sender) -> AnalyticClient { + pub fn new(tx: mpsc::Sender) -> AnalyticClient { AnalyticClient { tx } } pub async fn run_learning(&self) -> anyhow::Result<()> { - self.tx.send(AnalyticRequest::RunLearning).await?; + self.tx.send(AnalyticServiceMessage::Request(RequestType::RunLearning)).await?; Ok(()) } diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 11cd337..fae0858 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,8 +1,4 @@ -use super::{ - analytic_client::AnalyticClient, - pattern_detector::{self, LearningResults, PatternDetector}, - types::AnalyticRequest, -}; +use super::{analytic_client::AnalyticClient, pattern_detector::{self, LearningResults, PatternDetector}, types::{AnalyticServiceMessage, RequestType, ResponseType}}; use crate::services::{ metric_service::MetricService, @@ -14,7 +10,7 @@ use subbeat::metric::Metric; use anyhow; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc}; use tokio::time::{sleep, Duration}; use futures::future; @@ -34,13 +30,14 @@ enum LearningStatus { Ready, } +// TODO: now it's basically single analytic unit, service will opreate many AU pub struct AnalyticService { metric_service: MetricService, segments_service: SegmentsService, learning_results: Option, learning_status: LearningStatus, - tx: mpsc::Sender, - rx: mpsc::Receiver, + tx: mpsc::Sender, + rx: mpsc::Receiver, } impl AnalyticService { @@ -48,7 +45,7 @@ impl AnalyticService { metric_service: MetricService, segments_service: segments_service::SegmentsService, ) -> AnalyticService { - let (tx, rx) = mpsc::channel::(32); + let (tx, rx) = mpsc::channel::(32); AnalyticService { metric_service, @@ -65,68 +62,79 @@ impl AnalyticService { AnalyticClient::new(self.tx.clone()) } + fn consume_request(&mut self, req: types::RequestType) { + match req { + RequestType::RunLearning => tokio::spawn({ + self.learning_status = LearningStatus::Starting; + let tx = self.tx.clone(); + let ms = self.metric_service.clone(); + let ss = self.segments_service.clone(); + async move { + AnalyticService::run_learning(tx, ms, ss) + }}) + }; + } + + fn consume_response(&mut self, res: types::ResponseType) { + match res { + // TODO: handle when learning panic + ResponseType::LearningStarted => self.learning_status = LearningStatus::Learning, + ResponseType::LearningFinished(results) => { + self.learning_results = Some(results) + } + } + } + pub async fn serve(&mut self) { - while let Some(request) = self.rx.recv().await { - match request { - types::AnalyticRequest::RunLearning => { - // TODO: not block and do logic when it's finished - self.run_learning().await; - } + while let Some(message) = self.rx.recv().await { + match message { + AnalyticServiceMessage::Request(req) => self.consume_request(req), + AnalyticServiceMessage::Response(res) => self.consume_response(res) } } } // call this from api - async fn run_learning(&mut self) { - self.learning_status = LearningStatus::Starting; - println!("Learning starting"); - let prom = self.metric_service.get_prom(); - let ss = self.segments_service.clone(); + async fn run_learning(tx: mpsc::Sender, ms: MetricService, ss : SegmentsService) { - let (tx, rx) = oneshot::channel(); + match tx.send(AnalyticServiceMessage::Response(ResponseType::LearningStarted)).await { + Ok(_) => println!("Learning starting"), + Err(_e) => println!("Fail to send notification about learning start") + } - tokio::spawn(async move { - // TODO: logic for returning error + let prom = ms.get_prom(); - // be careful if decide to store detections in db - let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); + // TODO: logic for returning error - let fs = segments - .iter() - .map(|s| prom.query(s.from, s.to, DETECTION_STEP)); - let rs = future::join_all(fs).await; + // be careful if decide to store detections in db + let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); - // TODO: run this on label adding - // TODO: save learning results in cache - let mut learn_tss = Vec::new(); - for r in rs { - let mr = r.unwrap(); - if mr.data.keys().len() == 0 { - continue; - } - let k = mr.data.keys().nth(0).unwrap(); - let ts = &mr.data[k]; - // TODO: maybe not clone - learn_tss.push(ts.clone()); + let fs = segments + .iter() + .map(|s| prom.query(s.from, s.to, DETECTION_STEP)); + let rs = future::join_all(fs).await; + + // TODO: run this on label adding + // TODO: save learning results in cache + let mut learn_tss = Vec::new(); + for r in rs { + let mr = r.unwrap(); + if mr.data.keys().len() == 0 { + continue; } + let k = mr.data.keys().nth(0).unwrap(); + let ts = &mr.data[k]; + // TODO: maybe not clone + learn_tss.push(ts.clone()); + } - let lr = PatternDetector::learn(&learn_tss).await; - - if let Err(_) = tx.send(lr) { - println!("Error: receive of learning results dropped"); - } - }); + let lr = PatternDetector::learn(&learn_tss).await; - match rx.await { - Ok(lr) => { - self.learning_results = Some(lr); - self.learning_status = LearningStatus::Ready; - } - Err(_) => { - self.learning_status = LearningStatus::Error; - println!("learning dropped") - } + match tx.send(AnalyticServiceMessage::Response(ResponseType::LearningFinished(lr))).await { + Ok(_) => println!("Learning resuls sent"), + Err(_e) => println!("Fail to send learning results") } + } async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result> { diff --git a/server/src/services/analytic_service/pattern_detector.rs b/server/src/services/analytic_service/pattern_detector.rs index 35bc44a..f5d03ee 100644 --- a/server/src/services/analytic_service/pattern_detector.rs +++ b/server/src/services/analytic_service/pattern_detector.rs @@ -1,7 +1,7 @@ use std::{thread, time}; use tokio::time::{sleep, Duration}; -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct LearningResults { backet_size: usize, } diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 5e33db3..4da167b 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -1,8 +1,20 @@ -use tokio::sync::oneshot; +use super::pattern_detector::LearningResults; -#[derive(Debug, PartialEq)] -pub enum AnalyticRequest { +#[derive(Debug)] +pub enum ResponseType { + LearningStarted, + LearningFinished(LearningResults) +} + +#[derive(Debug)] +pub enum RequestType { + RunLearning +} + +#[derive(Debug)] +pub enum AnalyticServiceMessage { // Status, - RunLearning, + Request(RequestType), + Response(ResponseType) // Detect { from: u64, to: u64 }, }