Browse Source

rm arc wrapper for segment_service

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
fbff52fea1
  1. 2
      server/src/analytic_unit.rs
  2. 10
      server/src/api.rs
  3. 20
      server/src/api/analytics.rs
  4. 20
      server/src/api/segments.rs
  5. 2
      server/src/services.rs
  6. 3
      server/src/services/README.md
  7. 15
      server/src/services/analytic_service.rs
  8. 8
      server/src/services/segments_service.rs
  9. 3
      server/src/utils.rs

2
server/src/analytic_unit.rs

@ -1,3 +1,3 @@
struct AnalyticUnit { struct AnalyticUnit {
// TODO: fields // TODO: field
} }

10
server/src/api.rs

@ -7,10 +7,10 @@ use warp::reject::Reject;
use warp::{body, options, Rejection, Reply}; use warp::{body, options, Rejection, Reply};
use warp::{http::Response, Filter}; use warp::{http::Response, Filter};
mod analytics;
mod auth; mod auth;
mod metric; mod metric;
mod segments; mod segments;
mod analytics;
use serde::Serialize; use serde::Serialize;
@ -31,8 +31,8 @@ pub struct API<'a> {
config: &'a Config, config: &'a Config,
user_service: Arc<RwLock<user_service::UserService>>, user_service: Arc<RwLock<user_service::UserService>>,
metric_service: Arc<RwLock<metric_service::MetricService>>, metric_service: Arc<RwLock<metric_service::MetricService>>,
data_service: Arc<RwLock<segments_service::SegmentsService>>, data_service: segments_service::SegmentsService,
analytic_service: AnalyticService analytic_service: AnalyticService,
} }
impl API<'_> { impl API<'_> {
@ -44,8 +44,8 @@ impl API<'_> {
&config.prom_url, &config.prom_url,
&config.query, &config.query,
))), ))),
data_service: Arc::new(RwLock::new(segments_service::SegmentsService::new()?)), data_service: segments_service::SegmentsService::new()?,
analytic_service: AnalyticService::new(config) analytic_service: AnalyticService::new(config),
}) })
} }

20
server/src/api/analytics.rs

