Browse Source

segment_service

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
819b0df63a
  1. 25
      server/src/analytic_unit.rs
  2. 6
      server/src/api.rs
  3. 22
      server/src/api/segments.rs
  4. 2
      server/src/main.rs
  5. 2
      server/src/services.rs
  6. 80
      server/src/services/segments_service.rs

25
server/src/analytic_unit.rs

@ -1,22 +1,18 @@
use hastic::config::Config;
use hastic::services::data_service::Segment;
use hastic::services::metric_service::MetricService;
use anyhow; use anyhow;
use hastic::services::metric_service::MetricService;
use hastic::services::segments_service::Segment;
use hastic::{config::Config, services::segments_service::SegmentType};
use subbeat::metric::{Metric, MetricResult}; use subbeat::metric::{Metric, MetricResult};
struct AnalyticUnit { struct AnalyticUnit {
metric_service: MetricService metric_service: MetricService,
} }
impl AnalyticUnit { impl AnalyticUnit {
fn new(config: &Config) -> AnalyticUnit { fn new(config: &Config) -> AnalyticUnit {
AnalyticUnit{ AnalyticUnit {
metric_service: MetricService::new( metric_service: MetricService::new(&config.prom_url, &config.query),
&config.prom_url,
&config.query
)
} }
} }
@ -38,7 +34,12 @@ impl AnalyticUnit {
} }
} else { } else {
if from.is_some() { if from.is_some() {
result.push(Segment{ id: None, from: from.unwrap(), to: *t }); result.push(Segment {
id: None,
from: from.unwrap(),
to: *t,
segment_type: SegmentType::Detection,
});
from = None; from = None;
} }
} }
@ -46,4 +47,4 @@ impl AnalyticUnit {
Ok(result) Ok(result)
} }
} }

6
server/src/api.rs

@ -1,5 +1,5 @@
use hastic::config::Config; use hastic::config::Config;
use hastic::services::{data_service, metric_service, user_service}; use hastic::services::{metric_service, segments_service, user_service};
use warp::http::HeaderValue; use warp::http::HeaderValue;
use warp::hyper::{Body, StatusCode}; use warp::hyper::{Body, StatusCode};
use warp::reject::Reject; use warp::reject::Reject;
@ -29,7 +29,7 @@ pub struct API<'a> {
config: &'a Config, config: &'a Config,
user_service: Arc<RwLock<user_service::UserService>>, user_service: Arc<RwLock<user_service::UserService>>,
metric_service: Arc<RwLock<metric_service::MetricService>>, metric_service: Arc<RwLock<metric_service::MetricService>>,
data_service: Arc<RwLock<data_service::DataService>>, data_service: Arc<RwLock<segments_service::SegmentsService>>,
} }
impl API<'_> { impl API<'_> {
@ -41,7 +41,7 @@ impl API<'_> {
&config.prom_url, &config.prom_url,
&config.query, &config.query,
))), ))),
data_service: Arc::new(RwLock::new(data_service::DataService::new()?)), data_service: Arc::new(RwLock::new(segments_service::SegmentsService::new()?)),
}) })
} }

22
server/src/api/segments.rs

