Browse Source

empty segments case #26

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
2f0b7d7a4f
  1. 2
      client/src/utils.ts
  2. 47
      server/src/api/segments.rs
  3. 50
      server/src/services/analytic_service/analytic_service.rs
  4. 1
      server/src/services/analytic_service/types.rs

2
client/src/utils.ts

@ -4,7 +4,7 @@ export async function *getGenerator<T>(
...args ...args
): AsyncIterableIterator<T> { ): AsyncIterableIterator<T> {
let timeout = async () => new Promise( const timeout = async () => new Promise(
resolve => setTimeout(resolve, duration) resolve => setTimeout(resolve, duration)
); );

47
server/src/api/segments.rs

@ -1,56 +1,58 @@
pub mod filters { pub mod filters {
use super::handlers; use super::handlers;
use super::models::{Db, ListOptions}; use super::models::{Srv, ListOptions};
use hastic::services::analytic_service::analytic_client::AnalyticClient; use hastic::services::analytic_service::analytic_client::AnalyticClient;
use warp::Filter; use warp::Filter;
/// The 4 REST API filters combined. /// The 4 REST API filters combined.
pub fn filters( pub fn filters(
db: Db, db: Srv,
ac: AnalyticClient, ac: AnalyticClient,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
list(db.clone()) list(db.clone())
.or(create(db.clone(), ac)) .or(create(db.clone(), ac.clone()))
// .or(update(db.clone())) // .or(update(db.clone()))
.or(delete(db.clone())) .or(delete(db.clone(), ac.clone()))
} }
/// GET /segments?from=3&to=5 /// GET /segments?from=3&to=5
pub fn list( pub fn list(
db: Db, db: Srv,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("segments") warp::path!("segments")
.and(warp::get()) .and(warp::get())
.and(warp::query::<ListOptions>()) .and(warp::query::<ListOptions>())
.and(with_db(db)) .and(with_srv(db))
.and_then(handlers::list) .and_then(handlers::list)
} }
/// POST /segments with JSON body /// POST /segments with JSON body
pub fn create( pub fn create(
db: Db, db: Srv,
ac: AnalyticClient, ac: AnalyticClient,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("segments") warp::path!("segments")
.and(warp::post()) .and(warp::post())
.and(warp::body::json()) .and(warp::body::json())
.and(with_db(db)) .and(with_srv(db))
.and(warp::any().map(move || ac.clone())) .and(warp::any().map(move || ac.clone()))
.and_then(handlers::create) .and_then(handlers::create)
} }
/// POST /segments with JSON body /// POST /segments with JSON body
pub fn delete( pub fn delete(
db: Db, db: Srv,
ac: AnalyticClient,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("segments") warp::path!("segments")
.and(warp::delete()) .and(warp::delete())
.and(warp::query::<ListOptions>()) .and(warp::query::<ListOptions>())
.and(with_db(db)) .and(with_srv(db))
.and(warp::any().map(move || ac.clone()))
.and_then(handlers::delete) .and_then(handlers::delete)
} }
fn with_db(db: Db) -> impl Filter<Extract = (Db,), Error = std::convert::Infallible> + Clone { fn with_srv(db: Srv) -> impl Filter<Extract = (Srv,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || db.clone()) warp::any().map(move || db.clone())
} }
} }
@ -59,13 +61,13 @@ mod handlers {
use hastic::services::analytic_service::analytic_client::AnalyticClient; use hastic::services::analytic_service::analytic_client::AnalyticClient;
use hastic::services::segments_service; use hastic::services::segments_service;
use super::models::{Db, ListOptions}; use super::models::{Srv, ListOptions};
use crate::api; use crate::api;
use crate::api::BadQuery; use crate::api::BadQuery;
use crate::api::API; use crate::api::API;
pub async fn list(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> { pub async fn list(opts: ListOptions, src: Srv) -> Result<impl warp::Reply, warp::Rejection> {
match db.get_segments_intersected(opts.from, opts.to) { match src.get_segments_intersected(opts.from, opts.to) {
Ok(segments) => Ok(API::json(&segments)), Ok(segments) => Ok(API::json(&segments)),
// TODO: return proper http error // TODO: return proper http error
Err(_e) => Err(warp::reject::custom(BadQuery)), Err(_e) => Err(warp::reject::custom(BadQuery)),
@ -74,10 +76,10 @@ mod handlers {
pub async fn create( pub async fn create(
segment: segments_service::Segment, segment: segments_service::Segment,
db: Db, src: Srv,
ac: AnalyticClient, ac: AnalyticClient,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
match db.insert_segment(&segment) { match src.insert_segment(&segment) {
Ok(segment) => { Ok(segment) => {
ac.run_learning().await.unwrap(); ac.run_learning().await.unwrap();
Ok(API::json(&segment)) Ok(API::json(&segment))
@ -90,11 +92,14 @@ mod handlers {
} }
} }
pub async fn delete(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> { pub async fn delete(opts: ListOptions, db: Srv, ac: AnalyticClient,) -> Result<impl warp::Reply, warp::Rejection> {
match db.delete_segments_in_range(opts.from, opts.to) { match db.delete_segments_in_range(opts.from, opts.to) {
Ok(count) => Ok(API::json(&api::Message { Ok(count) => {
message: count.to_string(), ac.run_learning().await.unwrap();
})), Ok(API::json(&api::Message {
message: count.to_string(),
}))
},
// TODO: return proper http error // TODO: return proper http error
Err(_e) => Err(warp::reject::custom(BadQuery)), Err(_e) => Err(warp::reject::custom(BadQuery)),
} }
@ -105,7 +110,7 @@ mod models {
use hastic::services::segments_service::{self, SegmentId}; use hastic::services::segments_service::{self, SegmentId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub type Db = segments_service::SegmentsService; pub type Srv = segments_service::SegmentsService;
// The query parameters for list_todos. // The query parameters for list_todos.
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]

50
server/src/services/analytic_service/analytic_service.rs

@ -82,6 +82,10 @@ impl AnalyticService {
ResponseType::LearningFinished(results) => { ResponseType::LearningFinished(results) => {
self.learning_results = Some(results); self.learning_results = Some(results);
self.learning_status = LearningStatus::Ready; self.learning_status = LearningStatus::Ready;
},
ResponseType::LearningFinishedEmpty => {
self.learning_results = None;
self.learning_status = LearningStatus::Initialization;
} }
} }
} }
@ -95,7 +99,6 @@ impl AnalyticService {
} }
} }
// call this from api
async fn run_learning( async fn run_learning(
tx: mpsc::Sender<AnalyticServiceMessage>, tx: mpsc::Sender<AnalyticServiceMessage>,
ms: MetricService, ms: MetricService,
@ -117,6 +120,20 @@ impl AnalyticService {
// be careful if decide to store detections in db // be careful if decide to store detections in db
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
if segments.len() == 0 {
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningFinishedEmpty,
))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send learning results"),
}
return;
}
let fs = segments let fs = segments
.iter() .iter()
@ -150,27 +167,32 @@ impl AnalyticService {
} }
} }
async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> { async fn get_pattern_detection(tx: mpsc::Sender<AnalyticServiceMessage>, lr: LearningResults, ms: MetricService, from: u64, to: u64) {
let prom = self.metric_service.get_prom();
// TODO: move this loop to init closure
while self.learning_status == LearningStatus::Learning { // while let status = ac.get_status().await.unwrap() {
sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await; // if status == LearningStatus::Learning {
} // sleep(Duration::from_millis(LEARNING_WAITING_INTERVAL)).await;
// continue;
// }
// }
let lr = self.learning_results.as_ref().unwrap().clone(); let prom = ms.get_prom();
let pt = pattern_detector::PatternDetector::new(lr); let pt = pattern_detector::PatternDetector::new(lr);
let mr = prom.query(from, to, DETECTION_STEP).await?; let mr = prom.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 { // TODO: uncomment
return Ok(Vec::new()); // if mr.data.keys().len() == 0 {
} // return Ok(Vec::new());
// }
let k = mr.data.keys().nth(0).unwrap(); let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k]; let ts = &mr.data[k];
let result = pt.detect(ts); let result = pt.detect(ts);
let result_segments = result let result_segments: Vec<Segment> = result
.iter() .iter()
.map(|(p, q)| Segment { .map(|(p, q)| Segment {
from: *p, from: *p,
@ -182,7 +204,7 @@ impl AnalyticService {
// TODO: run detections // TODO: run detections
// TODO: convert detections to segments // TODO: convert detections to segments
Ok(result_segments) // Ok(result_segments)
} }
async fn get_threshold_detections( async fn get_threshold_detections(

1
server/src/services/analytic_service/types.rs

@ -15,6 +15,7 @@ pub enum LearningStatus {
pub enum ResponseType { pub enum ResponseType {
LearningStarted, LearningStarted,
LearningFinished(LearningResults), LearningFinished(LearningResults),
LearningFinishedEmpty
} }
#[derive(Debug)] #[derive(Debug)]

Loading…
Cancel
Save