@ -1,6 +1,6 @@
pub mod filters { pub mod filters {
use super::handlers; use super::handlers;
use super::models::{Srv, ListOptions}; use super::models::{ListOptions, Srv};
use warp::Filter; use warp::Filter;
/// The 4 REST API filters combined. /// The 4 REST API filters combined.
@ -8,11 +8,11 @@ pub mod filters {
srv: Srv, srv: Srv,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
list(srv.clone()) list(srv.clone())
// .or(create(db.clone())) // .or(create(db.clone()))
// // .or(update(db.clone())) // // .or(update(db.clone()))
// .or(delete(db.clone())) // .or(delete(db.clone()))
} }
/// GET /analytics?from=3&to=5 /// GET /analytics?from=3&to=5
pub fn list( pub fn list(
db: Srv, db: Srv,
@ -24,7 +24,9 @@ pub mod filters {
.and_then(handlers::list) .and_then(handlers::list)
} }
fn with_srv(srv: Srv) -> impl Filter<Extract = (Srv,), Error = std::convert::Infallible> + Clone { fn with_srv(
srv: Srv,
) -> impl Filter<Extract = (Srv,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || srv.clone()) warp::any().map(move || srv.clone())
} }
} }
@ -32,8 +34,8 @@ pub mod filters {
mod handlers { mod handlers {
use super::models::{ListOptions, Srv}; use super::models::{ListOptions, Srv};
use crate::api::{API, BadQuery}; use crate::api::{BadQuery, API};
pub async fn list(opts: ListOptions, srv: Srv) -> Result<impl warp::Reply, warp::Rejection> { pub async fn list(opts: ListOptions, srv: Srv) -> Result<impl warp::Reply, warp::Rejection> {
match srv.get_detections(opts.from, opts.to, 10).await { match srv.get_detections(opts.from, opts.to, 10).await {
Ok(segments) => Ok(API::json(&segments)), Ok(segments) => Ok(API::json(&segments)),
@ -57,4 +59,4 @@ mod models {
pub from: u64, pub from: u64,
pub to: u64, pub to: u64,
} }
} }

20
server/src/api/segments.rs

@ -53,17 +53,17 @@ pub mod filters {
mod handlers { mod handlers {
use hastic::services::segments_service; use hastic::services::segments_service;
use hastic::services::segments_service::Segment;
use super::models::{CreateResponse, Db, ListOptions}; use super::models::{Db, ListOptions};
use crate::api; use crate::api;
use crate::api::BadQuery; use crate::api::BadQuery;
use crate::api::API; use crate::api::API;
pub async fn list(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> { pub async fn list(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> {
match db.read().get_segments_intersected(opts.from, opts.to) { match db.get_segments_intersected(opts.from, opts.to) {
Ok(segments) => Ok(API::json(&segments)), Ok(segments) => Ok(API::json(&segments)),
Err(e) => Err(warp::reject::custom(BadQuery)), // TODO: return proper http error
Err(_e) => Err(warp::reject::custom(BadQuery)),
} }
} }
@ -71,32 +71,32 @@ mod handlers {
segment: segments_service::Segment, segment: segments_service::Segment,
db: Db, db: Db,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
match db.write().insert_segment(&segment) { match db.insert_segment(&segment) {
Ok(segment) => Ok(API::json(&segment)), Ok(segment) => Ok(API::json(&segment)),
Err(e) => { Err(e) => {
println!("{:?}", e); println!("{:?}", e);
// TODO: return proper http error
Err(warp::reject::custom(BadQuery)) Err(warp::reject::custom(BadQuery))
} }
} }
} }
pub async fn delete(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> { pub async fn delete(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> {
match db.read().delete_segments_in_range(opts.from, opts.to) { match db.delete_segments_in_range(opts.from, opts.to) {
Ok(count) => Ok(API::json(&api::Message { Ok(count) => Ok(API::json(&api::Message {
message: count.to_string(), message: count.to_string(),
})), })),
Err(e) => Err(warp::reject::custom(BadQuery)), // TODO: return proper http error
Err(_e) => Err(warp::reject::custom(BadQuery)),
} }
} }
} }
mod models { mod models {
use hastic::services::segments_service::{self, SegmentId}; use hastic::services::segments_service::{self, SegmentId};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub type Db = Arc<RwLock<segments_service::SegmentsService>>; pub type Db = segments_service::SegmentsService;
// The query parameters for list_todos. // The query parameters for list_todos.
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]

2
server/src/services.rs

@ -1,4 +1,4 @@
pub mod analytic_service;
pub mod metric_service; pub mod metric_service;
pub mod segments_service; pub mod segments_service;
pub mod user_service; pub mod user_service;
pub mod analytic_service;

3
server/src/services/README.md

@ -0,0 +1,3 @@
## Services
Services tend to be Clonable because it's the way warp works (mulithreaded)

15
server/src/services/analytic_service.rs

@ -1,12 +1,14 @@
use crate::{config::Config, utils::get_random_str}; use crate::{config::Config, utils::get_random_str};
use super::{metric_service::MetricService, segments_service::{ID_LENGTH, Segment, SegmentType}}; use super::{
metric_service::MetricService,
segments_service::{Segment, SegmentType, ID_LENGTH},
};
use subbeat::metric::Metric; use subbeat::metric::Metric;
use anyhow; use anyhow;
#[derive(Clone)] #[derive(Clone)]
pub struct AnalyticService { pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
@ -19,7 +21,12 @@ impl AnalyticService {
} }
} }
pub async fn get_detections(&self, from: u64, to: u64, step: u64) -> anyhow::Result<Vec<Segment>> { pub async fn get_detections(
&self,
from: u64,
to: u64,
step: u64,
) -> anyhow::Result<Vec<Segment>> {
let prom = self.metric_service.get_prom(); let prom = self.metric_service.get_prom();
let mr = prom.query(from, to, step).await?; let mr = prom.query(from, to, step).await?;
@ -33,7 +40,7 @@ impl AnalyticService {
let mut result = Vec::<Segment>::new(); let mut result = Vec::<Segment>::new();
let mut from: Option<u64> = None; let mut from: Option<u64> = None;
for (t, v) in ts { for (t, v) in ts {
if *v > 10_000.0 { if *v > 100_000.0 {
if from.is_some() { if from.is_some() {
continue; continue;
} else { } else {

8
server/src/services/segments_service.rs

@ -1,4 +1,4 @@
use rusqlite::{params, Connection, Row, ToSql}; use rusqlite::{params, Connection, Row};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -6,11 +6,9 @@ use std::sync::{Arc, Mutex};
use crate::utils::get_random_str; use crate::utils::get_random_str;
pub const ID_LENGTH: usize = 20; pub const ID_LENGTH: usize = 20;
pub type SegmentId = String; pub type SegmentId = String;
// TODO: make logic with this enum shorter // TODO: make logic with this enum shorter
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
pub enum SegmentType { pub enum SegmentType {
@ -36,7 +34,6 @@ impl SegmentType {
} }
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Segment { pub struct Segment {
pub id: Option<SegmentId>, pub id: Option<SegmentId>,
@ -56,14 +53,13 @@ impl Segment {
} }
} }
#[derive(Clone)]
pub struct SegmentsService { pub struct SegmentsService {
connection: Arc<Mutex<Connection>>, connection: Arc<Mutex<Connection>>,
} }
impl SegmentsService { impl SegmentsService {
pub fn new() -> anyhow::Result<SegmentsService> { pub fn new() -> anyhow::Result<SegmentsService> {
// TODO: move it to data service // TODO: move it to data service
std::fs::create_dir_all("./data").unwrap(); std::fs::create_dir_all("./data").unwrap();

3
server/src/utils.rs

@ -1,6 +1,5 @@
use std::iter::repeat_with; use std::iter::repeat_with;
pub fn get_random_str(len: usize) -> String { pub fn get_random_str(len: usize) -> String {
return repeat_with(fastrand::alphanumeric).take(len) return repeat_with(fastrand::alphanumeric).take(len).collect();
.collect();
} }

Loading…
Cancel
Save