Browse Source

analytic learning (build is broken)

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
d7bf7f2a18
  1. 9
      server/src/api.rs
  2. 8
      server/src/api/analytics.rs
  3. 1
      server/src/api/metric.rs
  4. 134
      server/src/services/analytic_service.rs
  5. 31
      server/src/services/analytic_service/pattern_detector.rs

9
server/src/api.rs

@ -32,23 +32,20 @@ pub struct API<'a> {
user_service: Arc<RwLock<user_service::UserService>>,
metric_service: metric_service::MetricService,
data_service: segments_service::SegmentsService,
analytic_service: AnalyticService,
analytic_service: Arc<RwLock<AnalyticService>>,
}
impl API<'_> {
pub fn new(config: &Config) -> anyhow::Result<API<'_>> {
let ss = segments_service::SegmentsService::new()?;
let ms = metric_service::MetricService::new(
&config.prom_url,
&config.query,
);
let ms = metric_service::MetricService::new(&config.prom_url, &config.query);
Ok(API {
config: config,
user_service: Arc::new(RwLock::new(user_service::UserService::new())),
metric_service: ms.clone(),
data_service: ss.clone(),
analytic_service: AnalyticService::new(ms, ss),
analytic_service: Arc::new(RwLock::new(AnalyticService::new(ms, ss)))
})
}

8
server/src/api/analytics.rs

