diff --git a/server/src/api.rs b/server/src/api.rs index 2fe6839..3918cff 100644 --- a/server/src/api.rs +++ b/server/src/api.rs @@ -1,4 +1,5 @@ use hastic::config::Config; +use hastic::services::analytic_service::AnalyticService; use hastic::services::{metric_service, segments_service, user_service}; use warp::http::HeaderValue; use warp::hyper::{Body, StatusCode}; @@ -9,6 +10,7 @@ use warp::{http::Response, Filter}; mod auth; mod metric; mod segments; +mod analytics; use serde::Serialize; @@ -30,6 +32,7 @@ pub struct API<'a> { user_service: Arc>, metric_service: Arc>, data_service: Arc>, + analytic_service: AnalyticService } impl API<'_> { @@ -42,6 +45,7 @@ impl API<'_> { &config.query, ))), data_service: Arc::new(RwLock::new(segments_service::SegmentsService::new()?)), + analytic_service: AnalyticService::new(config) }) } @@ -77,12 +81,13 @@ impl API<'_> { let metrics = metric::get_route(self.metric_service.clone()); let login = auth::get_route(self.user_service.clone()); let segments = segments::filters::filters(self.data_service.clone()); + let analytics = analytics::filters::filters(self.analytic_service.clone()); let public = warp::fs::dir("public"); println!("Start server on {} port", self.config.port); // TODO: move it to "server" let routes = warp::path("api") - .and(login.or(metrics).or(segments).or(options)) + .and(login.or(metrics).or(segments).or(analytics).or(options)) .or(public) .or(not_found); warp::serve(routes) diff --git a/server/src/api/analytics.rs b/server/src/api/analytics.rs new file mode 100644 index 0000000..c723b1f --- /dev/null +++ b/server/src/api/analytics.rs @@ -0,0 +1,60 @@ +pub mod filters { + use super::handlers; + use super::models::{Srv, ListOptions}; + use warp::Filter; + + /// The 4 REST API filters combined. + pub fn filters( + srv: Srv, + ) -> impl Filter + Clone { + list(srv.clone()) + // .or(create(db.clone())) + // // .or(update(db.clone())) + // .or(delete(db.clone())) + } + + /// GET /analytics?from=3&to=5 + pub fn list( + db: Srv, + ) -> impl Filter + Clone { + warp::path!("analytics") + .and(warp::get()) + .and(warp::query::()) + .and(with_srv(db)) + .and_then(handlers::list) + } + + fn with_srv(srv: Srv) -> impl Filter + Clone { + warp::any().map(move || srv.clone()) + } +} + +mod handlers { + + use super::models::{ListOptions, Srv}; + use crate::api::{API, BadQuery}; + + pub async fn list(opts: ListOptions, srv: Srv) -> Result { + match srv.get_detections(opts.from, opts.to, 10).await { + Ok(segments) => Ok(API::json(&segments)), + Err(e) => Err(warp::reject::custom(BadQuery)), + } + } +} + +mod models { + use hastic::services::analytic_service; + use serde::{Deserialize, Serialize}; + + // use parking_lot::RwLock; + // use std::sync::Arc; + + pub type Srv = analytic_service::AnalyticService; + + // The query parameters for list_todos. + #[derive(Debug, Deserialize)] + pub struct ListOptions { + pub from: u64, + pub to: u64, + } +} \ No newline at end of file diff --git a/server/src/api/metric.rs b/server/src/api/metric.rs index 550ccef..c729439 100644 --- a/server/src/api/metric.rs +++ b/server/src/api/metric.rs @@ -17,8 +17,6 @@ use crate::api::{self, API}; use parking_lot::RwLock; use std::collections::HashMap; -use std::error::Error; -use std::fmt::Debug; use std::sync::Arc; use super::BadQuery; @@ -54,7 +52,6 @@ async fn query( p: HashMap, ms: Arc>, ) -> Result { - //Err(warp::reject::custom(BadQuery)); match get_query(p, ms).await { Ok(res) => Ok(API::json(&res)), // TODO: parse different error types diff --git a/server/src/services/analytic_service.rs b/server/src/services/analytic_service.rs index 1706ddc..8c01cda 100644 --- a/server/src/services/analytic_service.rs +++ b/server/src/services/analytic_service.rs @@ -7,21 +7,25 @@ use subbeat::metric::Metric; use anyhow; - -struct AnalyticService { +#[derive(Clone)] +pub struct AnalyticService { metric_service: MetricService, } impl AnalyticService { - fn new(config: &Config) -> AnalyticService { + pub fn new(config: &Config) -> AnalyticService { AnalyticService { metric_service: MetricService::new(&config.prom_url, &config.query), } } - pub async fn get_detections(&self, from: u64, to: u64) -> anyhow::Result> { + pub async fn get_detections(&self, from: u64, to: u64, step: u64) -> anyhow::Result> { let prom = self.metric_service.get_prom(); - let mr = prom.query(from, to, 10).await?; + let mr = prom.query(from, to, step).await?; + + if mr.data.keys().len() == 0 { + return Ok(Vec::new()); + } let key = mr.data.keys().nth(0).unwrap(); let ts = &mr.data[key]; @@ -29,7 +33,7 @@ impl AnalyticService { let mut result = Vec::::new(); let mut from: Option = None; for (t, v) in ts { - if *v > 100.0 { + if *v > 10_000.0 { if from.is_some() { continue; } else { @@ -48,6 +52,17 @@ impl AnalyticService { } } + if from.is_some() { + result.push(Segment { + id: None, + from: from.unwrap(), + to, + segment_type: SegmentType::Detection, + }); + } + + // TODO: decide what to do it from is Some() in the end + Ok(result) } } diff --git a/server/src/services/metric_service.rs b/server/src/services/metric_service.rs index f4b9765..be5dc68 100644 --- a/server/src/services/metric_service.rs +++ b/server/src/services/metric_service.rs @@ -1,5 +1,6 @@ use subbeat::datasources::prometheus::Prometheus; +#[derive(Clone)] pub struct MetricService { url: String, query: String, diff --git a/server/src/services/segments_service.rs b/server/src/services/segments_service.rs index e413ac7..6bb258e 100644 --- a/server/src/services/segments_service.rs +++ b/server/src/services/segments_service.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex}; use std::iter::repeat_with; + const ID_LENGTH: usize = 20; pub type SegmentId = String; @@ -55,8 +56,6 @@ impl Segment { } } -// TODO: find a way to remove this unsafe -unsafe impl Sync for SegmentsService {} pub struct SegmentsService { connection: Arc>,