@ -7,9 +7,10 @@ pub mod filters {
pub fn filters( pub fn filters(
db: Db, db: Db,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
list(db.clone()).or(create(db.clone())) list(db.clone())
// .or(update(db.clone())) .or(create(db.clone()))
.or(delete(db.clone())) // .or(update(db.clone()))
.or(delete(db.clone()))
} }
/// GET /segments?from=3&to=5 /// GET /segments?from=3&to=5
@ -51,8 +52,8 @@ pub mod filters {
} }
mod handlers { mod handlers {
use hastic::services::data_service; use hastic::services::segments_service;
use hastic::services::data_service::Segment; use hastic::services::segments_service::Segment;
use super::models::{CreateResponse, Db, ListOptions}; use super::models::{CreateResponse, Db, ListOptions};
use crate::api; use crate::api;
@ -67,7 +68,7 @@ mod handlers {
} }
pub async fn create( pub async fn create(
segment: data_service::Segment, segment: segments_service::Segment,
db: Db, db: Db,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
match db.write().insert_segment(&segment) { match db.write().insert_segment(&segment) {
@ -81,20 +82,21 @@ mod handlers {
pub async fn delete(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> { pub async fn delete(opts: ListOptions, db: Db) -> Result<impl warp::Reply, warp::Rejection> {
match db.read().delete_segments_in_range(opts.from, opts.to) { match db.read().delete_segments_in_range(opts.from, opts.to) {
Ok(count) => Ok(API::json(&api::Message{ message: count.to_string() })), Ok(count) => Ok(API::json(&api::Message {
message: count.to_string(),
})),
Err(e) => Err(warp::reject::custom(BadQuery)), Err(e) => Err(warp::reject::custom(BadQuery)),
} }
} }
} }
mod models { mod models {
use hastic::services::data_service::{self, SegmentId}; use hastic::services::segments_service::{self, SegmentId};
use parking_lot::RwLock; use parking_lot::RwLock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
pub type Db = Arc<RwLock<data_service::DataService>>; pub type Db = Arc<RwLock<segments_service::SegmentsService>>;
// The query parameters for list_todos. // The query parameters for list_todos.
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]

2
server/src/main.rs

@ -1,7 +1,7 @@
use anyhow; use anyhow;
mod api;
mod analytic_unit; mod analytic_unit;
mod api;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {

2
server/src/services.rs

@ -1,3 +1,3 @@
pub mod data_service;
pub mod metric_service; pub mod metric_service;
pub mod segments_service;
pub mod user_service; pub mod user_service;

80
server/src/services/data_service.rs → server/src/services/segments_service.rs

@ -1,4 +1,4 @@
use rusqlite::{params, Connection, ToSql}; use rusqlite::{params, Connection, Row, ToSql};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -9,33 +9,62 @@ use std::iter::repeat_with;
const ID_LENGTH: usize = 20; const ID_LENGTH: usize = 20;
pub type SegmentId = String; pub type SegmentId = String;
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub enum SegmentType {
Label = 1,
Detection = 2,
}
impl SegmentType {
fn from(u: u64) -> SegmentType {
if u == 1 {
SegmentType::Label
} else {
SegmentType::Detection
}
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Segment { pub struct Segment {
pub id: Option<SegmentId>, pub id: Option<SegmentId>,
pub from: u64, pub from: u64,
pub to: u64, pub to: u64,
pub segment_type: SegmentType,
}
impl Segment {
fn from(row: &Row) -> anyhow::Result<Segment, rusqlite::Error> {
Ok(Segment {
id: row.get(0)?,
from: row.get(1)?,
to: row.get(2)?,
segment_type: SegmentType::from(row.get(3)?),
})
}
} }
// TODO: find a way to remove this unsafe // TODO: find a way to remove this unsafe
unsafe impl Sync for DataService {} unsafe impl Sync for SegmentsService {}
pub struct DataService { pub struct SegmentsService {
connection: Arc<Mutex<Connection>>, connection: Arc<Mutex<Connection>>,
} }
impl DataService { impl SegmentsService {
pub fn new() -> anyhow::Result<DataService> { pub fn new() -> anyhow::Result<SegmentsService> {
let conn = Connection::open("./data.db")?; let conn = Connection::open("./segments.db")?;
conn.execute( conn.execute(
"CREATE TABLE IF NOT EXISTS segment ( "CREATE TABLE IF NOT EXISTS segment (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
start INTEGER NOT NULL, start INTEGER NOT NULL,
end INTEGER NOT NULL end INTEGER NOT NULL,
segment_type INTEGER NOT NULL
)", )",
[], [],
)?; )?;
Ok(DataService { Ok(SegmentsService {
connection: Arc::new(Mutex::new(conn)), connection: Arc::new(Mutex::new(conn)),
}) })
} }
@ -61,29 +90,25 @@ impl DataService {
self.delete_segments(&ids_to_delete)?; self.delete_segments(&ids_to_delete)?;
self.connection.lock().unwrap().execute( self.connection.lock().unwrap().execute(
"INSERT INTO segment (id, start, end) VALUES (?1, ?2, ?3)", "INSERT INTO segment (id, start, end, segment_type) VALUES (?1, ?2, ?3, ?4)",
params![id, min, max], params![id, min, max],
)?; )?;
Ok(Segment { Ok(Segment {
id: Some(id), id: Some(id),
from: min, from: min,
to: max, to: max,
segment_type: segment.segment_type,
}) })
} }
pub fn get_segments_inside(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> { pub fn get_segments_inside(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = let mut stmt = conn.prepare(
conn.prepare("SELECT id, start, end FROM segment WHERE ?1 < start AND end < ?2")?; "SELECT id, start, end, segment_type FROM segment WHERE ?1 < start AND end < ?2",
)?;
let res = stmt let res = stmt
.query_map(params![from, to], |row| { .query_map(params![from, to], Segment::from)?
Ok(Segment {
id: row.get(0)?,
from: row.get(1)?,
to: row.get(2)?,
})
})?
.map(|e| e.unwrap()) .map(|e| e.unwrap())
.collect(); .collect();
Ok(res) Ok(res)
@ -92,21 +117,16 @@ impl DataService {
pub fn get_segments_intersected(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> { pub fn get_segments_intersected(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT id, start, end FROM segment "SELECT id, start, end, segment_type
WHERE (start <= ?1 and ?1 <= end) OR FROM segment
(start <= ?2 AND ?2 <= end) OR WHERE (start <= ?1 and ?1 <= end) OR
(?1 <= start AND start <= ?2) OR (start <= ?2 AND ?2 <= end) OR
(?1 <= end AND end <= ?2) ", (?1 <= start AND start <= ?2) OR
(?1 <= end AND end <= ?2) ",
)?; )?;
let res = stmt let res = stmt
.query_map(params![from, to], |row| { .query_map(params![from, to], Segment::from)?
Ok(Segment {
id: row.get(0)?,
from: row.get(1)?,
to: row.get(2)?,
})
})?
.map(|e| e.unwrap()) .map(|e| e.unwrap())
.collect(); .collect();
Ok(res) Ok(res)
Loading…
Cancel
Save