Compare commits

...

98 Commits

Author SHA1 Message Date
rozetko 77c5d3d3ce Merge pull request 'hotfix: `configuration property "influx.org_id" not found`' (#111) from influx-docker into main 2 years ago
rozetko d099998562 fix `configuration property "influx.org_id" not found` 2 years ago
rozetko abbadbc155 Merge pull request 'Make influx work in Docker' (#110) from influx-docker into main 2 years ago
rozetko fce0f30e9e docker-compose: rm "network-mode: host" option 2 years ago
rozetko f4226a5245 upd config example 2 years ago
rozetko 57854909f9 print config on start 2 years ago
rozetko e16410d407 make influx work in docker 2 years ago
Alexey Velikiy 5f16e006a8 rm range print in cleint 2 years ago
Alexey Velikiy 55c0fca371 debug error in config patch notification 2 years ago
glitch4347 aa4008662a
Merge pull request #108 from hastic/get-notified-in-analytic-service-about-new-detections-#51 2 years ago
Alexey Velikiy a9d2995281 detection runner detection notification 2 years ago
glitch4347 eee437d779
Merge pull request #106 from hastic/data-folder-creation-single-place-#102 2 years ago
Alexey Velikiy b7fb951590 data_service 2 years ago
Alexey Velikiy ad95c1e49f rm console log 2 years ago
glitch4347 f684d8ee6a
Merge pull request #100 from hastic/ui-not-editable-confidence-param-#99 2 years ago
Alexey Velikiy 0d756d3f42 use v-model instead of :value 2 years ago
Alexey Velikiy a4da3b5ea3 todo++ 2 years ago
Alexey Velikiy cafb50cce3 small refactorign 2 years ago
Alexey Velikiy c7284f4305 todos++ 2 years ago
glitch4347 cb87a98e39
Merge pull request #96 from hastic/env-vars-docs-#90 2 years ago
glitch4347 49c2e30acc
Merge pull request #94 from rusdacent/main 2 years ago
rozetko 8e4b29508b minor fixes 2 years ago
rozetko ed28530955 readme: initial configuration docs 2 years ago
rusdacent e0e4d93ebb Set always restart 2 years ago
rusdacent 8121292450 Set ports for Hastic 2 years ago
rusdacent 2d5786d639 Set image for docker-compose 2 years ago
rusdacent 0733a5d002 Add docker-compose 2 years ago
rusdacent e20a5affcc Add Dockerfile 2 years ago
glitch4347 545c932f6b
Merge pull request #91 from hastic/show-error-if-server-is-down-#48 2 years ago
rozetko 936d207a74 change default port: 8000 -> 4347 2 years ago
rozetko 4ed8ba1193 do not display anything below the Analytic Status if analytic is not available 2 years ago
rozetko 70c49fe91b change analytic status type && handle GET /status errors 2 years ago
glitch4347 052e5a5987
Merge pull request #89 from hastic/env-variables-configuration-#55 2 years ago
glitch4347 8ab0718326
Update server/src/config.rs 2 years ago
glitch4347 4f66b59b7e
Merge pull request #85 from hastic/backet-detection-window-for-pattern-analytic-unit-#83 2 years ago
rozetko 7d190c426a env variables configuration #55 2 years ago
glitch4347 637eef2105
Update server/src/services/analytic_service/detection_runner.rs 2 years ago
Alexey Velikiy 7731e2f56b detection window for pattern detector 2 years ago
Alexey Velikiy 6b44428586 detection step in matric service and update of t_from \ t_to 2 years ago
Alexey Velikiy 6f6f946b5b DetectionRunner todos 2 years ago
glitch4347 278ea473f2
Merge pull request #72 from hastic/analytic-unit-type-and-meta-in-db-#65 2 years ago
Alexey Velikiy 81069e9782 code format 2 years ago
Alexey Velikiy 21f4b6eaaa need_learning on patch case 2 years ago
Alexey Velikiy a200e3d450 basic seving of new analytic unit config 2 years ago
Alexey Velikiy d7b283f1ff basic save of analytic_unit 2 years ago
Alexey Velikiy 1af18e1118 rm 'same_type' in pathc config and patching continue 2 years ago
Alexey Velikiy df3be270b8 better patch begin 2 years ago
glitch4347 f1d31b9637
Merge pull request #67 from hastic/active-analytic-unit-#66 2 years ago
glitch4347 2498c40bed
Update server/src/services/analytic_unit_service.rs 2 years ago
Alexey Velikiy b5f960a6ea save config for patched 2 years ago
Alexey Velikiy 3313a92039 update active config begin 2 years ago
Alexey Velikiy a982f8f1f5 update_active_config begin 2 years ago
Alexey Velikiy 1f2d6f551e patch config: same type param 2 years ago
Alexey Velikiy 5bf13b171b get_active_config++ 2 years ago
Alexey Velikiy 090b9bf42e active config++ 2 years ago
Alexey Velikiy 86a1f7e3a6 get active with config 2 years ago
Alexey Velikiy b26d36fad2 serialize config in analytic_unit 2 years ago
Alexey Velikiy 4b6e95e784 Merge branch 'main' into active-analytic-unit-#66 2 years ago
glitch4347 5fa9cf8ba2
Merge pull request #69 from hastic/analytic-is-stuck-at-learning-#42 2 years ago
rozetko ef572afcaa prettify "too short timeserie to learn from" error message 2 years ago
rozetko 4774f5d6b5 LearningStatus::Error : error message argument 2 years ago
Alexey Velikiy 311e5a8041 get_active++ 2 years ago
Alexey Velikiy 1565c156e0 TODO++ 2 years ago
Alexey Velikiy d23af0482b TODOS and type field 2 years ago
Alexey Velikiy 187ec15f1b active analytic unit begin 2 years ago
glitch4347 0eff1204f2
Merge pull request #63 from hastic/basic-periodic-update-of-detection-runner-#62 2 years ago
Alexey Velikiy c79993bc52 rm todos 2 years ago
Alexey Velikiy a338b3339b detectoin println fix 2 years ago
Alexey Velikiy 9904e4d449 metric_service to detections runner + basic detections 2 years ago
Alexey Velikiy 0761e4d9b2 detections window begin 2 years ago
glitch4347 1b89075ad0
Merge pull request #60 from hastic/detection_runner_updade 2 years ago
Alexey Velikiy 5fa99d1421 update detections 2 years ago
Alexey Velikiy c61352422c save analytic unit into db on resolve 2 years ago
Alexey Velikiy f48a22e314 analytic unit id 2 years ago
Alexey Velikiy 9745083799 AnalyticUnitService++ 2 years ago
Alexey Velikiy bf0a684e5c AnalyticUnitService begin 2 years ago
Alexey Velikiy c74abb1bbd detection runner started notification 2 years ago
Alexey Velikiy 2126025abd tx to detection runner + formet 2 years ago
Alexey Velikiy 2c465fa5c6 better print for detection runner 2 years ago
Alexey Velikiy 926f49c168 basic detector runniung 2 years ago
glitch4347 193e7984c2
Merge pull request #47 from hastic/detection-waiter 2 years ago
Alexey Velikiy e527c54eed run_detection_runner 2 years ago
Alexey Velikiy 9473781c06 dete4ction runner begin 2 years ago
glitch4347 3cc85c289f
Merge pull request #45 from hastic/poc-improve-error-handling 2 years ago
rozetko f55d4a5a0d PoC: improve error handling 2 years ago
glitch4347 86f4ec4baa
Merge pull request #41 from rozetko/patch-1 2 years ago
rozetko 0897688c8a
Readme: update requirements 2 years ago
glitch4347 580438a614
Merge pull request #39 from hastic/anomly-detector-configurable-iteration-steps-#35 2 years ago
Alexey Velikiy b003f1ee17 seasonality iterations 2 years ago
Alexey Velikiy e92c3dc56f rerun detection in anomaly unit on confidence change #37 2 years ago
Alexey Velikiy 55c5e08ed6 code cleanupo 2 years ago
Alexey Velikiy 1bc3ce2037 alerting++ 2 years ago
Alexey Velikiy 07a58dde02 DetectionRunner++ 2 years ago
glitch4347 03b613d378
Merge pull request #34 from hastic/set-seasonality-with-mouse-#30 2 years ago
Alexey Velikiy 6463636b54 fx typos in README 2 years ago
Alexey Velikiy 15c9e88760 musl-tools 2 years ago
Alexey Velikiy d160978034 README++ 2 years ago
Alexey Velikiy 2899123ea7 README++ 2 years ago
  1. 27
      Dockerfile
  2. 49
      README.md
  3. 6
      client/src/components/AnlyticsStatus.vue
  4. 14
      client/src/components/Graph.vue
  5. 1
      client/src/components/pods/pattern_pod.ts
  6. 31
      client/src/services/analytics.service.ts
  7. 17
      client/src/store/index.ts
  8. 95
      client/src/views/Home.vue
  9. 12
      docker-compose.yml
  10. 4
      server/Cargo.lock
  11. 2
      server/Cargo.toml
  12. 17
      server/config.example.toml
  13. 62
      server/src/config.rs
  14. 7
      server/src/main.rs
  15. 245
      server/src/services/analytic_service/analytic_service.rs
  16. 87
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  17. 8
      server/src/services/analytic_service/analytic_unit/mod.rs
  18. 34
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  19. 19
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  20. 87
      server/src/services/analytic_service/analytic_unit/types.rs
  21. 103
      server/src/services/analytic_service/detection_runner.rs
  22. 19
      server/src/services/analytic_service/types.rs
  23. 195
      server/src/services/analytic_unit_service.rs
  24. 23
      server/src/services/data_service.rs
  25. 7
      server/src/services/metric_service.rs
  26. 2
      server/src/services/mod.rs
  27. 17
      server/src/services/segments_service.rs

27
Dockerfile

@ -0,0 +1,27 @@
FROM rust:1.57.0-bullseye as builder
RUN curl -sL https://deb.nodesource.com/setup_16.x | bash -
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \
nodejs \
gcc \
g++ \
make \
musl-tools \
&& rm -rf /var/lib/apt/lists/*
RUN npm install --global yarn
RUN rustup target add x86_64-unknown-linux-musl
ADD . ./
RUN make
FROM debian:bullseye-slim
COPY --from=builder /release/hastic /hastic
COPY --from=builder /release/config.toml /config.toml
COPY --from=builder /release/public /public
CMD ["./hastic"]

49
README.md

@ -1,11 +1,58 @@
# Hastic
## Build release
Hastic needs [Prometheus](https://prometheus.io/) or [InfluxDB](https://www.influxdata.com/get-influxdb/)
instance for getting metrics.
## Build from source (Linux)
### Prerequirements
1. [Install cargo](https://doc.rust-lang.org/cargo/getting-started/installation.html) (required version: >=1.49)
2. Install [node.js >=10.x](https://nodejs.org/en/download/)
3. Install [yarn](https://classic.yarnpkg.com/lang/en/docs/install)
4. Install x86_64-unknown-linux-musl: `rustup target add x86_64-unknown-linux-musl`
5. musl-tools: `sudo apt install musl-tools`
### Build
```
make
```
### Configure
Hastic can be configured using config-file or environment variables.
At first, choose which datasource you'll be using: `prometheus` or `influx`. Only one can be used at a time.
#### Config-file
- copy the config example to the release directory:
```bash
cp config.example.toml release/config.toml
```
- edit the config file, e.g. using `nano`:
```bash
nano release/config.toml
```
#### Environment variables
All config fields are also available as environment variables with `HASTIC_` prefix
Variable name structure:
- for high-level fields: `HASTIC_<field_name>`, e.g. `HASTIC_PORT`
- for nested fields: `HASTIC_<category_name>__<field_name>`, e.g. `HASTIC_PROMETHEUS__URL`
Environment variables can be set either by exporting them (they'll be actual until a bash-session is closed):
```bash
export HASTIC_PORT=8000
export HASTIC_PROMETHEUS__URL=http://localhost:9090
export HASTIC_PROMETHEUS__QUERY=rate(go_memstats_alloc_bytes_total[5m])
```
or specifing them in a run command (they'll be actual only for one run):
```bash
HASTIC_PORT=8000 HASTIC_PROMETHEUS__URL=http://localhost:9090 HASTIC_PROMETHEUS__QUERY=rate(go_memstats_alloc_bytes_total[5m]) ./release/hastic
```
### Run
```
cd release
./hastic

6
client/src/components/AnlyticsStatus.vue

@ -1,10 +1,12 @@
<template>
<div class="analytic-status">
analytic status: <strong> {{ status }} </strong>
analytic status: <strong> {{ status.message }} </strong>
</div>
</template>
<script lang="ts">
import { AnalyticStatus } from "@/store";
import { defineComponent } from 'vue';
@ -13,7 +15,7 @@ export default defineComponent({
components: {
},
computed: {
status() {
status(): AnalyticStatus {
return this.$store.state.analyticStatus;
}
}

14
client/src/components/Graph.vue

@ -10,18 +10,20 @@ import { AnomalyHSR, TimeRange } from "@/types";
import { PatternPod } from "./pods/pattern_pod";
import { ThresholdPod } from './pods/threshold_pod';
import { AnomalyPod } from './pods/anomaly_pod';
import { getMetrics } from '../services/metrics.service';
import { getSegments, postSegment, deleteSegment } from '../services/segments.service';
import { LineTimeSerie } from "@chartwerk/line-pod";
import { SegmentArray } from '@/types/segment_array';
import { Segment, SegmentId } from '@/types/segment';
import _ from "lodash";
import { AnalyticUnitType } from '@/types/analytic_units';
import { defineComponent, watch } from 'vue';
import { getHSRAnomaly } from "@/services/analytics.service";
import { getMetrics } from '@/services/metrics.service';
import { getSegments, postSegment, deleteSegment } from '@/services/segments.service';
import { defineComponent } from 'vue';
import _ from "lodash";
// TODO: move to store
async function resolveDataPatterns(range: TimeRange): Promise<{
@ -117,7 +119,6 @@ async function resolveDataAnomaly(range: TimeRange): Promise<{
}
}
// TODO: move to store
async function addSegment(segment: Segment): Promise<SegmentId> {
try {
@ -147,7 +148,6 @@ async function _deleteSegment(from: number, to: number): Promise<number> {
}
}
// TODO: convert to class component
export default defineComponent({
name: 'Graph',
@ -222,8 +222,6 @@ export default defineComponent({
let cfg = _.clone(this.analyticUnitConfig);
// TODO: get 10 (step) from API config
cfg.seasonality = Math.ceil(Math.abs(from - to) / 10) * 10;
console.log("cfg.seasonality: " + cfg.seasonality);
this.$store.dispatch('patchConfig', { Anomaly: cfg });
}
},

1
client/src/components/pods/pattern_pod.ts

@ -79,7 +79,6 @@ export class PatternPod extends HasticPod<UpdateDataCallback> {
} else {
console.log('took from range from default');
}
console.log(from + " ---- " + to);
this.udc({ from, to })
.then(resp => {

31
client/src/services/analytics.service.ts

@ -5,20 +5,33 @@ import axios from 'axios';
import { getGenerator } from '@/utils';
import _ from 'lodash';
import {
import {
AnalyticUnitType, AnlyticUnitConfig,
PatternConfig, ThresholdConfig, AnomalyConfig
} from "@/types/analytic_units";
import { AnomalyHSR } from "@/types";
import { AnalyticStatus } from "@/store";
import _ from 'lodash';
const ANALYTICS_API_URL = API_URL + "analytics/";
export async function getStatus(): Promise<string> {
export async function getStatus(): Promise<AnalyticStatus> {
const uri = ANALYTICS_API_URL + `status`;
const res = await axios.get(uri);
const data = res['data'] as any;
return data.status;
try {
const res = await axios.get<{ status: string }>(uri);
const data = res.data;
return {
available: true,
message: data.status
};
} catch (e) {
return {
available: false,
message: e.message
};
}
}
export async function getConfig(): Promise<[AnalyticUnitType, AnlyticUnitConfig]> {
@ -54,8 +67,8 @@ export async function patchConfig(patchObj: any) {
await axios.patch(uri, patchObj);
}
export function getStatusGenerator(): AsyncIterableIterator<string> {
return getGenerator<string>(100, getStatus);
export function getStatusGenerator(): AsyncIterableIterator<AnalyticStatus> {
return getGenerator<AnalyticStatus>(100, getStatus);
}
@ -68,6 +81,6 @@ export async function getHSRAnomaly(from: number, to: number): Promise<AnomalyHS
const res = await axios.get(uri);
const values = res["data"]["AnomalyHSR"];
return values as AnomalyHSR;
}

17
client/src/store/index.ts

@ -12,23 +12,30 @@ const _SET_STATUS_GENERATOR = '_SET_STATUS_GENERATOR';
// TODO: consts for actions
export type AnalyticStatus = {
available: boolean,
message: string,
}
type State = {
analyticStatus: string,
analyticStatus: AnalyticStatus,
analyticUnitType?: AnalyticUnitType,
analyticUnitConfig?: AnlyticUnitConfig,
_statusGenerator: AsyncIterableIterator<string>
_statusGenerator: AsyncIterableIterator<AnalyticStatus>
}
const store = createStore<State>({
state: {
analyticStatus: 'loading...',
analyticStatus: {
available: false,
message: 'loading...',
},
analyticUnitType: null,
analyticUnitConfig: null,
_statusGenerator: null
},
mutations: {
[SET_ANALYTICS_STATUS](state, status: string) {
[SET_ANALYTICS_STATUS](state, status: AnalyticStatus) {
state.analyticStatus = status;
},
[SET_DETECTOR_CONFIG](state, { analyticUnitType, analyticUnitConfig }) {
@ -38,7 +45,7 @@ const store = createStore<State>({
// [PATCH_CONFIG](state, patchObj) {
// patchConfig(patchConfig)
// }
[_SET_STATUS_GENERATOR](state, generator: AsyncIterableIterator<string>) {
[_SET_STATUS_GENERATOR](state, generator: AsyncIterableIterator<AnalyticStatus>) {
state._statusGenerator = generator;
}
},

95
client/src/views/Home.vue

@ -2,57 +2,63 @@
<div class="home">
<img alt="Vue logo" src="../assets/logo.png">
<graph ref="graph" />
<analytic-status />
<div>
Analytic unit type:
<select :value="analyticUnitType" @change="changeAnalyticUnitType">
<option disabled value="">Please Select</option>
<option v-bind:key="option" v-for="option in analyticUnitTypes" :value="option">{{option}}</option>
</select> <br/><br/>
</div>
<div id="controls">
<div v-if="analyticUnitType == analyticUnitTypes[0]">
Threshold:
<input :value="analyticUnitConfig.threshold" @change="thresholdChange" /> <br/><br/>
</div>
<div v-if="analyticUnitType == analyticUnitTypes[1]">
Hold <pre>S</pre> to label patterns;
Hold <pre>A</pre> to label anti patterns <br/>
Hold <pre>D</pre> to delete patterns
<br/>
<hr/>
Correlation score:
<input :value="analyticUnitConfig.correlation_score" @change="correlationScoreChange" /> <br/>
Anti correlation score:
<input :value="analyticUnitConfig.anti_correlation_score" @change="antiCorrelationScoreChange" /> <br/>
Model score:
<input :value="analyticUnitConfig.model_score" @change="modelScoreChange" /> <br/>
Threshold score:
<input :value="analyticUnitConfig.threshold_score" @change="thresholdScoreChange" /> <br/><br/>
<button @click="clearAllLabeling"> clear all labeling </button>
<template v-if="analyticStatus.available">
<div>
Analytic unit type:
<select :value="analyticUnitType" @change="changeAnalyticUnitType">
<option disabled value="">Please Select</option>
<option v-bind:key="option" v-for="option in analyticUnitTypes" :value="option">{{option}}</option>
</select> <br/><br/>
</div>
<div v-if="analyticUnitType == analyticUnitTypes[2]">
Hold <pre>Z</pre> to set seasonality timespan
<hr/>
<!-- Alpha:
<input :value="analyticUnitConfig.alpha" @change="alphaChange" /> <br/> -->
Confidence:
<input :value="analyticUnitConfig.confidence" @change="confidenceChange" /> <br/>
Seasonality:
<input :value="analyticUnitConfig.seasonality" @change="seasonalityChange" /> <br/>
<br/>
<div id="controls">
<div v-if="analyticUnitType == analyticUnitTypes[0]">
Threshold:
<input v-model="analyticUnitConfig.threshold" @change="thresholdChange" /> <br/><br/>
</div>
<div v-if="analyticUnitType == analyticUnitTypes[1]">
Hold <pre>S</pre> to label patterns;
Hold <pre>A</pre> to label anti patterns <br/>
Hold <pre>D</pre> to delete patterns
<br/>
<hr/>
Correlation score:
<input v-model="analyticUnitConfig.correlation_score" @change="correlationScoreChange" /> <br/>
Anti correlation score:
<input v-model="analyticUnitConfig.anti_correlation_score" @change="antiCorrelationScoreChange" /> <br/>
Model score:
<input v-model="analyticUnitConfig.model_score" @change="modelScoreChange" /> <br/>
Threshold score:
<input v-model="analyticUnitConfig.threshold_score" @change="thresholdScoreChange" /> <br/><br/>
<button @click="clearAllLabeling"> clear all labeling </button>
</div>
<div v-if="analyticUnitType == analyticUnitTypes[2]">
Hold <pre>Z</pre> to set seasonality timespan
<hr/>
<!-- Alpha:
<input :value="analyticUnitConfig.alpha" @change="alphaChange" /> <br/> -->
Confidence:
<input v-model="analyticUnitConfig.confidence" @change="confidenceChange" /> <br/>
Seasonality:
<input v-model="analyticUnitConfig.seasonality" @change="seasonalityChange" /> <br/>
Seasonality iterations:
<input v-model="analyticUnitConfig.seasonality_iterations" @change="seasonalityIterationsChange" /> <br/>
<br/>
</div>
</div>
</div>
</template>
</div>
</template>
<script lang="ts">
import { defineComponent } from 'vue';
import Graph from '@/components/Graph.vue';
import AnalyticStatus from '@/components/AnlyticsStatus.vue';
import { AnalyticUnitType } from '@/types/analytic_units';
import { defineComponent } from 'vue';
import * as _ from 'lodash';
@ -116,7 +122,11 @@ export default defineComponent({
cfg.seasonality = parseFloat(e.target.value);
this.$store.dispatch('patchConfig', { Anomaly: cfg });
},
seasonalityIterationsChange(e) {
let cfg = _.clone(this.analyticUnitConfig);
cfg.seasonality_iterations = Math.ceil(e.target.value);
this.$store.dispatch('patchConfig', { Anomaly: cfg });
},
},
data: function () {
return {
@ -133,6 +143,9 @@ export default defineComponent({
},
analyticUnitConfig() {
return this.$store.state.analyticUnitConfig;
},
analyticStatus() {
return this.$store.state.analyticStatus;
}
}
});

12
docker-compose.yml

@ -0,0 +1,12 @@
version: '3'
services:
app:
image: hastic/hastic:latest
restart: always
environment:
HASTIC_PORT: "4347"
HASTIC_PROMETHEUS__URL: "http://demo.robustperception.io:9090"
HASTIC_PROMETHEUS__QUERY: "rate(go_memstats_alloc_bytes_total[1m])"
ports:
- "4347:4347"

4
server/Cargo.lock generated

@ -1333,9 +1333,9 @@ dependencies = [
[[package]]
name = "rusqlite"
version = "0.26.1"
version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a82b0b91fad72160c56bf8da7a549b25d7c31109f52cc1437eac4c0ad2550a7"
checksum = "4ba4d3462c8b2e4d7f4fcfcf2b296dc6b65404fbbc7b63daa37fd485c149daf7"
dependencies = [
"bitflags",
"fallible-iterator",

2
server/Cargo.toml

@ -17,7 +17,7 @@ fastrand = "1.5.0"
subbeat = "0.0.15"
config = "0.11.0"
openssl = { version = "=0.10.33", features = ["vendored"] }
rusqlite = "0.26.1"
rusqlite = "0.26.2"
# https://github.com/rusqlite/rusqlite/issues/914
libsqlite3-sys = { version = "*", features = ["bundled"] }
futures = "0.3.17"

17
server/config.example.toml

@ -1,8 +1,10 @@
port = 4347
[prometheus]
url = "http://localhost:9090"
query = "rate(go_memstats_alloc_bytes_total[5m])"
# one of datasource sections (prometheus / influx) should be uncommented and edited corresponding to your environment
# [prometheus]
# url = "http://localhost:9090"
# query = "rate(go_memstats_alloc_bytes_total[5m])"
# [influx]
@ -16,8 +18,7 @@ query = "rate(go_memstats_alloc_bytes_total[5m])"
# |> yield(name: "mean")
# """
[alerting]
type = "webhook"
interval = 10 # in seconds
endpoint = "http://localhost:9092"
# [alerting]
# type = "webhook"
# interval = 10 # in seconds
# endpoint = "http://localhost:9092"

62
server/src/config.rs

@ -1,8 +1,10 @@
use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig};
use std::env;
#[derive(Clone)]
pub struct WebhookAlertingConfig {
endpoint: String,
pub endpoint: String,
}
#[derive(Clone)]
@ -12,8 +14,8 @@ pub enum AlertingType {
#[derive(Clone)]
pub struct AlertingConfig {
alerting_type: AlertingType,
interval: u64, // interval in seconds
pub alerting_type: AlertingType,
pub interval: u64, // interval in seconds
}
#[derive(Clone)]
@ -25,6 +27,7 @@ pub struct Config {
fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfig> {
if config.get::<String>("prometheus.url").is_ok() {
println!("using Prometheus");
return Ok(DatasourceConfig::Prometheus(PrometheusConfig {
url: config.get("prometheus.url")?,
query: config.get("prometheus.query")?,
@ -32,6 +35,7 @@ fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfi
}
if config.get::<String>("influx.url").is_ok() {
println!("using Influx");
return Ok(DatasourceConfig::Influx(InfluxConfig {
url: config.get("influx.url")?,
org_id: config.get("influx.org_id")?,
@ -40,7 +44,7 @@ fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfi
}));
}
return Err(anyhow::format_err!("no datasource found"));
return Err(anyhow::format_err!("please configure a datasource"));
}
fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingConfig>> {
@ -62,7 +66,7 @@ fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingCo
let analytic_type = config.get::<String>("alerting.type").unwrap();
if analytic_type != "webhook" {
return Err(anyhow::format_err!(
"unknown alerting typy {}",
"unknown alerting type: {}",
analytic_type
));
}
@ -75,6 +79,46 @@ fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingCo
}));
}
// config::Environment doesn't support nested configs, e.g. `alerting.type`,
// so I've copied this from:
// https://github.com/rust-lang/mdBook/blob/f3e5fce6bf5e290c713f4015947dc0f0ad172d20/src/config.rs#L132
// so that `__` can be used in env variables instead of `.`,
// e.g. `HASTIC_ALERTING__TYPE` -> alerting.type
pub fn update_from_env(config: &mut config::Config) {
let overrides =
env::vars().filter_map(|(key, value)| parse_env(&key).map(|index| (index, value)));
for (key, value) in overrides {
config.set(&key, value).unwrap();
}
}
pub fn print_config(config: config::Config) {
// TODO: support any nesting level
let sections = config.to_owned().cache.into_table().unwrap();
for (section_name, values) in sections {
match values.clone().into_table() {
Err(_) => println!("{} => {}", section_name, values),
Ok(section) => {
for (key, value) in section {
println!("{}.{} => {}", section_name, key, value);
}
}
}
}
}
fn parse_env(key: &str) -> Option<String> {
const PREFIX: &str = "HASTIC_";
if key.starts_with(PREFIX) {
let key = &key[PREFIX.len()..];
Some(key.to_lowercase().replace("__", "."))
} else {
None
}
}
impl Config {
pub fn new() -> anyhow::Result<Config> {
let mut config = config::Config::default();
@ -83,14 +127,14 @@ impl Config {
config.merge(config::File::with_name("config")).unwrap();
}
config
.merge(config::Environment::with_prefix("HASTIC"))
.unwrap();
update_from_env(&mut config);
if config.get::<u16>("port").is_err() {
config.set("port", "8000").unwrap();
config.set("port", 4347).unwrap();
}
print_config(config.clone());
Ok(Config {
port: config.get::<u16>("port").unwrap(),
datasource_config: resolve_datasource(&config)?,

7
server/src/main.rs

@ -1,6 +1,6 @@
mod api;
use hastic::services::{analytic_service, metric_service, segments_service};
use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service, data_service};
use anyhow;
@ -9,10 +9,13 @@ async fn main() -> anyhow::Result<()> {
let config = hastic::config::Config::new()?;
let cfg_clone = config.clone();
let data_service = data_service::DataService::new()?;
let analytic_unit_service = analytic_unit_service::AnalyticUnitService::new(&data_service)?;
let metric_service = metric_service::MetricService::new(&config.datasource_config);
let segments_service = segments_service::SegmentsService::new()?;
let segments_service = segments_service::SegmentsService::new(&data_service)?;
let mut analytic_service = analytic_service::AnalyticService::new(
analytic_unit_service.clone(),
metric_service.clone(),
segments_service.clone(),
config.alerting,

245
server/src/services/analytic_service/analytic_service.rs

@ -2,14 +2,17 @@ use std::sync::Arc;
use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig};
use super::detection_runner::DetectionRunner;
use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR};
use super::types::{
self, AnalyticUnitRF, DetectionRunnerConfig, DetectionRunnerTask, LearningWaiter, HSR,
};
use super::{
analytic_client::AnalyticClient,
types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType},
};
use crate::config::AlertingConfig;
use crate::services::analytic_service::analytic_unit::resolve;
use crate::config::{AlertingConfig, AlertingType};
use crate::services::analytic_unit_service::AnalyticUnitService;
use crate::services::{
metric_service::MetricService,
segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH},
@ -20,6 +23,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear
use anyhow;
use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
// TODO: now it's basically single analytic unit, service will operate on many AU
@ -27,6 +31,7 @@ use tokio::sync::{mpsc, oneshot};
pub struct AnalyticService {
metric_service: MetricService,
segments_service: SegmentsService,
analytic_unit_service: AnalyticUnitService,
alerting: Option<AlertingConfig>,
@ -34,6 +39,7 @@ pub struct AnalyticService {
analytic_unit_config: AnalyticUnitConfig,
analytic_unit_learning_status: LearningStatus,
// TODO: add comment about how it's used
tx: mpsc::Sender<AnalyticServiceMessage>,
rx: mpsc::Receiver<AnalyticServiceMessage>,
@ -48,21 +54,25 @@ pub struct AnalyticService {
impl AnalyticService {
pub fn new(
analytic_unit_service: AnalyticUnitService,
metric_service: MetricService,
segments_service: segments_service::SegmentsService,
alerting: Option<AlertingConfig>,
) -> AnalyticService {
// TODO: move buffer size to config
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
let aus = analytic_unit_service.clone();
AnalyticService {
analytic_unit_service: aus,
metric_service,
segments_service,
alerting,
// TODO: get it from persistance
analytic_unit: None,
analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()),
analytic_unit_config: analytic_unit_service.get_active_config().unwrap(),
analytic_unit_learning_status: LearningStatus::Initialization,
tx,
@ -82,28 +92,56 @@ impl AnalyticService {
AnalyticClient::new(self.tx.clone())
}
fn run_learning_waiter(&self, learning_waiter: LearningWaiter) {
fn run_learning_waiter(&mut self, learning_waiter: LearningWaiter) {
// TODO: save handler of the task
tokio::spawn({
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
match learning_waiter {
LearningWaiter::Detection(task) => {
match learning_waiter {
LearningWaiter::Detection(task) => {
tokio::spawn({
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
AnalyticService::get_detections(task.sender, au, ms, task.from, task.to)
.await
}
LearningWaiter::HSR(task) => {
});
}
LearningWaiter::HSR(task) => {
tokio::spawn({
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await
}
}
});
}
LearningWaiter::DetectionRunner(task) => {
self.run_detection_runner(task.from);
}
});
}
}
fn run_detection_runner(&mut self) {
// TODO: create DetectionRunnerConfig from alerting
// TODO: rerun detection runner on analytic unit change
fn run_detection_runner(&mut self, from: u64) {
// TODO: handle case or make it impossible to run_detection_runner second time
if self.analytic_unit_learning_status != LearningStatus::Ready {
let task = DetectionRunnerTask { from };
self.learning_waiters
.push(LearningWaiter::DetectionRunner(task));
return;
}
let AlertingType::Webhook(acfg) = self.alerting.as_ref().unwrap().alerting_type.clone();
let drcfg = DetectionRunnerConfig {
endpoint: acfg.endpoint.clone(),
interval: self.alerting.as_ref().unwrap().interval,
};
let tx = self.tx.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
let dr = DetectionRunner::new(tx,self.metric_service.clone(), drcfg, au);
self.detection_runner = Some(dr);
self.detection_runner.as_mut().unwrap().run(from);
// TODO: rerun detection runner on analytic unit change (by setting analytic unit)
// if self.runner_handler.is_some() {
// self.runner_handler.as_mut().unwrap().abort();
// }
@ -129,11 +167,12 @@ impl AnalyticService {
self.learning_handler = Some(tokio::spawn({
self.analytic_unit_learning_status = LearningStatus::Starting;
let tx = self.tx.clone();
let aus = self.analytic_unit_service.clone();
let ms = self.metric_service.clone();
let ss = self.segments_service.clone();
let cfg = self.analytic_unit_config.clone();
async move {
AnalyticService::run_learning(tx, cfg, ms, ss).await;
AnalyticService::run_learning(tx, cfg, aus, ms, ss).await;
}
}));
}
@ -194,80 +233,132 @@ impl AnalyticService {
}
// TODO: maybe make `consume_response` async
fn consume_response(&mut self, res: types::ResponseType) {
fn consume_response(&mut self, res: anyhow::Result<types::ResponseType>) {
match res {
// TODO: handle when learning panics
ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning
}
ResponseType::LearningFinished(results) => {
self.learning_handler = None;
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results)));
self.analytic_unit_learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter
while self.learning_waiters.len() > 0 {
let task = self.learning_waiters.pop().unwrap();
self.run_learning_waiter(task);
Ok(response_type) => {
match response_type {
ResponseType::DetectionRunnerStarted(from) => {
println!("Detection runner started from {}", from)
}
ResponseType::DetectionRunnerUpdate(id, timestamp) => {
self.analytic_unit_service
.set_last_detection(id, timestamp)
.unwrap();
}
ResponseType::DetectionRunnerDetection(from, to) => {
println!("detection: {} {}", from, to);
}
ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning
}
ResponseType::LearningFinished(results) => {
self.learning_handler = None;
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results)));
self.analytic_unit_learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter
while self.learning_waiters.len() > 0 {
let task = self.learning_waiters.pop().unwrap();
self.run_learning_waiter(task);
}
}
ResponseType::LearningFinishedEmpty => {
// TODO: drop all learning_waiters with empty results
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Initialization;
}
}
}
ResponseType::LearningFinishedEmpty => {
// TODO: drop all learning_waiters with empty results
// TODO: create custom DatasourceError error type
Err(err) => {
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Initialization;
}
ResponseType::LearningDatasourceError => {
// TODO: drop all learning_waiters with error
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Error;
self.analytic_unit_learning_status = LearningStatus::Error(err.to_string());
}
}
}
fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) {
let (new_conf, need_learning) = self.analytic_unit_config.patch(patch);
self.analytic_unit_config = new_conf;
if need_learning {
self.consume_request(RequestType::RunLearning);
// TODO: it's not fullu correct: we need to wait when the learning starts
match tx.send(()) {
Ok(_) => {}
Err(_e) => {
println!("Can`t send patch config notification");
}
}
} else {
let my_id = self
.analytic_unit_service
.get_config_id(&self.analytic_unit_config);
let patch_id = patch.get_type_id();
let same_type = my_id == patch_id;
// TODO: need_learning and same_type logic overlaps, there is a way to optimise this
let need_learning = self.analytic_unit_config.patch_needs_learning(&patch);
if same_type {
// TODO: check when learning should be started
let new_conf = patch.get_new_config();
self.analytic_unit_config = new_conf.clone();
self.analytic_unit_service
.update_config_by_id(&my_id, &new_conf)
.unwrap();
if self.analytic_unit.is_some() {
tokio::spawn({
let au = self.analytic_unit.clone();
let cfg = self.analytic_unit_config.clone();
async move {
au.unwrap().write().await.set_config(cfg);
match tx.send(()) {
Ok(_) => {}
Err(_e) => {
println!("Can`t send patch config notification");
}
if need_learning {
self.consume_request(RequestType::RunLearning);
match tx.send(()) {
Ok(_) => {}
Err(e) => {
println!("Can`t send patch config notification");
println!("{:?}", e);
}
}
});
return;
} else {
tokio::spawn({
let au = self.analytic_unit.clone();
let cfg = self.analytic_unit_config.clone();
async move {
au.unwrap().write().await.set_config(cfg);
match tx.send(()) {
Ok(_) => {}
Err(e) => {
println!("Can`t send patch config notification");
println!("{:?}", e);
}
}
}
});
}
} else {
// TODO: check if we need this else
match tx.send(()) {
Ok(_) => {}
Err(_e) => {
Err(e) => {
println!("Can`t send patch config notification");
println!("{:?}", e);
}
}
}
} else {
let new_conf = self
.analytic_unit_service
.get_config_by_id(&patch_id)
.unwrap();
self.analytic_unit_config = new_conf.clone();
self.consume_request(RequestType::RunLearning);
match tx.send(()) {
Ok(_) => {}
Err(e) => {
println!("Can`t send patch config notification");
println!("{:?}", e);
}
}
}
}
pub async fn serve(&mut self) {
// TODO: remove this hack
self.consume_request(RequestType::RunLearning);
// TODO: start detection runner if
if self.alerting.is_some() {
self.run_detection_runner();
// TODO: get it from persistance
let now: DateTime<Utc> = Utc::now();
let from = now.timestamp() as u64;
self.run_detection_runner(from);
}
while let Some(message) = self.rx.recv().await {
@ -281,15 +372,21 @@ impl AnalyticService {
async fn run_learning(
tx: mpsc::Sender<AnalyticServiceMessage>,
aucfg: AnalyticUnitConfig,
aus: AnalyticUnitService,
ms: MetricService,
ss: SegmentsService,
) {
let mut au = resolve(aucfg);
let mut au = match aus.resolve(&aucfg) {
Ok(a) => a,
Err(e) => {
panic!("{}", e);
}
};
match tx
.send(AnalyticServiceMessage::Response(
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::LearningStarted,
))
)))
.await
{
Ok(_) => {}
@ -298,9 +395,11 @@ impl AnalyticService {
// TODO: maybe to spawn_blocking here
let lr = match au.learn(ms, ss).await {
LearningResult::Finished => ResponseType::LearningFinished(au),
LearningResult::DatasourceError => ResponseType::LearningDatasourceError,
LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty,
Ok(res) => match res {
LearningResult::Finished => Ok(ResponseType::LearningFinished(au)),
LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty),
},
Err(e) => Err(e),
};
match tx.send(AnalyticServiceMessage::Response(lr)).await {

87
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -1,4 +1,8 @@
use crate::services::{analytic_service::types::{AnomalyHSRConfig, HSR}, metric_service::MetricService, segments_service::SegmentsService};
use crate::services::{
analytic_service::types::{AnomalyHSRConfig, HSR},
metric_service::MetricService,
segments_service::SegmentsService,
};
use super::types::{AnalyticUnit, AnalyticUnitConfig, AnomalyConfig, LearningResult};
@ -7,15 +11,13 @@ use subbeat::metric::MetricResult;
use chrono::prelude::*;
// TODO: move to config
const DETECTION_STEP: u64 = 10;
const SEASONALITY_ITERATIONS: u64 = 3; // TODO: better name
// timerange offset in seconds backwards from end of ts in assumption that ts has no gaps
fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)>{
fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)> {
// TODO: remove dependency to DETECTION_STEP
let indexes_offset = (offset / DETECTION_STEP) as usize;
let n = ts.len() - 1;
if n < indexes_offset {
@ -25,48 +27,55 @@ fn get_value_with_offset(ts: &Vec<(u64, f64)>, offset: u64) -> Option<(u64, f64)
return Some(ts[i]);
}
struct SARIMA {
pub ts: Vec<(u64, f64)>,
pub seasonality: u64,
pub confidence: f64,
pub seasonality_iterations: u64,
}
impl SARIMA {
pub fn new(seasonality: u64, confidence: f64) -> SARIMA {
pub fn new(seasonality: u64, confidence: f64, seasonality_iterations: u64) -> SARIMA {
return SARIMA {
ts: Vec::new(),
seasonality,
confidence
confidence,
seasonality_iterations,
};
}
pub fn learn(&mut self, ts: &Vec<(u64, f64)>) -> anyhow::Result<()> {
// TODO: don't count NaNs in model
// TODO: add exponental smooting to model
// TODO: add exponental smooting to the model
// TODO: trend detection
if ts.len() < 2 {
return Err(anyhow::format_err!("to short timeserie to learn from"));
return Err(anyhow::format_err!(
"too short timeserie to learn from, timeserie length: {}",
ts.len()
));
}
// TODO: ensure capacity with seasonality size
let mut res_ts = Vec::<(u64, f64)>::new();
let from = ts[0].0;
// TODO: unwrap -> ?
let to = ts.last().unwrap().0;
let iter_steps = (self.seasonality / DETECTION_STEP) as usize;
if to - from != SEASONALITY_ITERATIONS * self.seasonality {
return Err(anyhow::format_err!("timeserie to learn from should be {} * sasonality", SEASONALITY_ITERATIONS));
if to - from != self.seasonality_iterations * self.seasonality {
return Err(anyhow::format_err!(
"timeserie to learn from should be {} * sasonality",
self.seasonality_iterations
));
}
for k in 0..iter_steps {
let mut vts = Vec::new();
for si in 0..SEASONALITY_ITERATIONS {
for si in 0..self.seasonality_iterations {
vts.push(ts[k + iter_steps * si as usize].1);
}
let mut vt: f64 = vts.iter().sum();
vt /= SEASONALITY_ITERATIONS as f64;
vt /= self.seasonality_iterations as f64;
let t = ts[ts.len() - iter_steps + k].0;
res_ts.push((t, vt));
}
@ -89,7 +98,6 @@ impl SARIMA {
let len_from = timestamp - from;
// TODO: take avg if timestamp in between
let index_diff = (len_from / DETECTION_STEP) % self.ts.len() as u64;
let p = self.ts[index_diff as usize].1;
return (p, (p + self.confidence, p - self.confidence));
@ -98,18 +106,18 @@ impl SARIMA {
pub fn push_point() {
// TODO: inmplement
}
}
pub struct AnomalyAnalyticUnit {
id: String,
config: AnomalyConfig,
sarima: Option<SARIMA>,
}
impl AnomalyAnalyticUnit {
pub fn new(config: AnomalyConfig) -> AnomalyAnalyticUnit {
pub fn new(id: String, config: AnomalyConfig) -> AnomalyAnalyticUnit {
AnomalyAnalyticUnit {
id,
config,
sarima: None,
}
@ -124,7 +132,7 @@ impl AnomalyAnalyticUnit {
return Ok(HSR::AnomalyHSR(AnomalyHSRConfig {
seasonality: self.config.seasonality,
timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0,
ts: Vec::new()
ts: Vec::new(),
}));
}
@ -135,7 +143,7 @@ impl AnomalyAnalyticUnit {
return Ok(HSR::AnomalyHSR(AnomalyHSRConfig {
seasonality: self.config.seasonality,
timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0,
ts: Vec::new()
ts: Vec::new(),
}));
}
@ -143,41 +151,60 @@ impl AnomalyAnalyticUnit {
let sarima = self.sarima.as_ref().unwrap();
for vt in ts {
let x = sarima.predict(vt.0);
sts.push((vt.0, x.0, (x.1.0, x.1.1)));
sts.push((vt.0, x.0, (x.1 .0, x.1 .1)));
}
return Ok(HSR::AnomalyHSR(AnomalyHSRConfig {
seasonality: self.config.seasonality,
timestamp: self.sarima.as_ref().unwrap().ts.last().unwrap().0,
ts: sts
ts: sts,
}));
}
}
#[async_trait]
impl AnalyticUnit for AnomalyAnalyticUnit {
fn get_id(&self) -> String {
return self.id.to_owned();
}
fn get_detection_window(&self) -> u64 {
// TODO: return window based on real petterns info
return DETECTION_STEP;
}
fn set_config(&mut self, config: AnalyticUnitConfig) {
if let AnalyticUnitConfig::Anomaly(cfg) = config {
self.config = cfg;
if self.sarima.is_some() {
self.sarima.as_mut().unwrap().confidence = self.config.confidence;
}
} else {
panic!("Bad config!");
}
}
async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> LearningResult {
let mut sarima = SARIMA::new(self.config.seasonality, self.config.confidence);
async fn learn(
&mut self,
ms: MetricService,
_ss: SegmentsService,
) -> anyhow::Result<LearningResult> {
let mut sarima = SARIMA::new(
self.config.seasonality,
self.config.confidence,
self.config.seasonality_iterations,
);
let utc: DateTime<Utc> = Utc::now();
let to = utc.timestamp() as u64;
let from = to - self.config.seasonality * SEASONALITY_ITERATIONS;
let from = to - self.config.seasonality * self.config.seasonality_iterations;
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
let mr = ms.query(from, to, DETECTION_STEP).await?;
if mr.data.keys().len() == 0 {
return LearningResult::FinishedEmpty;
return Ok(LearningResult::FinishedEmpty);
}
// TODO: unwrap -> ?
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
sarima.learn(ts).unwrap();
sarima.learn(ts)?;
self.sarima = Some(sarima);
@ -185,7 +212,7 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
// TODO: load data to learning
// TODO: update model to work online
return LearningResult::Finished;
return Ok(LearningResult::Finished);
}
async fn detect(
&self,

8
server/src/services/analytic_service/analytic_unit/mod.rs

@ -7,11 +7,3 @@ use self::{
anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit,
threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig,
};
pub fn resolve(cfg: AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg {
AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new(c.clone())),
AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new(c.clone())),
AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new(c.clone())),
}
}

34
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -74,6 +74,7 @@ async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Re
});
}
// TODO: unwrap -> ?
let k = mr.data.keys().nth(0).unwrap().clone();
let ts = mr.data.remove(&k).unwrap();
@ -197,14 +198,16 @@ fn max_corr_with_segments(xs: &VecDeque<f64>, yss: &Vec<Vec<f64>>) -> f32 {
}
pub struct PatternAnalyticUnit {
id: String,
config: PatternConfig,
learning_results: Option<LearningResults>,
}
// TODO: move this to loginc of analytic unit
impl PatternAnalyticUnit {
pub fn new(cfg: PatternConfig) -> PatternAnalyticUnit {
pub fn new(id: String, cfg: PatternConfig) -> PatternAnalyticUnit {
PatternAnalyticUnit {
id,
config: cfg,
learning_results: None,
}
@ -213,6 +216,13 @@ impl PatternAnalyticUnit {
#[async_trait]
impl AnalyticUnit for PatternAnalyticUnit {
fn get_id(&self) -> String {
return self.id.to_owned();
}
fn get_detection_window(&self) -> u64 {
let lr = self.learning_results.as_ref().unwrap();
return lr.avg_pattern_length as u64;
}
fn set_config(&mut self, config: AnalyticUnitConfig) {
if let AnalyticUnitConfig::Pattern(cfg) = config {
self.config = cfg;
@ -221,7 +231,11 @@ impl AnalyticUnit for PatternAnalyticUnit {
}
}
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult {
async fn learn(
&mut self,
ms: MetricService,
ss: SegmentsService,
) -> anyhow::Result<LearningResult> {
// TODO: move to config
let mut cfg = Config::new();
cfg.set_feature_size(FEATURES_SIZE);
@ -235,14 +249,14 @@ impl AnalyticUnit for PatternAnalyticUnit {
cfg.set_training_optimization_level(2);
// be careful if decide to store detections in db
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
let segments = ss.get_segments_inside(0, u64::MAX / 2)?;
let has_segments_label = segments
.iter()
.find(|s| s.segment_type == SegmentType::Label)
.is_some();
if !has_segments_label {
return LearningResult::FinishedEmpty;
return Ok(LearningResult::FinishedEmpty);
}
let fs = segments.iter().map(|s| segment_to_segdata(&ms, s));
@ -253,11 +267,13 @@ impl AnalyticUnit for PatternAnalyticUnit {
for r in rs {
if r.is_err() {
println!("Error extracting metrics from datasource");
return LearningResult::DatasourceError;
// TODO: custom DatasourceError error type
return Err(anyhow::format_err!(
"Error extracting metrics from datasource"
));
}
let sd = r.unwrap();
let sd = r?;
if sd.data.is_empty() {
continue;
}
@ -269,7 +285,7 @@ impl AnalyticUnit for PatternAnalyticUnit {
}
if learn_tss.len() == 0 {
return LearningResult::FinishedEmpty;
return Ok(LearningResult::FinishedEmpty);
}
let mut patterns = Vec::<Vec<f64>>::new();
@ -332,7 +348,7 @@ impl AnalyticUnit for PatternAnalyticUnit {
avg_pattern_length,
});
return LearningResult::Finished;
return Ok(LearningResult::Finished);
}
// TODO: get iterator instead of vector

19
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -10,19 +10,30 @@ use async_trait::async_trait;
const DETECTION_STEP: u64 = 10;
pub struct ThresholdAnalyticUnit {
id: String,
config: ThresholdConfig,
}
impl ThresholdAnalyticUnit {
pub fn new(config: ThresholdConfig) -> ThresholdAnalyticUnit {
ThresholdAnalyticUnit { config }
pub fn new(id: String, config: ThresholdConfig) -> ThresholdAnalyticUnit {
ThresholdAnalyticUnit { id, config }
}
}
#[async_trait]
impl AnalyticUnit for ThresholdAnalyticUnit {
async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult {
return LearningResult::Finished;
fn get_id(&self) -> String {
return self.id.to_owned();
}
fn get_detection_window(&self) -> u64 {
return DETECTION_STEP;
}
async fn learn(
&mut self,
_ms: MetricService,
_ss: SegmentsService,
) -> anyhow::Result<LearningResult> {
return Ok(LearningResult::Finished);
}
fn set_config(&mut self, config: AnalyticUnitConfig) {

87
server/src/services/analytic_service/analytic_unit/types.rs

@ -30,6 +30,7 @@ pub struct AnomalyConfig {
pub alpha: f64,
pub confidence: f64,
pub seasonality: u64, // step in seconds, can be zero
pub seasonality_iterations: u64,
}
impl Default for AnomalyConfig {
@ -38,6 +39,7 @@ impl Default for AnomalyConfig {
alpha: 0.5,
confidence: 10.0,
seasonality: 60 * 60,
seasonality_iterations: 3,
}
}
}
@ -55,13 +57,61 @@ impl Default for ThresholdConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AnalyticUnitConfig {
Pattern(PatternConfig),
Threshold(ThresholdConfig),
Pattern(PatternConfig),
Anomaly(AnomalyConfig),
}
impl AnalyticUnitConfig {
// return tru if patch is different type
pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig {
let iid = id.as_str();
match iid {
"1" => AnalyticUnitConfig::Threshold(Default::default()),
"2" => AnalyticUnitConfig::Pattern(Default::default()),
"3" => AnalyticUnitConfig::Anomaly(Default::default()),
_ => panic!("bad id for getting get_default_by_id"),
}
}
pub fn patch_needs_learning(&self, patch: &PatchConfig) -> bool {
// TODO: maybe use type id's to optimise code
match patch {
PatchConfig::Pattern(tcfg) => match self.clone() {
AnalyticUnitConfig::Pattern(_) => {
return false;
}
_ => return true,
},
PatchConfig::Anomaly(tcfg) => match self.clone() {
AnalyticUnitConfig::Anomaly(scfg) => {
if tcfg.is_some() {
let t = tcfg.as_ref().unwrap();
let mut need_learning = t.seasonality != scfg.seasonality;
need_learning |= t.seasonality_iterations != scfg.seasonality_iterations;
return need_learning;
} else {
return false;
}
}
_ => {
return true;
}
},
PatchConfig::Threshold(tcfg) => match self.clone() {
AnalyticUnitConfig::Threshold(_) => {
return false;
}
_ => {
return true;
}
},
}
}
// TODO: maybe this method depricated
// return true if need needs relearning
pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) {
match patch {
PatchConfig::Pattern(tcfg) => match self.clone() {
@ -69,6 +119,7 @@ impl AnalyticUnitConfig {
if tcfg.is_some() {
return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false);
} else {
// TODO: it should be extraced from db
return (AnalyticUnitConfig::Pattern(Default::default()), false);
}
}
@ -85,7 +136,8 @@ impl AnalyticUnitConfig {
AnalyticUnitConfig::Anomaly(scfg) => {
if tcfg.is_some() {
let t = tcfg.as_ref().unwrap();
let need_learning = t.seasonality != scfg.seasonality;
let mut need_learning = t.seasonality != scfg.seasonality;
need_learning |= t.seasonality_iterations != scfg.seasonality_iterations;
return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning);
} else {
return (AnalyticUnitConfig::Anomaly(Default::default()), false);
@ -123,12 +175,17 @@ impl AnalyticUnitConfig {
pub enum LearningResult {
Finished,
FinishedEmpty,
DatasourceError,
}
#[async_trait]
pub trait AnalyticUnit {
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult;
fn get_id(&self) -> String;
fn get_detection_window(&self) -> u64;
async fn learn(
&mut self,
ms: MetricService,
ss: SegmentsService,
) -> anyhow::Result<LearningResult>;
async fn detect(
&self,
ms: MetricService,
@ -146,3 +203,23 @@ pub enum PatchConfig {
Threshold(Option<ThresholdConfig>),
Anomaly(Option<AnomalyConfig>),
}
impl PatchConfig {
pub fn get_type_id(&self) -> String {
match &self {
PatchConfig::Threshold(_) => "1".to_string(),
PatchConfig::Pattern(_) => "2".to_string(),
PatchConfig::Anomaly(_) => "3".to_string(),
}
}
pub fn get_new_config(&self) -> AnalyticUnitConfig {
match &self {
PatchConfig::Threshold(cfg) => {
AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone())
}
PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()),
PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()),
}
}
}

103
server/src/services/analytic_service/detection_runner.rs

@ -1,52 +1,115 @@
use crate::services::analytic_service::analytic_unit::types::AnalyticUnit;
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tokio::sync::mpsc;
use chrono::Utc;
use crate::services::metric_service::MetricService;
use tokio::sync::{mpsc, RwLock};
use super::types::{AnalyticUnitRF, DetectionRunnerConfig};
use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType};
use tokio::time::{sleep, Duration};
pub struct DetectionRunner {
tx: mpsc::Sender<AnalyticServiceMessage>,
metric_service: MetricService,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
running_handler: Option<tokio::task::JoinHandle<()>>,
}
impl DetectionRunner {
pub fn new(config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF) -> DetectionRunner {
pub fn new(
tx: mpsc::Sender<AnalyticServiceMessage>,
metric_service: MetricService,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
) -> DetectionRunner {
DetectionRunner {
metric_service,
tx,
config,
analytic_unit,
running_handler: None,
}
}
pub async fn run(&mut self, from: u64) {
// TODO: get last detection timestamp from persistance
// TODO: set lst detection from "now"
pub fn run(&mut self, from: u64) {
// TODO: set last detection from "now"
if self.running_handler.is_some() {
self.running_handler.as_mut().unwrap().abort();
}
self.running_handler = Some(tokio::spawn({
// TODO: clone channel
let cfg = self.config.clone();
let ms = self.metric_service.clone();
let tx = self.tx.clone();
let au = self.analytic_unit.clone();
async move {
// AnalyticService::run_learning(tx, cfg, ms, ss).await;
// TODO: run detection "from"
// TODO: run detection "from" for big timespan
// TODO: parse detections to webhooks
// TODO: handle case when detection is in the end and continues after "now"
// it's better to make an issue on github
// TODO: find place to update analytic unit model
let window_size = au.as_ref().read().await.get_detection_window();
let detection_step = ms.get_detection_step();
let mut t_from = from - window_size;
let mut t_to = from;
match tx
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerStarted(from),
)))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner started notification"),
}
loop {
// TODO: run detection periodically
// TODO: use interval
sleep(Duration::from_secs(100)).await;
let a = au.as_ref().read().await;
let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap();
for d in detections {
match tx
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerDetection(d.0, d.1),
)))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner detection notification"),
}
}
// TODO: send info about detections to tx
match tx
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerUpdate(
au.as_ref().read().await.get_id(),
t_to,
),
)))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner started notification"),
}
sleep(Duration::from_secs(cfg.interval)).await;
t_from += detection_step;
t_to += detection_step;
}
}
}));
}
pub async fn set_analytic_unit(
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
) {
}
// pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF,
// ) {
// self.analytic_unit = analytic_unit;
// // TODO: stop running_handler
// // TODO: rerun detection with new anomaly units
// if self.runner_handler.is_some() {
// self.runner_handler.as_mut().unwrap().abort();
// }
// }
}

19
server/src/services/analytic_service/types.rs

@ -22,7 +22,7 @@ pub enum LearningStatus {
Initialization,
Starting,
Learning,
Error,
Error(String),
Ready,
}
@ -43,10 +43,12 @@ impl Default for LearningTrain {
}
pub enum ResponseType {
DetectionRunnerStarted(u64),
DetectionRunnerUpdate(String, u64), // analytic_unit id and timestamp
DetectionRunnerDetection(u64, u64), // TODO: add more into about analytic unit and more
LearningStarted,
LearningFinished(Box<dyn AnalyticUnit + Send + Sync>),
LearningFinishedEmpty,
LearningDatasourceError,
}
impl fmt::Debug for ResponseType {
@ -63,11 +65,16 @@ pub struct DetectionTask {
pub to: u64,
}
#[derive(Debug)]
pub struct DetectionRunnerTask {
pub from: u64,
}
#[derive(Debug, Serialize)]
pub struct AnomalyHSRConfig {
pub timestamp: u64,
pub seasonality: u64,
pub ts: Vec<(u64, f64, (f64, f64))>
pub ts: Vec<(u64, f64, (f64, f64))>,
}
// HSR Stands for Hastic Signal Representation,
// varies for different analytic units
@ -88,6 +95,7 @@ pub struct HSRTask {
#[derive(Debug)]
pub enum LearningWaiter {
Detection(DetectionTask),
DetectionRunner(DetectionRunnerTask),
HSR(HSRTask),
}
@ -96,7 +104,8 @@ pub enum LearningWaiter {
pub struct DetectionRunnerConfig {
// pub sender: mpsc::Sender<Result<Vec<Segment>>>,
pub endpoint: String,
pub from: u64,
// pub from: u64,
pub interval: u64,
}
#[derive(Debug)]
@ -116,5 +125,5 @@ pub enum RequestType {
pub enum AnalyticServiceMessage {
// Status,
Request(RequestType),
Response(ResponseType), // Detect { from: u64, to: u64 },
Response(anyhow::Result<ResponseType>), // Detect { from: u64, to: u64 },
}

195
server/src/services/analytic_unit_service.rs

@ -0,0 +1,195 @@
use std::sync::{Arc, Mutex};
use rusqlite::{params, Connection};
use super::analytic_service::analytic_unit::{
anomaly_analytic_unit::AnomalyAnalyticUnit,
pattern_analytic_unit::PatternAnalyticUnit,
threshold_analytic_unit::ThresholdAnalyticUnit,
types::{self, AnalyticUnitConfig},
};
use super::data_service::DataService;
#[derive(Clone)]
pub struct AnalyticUnitService {
connection: Arc<Mutex<Connection>>,
}
// TODO: get DataService
impl AnalyticUnitService {
pub fn new(ds: &DataService) -> anyhow::Result<AnalyticUnitService> {
let conn = ds.analytic_units_connection.clone();
// TODO: add learning results field
conn.lock().unwrap().execute(
"CREATE TABLE IF NOT EXISTS analytic_unit (
id TEXT PRIMARY KEY,
last_detection INTEGER,
active BOOLEAN,
type INTEGER,
config TEXT
)",
[],
)?;
Ok(AnalyticUnitService {
connection: conn
})
}
// TODO: optional id
pub fn resolve_au(
&self,
cfg: &AnalyticUnitConfig,
) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg {
AnalyticUnitConfig::Threshold(c) => {
Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone()))
}
AnalyticUnitConfig::Pattern(c) => {
Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone()))
}
AnalyticUnitConfig::Anomaly(c) => {
Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone()))
}
}
}
// TODO: get id of analytic_unit which be used also as it's type
pub fn resolve(
&self,
cfg: &AnalyticUnitConfig,
) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
let au = self.resolve_au(cfg);
let id = au.as_ref().get_id();
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT id from analytic_unit WHERE id = ?1")?;
let res = stmt.exists(params![id])?;
if res == false {
let cfg_json = serde_json::to_string(&cfg)?;
conn.execute(
"INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)",
params![id, cfg_json],
)?;
}
conn.execute(
"UPDATE analytic_unit set active = FALSE where active = TRUE",
params![],
)?;
conn.execute(
"UPDATE analytic_unit set active = TRUE where id = ?1",
params![id],
)?;
return Ok(au);
}
pub fn set_last_detection(&self, id: String, last_detection: u64) -> anyhow::Result<()> {
let conn = self.connection.lock().unwrap();
conn.execute(
"UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2",
params![last_detection, id],
)?;
Ok(())
}
pub fn get_active(&self) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
// TODO: return default when there is no active
let conn = self.connection.lock().unwrap();
let mut stmt =
conn.prepare("SELECT id, type, config from analytic_unit WHERE active = TRUE")?;
let au = stmt.query_row([], |row| {
let c: String = row.get(2)?;
let cfg: AnalyticUnitConfig = serde_json::from_str(&c).unwrap();
Ok(self.resolve(&cfg))
})??;
return Ok(au);
}
pub fn get_active_config(&self) -> anyhow::Result<AnalyticUnitConfig> {
let exists = {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?;
stmt.exists([])?
};
if exists == false {
let c = AnalyticUnitConfig::Pattern(Default::default());
self.resolve(&c)?;
return Ok(c);
} else {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?;
let acfg = stmt.query_row([], |row| {
let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap();
Ok(cfg)
})?;
return Ok(acfg);
}
}
pub fn get_config_by_id(&self, id: &String) -> anyhow::Result<AnalyticUnitConfig> {
let exists = {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?;
stmt.exists([id])?
};
if exists == false {
let c = AnalyticUnitConfig::get_default_by_id(id);
self.resolve(&c)?;
return Ok(c);
} else {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?;
let acfg = stmt.query_row([id], |row| {
let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap();
Ok(cfg)
})?;
return Ok(acfg);
}
}
pub fn get_config_id(&self, cfg: &AnalyticUnitConfig) -> String {
match cfg {
AnalyticUnitConfig::Threshold(_) => "1".to_string(),
AnalyticUnitConfig::Pattern(_) => "2".to_string(),
AnalyticUnitConfig::Anomaly(_) => "3".to_string(),
}
}
pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> {
// TODO: it's possble that config doesn't exist, but we trying to update it
let conn = self.connection.lock().unwrap();
let cfg_json = serde_json::to_string(&cfg)?;
conn.execute(
"UPDATE analytic_unit SET config = ?1 WHERE id = ?2",
params![cfg_json, id],
)?;
return Ok(());
}
pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> {
let conn = self.connection.lock().unwrap();
let cfg_json = serde_json::to_string(&cfg)?;
conn.execute(
"UPDATE analytic_unit SET config = ?1 WHERE active = TRUE",
params![cfg_json],
)?;
return Ok(());
}
}

23
server/src/services/data_service.rs

@ -0,0 +1,23 @@
use std::sync::{Arc, Mutex};
use rusqlite::{Connection};
pub struct DataService {
pub analytic_units_connection: Arc<Mutex<Connection>>,
pub segments_connection: Arc<Mutex<Connection>>
}
impl DataService {
pub fn new() -> anyhow::Result<DataService> {
std::fs::create_dir_all("./data").unwrap();
let analytic_units_connection = Connection::open("./data/analytic_units.db")?;
let segments_connection = Connection::open("./data/segments.db")?;
Ok(DataService {
analytic_units_connection: Arc::new(Mutex::new(analytic_units_connection)),
segments_connection: Arc::new(Mutex::new(segments_connection))
})
}
}

7
server/src/services/metric_service.rs

@ -32,4 +32,11 @@ impl MetricService {
}
return Ok(mr);
}
// TODO: it a hack for DetectionRunner: it should vary for different analytic units
// and it's config
pub fn get_detection_step(&self) -> u64 {
return 10;
}
}

2
server/src/services/mod.rs

@ -1,4 +1,6 @@
pub mod analytic_service;
pub mod data_service;
pub mod analytic_unit_service;
pub mod metric_service;
pub mod segments_service;
pub mod user_service;

17
server/src/services/segments_service.rs

@ -6,9 +6,13 @@ use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use super::data_service::DataService;
pub const ID_LENGTH: usize = 20;
pub type SegmentId = String;
// TODO: make logic with this enum shorter
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
pub enum SegmentType {
@ -58,18 +62,19 @@ impl Segment {
}
}
// TODO: get DataService
#[derive(Clone)]
pub struct SegmentsService {
connection: Arc<Mutex<Connection>>,
}
impl SegmentsService {
pub fn new() -> anyhow::Result<SegmentsService> {
// TODO: move it to data service
std::fs::create_dir_all("./data").unwrap();
pub fn new(ds: &DataService) -> anyhow::Result<SegmentsService> {
let conn = Connection::open("./data/segments.db")?;
conn.execute(
// TODO: add unilytic_unit id as a new field
let conn = ds.segments_connection.clone();
conn.lock().unwrap().execute(
"CREATE TABLE IF NOT EXISTS segment (
id TEXT PRIMARY KEY,
start INTEGER NOT NULL,
@ -80,7 +85,7 @@ impl SegmentsService {
)?;
Ok(SegmentsService {
connection: Arc::new(Mutex::new(conn)),
connection: conn
})
}

Loading…
Cancel
Save