Browse Source

learning continue

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
e0cba0326d
  1. 8
      server/src/services/analytic_service/analytic_client.rs
  2. 120
      server/src/services/analytic_service/analytic_service.rs
  3. 2
      server/src/services/analytic_service/pattern_detector.rs
  4. 20
      server/src/services/analytic_service/types.rs

8
server/src/services/analytic_service/analytic_client.rs

@ -2,22 +2,22 @@ use tokio::sync::mpsc;
use crate::services::segments_service::Segment; use crate::services::segments_service::Segment;
use super::types::{AnalyticRequest}; use super::types::{AnalyticServiceMessage, RequestType};
/// CLient to be used multithreaded /// CLient to be used multithreaded
/// ///
/// ///
#[derive(Clone)] #[derive(Clone)]
pub struct AnalyticClient { pub struct AnalyticClient {
tx: mpsc::Sender<AnalyticRequest>, tx: mpsc::Sender<AnalyticServiceMessage>,
} }
impl AnalyticClient { impl AnalyticClient {
pub fn new(tx: mpsc::Sender<AnalyticRequest>) -> AnalyticClient { pub fn new(tx: mpsc::Sender<AnalyticServiceMessage>) -> AnalyticClient {
AnalyticClient { tx } AnalyticClient { tx }
} }
pub async fn run_learning(&self) -> anyhow::Result<()> { pub async fn run_learning(&self) -> anyhow::Result<()> {
self.tx.send(AnalyticRequest::RunLearning).await?; self.tx.send(AnalyticServiceMessage::Request(RequestType::RunLearning)).await?;
Ok(()) Ok(())
} }

120
server/src/services/analytic_service/analytic_service.rs

@ -1,8 +1,4 @@
use super::{ use super::{analytic_client::AnalyticClient, pattern_detector::{self, LearningResults, PatternDetector}, types::{AnalyticServiceMessage, RequestType, ResponseType}};
analytic_client::AnalyticClient,
pattern_detector::{self, LearningResults, PatternDetector},
types::AnalyticRequest,
};
use crate::services::{ use crate::services::{
metric_service::MetricService, metric_service::MetricService,
@ -14,7 +10,7 @@ use subbeat::metric::Metric;
use anyhow; use anyhow;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use futures::future; use futures::future;
@ -34,13 +30,14 @@ enum LearningStatus {
Ready, Ready,
} }
// TODO: now it's basically single analytic unit, service will opreate many AU
pub struct AnalyticService { pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
segments_service: SegmentsService, segments_service: SegmentsService,
learning_results: Option<LearningResults>, learning_results: Option<LearningResults>,
learning_status: LearningStatus, learning_status: LearningStatus,
tx: mpsc::Sender<AnalyticRequest>, tx: mpsc::Sender<AnalyticServiceMessage>,
rx: mpsc::Receiver<AnalyticRequest>, rx: mpsc::Receiver<AnalyticServiceMessage>,
} }
impl AnalyticService { impl AnalyticService {
@ -48,7 +45,7 @@ impl AnalyticService {
metric_service: MetricService, metric_service: MetricService,
segments_service: segments_service::SegmentsService, segments_service: segments_service::SegmentsService,
) -> AnalyticService { ) -> AnalyticService {
let (tx, rx) = mpsc::channel::<AnalyticRequest>(32); let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
AnalyticService { AnalyticService {
metric_service, metric_service,
@ -65,68 +62,79 @@ impl AnalyticService {
AnalyticClient::new(self.tx.clone()) AnalyticClient::new(self.tx.clone())
} }
fn consume_request(&mut self, req: types::RequestType) {
match req {
RequestType::RunLearning => tokio::spawn({
self.learning_status = LearningStatus::Starting;
let tx = self.tx.clone();
let ms = self.metric_service.clone();
let ss = self.segments_service.clone();
async move {
AnalyticService::run_learning(tx, ms, ss)
}})
};
}
fn consume_response(&mut self, res: types::ResponseType) {
match res {
// TODO: handle when learning panic
ResponseType::LearningStarted => self.learning_status = LearningStatus::Learning,
ResponseType::LearningFinished(results) => {
self.learning_results = Some(results)
}
}
}
pub async fn serve(&mut self) { pub async fn serve(&mut self) {
while let Some(request) = self.rx.recv().await { while let Some(message) = self.rx.recv().await {
match request { match message {
types::AnalyticRequest::RunLearning => { AnalyticServiceMessage::Request(req) => self.consume_request(req),
// TODO: not block and do logic when it's finished AnalyticServiceMessage::Response(res) => self.consume_response(res)
self.run_learning().await;
}
} }
} }
} }
// call this from api // call this from api
async fn run_learning(&mut self) { async fn run_learning(tx: mpsc::Sender<AnalyticServiceMessage>, ms: MetricService, ss : SegmentsService) {
self.learning_status = LearningStatus::Starting;
println!("Learning starting");
let prom = self.metric_service.get_prom();
let ss = self.segments_service.clone();
let (tx, rx) = oneshot::channel(); match tx.send(AnalyticServiceMessage::Response(ResponseType::LearningStarted)).await {
Ok(_) => println!("Learning starting"),
Err(_e) => println!("Fail to send notification about learning start")
}
tokio::spawn(async move { let prom = ms.get_prom();
// TODO: logic for returning error
// be careful if decide to store detections in db // TODO: logic for returning error
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
let fs = segments // be careful if decide to store detections in db
.iter() let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
.map(|s| prom.query(s.from, s.to, DETECTION_STEP));
let rs = future::join_all(fs).await;
// TODO: run this on label adding let fs = segments
// TODO: save learning results in cache .iter()
let mut learn_tss = Vec::new(); .map(|s| prom.query(s.from, s.to, DETECTION_STEP));
for r in rs { let rs = future::join_all(fs).await;
let mr = r.unwrap();
if mr.data.keys().len() == 0 { // TODO: run this on label adding
continue; // TODO: save learning results in cache
} let mut learn_tss = Vec::new();
let k = mr.data.keys().nth(0).unwrap(); for r in rs {
let ts = &mr.data[k]; let mr = r.unwrap();
// TODO: maybe not clone if mr.data.keys().len() == 0 {
learn_tss.push(ts.clone()); continue;
} }
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
// TODO: maybe not clone
learn_tss.push(ts.clone());
}
let lr = PatternDetector::learn(&learn_tss).await; let lr = PatternDetector::learn(&learn_tss).await;
if let Err(_) = tx.send(lr) {
println!("Error: receive of learning results dropped");
}
});
match rx.await { match tx.send(AnalyticServiceMessage::Response(ResponseType::LearningFinished(lr))).await {
Ok(lr) => { Ok(_) => println!("Learning resuls sent"),
self.learning_results = Some(lr); Err(_e) => println!("Fail to send learning results")
self.learning_status = LearningStatus::Ready;
}
Err(_) => {
self.learning_status = LearningStatus::Error;
println!("learning dropped")
}
} }
} }
async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> { async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {

2
server/src/services/analytic_service/pattern_detector.rs

@ -1,7 +1,7 @@
use std::{thread, time}; use std::{thread, time};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
#[derive(Clone)] #[derive(Debug, Clone)]
pub struct LearningResults { pub struct LearningResults {
backet_size: usize, backet_size: usize,
} }

20
server/src/services/analytic_service/types.rs

@ -1,8 +1,20 @@
use tokio::sync::oneshot; use super::pattern_detector::LearningResults;
#[derive(Debug, PartialEq)] #[derive(Debug)]
pub enum AnalyticRequest { pub enum ResponseType {
LearningStarted,
LearningFinished(LearningResults)
}
#[derive(Debug)]
pub enum RequestType {
RunLearning
}
#[derive(Debug)]
pub enum AnalyticServiceMessage {
// Status, // Status,
RunLearning, Request(RequestType),
Response(ResponseType)
// Detect { from: u64, to: u64 }, // Detect { from: u64, to: u64 },
} }

Loading…
Cancel
Save