Compare commits

..

No commits in common. 'main' and 'active-analytic-unit-#66' have entirely different histories.

  1. 27
      Dockerfile
  2. 36
      README.md
  3. 6
      client/src/components/AnlyticsStatus.vue
  4. 1
      client/src/components/pods/pattern_pod.ts
  5. 31
      client/src/services/analytics.service.ts
  6. 17
      client/src/store/index.ts
  7. 88
      client/src/views/Home.vue
  8. 12
      docker-compose.yml
  9. 17
      server/config.example.toml
  10. 56
      server/src/config.rs
  11. 7
      server/src/main.rs
  12. 109
      server/src/services/analytic_service/analytic_service.rs
  13. 5
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  14. 1
      server/src/services/analytic_service/analytic_unit/mod.rs
  15. 4
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  16. 99
      server/src/services/analytic_service/analytic_unit/types.rs
  17. 49
      server/src/services/analytic_service/detection_runner.rs
  18. 1
      server/src/services/analytic_service/types.rs
  19. 121
      server/src/services/analytic_unit_service.rs
  20. 23
      server/src/services/data_service.rs
  21. 7
      server/src/services/metric_service.rs
  22. 3
      server/src/services/mod.rs
  23. 16
      server/src/services/segments_service.rs

27
Dockerfile