@ -8,6 +8,7 @@ pub mod filters {
srv: Srv,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
list(srv.clone())
// TODO: /status endpoint
// .or(create(db.clone()))
// // .or(update(db.clone()))
// .or(delete(db.clone()))
@ -38,7 +39,7 @@ mod handlers {
pub async fn list(opts: ListOptions, srv: Srv) -> Result<impl warp::Reply, warp::Rejection> {
// match srv.get_threshold_detections(opts.from, opts.to, 10, 100_000.).await {
match srv.get_pattern_detection(opts.from, opts.to).await {
match srv.read().get_pattern_detection(opts.from, opts.to).await {
Ok(segments) => Ok(API::json(&segments)),
Err(e) => {
println!("{:?}", e);
@ -49,13 +50,16 @@ mod handlers {
}
mod models {
use std::sync::Arc;
use hastic::services::analytic_service;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
// use parking_lot::RwLock;
// use std::sync::Arc;
pub type Srv = analytic_service::AnalyticService;
pub type Srv = Arc<RwLock<analytic_service::AnalyticService>>;
// The query parameters for list_todos.
#[derive(Debug, Deserialize)]

1
server/src/api/metric.rs

@ -17,7 +17,6 @@ use crate::api::{self, API};
use std::collections::HashMap;
use super::BadQuery;
#[derive(Serialize)]

134
server/src/services/analytic_service.rs

@ -1,7 +1,11 @@
use crate::utils::{self, get_random_str};
use crate::{utils::get_random_str};
use self::pattern_detector::{LearningResults, PatternDetector};
use super::{metric_service::MetricService, segments_service::{self, ID_LENGTH, Segment, SegmentType, SegmentsService}};
use super::{
metric_service::MetricService,
segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH},
};
use subbeat::metric::Metric;
@ -10,58 +14,126 @@ use anyhow;
mod pattern_detector;
use futures::future;
use tokio::sync::oneshot;
use tokio::time::{sleep, Duration};
const DETECTION_STEP: u64 = 10;
const LEARNING_WAITING_INTERVAL: u64 = 100;
#[derive(Clone, PartialEq)]
enum LearningStatus {
Initialization,
Learning,
Error,
Ready,
}
#[derive(Clone)]
pub struct AnalyticService {
metric_service: MetricService,
segments_service: SegmentsService
segments_service: SegmentsService,
learning_results: Option<LearningResults>,
learning_status: LearningStatus,
}
impl AnalyticService {
pub fn new(metric_service: MetricService, segments_service: segments_service::SegmentsService) -> AnalyticService {
pub fn new(
metric_service: MetricService,
segments_service: segments_service::SegmentsService,
) -> AnalyticService {
AnalyticService {
metric_service,
segments_service
segments_service,
// TODO: get it from persistance
learning_results: None,
learning_status: LearningStatus::Initialization,
}
}
pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
// call this from api
pub async fn run_learning(&mut self) {
let prom = self.metric_service.get_prom();
let ss = self.segments_service.clone();
let innter_step = 10u64;
let segments = self.segments_service.get_segments_inside(0, u64::MAX / 2)?;
let (tx, rx) = oneshot::channel();
let prom = self.metric_service.get_prom();
let fs = segments.iter().map(|s| prom.query(s.from, s.to, innter_step));
let rs = future::join_all(fs).await;
let mut pt = pattern_detector::PatternDetector::new();
// TODO: run this on label adding
// TODO: save learning results in cache
let mut learn_tss = Vec::new();
for r in rs {
let mr = r.unwrap();
if mr.data.keys().len() == 0 {
continue;
tokio::spawn(async move {
// TODO: logic for returning error
// be careful if decide to store detections in db
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
let fs = segments
.iter()
.map(|s| prom.query(s.from, s.to, DETECTION_STEP));
let rs = future::join_all(fs).await;
// TODO: run this on label adding
// TODO: save learning results in cache
let mut learn_tss = Vec::new();
for r in rs {
let mr = r.unwrap();
if mr.data.keys().len() == 0 {
continue;
}
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
// TODO: maybe not clone
learn_tss.push(ts.clone());
}
let lr = PatternDetector::learn(&learn_tss).await;
if let Err(_) = tx.send(lr) {
println!("Error: receive of learning results dropped");
}
});
match rx.await {
Ok(lr) => {
self.learning_results = Some(lr);
self.learning_status = LearningStatus::Ready;
}
Err(_) => {
self.learning_status = LearningStatus::Error;
println!("learning dropped")
}
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
// TODO: maybe not clone
learn_tss.push(ts.clone());
}
}
pt.learn(&learn_tss);
pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
let prom = self.metric_service.get_prom();
let ts = prom.query(from, to, innter_step).await?;
while self.learning_status == LearningStatus::Learning {
sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await;
}
let lr = self.learning_results.as_ref().unwrap().clone();
let pt = pattern_detector::PatternDetector::new(lr);
let mr = prom.query(from, to, DETECTION_STEP).await?;
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
// pt.detect(ts);
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
let result = pt.detect(ts);
let result_segments = result
.iter()
.map(|(p, q)| Segment {
from: *p,
to: *q,
id: Some(utils::get_random_str(ID_LENGTH)),
segment_type: SegmentType::Detection,
})
.collect();
// TODO: run detections
// TODO: convert detections to segments
Ok(Vec::new())
Ok(result_segments)
}
pub async fn get_threshold_detections(
@ -69,7 +141,7 @@ impl AnalyticService {
from: u64,
to: u64,
step: u64,
threashold: f64
threashold: f64,
) -> anyhow::Result<Vec<Segment>> {
let prom = self.metric_service.get_prom();
let mr = prom.query(from, to, step).await?;

31
server/src/services/analytic_service/pattern_detector.rs

@ -1,20 +1,22 @@
use std::{thread, time};
use tokio::time::{sleep, Duration};
struct LearningResults {
backet_size: usize
#[derive(Clone)]
pub struct LearningResults {
backet_size: usize,
}
#[derive(Clone)]
pub struct PatternDetector {
learning_results: Option<LearningResults>
learning_results: LearningResults,
}
impl PatternDetector {
pub fn new() -> PatternDetector {
PatternDetector{
learning_results: None
}
pub fn new(learning_results: LearningResults) -> PatternDetector {
PatternDetector { learning_results }
}
pub fn learn(&mut self, reads: &Vec<Vec<(u64, f64)>>) {
pub async fn learn(reads: &Vec<Vec<(u64, f64)>>) -> LearningResults {
// TODO: implement
let mut min_size = usize::MAX;
let mut max_size = 0usize;
@ -23,13 +25,18 @@ impl PatternDetector {
max_size = max_size.max(r.len());
}
self.learning_results = Some(LearningResults{
backet_size: (min_size + max_size) / 2
});
let ten_millis = time::Duration::from_millis(1000);
thread::sleep(ten_millis);
sleep(Duration::from_millis(1000)).await;
LearningResults {
backet_size: (min_size + max_size) / 2,
}
}
pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> {
// fill backet
return Vec::new();
}
}
}

Loading…
Cancel
Save