@ -1,27 +0,0 @@
FROM rust:1.57.0-bullseye as builder
RUN curl -sL https://deb.nodesource.com/setup_16.x | bash -
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \
nodejs \
gcc \
g++ \
make \
musl-tools \
&& rm -rf /var/lib/apt/lists/*
RUN npm install --global yarn
RUN rustup target add x86_64-unknown-linux-musl
ADD . ./
RUN make
FROM debian:bullseye-slim
COPY --from=builder /release/hastic /hastic
COPY --from=builder /release/config.toml /config.toml
COPY --from=builder /release/public /public
CMD ["./hastic"]

36
README.md

@ -17,42 +17,6 @@ instance for getting metrics.
make make
``` ```
### Configure
Hastic can be configured using config-file or environment variables.
At first, choose which datasource you'll be using: `prometheus` or `influx`. Only one can be used at a time.
#### Config-file
- copy the config example to the release directory:
```bash
cp config.example.toml release/config.toml
```
- edit the config file, e.g. using `nano`:
```bash
nano release/config.toml
```
#### Environment variables
All config fields are also available as environment variables with `HASTIC_` prefix
Variable name structure:
- for high-level fields: `HASTIC_<field_name>`, e.g. `HASTIC_PORT`
- for nested fields: `HASTIC_<category_name>__<field_name>`, e.g. `HASTIC_PROMETHEUS__URL`
Environment variables can be set either by exporting them (they'll be actual until a bash-session is closed):
```bash
export HASTIC_PORT=8000
export HASTIC_PROMETHEUS__URL=http://localhost:9090
export HASTIC_PROMETHEUS__QUERY=rate(go_memstats_alloc_bytes_total[5m])
```
or specifing them in a run command (they'll be actual only for one run):
```bash
HASTIC_PORT=8000 HASTIC_PROMETHEUS__URL=http://localhost:9090 HASTIC_PROMETHEUS__QUERY=rate(go_memstats_alloc_bytes_total[5m]) ./release/hastic
```
### Run
``` ```
cd release cd release
./hastic ./hastic

6
client/src/components/AnlyticsStatus.vue

@ -1,12 +1,10 @@
<template> <template>
<div class="analytic-status"> <div class="analytic-status">
analytic status: <strong> {{ status.message }} </strong> analytic status: <strong> {{ status }} </strong>
</div> </div>
</template> </template>
<script lang="ts"> <script lang="ts">
import { AnalyticStatus } from "@/store";
import { defineComponent } from 'vue'; import { defineComponent } from 'vue';
@ -15,7 +13,7 @@ export default defineComponent({
components: { components: {
}, },
computed: { computed: {
status(): AnalyticStatus { status() {
return this.$store.state.analyticStatus; return this.$store.state.analyticStatus;
} }
} }

1
client/src/components/pods/pattern_pod.ts

@ -79,6 +79,7 @@ export class PatternPod extends HasticPod<UpdateDataCallback> {
} else { } else {
console.log('took from range from default'); console.log('took from range from default');
} }
console.log(from + " ---- " + to);
this.udc({ from, to }) this.udc({ from, to })
.then(resp => { .then(resp => {

31
client/src/services/analytics.service.ts

@ -5,33 +5,20 @@ import axios from 'axios';
import { getGenerator } from '@/utils'; import { getGenerator } from '@/utils';
import { import _ from 'lodash';
import {
AnalyticUnitType, AnlyticUnitConfig, AnalyticUnitType, AnlyticUnitConfig,
PatternConfig, ThresholdConfig, AnomalyConfig PatternConfig, ThresholdConfig, AnomalyConfig
} from "@/types/analytic_units"; } from "@/types/analytic_units";
import { AnomalyHSR } from "@/types"; import { AnomalyHSR } from "@/types";
import { AnalyticStatus } from "@/store";
import _ from 'lodash';
const ANALYTICS_API_URL = API_URL + "analytics/"; const ANALYTICS_API_URL = API_URL + "analytics/";
export async function getStatus(): Promise<AnalyticStatus> { export async function getStatus(): Promise<string> {
const uri = ANALYTICS_API_URL + `status`; const uri = ANALYTICS_API_URL + `status`;
try { const res = await axios.get(uri);
const res = await axios.get<{ status: string }>(uri); const data = res['data'] as any;
const data = res.data; return data.status;
return {
available: true,
message: data.status
};
} catch (e) {
return {
available: false,
message: e.message
};
}
} }
export async function getConfig(): Promise<[AnalyticUnitType, AnlyticUnitConfig]> { export async function getConfig(): Promise<[AnalyticUnitType, AnlyticUnitConfig]> {
@ -67,8 +54,8 @@ export async function patchConfig(patchObj: any) {
await axios.patch(uri, patchObj); await axios.patch(uri, patchObj);
} }
export function getStatusGenerator(): AsyncIterableIterator<AnalyticStatus> { export function getStatusGenerator(): AsyncIterableIterator<string> {
return getGenerator<AnalyticStatus>(100, getStatus); return getGenerator<string>(100, getStatus);
} }
@ -81,6 +68,6 @@ export async function getHSRAnomaly(from: number, to: number): Promise<AnomalyHS
const res = await axios.get(uri); const res = await axios.get(uri);
const values = res["data"]["AnomalyHSR"]; const values = res["data"]["AnomalyHSR"];
return values as AnomalyHSR; return values as AnomalyHSR;
} }

17
client/src/store/index.ts

@ -12,30 +12,23 @@ const _SET_STATUS_GENERATOR = '_SET_STATUS_GENERATOR';
// TODO: consts for actions // TODO: consts for actions
export type AnalyticStatus = {
available: boolean,
message: string,
}
type State = { type State = {
analyticStatus: AnalyticStatus, analyticStatus: string,
analyticUnitType?: AnalyticUnitType, analyticUnitType?: AnalyticUnitType,
analyticUnitConfig?: AnlyticUnitConfig, analyticUnitConfig?: AnlyticUnitConfig,
_statusGenerator: AsyncIterableIterator<AnalyticStatus> _statusGenerator: AsyncIterableIterator<string>
} }
const store = createStore<State>({ const store = createStore<State>({
state: { state: {
analyticStatus: { analyticStatus: 'loading...',
available: false,
message: 'loading...',
},
analyticUnitType: null, analyticUnitType: null,
analyticUnitConfig: null, analyticUnitConfig: null,
_statusGenerator: null _statusGenerator: null
}, },
mutations: { mutations: {
[SET_ANALYTICS_STATUS](state, status: AnalyticStatus) { [SET_ANALYTICS_STATUS](state, status: string) {
state.analyticStatus = status; state.analyticStatus = status;
}, },
[SET_DETECTOR_CONFIG](state, { analyticUnitType, analyticUnitConfig }) { [SET_DETECTOR_CONFIG](state, { analyticUnitType, analyticUnitConfig }) {
@ -45,7 +38,7 @@ const store = createStore<State>({
// [PATCH_CONFIG](state, patchObj) { // [PATCH_CONFIG](state, patchObj) {
// patchConfig(patchConfig) // patchConfig(patchConfig)
// } // }
[_SET_STATUS_GENERATOR](state, generator: AsyncIterableIterator<AnalyticStatus>) { [_SET_STATUS_GENERATOR](state, generator: AsyncIterableIterator<string>) {
state._statusGenerator = generator; state._statusGenerator = generator;
} }
}, },

88
client/src/views/Home.vue

@ -2,53 +2,50 @@
<div class="home"> <div class="home">
<img alt="Vue logo" src="../assets/logo.png"> <img alt="Vue logo" src="../assets/logo.png">
<graph ref="graph" /> <graph ref="graph" />
<analytic-status /> <analytic-status />
<div>
<template v-if="analyticStatus.available"> Analytic unit type:
<div> <select :value="analyticUnitType" @change="changeAnalyticUnitType">
Analytic unit type: <option disabled value="">Please Select</option>
<select :value="analyticUnitType" @change="changeAnalyticUnitType"> <option v-bind:key="option" v-for="option in analyticUnitTypes" :value="option">{{option}}</option>
<option disabled value="">Please Select</option> </select> <br/><br/>
<option v-bind:key="option" v-for="option in analyticUnitTypes" :value="option">{{option}}</option> </div>
</select> <br/><br/> <div id="controls">
<div v-if="analyticUnitType == analyticUnitTypes[0]">
Threshold:
<input :value="analyticUnitConfig.threshold" @change="thresholdChange" /> <br/><br/>
</div> </div>
<div id="controls"> <div v-if="analyticUnitType == analyticUnitTypes[1]">
<div v-if="analyticUnitType == analyticUnitTypes[0]"> Hold <pre>S</pre> to label patterns;
Threshold: Hold <pre>A</pre> to label anti patterns <br/>
<input v-model="analyticUnitConfig.threshold" @change="thresholdChange" /> <br/><br/> Hold <pre>D</pre> to delete patterns
</div> <br/>
<div v-if="analyticUnitType == analyticUnitTypes[1]"> <hr/>
Hold <pre>S</pre> to label patterns; Correlation score:
Hold <pre>A</pre> to label anti patterns <br/> <input :value="analyticUnitConfig.correlation_score" @change="correlationScoreChange" /> <br/>
Hold <pre>D</pre> to delete patterns Anti correlation score:
<br/> <input :value="analyticUnitConfig.anti_correlation_score" @change="antiCorrelationScoreChange" /> <br/>
<hr/> Model score:
Correlation score: <input :value="analyticUnitConfig.model_score" @change="modelScoreChange" /> <br/>
<input v-model="analyticUnitConfig.correlation_score" @change="correlationScoreChange" /> <br/> Threshold score:
Anti correlation score: <input :value="analyticUnitConfig.threshold_score" @change="thresholdScoreChange" /> <br/><br/>
<input v-model="analyticUnitConfig.anti_correlation_score" @change="antiCorrelationScoreChange" /> <br/> <button @click="clearAllLabeling"> clear all labeling </button>
Model score:
<input v-model="analyticUnitConfig.model_score" @change="modelScoreChange" /> <br/>
Threshold score:
<input v-model="analyticUnitConfig.threshold_score" @change="thresholdScoreChange" /> <br/><br/>
<button @click="clearAllLabeling"> clear all labeling </button>
</div>
<div v-if="analyticUnitType == analyticUnitTypes[2]">
Hold <pre>Z</pre> to set seasonality timespan
<hr/>
<!-- Alpha:
<input :value="analyticUnitConfig.alpha" @change="alphaChange" /> <br/> -->
Confidence:
<input v-model="analyticUnitConfig.confidence" @change="confidenceChange" /> <br/>
Seasonality:
<input v-model="analyticUnitConfig.seasonality" @change="seasonalityChange" /> <br/>
Seasonality iterations:
<input v-model="analyticUnitConfig.seasonality_iterations" @change="seasonalityIterationsChange" /> <br/>
<br/>
</div>
</div> </div>
</template> <div v-if="analyticUnitType == analyticUnitTypes[2]">
Hold <pre>Z</pre> to set seasonality timespan
<hr/>
<!-- Alpha:
<input :value="analyticUnitConfig.alpha" @change="alphaChange" /> <br/> -->
Confidence:
<input :value="analyticUnitConfig.confidence" @change="confidenceChange" /> <br/>
Seasonality:
<input :value="analyticUnitConfig.seasonality" @change="seasonalityChange" /> <br/>
Seasonality iterations:
<input :value="analyticUnitConfig.seasonality_iterations" @change="seasonalityIterationsChange" /> <br/>
<br/>
</div>
</div>
</div> </div>
</template> </template>
@ -143,9 +140,6 @@ export default defineComponent({
}, },
analyticUnitConfig() { analyticUnitConfig() {
return this.$store.state.analyticUnitConfig; return this.$store.state.analyticUnitConfig;
},
analyticStatus() {
return this.$store.state.analyticStatus;
} }
} }
}); });

12
docker-compose.yml

@ -1,12 +0,0 @@
version: '3'
services:
app:
image: hastic/hastic:latest
restart: always
environment:
HASTIC_PORT: "4347"
HASTIC_PROMETHEUS__URL: "http://demo.robustperception.io:9090"
HASTIC_PROMETHEUS__QUERY: "rate(go_memstats_alloc_bytes_total[1m])"
ports:
- "4347:4347"

17
server/config.example.toml

@ -1,10 +1,8 @@
port = 4347 port = 4347
# one of datasource sections (prometheus / influx) should be uncommented and edited corresponding to your environment [prometheus]
url = "http://localhost:9090"
# [prometheus] query = "rate(go_memstats_alloc_bytes_total[5m])"
# url = "http://localhost:9090"
# query = "rate(go_memstats_alloc_bytes_total[5m])"
# [influx] # [influx]
@ -18,7 +16,8 @@ port = 4347
# |> yield(name: "mean") # |> yield(name: "mean")
# """ # """
# [alerting] [alerting]
# type = "webhook" type = "webhook"
# interval = 10 # in seconds interval = 10 # in seconds
# endpoint = "http://localhost:9092" endpoint = "http://localhost:9092"

56
server/src/config.rs

@ -1,7 +1,5 @@
use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig}; use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig};
use std::env;
#[derive(Clone)] #[derive(Clone)]
pub struct WebhookAlertingConfig { pub struct WebhookAlertingConfig {
pub endpoint: String, pub endpoint: String,
@ -27,7 +25,6 @@ pub struct Config {
fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfig> { fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfig> {
if config.get::<String>("prometheus.url").is_ok() { if config.get::<String>("prometheus.url").is_ok() {
println!("using Prometheus");
return Ok(DatasourceConfig::Prometheus(PrometheusConfig { return Ok(DatasourceConfig::Prometheus(PrometheusConfig {
url: config.get("prometheus.url")?, url: config.get("prometheus.url")?,
query: config.get("prometheus.query")?, query: config.get("prometheus.query")?,
@ -35,7 +32,6 @@ fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfi
} }
if config.get::<String>("influx.url").is_ok() { if config.get::<String>("influx.url").is_ok() {
println!("using Influx");
return Ok(DatasourceConfig::Influx(InfluxConfig { return Ok(DatasourceConfig::Influx(InfluxConfig {
url: config.get("influx.url")?, url: config.get("influx.url")?,
org_id: config.get("influx.org_id")?, org_id: config.get("influx.org_id")?,
@ -44,7 +40,7 @@ fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfi
})); }));
} }
return Err(anyhow::format_err!("please configure a datasource")); return Err(anyhow::format_err!("no datasource found"));
} }
fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingConfig>> { fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingConfig>> {
@ -66,7 +62,7 @@ fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingCo
let analytic_type = config.get::<String>("alerting.type").unwrap(); let analytic_type = config.get::<String>("alerting.type").unwrap();
if analytic_type != "webhook" { if analytic_type != "webhook" {
return Err(anyhow::format_err!( return Err(anyhow::format_err!(
"unknown alerting type: {}", "unknown alerting typy {}",
analytic_type analytic_type
)); ));
} }
@ -79,46 +75,6 @@ fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingCo
})); }));
} }
// config::Environment doesn't support nested configs, e.g. `alerting.type`,
// so I've copied this from:
// https://github.com/rust-lang/mdBook/blob/f3e5fce6bf5e290c713f4015947dc0f0ad172d20/src/config.rs#L132
// so that `__` can be used in env variables instead of `.`,
// e.g. `HASTIC_ALERTING__TYPE` -> alerting.type
pub fn update_from_env(config: &mut config::Config) {
let overrides =
env::vars().filter_map(|(key, value)| parse_env(&key).map(|index| (index, value)));
for (key, value) in overrides {
config.set(&key, value).unwrap();
}
}
pub fn print_config(config: config::Config) {
// TODO: support any nesting level
let sections = config.to_owned().cache.into_table().unwrap();
for (section_name, values) in sections {
match values.clone().into_table() {
Err(_) => println!("{} => {}", section_name, values),
Ok(section) => {
for (key, value) in section {
println!("{}.{} => {}", section_name, key, value);
}
}
}
}
}
fn parse_env(key: &str) -> Option<String> {
const PREFIX: &str = "HASTIC_";
if key.starts_with(PREFIX) {
let key = &key[PREFIX.len()..];
Some(key.to_lowercase().replace("__", "."))
} else {
None
}
}
impl Config { impl Config {
pub fn new() -> anyhow::Result<Config> { pub fn new() -> anyhow::Result<Config> {
let mut config = config::Config::default(); let mut config = config::Config::default();
@ -127,14 +83,14 @@ impl Config {
config.merge(config::File::with_name("config")).unwrap(); config.merge(config::File::with_name("config")).unwrap();
} }
update_from_env(&mut config); config
.merge(config::Environment::with_prefix("HASTIC"))
.unwrap();
if config.get::<u16>("port").is_err() { if config.get::<u16>("port").is_err() {
config.set("port", 4347).unwrap(); config.set("port", "8000").unwrap();
} }
print_config(config.clone());
Ok(Config { Ok(Config {
port: config.get::<u16>("port").unwrap(), port: config.get::<u16>("port").unwrap(),
datasource_config: resolve_datasource(&config)?, datasource_config: resolve_datasource(&config)?,

7
server/src/main.rs

@ -1,6 +1,6 @@
mod api; mod api;
use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service, data_service}; use hastic::services::{analytic_service, metric_service, segments_service, analytic_unit_service};
use anyhow; use anyhow;
@ -9,10 +9,9 @@ async fn main() -> anyhow::Result<()> {
let config = hastic::config::Config::new()?; let config = hastic::config::Config::new()?;
let cfg_clone = config.clone(); let cfg_clone = config.clone();
let data_service = data_service::DataService::new()?; let analytic_unit_service = analytic_unit_service::AnalyticUnitService::new()?;
let analytic_unit_service = analytic_unit_service::AnalyticUnitService::new(&data_service)?;
let metric_service = metric_service::MetricService::new(&config.datasource_config); let metric_service = metric_service::MetricService::new(&config.datasource_config);
let segments_service = segments_service::SegmentsService::new(&data_service)?; let segments_service = segments_service::SegmentsService::new()?;
let mut analytic_service = analytic_service::AnalyticService::new( let mut analytic_service = analytic_service::AnalyticService::new(
analytic_unit_service.clone(), analytic_unit_service.clone(),

109
server/src/services/analytic_service/analytic_service.rs

@ -59,6 +59,7 @@ impl AnalyticService {
segments_service: segments_service::SegmentsService, segments_service: segments_service::SegmentsService,
alerting: Option<AlertingConfig>, alerting: Option<AlertingConfig>,
) -> AnalyticService { ) -> AnalyticService {
// TODO: move buffer size to config // TODO: move buffer size to config
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32); let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
@ -137,7 +138,7 @@ impl AnalyticService {
}; };
let tx = self.tx.clone(); let tx = self.tx.clone();
let au = self.analytic_unit.as_ref().unwrap().clone(); let au = self.analytic_unit.as_ref().unwrap().clone();
let dr = DetectionRunner::new(tx,self.metric_service.clone(), drcfg, au); let dr = DetectionRunner::new(self.metric_service.clone(), tx, drcfg, au);
self.detection_runner = Some(dr); self.detection_runner = Some(dr);
self.detection_runner.as_mut().unwrap().run(from); self.detection_runner.as_mut().unwrap().run(from);
@ -241,12 +242,7 @@ impl AnalyticService {
println!("Detection runner started from {}", from) println!("Detection runner started from {}", from)
} }
ResponseType::DetectionRunnerUpdate(id, timestamp) => { ResponseType::DetectionRunnerUpdate(id, timestamp) => {
self.analytic_unit_service self.analytic_unit_service.set_last_detection(id, timestamp).unwrap();
.set_last_detection(id, timestamp)
.unwrap();
}
ResponseType::DetectionRunnerDetection(from, to) => {
println!("detection: {} {}", from, to);
} }
ResponseType::LearningStarted => { ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning self.analytic_unit_learning_status = LearningStatus::Learning
@ -278,77 +274,54 @@ impl AnalyticService {
} }
fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) {
let my_id = self
.analytic_unit_service let (new_conf, need_learning, same_type) = self.analytic_unit_config.patch(patch);
.get_config_id(&self.analytic_unit_config); self.analytic_unit_config = new_conf.clone();
if need_learning {
let patch_id = patch.get_type_id(); self.consume_request(RequestType::RunLearning);
// TODO: it's not fully correct: we need to wait when the learning starts
let same_type = my_id == patch_id; match tx.send(()) {
Ok(_) => {}
// TODO: need_learning and same_type logic overlaps, there is a way to optimise this Err(_e) => {
let need_learning = self.analytic_unit_config.patch_needs_learning(&patch); println!("Can`t send patch config notification");
}
if same_type { }
// TODO: check when learning should be started } else {
let new_conf = patch.get_new_config();
self.analytic_unit_config = new_conf.clone();
self.analytic_unit_service
.update_config_by_id(&my_id, &new_conf)
.unwrap();
if self.analytic_unit.is_some() { if self.analytic_unit.is_some() {
if need_learning { tokio::spawn({
self.consume_request(RequestType::RunLearning); let au = self.analytic_unit.clone();
match tx.send(()) { let cfg = self.analytic_unit_config.clone();
Ok(_) => {} async move {
Err(e) => { au.unwrap().write().await.set_config(cfg);
println!("Can`t send patch config notification"); match tx.send(()) {
println!("{:?}", e); Ok(_) => {}
} Err(_e) => {
} println!("Can`t send patch config notification");
return;
} else {
tokio::spawn({
let au = self.analytic_unit.clone();
let cfg = self.analytic_unit_config.clone();
async move {
au.unwrap().write().await.set_config(cfg);
match tx.send(()) {
Ok(_) => {}
Err(e) => {
println!("Can`t send patch config notification");
println!("{:?}", e);
}
} }
} }
}); }
} });
} else { } else {
// TODO: check if we need this else
match tx.send(()) { match tx.send(()) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(_e) => {
println!("Can`t send patch config notification"); println!("Can`t send patch config notification");
println!("{:?}", e);
} }
} }
} }
}
if same_type {
// TODO: avoid using `unwrap`
self.analytic_unit_service.update_active_config(&new_conf).unwrap();
} else { } else {
let new_conf = self // TODO: it's a hack, make it a better way
.analytic_unit_service // TODO: avoid using unwrap
.get_config_by_id(&patch_id) self.analytic_unit_service.resolve(&new_conf).unwrap();
.unwrap(); self.analytic_unit_service.update_active_config(&new_conf).unwrap();
self.analytic_unit_config = new_conf.clone();
self.consume_request(RequestType::RunLearning);
match tx.send(()) {
Ok(_) => {}
Err(e) => {
println!("Can`t send patch config notification");
println!("{:?}", e);
}
}
} }
} }
pub async fn serve(&mut self) { pub async fn serve(&mut self) {
@ -378,9 +351,7 @@ impl AnalyticService {
) { ) {
let mut au = match aus.resolve(&aucfg) { let mut au = match aus.resolve(&aucfg) {
Ok(a) => a, Ok(a) => a,
Err(e) => { Err(e) => { panic!("{}", e); }
panic!("{}", e);
}
}; };
match tx match tx

5
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -50,10 +50,7 @@ impl SARIMA {
// TODO: trend detection // TODO: trend detection
if ts.len() < 2 { if ts.len() < 2 {
return Err(anyhow::format_err!( return Err(anyhow::format_err!("too short timeserie to learn from, timeserie length: {}", ts.len()));
"too short timeserie to learn from, timeserie length: {}",
ts.len()
));
} }
// TODO: ensure capacity with seasonality size // TODO: ensure capacity with seasonality size
let mut res_ts = Vec::<(u64, f64)>::new(); let mut res_ts = Vec::<(u64, f64)>::new();

1
server/src/services/analytic_service/analytic_unit/mod.rs

@ -7,3 +7,4 @@ use self::{
anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit,
threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig, threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig,
}; };

4
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -220,8 +220,8 @@ impl AnalyticUnit for PatternAnalyticUnit {
return self.id.to_owned(); return self.id.to_owned();
} }
fn get_detection_window(&self) -> u64 { fn get_detection_window(&self) -> u64 {
let lr = self.learning_results.as_ref().unwrap(); // TODO: return window based on real petterns info
return lr.avg_pattern_length as u64; return DETECTION_STEP;
} }
fn set_config(&mut self, config: AnalyticUnitConfig) { fn set_config(&mut self, config: AnalyticUnitConfig) {
if let AnalyticUnitConfig::Pattern(cfg) = config { if let AnalyticUnitConfig::Pattern(cfg) = config {

99
server/src/services/analytic_service/analytic_unit/types.rs

@ -57,77 +57,28 @@ impl Default for ThresholdConfig {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AnalyticUnitConfig { pub enum AnalyticUnitConfig {
Threshold(ThresholdConfig),
Pattern(PatternConfig), Pattern(PatternConfig),
Threshold(ThresholdConfig),
Anomaly(AnomalyConfig), Anomaly(AnomalyConfig),
} }
impl AnalyticUnitConfig { impl AnalyticUnitConfig {
pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig { // return true if need needs relearning and true if the config of the same type
let iid = id.as_str(); pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool, bool) {
match iid {
"1" => AnalyticUnitConfig::Threshold(Default::default()),
"2" => AnalyticUnitConfig::Pattern(Default::default()),
"3" => AnalyticUnitConfig::Anomaly(Default::default()),
_ => panic!("bad id for getting get_default_by_id"),
}
}
pub fn patch_needs_learning(&self, patch: &PatchConfig) -> bool {
// TODO: maybe use type id's to optimise code
match patch {
PatchConfig::Pattern(tcfg) => match self.clone() {
AnalyticUnitConfig::Pattern(_) => {
return false;
}
_ => return true,
},
PatchConfig::Anomaly(tcfg) => match self.clone() {
AnalyticUnitConfig::Anomaly(scfg) => {
if tcfg.is_some() {
let t = tcfg.as_ref().unwrap();
let mut need_learning = t.seasonality != scfg.seasonality;
need_learning |= t.seasonality_iterations != scfg.seasonality_iterations;
return need_learning;
} else {
return false;
}
}
_ => {
return true;
}
},
PatchConfig::Threshold(tcfg) => match self.clone() {
AnalyticUnitConfig::Threshold(_) => {
return false;
}
_ => {
return true;
}
},
}
}
// TODO: maybe this method depricated
// return true if need needs relearning
pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) {
match patch { match patch {
PatchConfig::Pattern(tcfg) => match self.clone() { PatchConfig::Pattern(tcfg) => match self.clone() {
AnalyticUnitConfig::Pattern(_) => { AnalyticUnitConfig::Pattern(_) => {
if tcfg.is_some() { if tcfg.is_some() {
return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false); return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false, true);
} else { } else {
// TODO: it should be extraced from db return (AnalyticUnitConfig::Pattern(Default::default()), false, true);
return (AnalyticUnitConfig::Pattern(Default::default()), false);
} }
} }
_ => { _ => {
if tcfg.is_some() { if tcfg.is_some() {
return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true); return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true, false);
} else { } else {
return (AnalyticUnitConfig::Pattern(Default::default()), true); return (AnalyticUnitConfig::Pattern(Default::default()), true, false);
} }
} }
}, },
@ -138,16 +89,16 @@ impl AnalyticUnitConfig {
let t = tcfg.as_ref().unwrap(); let t = tcfg.as_ref().unwrap();
let mut need_learning = t.seasonality != scfg.seasonality; let mut need_learning = t.seasonality != scfg.seasonality;
need_learning |= t.seasonality_iterations != scfg.seasonality_iterations; need_learning |= t.seasonality_iterations != scfg.seasonality_iterations;
return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning); return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning, true);
} else { } else {
return (AnalyticUnitConfig::Anomaly(Default::default()), false); return (AnalyticUnitConfig::Anomaly(Default::default()), false, true);
} }
} }
_ => { _ => {
if tcfg.is_some() { if tcfg.is_some() {
return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true); return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true, false);
} else { } else {
return (AnalyticUnitConfig::Anomaly(Default::default()), true); return (AnalyticUnitConfig::Anomaly(Default::default()), true, false);
} }
} }
}, },
@ -155,16 +106,16 @@ impl AnalyticUnitConfig {
PatchConfig::Threshold(tcfg) => match self.clone() { PatchConfig::Threshold(tcfg) => match self.clone() {
AnalyticUnitConfig::Threshold(_) => { AnalyticUnitConfig::Threshold(_) => {
if tcfg.is_some() { if tcfg.is_some() {
return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false); return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false, true);
} else { } else {
return (AnalyticUnitConfig::Threshold(Default::default()), false); return (AnalyticUnitConfig::Threshold(Default::default()), false, true);
} }
} }
_ => { _ => {
if tcfg.is_some() { if tcfg.is_some() {
return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true); return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true, false);
} else { } else {
return (AnalyticUnitConfig::Threshold(Default::default()), true); return (AnalyticUnitConfig::Threshold(Default::default()), true, false);
} }
} }
}, },
@ -203,23 +154,3 @@ pub enum PatchConfig {
Threshold(Option<ThresholdConfig>), Threshold(Option<ThresholdConfig>),
Anomaly(Option<AnomalyConfig>), Anomaly(Option<AnomalyConfig>),
} }
impl PatchConfig {
pub fn get_type_id(&self) -> String {
match &self {
PatchConfig::Threshold(_) => "1".to_string(),
PatchConfig::Pattern(_) => "2".to_string(),
PatchConfig::Anomaly(_) => "3".to_string(),
}
}
pub fn get_new_config(&self) -> AnalyticUnitConfig {
match &self {
PatchConfig::Threshold(cfg) => {
AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone())
}
PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()),
PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()),
}
}
}

49
server/src/services/analytic_service/detection_runner.rs

@ -1,6 +1,6 @@
use chrono::{DateTime, Utc}; use chrono::{Utc, DateTime};
use tokio::sync::mpsc; use tokio::sync::{mpsc};
use crate::services::metric_service::MetricService; use crate::services::metric_service::MetricService;
@ -8,9 +8,12 @@ use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
const DETECTION_STEP: u64 = 10;
pub struct DetectionRunner { pub struct DetectionRunner {
tx: mpsc::Sender<AnalyticServiceMessage>,
metric_service: MetricService, metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig, config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF, analytic_unit: AnalyticUnitRF,
running_handler: Option<tokio::task::JoinHandle<()>>, running_handler: Option<tokio::task::JoinHandle<()>>,
@ -18,8 +21,8 @@ pub struct DetectionRunner {
impl DetectionRunner { impl DetectionRunner {
pub fn new( pub fn new(
tx: mpsc::Sender<AnalyticServiceMessage>,
metric_service: MetricService, metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig, config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF, analytic_unit: AnalyticUnitRF,
) -> DetectionRunner { ) -> DetectionRunner {
@ -33,6 +36,7 @@ impl DetectionRunner {
} }
pub fn run(&mut self, from: u64) { pub fn run(&mut self, from: u64) {
// TODO: get last detection timestamp from persistance
// TODO: set last detection from "now" // TODO: set last detection from "now"
if self.running_handler.is_some() { if self.running_handler.is_some() {
self.running_handler.as_mut().unwrap().abort(); self.running_handler.as_mut().unwrap().abort();
@ -45,12 +49,9 @@ impl DetectionRunner {
async move { async move {
// TODO: run detection "from" for big timespan // TODO: run detection "from" for big timespan
// TODO: parse detections to webhooks // TODO: parse detections to webhooks
// TODO: define window for detection
// TODO: handle case when detection is in the end and continues after "now" // TODO: handle case when detection is in the end and continues after "now"
// it's better to make an issue on github
// TODO: find place to update analytic unit model
let window_size = au.as_ref().read().await.get_detection_window(); let window_size = au.as_ref().read().await.get_detection_window();
let detection_step = ms.get_detection_step();
let mut t_from = from - window_size; let mut t_from = from - window_size;
let mut t_to = from; let mut t_to = from;
@ -69,35 +70,21 @@ impl DetectionRunner {
let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap(); let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap();
for d in detections { for d in detections {
match tx println!("detection: {} {}", d.0, d.1);
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerDetection(d.0, d.1),
)))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner detection notification"),
}
} }
// TODO: send info about detections to tx // TODO: run detection periodically
// TODO: set info about detections to tx
match tx
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerUpdate( match tx.send(AnalyticServiceMessage::Response(Ok(
au.as_ref().read().await.get_id(), ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to)
t_to, ))).await {
), Ok(_) => {},
)))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner started notification"), Err(_e) => println!("Fail to send detection runner started notification"),
} }
sleep(Duration::from_secs(cfg.interval)).await; sleep(Duration::from_secs(cfg.interval)).await;
t_from += detection_step;
t_to += detection_step;
} }
} }
})); }));

1
server/src/services/analytic_service/types.rs

@ -45,7 +45,6 @@ impl Default for LearningTrain {
pub enum ResponseType { pub enum ResponseType {
DetectionRunnerStarted(u64), DetectionRunnerStarted(u64),
DetectionRunnerUpdate(String, u64), // analytic_unit id and timestamp DetectionRunnerUpdate(String, u64), // analytic_unit id and timestamp
DetectionRunnerDetection(u64, u64), // TODO: add more into about analytic unit and more
LearningStarted, LearningStarted,
LearningFinished(Box<dyn AnalyticUnit + Send + Sync>), LearningFinished(Box<dyn AnalyticUnit + Send + Sync>),
LearningFinishedEmpty, LearningFinishedEmpty,

121
server/src/services/analytic_unit_service.rs

@ -1,28 +1,24 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use serde_json::{Result, Value};
use serde::{Deserialize, Serialize};
use rusqlite::{params, Connection}; use rusqlite::{params, Connection};
use super::analytic_service::analytic_unit::{ use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit};
anomaly_analytic_unit::AnomalyAnalyticUnit,
pattern_analytic_unit::PatternAnalyticUnit,
threshold_analytic_unit::ThresholdAnalyticUnit,
types::{self, AnalyticUnitConfig},
};
use super::data_service::DataService;
#[derive(Clone)] #[derive(Clone)]
pub struct AnalyticUnitService { pub struct AnalyticUnitService {
connection: Arc<Mutex<Connection>>, connection: Arc<Mutex<Connection>>
} }
// TODO: get DataService
impl AnalyticUnitService { impl AnalyticUnitService {
pub fn new(ds: &DataService) -> anyhow::Result<AnalyticUnitService> { pub fn new() -> anyhow::Result<AnalyticUnitService> {
let conn = ds.analytic_units_connection.clone(); // TODO: remove repetitoin with segment_service
std::fs::create_dir_all("./data").unwrap();
let conn = Connection::open("./data/analytic_units.db")?;
// TODO: add learning results field // TODO: add learning results field
conn.lock().unwrap().execute( conn.execute(
"CREATE TABLE IF NOT EXISTS analytic_unit ( "CREATE TABLE IF NOT EXISTS analytic_unit (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
last_detection INTEGER, last_detection INTEGER,
@ -34,55 +30,45 @@ impl AnalyticUnitService {
)?; )?;
Ok(AnalyticUnitService { Ok(AnalyticUnitService {
connection: conn connection: Arc::new(Mutex::new(conn)),
}) })
} }
// TODO: optional id // TODO: optional id
pub fn resolve_au( pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
&self,
cfg: &AnalyticUnitConfig,
) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg { match cfg {
AnalyticUnitConfig::Threshold(c) => { AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())),
Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())) AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())),
} AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())),
AnalyticUnitConfig::Pattern(c) => {
Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone()))
}
AnalyticUnitConfig::Anomaly(c) => {
Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone()))
}
} }
} }
// TODO: get id of analytic_unit which be used also as it's type // TODO: get id of analytic_unit which be used also as it's type
pub fn resolve( pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
&self,
cfg: &AnalyticUnitConfig,
) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
let au = self.resolve_au(cfg); let au = self.resolve_au(cfg);
let id = au.as_ref().get_id(); let id = au.as_ref().get_id();
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT id from analytic_unit WHERE id = ?1")?; let mut stmt = conn.prepare(
"SELECT id from analytic_unit WHERE id = ?1",
)?;
let res = stmt.exists(params![id])?; let res = stmt.exists(params![id])?;
if res == false { if res == false {
let cfg_json = serde_json::to_string(&cfg)?; let cfg_json = serde_json::to_string(&cfg)?;
conn.execute( conn.execute(
"INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)",
params![id, cfg_json], params![id, cfg_json]
)?; )?;
} }
conn.execute( conn.execute(
"UPDATE analytic_unit set active = FALSE where active = TRUE", "UPDATE analytic_unit set active = FALSE where active = TRUE",
params![], params![]
)?; )?;
conn.execute( conn.execute(
"UPDATE analytic_unit set active = TRUE where id = ?1", "UPDATE analytic_unit set active = TRUE where id = ?1",
params![id], params![id]
)?; )?;
return Ok(au); return Ok(au);
@ -92,7 +78,7 @@ impl AnalyticUnitService {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
conn.execute( conn.execute(
"UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2", "UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2",
params![last_detection, id], params![last_detection, id]
)?; )?;
Ok(()) Ok(())
} }
@ -100,8 +86,9 @@ impl AnalyticUnitService {
pub fn get_active(&self) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> { pub fn get_active(&self) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
// TODO: return default when there is no active // TODO: return default when there is no active
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = let mut stmt = conn.prepare(
conn.prepare("SELECT id, type, config from analytic_unit WHERE active = TRUE")?; "SELECT id, type, config from analytic_unit WHERE active = TRUE"
)?;
let au = stmt.query_row([], |row| { let au = stmt.query_row([], |row| {
let c: String = row.get(2)?; let c: String = row.get(2)?;
@ -110,12 +97,15 @@ impl AnalyticUnitService {
})??; })??;
return Ok(au); return Ok(au);
} }
pub fn get_active_config(&self) -> anyhow::Result<AnalyticUnitConfig> { pub fn get_active_config(&self) -> anyhow::Result<AnalyticUnitConfig> {
let exists = { let exists = {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; let mut stmt = conn.prepare(
"SELECT config from analytic_unit WHERE active = TRUE"
)?;
stmt.exists([])? stmt.exists([])?
}; };
@ -125,7 +115,9 @@ impl AnalyticUnitService {
return Ok(c); return Ok(c);
} else { } else {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; let mut stmt = conn.prepare(
"SELECT config from analytic_unit WHERE active = TRUE"
)?;
let acfg = stmt.query_row([], |row| { let acfg = stmt.query_row([], |row| {
let c: String = row.get(0)?; let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap(); let cfg = serde_json::from_str(&c).unwrap();
@ -135,51 +127,6 @@ impl AnalyticUnitService {
} }
} }
pub fn get_config_by_id(&self, id: &String) -> anyhow::Result<AnalyticUnitConfig> {
let exists = {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?;
stmt.exists([id])?
};
if exists == false {
let c = AnalyticUnitConfig::get_default_by_id(id);
self.resolve(&c)?;
return Ok(c);
} else {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?;
let acfg = stmt.query_row([id], |row| {
let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap();
Ok(cfg)
})?;
return Ok(acfg);
}
}
pub fn get_config_id(&self, cfg: &AnalyticUnitConfig) -> String {
match cfg {
AnalyticUnitConfig::Threshold(_) => "1".to_string(),
AnalyticUnitConfig::Pattern(_) => "2".to_string(),
AnalyticUnitConfig::Anomaly(_) => "3".to_string(),
}
}
pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> {
// TODO: it's possble that config doesn't exist, but we trying to update it
let conn = self.connection.lock().unwrap();
let cfg_json = serde_json::to_string(&cfg)?;
conn.execute(
"UPDATE analytic_unit SET config = ?1 WHERE id = ?2",
params![cfg_json, id],
)?;
return Ok(());
}
pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
@ -187,7 +134,7 @@ impl AnalyticUnitService {
conn.execute( conn.execute(
"UPDATE analytic_unit SET config = ?1 WHERE active = TRUE", "UPDATE analytic_unit SET config = ?1 WHERE active = TRUE",
params![cfg_json], params![cfg_json]
)?; )?;
return Ok(()); return Ok(());

23
server/src/services/data_service.rs

@ -1,23 +0,0 @@
use std::sync::{Arc, Mutex};
use rusqlite::{Connection};
pub struct DataService {
pub analytic_units_connection: Arc<Mutex<Connection>>,
pub segments_connection: Arc<Mutex<Connection>>
}
impl DataService {
pub fn new() -> anyhow::Result<DataService> {
std::fs::create_dir_all("./data").unwrap();
let analytic_units_connection = Connection::open("./data/analytic_units.db")?;
let segments_connection = Connection::open("./data/segments.db")?;
Ok(DataService {
analytic_units_connection: Arc::new(Mutex::new(analytic_units_connection)),
segments_connection: Arc::new(Mutex::new(segments_connection))
})
}
}

7
server/src/services/metric_service.rs

@ -32,11 +32,4 @@ impl MetricService {
} }
return Ok(mr); return Ok(mr);
} }
// TODO: it a hack for DetectionRunner: it should vary for different analytic units
// and it's config
pub fn get_detection_step(&self) -> u64 {
return 10;
}
} }

3
server/src/services/mod.rs

@ -1,6 +1,5 @@
pub mod analytic_service; pub mod analytic_service;
pub mod data_service;
pub mod analytic_unit_service;
pub mod metric_service; pub mod metric_service;
pub mod segments_service; pub mod segments_service;
pub mod user_service; pub mod user_service;
pub mod analytic_unit_service;

16
server/src/services/segments_service.rs

@ -6,13 +6,9 @@ use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use super::data_service::DataService;
pub const ID_LENGTH: usize = 20; pub const ID_LENGTH: usize = 20;
pub type SegmentId = String; pub type SegmentId = String;
// TODO: make logic with this enum shorter // TODO: make logic with this enum shorter
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
pub enum SegmentType { pub enum SegmentType {
@ -62,19 +58,19 @@ impl Segment {
} }
} }
// TODO: get DataService
#[derive(Clone)] #[derive(Clone)]
pub struct SegmentsService { pub struct SegmentsService {
connection: Arc<Mutex<Connection>>, connection: Arc<Mutex<Connection>>,
} }
impl SegmentsService { impl SegmentsService {
pub fn new(ds: &DataService) -> anyhow::Result<SegmentsService> { pub fn new() -> anyhow::Result<SegmentsService> {
// TODO: move it to data service
std::fs::create_dir_all("./data").unwrap();
// TODO: add unilytic_unit id as a new field // TODO: add unilytic_unit id as a new field
let conn = ds.segments_connection.clone(); let conn = Connection::open("./data/segments.db")?;
conn.lock().unwrap().execute( conn.execute(
"CREATE TABLE IF NOT EXISTS segment ( "CREATE TABLE IF NOT EXISTS segment (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
start INTEGER NOT NULL, start INTEGER NOT NULL,
@ -85,7 +81,7 @@ impl SegmentsService {
)?; )?;
Ok(SegmentsService { Ok(SegmentsService {
connection: conn connection: Arc::new(Mutex::new(conn)),
}) })
} }

Loading…
Cancel
Save