Browse Source

models++

pull/1/head
Alexey Velikiy 6 years ago
parent
commit
1d35992384
  1. 2
      server/src/config.ts
  2. 6
      server/src/controllers/analytics_controller.ts
  3. 102
      server/src/models/analytic_unit_model.ts
  4. 78
      server/src/models/metric_model.ts
  5. 58
      server/src/models/segment_model.ts
  6. 6
      server/src/models/task_model.ts
  7. 464
      server/src/services/analytics_service.ts
  8. 247
      server/src/services/data_service.ts
  9. 86
      server/src/services/process_service.ts

2
server/src/config.ts

@ -12,9 +12,7 @@ export const ANALYTICS_PATH = path.join(__dirname, '../../analytics');
export const DATA_PATH = path.join(__dirname, '../../data'); export const DATA_PATH = path.join(__dirname, '../../data');
export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db');
export const METRICS_DATABASE_PATH = path.join(DATA_PATH, 'metrics.db');
export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');
export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null);

6
server/src/controllers/analytics_controller.ts

@ -123,8 +123,8 @@ export function isAnalyticReady(): boolean {
} }
export async function createAnalyticUnitFromObject(obj: any): Promise<AnalyticUnit.AnalyticUnitId> { export async function createAnalyticUnitFromObject(obj: any): Promise<AnalyticUnit.AnalyticUnitId> {
let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.analyticUnitFromObj(obj); let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.AnalyticUnit.fromObject(obj);
AnalyticUnit.create(unit); let id = await AnalyticUnit.create(unit);
// runLearning(unit); // runLearning(unit);
return return id;
} }

102
server/src/models/analytic_unit_model.ts

@ -7,54 +7,66 @@ let db = makeDBQ(Collection.ANALYTIC_UNITS);
export type AnalyticUnitId = string; export type AnalyticUnitId = string;
export type AnalyticUnit = { export class AnalyticUnit {
id?: AnalyticUnitId, constructor(
name: string, public name: string,
panelUrl: string, public panelUrl: string,
type: string, public type: string,
metric: Metric public metric: Metric,
status: string, public id?: AnalyticUnitId,
error?: string, public lastPredictionTime?: number,
lastPredictionTime: number public status?: string,
} public error?: string,
) {
export function analyticUnitFromObj(obj: any): AnalyticUnit { if(name === undefined) {
if(obj === undefined) { throw new Error(`Missing field "name"`);
throw new Error('obj is undefined'); }
} if(panelUrl === undefined) {
if(obj.type === undefined) { throw new Error(`Missing field "panelUrl"`);
throw new Error(`Missing field "type"`); }
} if(type === undefined) {
if(obj.name === undefined) { throw new Error(`Missing field "type"`);
throw new Error(`Missing field "name"`); }
} if(metric === undefined) {
if(obj.panelUrl === undefined) { throw new Error(`Missing field "metric"`);
throw new Error(`Missing field "panelUrl"`); }
} }
if(obj.metric === undefined) {
throw new Error(`Missing field "datasource"`); public toObject() {
} return {
if(obj.metric.datasource === undefined) { _id: this.id,
throw new Error(`Missing field "metric.datasource"`); name: this.name,
panelUrl: this.panelUrl,
type: this.type,
metric: this.metric.toObject(),
lastPredictionTime: this.lastPredictionTime,
status: this.status,
error: this.error
};
} }
if(obj.metric.targets === undefined) {
throw new Error(`Missing field "metric.targets"`); static fromObject(obj: any): AnalyticUnit {
if(obj === undefined) {
throw new Error('obj is undefined');
}
return new AnalyticUnit(
obj.name,
obj.panelUrl,
obj.type,
Metric.fromObject(obj.metric),
obj.status,
obj.lastPredictionTime,
obj.id || obj._id,
obj.error,
);
} }
const unit: AnalyticUnit = {
name: obj.name,
panelUrl: obj.panelUrl,
type: obj.type,
metric: obj.metric,
status: 'LEARNING',
lastPredictionTime: 0
};
return unit;
} }
export async function findById(id: AnalyticUnitId): Promise<AnalyticUnit> { export async function findById(id: AnalyticUnitId): Promise<AnalyticUnit> {
return db.findOne(id); return AnalyticUnit.fromObject(db.findOne(id));
} }
/** /**
@ -64,22 +76,22 @@ export async function findById(id: AnalyticUnitId): Promise<AnalyticUnit> {
* @returns unit.id * @returns unit.id
*/ */
export async function create(unit: AnalyticUnit): Promise<AnalyticUnitId> { export async function create(unit: AnalyticUnit): Promise<AnalyticUnitId> {
return unit.id = await db.insert(unit); return unit.id = await db.insertOne(unit);
} }
export async function remove(id: AnalyticUnitId): Promise<void> { export async function remove(id: AnalyticUnitId): Promise<void> {
await db.remove(id); await db.removeOne(id);
return; return;
} }
export async function update(id: AnalyticUnitId, unit: AnalyticUnit) { export async function update(id: AnalyticUnitId, unit: AnalyticUnit) {
return db.update(id, unit); return db.updateOne(id, unit);
} }
export async function setStatus(id: AnalyticUnitId, status: string, error?: string) { export async function setStatus(id: AnalyticUnitId, status: string, error?: string) {
return db.update(id, { status, error }); return db.updateOne(id, { status, error });
} }
export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) { export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) {
return db.update(id, { lastPredictionTime }); return db.updateOne(id, { lastPredictionTime });
} }

78
server/src/models/metric_model.ts

@ -1,46 +1,32 @@
import { Collection, makeDBQ } from '../services/data_service'; export class Metric {
constructor(public datasource: string, public targets: any[]) {
if(datasource === undefined) {
let db = makeDBQ(Collection.METRICS); throw new Error('datasource is undefined');
}
if(targets === undefined) {
export type Datasource = { throw new Error('targets is undefined');
method: string, }
data: Object, if(targets.length === 0) {
params: Object, throw new Error('targets is empty');
type: string, }
url: string }
}
public toObject() {
export type MetricId = string; return {
datasource: this.datasource,
export type Metric = { targets: this.targets
id?: MetricId, };
datasource: Datasource, }
targets: string[]
} static fromObject(obj: any): Metric {
if(obj === undefined) {
export function metricFromObj(obj: any): Metric { throw new Error('obj is undefined');
return { }
datasource: obj.datasource, return new Metric(
targets: obj.targets obj.datasource,
}; obj.targets
} );
}
// export async function saveTargets(targets: string[]) { }
// let metrics = [];
// for (let target of targets) {
// metrics.push(create(target));
// }
// return metrics;
// }
export async function create(metric: Metric): Promise<MetricId> {
return metric.id = await db.insert(metric);
}
export async function findMetric(id: MetricId): Promise<Metric> {
return db.findOne(id);
}

58
server/src/models/segment_model.ts

@ -2,35 +2,61 @@ import { AnalyticUnitId } from './analytic_unit_model';
import { Collection, makeDBQ } from '../services/data_service'; import { Collection, makeDBQ } from '../services/data_service';
let db = makeDBQ(Collection.SEGMENTS);
import * as _ from 'lodash';
type SegmentId = string; type SegmentId = string;
type Segment = { export class Segment {
id?: SegmentId, constructor(
from: number, public from: number,
to: number, public to: number,
labeled: boolean public labeled: boolean,
public id?: SegmentId
) {
if(from === undefined) {
throw new Error('from is undefined');
}
if(to === undefined) {
throw new Error('to is undefined');
}
if(labeled === undefined) {
throw new Error('labeled is undefined');
}
}
public toObject() {
return {
_id: this.id,
from: this.from,
to: this.to,
labeled: this.labeled
};
}
static fromObject(obj: any): Segment {
if(obj === undefined) {
throw new Error('obj is undefined');
}
return new Segment(
obj.from, obj.to,
obj.labeled, obj.id || obj._id
);
}
} }
let db = makeDBQ(Collection.SEGMENTS);
export function getLabeledSegments(id: AnalyticUnitId) { export function getLabeled(id: AnalyticUnitId) {
return //return db.
} }
export function getPredictedSegments(id: AnalyticUnitId) { export function getPredicted(id: AnalyticUnitId) {
} }
export function saveSegments(id: AnalyticUnitId, segments: Segment[]) { export async function create(segments: Segment[]) {
}
export async function insertSegments(id: AnalyticUnitId, addedSegments: Segment[], labeled: boolean) {
} }
export function removeSegments(idsToRemove: SegmentId[]) { export function remove(idsToRemove: SegmentId[]) {
} }

6
server/src/models/task_model.ts

@ -1,3 +1,3 @@
class Task { export class Task {
} }

464
server/src/services/analytics_service.ts

@ -1,232 +1,232 @@
import * as config from '../config'; import * as config from '../config';
const zmq = require('zeromq'); const zmq = require('zeromq');
import * as childProcess from 'child_process' import * as childProcess from 'child_process'
import * as fs from 'fs'; import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
export class AnalyticsMessage { export class AnalyticsMessage {
public constructor(public method: string, public payload?: string, public requestId?: number) { public constructor(public method: string, public payload?: string, public requestId?: number) {
} }
static fromJSON(obj: any): AnalyticsMessage { static fromJSON(obj: any): AnalyticsMessage {
if(obj.method === undefined) { if(obj.method === undefined) {
throw new Error('No method in obj:' + obj); throw new Error('No method in obj:' + obj);
} }
return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); return new AnalyticsMessage(obj.method, obj.payload, obj.requestId);
} }
} }
function analyticsMessageFromJson(obj: any): AnalyticsMessage { function analyticsMessageFromJson(obj: any): AnalyticsMessage {
return new AnalyticsMessage(obj); return new AnalyticsMessage(obj);
} }
export class AnalyticsService { export class AnalyticsService {
private _requester: any; private _requester: any;
private _ready: boolean = false; private _ready: boolean = false;
private _pingResponded = false; private _pingResponded = false;
private _zmqConnectionString = null; private _zmqConnectionString = null;
private _ipcPath = null; private _ipcPath = null;
private _analyticsPinger: NodeJS.Timer = null; private _analyticsPinger: NodeJS.Timer = null;
private _isClosed = false; private _isClosed = false;
constructor(private _onMessage: (message: AnalyticsMessage) => void) { constructor(private _onMessage: (message: AnalyticsMessage) => void) {
this._init(); this._init();
} }
public async sendTask(taskObj: any): Promise<void> { public async sendTask(taskObj: any): Promise<void> {
if(!this._ready) { if(!this._ready) {
return Promise.reject("Analytics is not ready"); return Promise.reject("Analytics is not ready");
} }
let message = { let message = {
method: 'TASK', method: 'TASK',
payload: taskObj payload: taskObj
} }
return this.sendMessage(message); return this.sendMessage(message);
} }
public async sendMessage(message: AnalyticsMessage): Promise<void> { public async sendMessage(message: AnalyticsMessage): Promise<void> {
let strMessage = JSON.stringify(message); let strMessage = JSON.stringify(message);
if(message.method === 'PING') { if(message.method === 'PING') {
strMessage = 'PING'; strMessage = 'PING';
} }
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this._requester.send(strMessage, undefined, (err) => { this._requester.send(strMessage, undefined, (err) => {
if(err) { if(err) {
reject(err); reject(err);
} else { } else {
resolve(); resolve();
} }
}); });
}); });
} }
public close() { public close() {
this._isClosed = true; this._isClosed = true;
console.log('Terminating analytics service...'); console.log('Terminating analytics service...');
clearInterval(this._analyticsPinger); clearInterval(this._analyticsPinger);
if(this._ipcPath !== null) { if(this._ipcPath !== null) {
console.log('Remove ipc path: ' + this._ipcPath); console.log('Remove ipc path: ' + this._ipcPath);
fs.unlinkSync(this._ipcPath); fs.unlinkSync(this._ipcPath);
} }
this._requester.close(); this._requester.close();
console.log('Ok'); console.log('Ok');
} }
public get ready(): boolean { return this._ready; } public get ready(): boolean { return this._ready; }
private async _init() { private async _init() {
this._requester = zmq.socket('pair'); this._requester = zmq.socket('pair');
let productionMode = process.env.NODE_ENV !== 'development'; let productionMode = process.env.NODE_ENV !== 'development';
this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode
if(productionMode) { if(productionMode) {
this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; this._zmqConnectionString = config.ZMQ_CONNECTION_STRING;
if(this._zmqConnectionString === null) { if(this._zmqConnectionString === null) {
var createResult = await AnalyticsService.createIPCAddress(); var createResult = await AnalyticsService.createIPCAddress();
this._zmqConnectionString = createResult.address; this._zmqConnectionString = createResult.address;
this._ipcPath = createResult.file; this._ipcPath = createResult.file;
} }
} }
console.log("Binding to zmq... %s", this._zmqConnectionString); console.log("Binding to zmq... %s", this._zmqConnectionString);
this._requester.connect(this._zmqConnectionString); this._requester.connect(this._zmqConnectionString);
this._requester.on("message", this._onAnalyticsMessage.bind(this)); this._requester.on("message", this._onAnalyticsMessage.bind(this));
console.log('Ok'); console.log('Ok');
if(productionMode) { if(productionMode) {
console.log('Creating analytics process...'); console.log('Creating analytics process...');
try { try {
var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString);
} catch(error) { } catch(error) {
console.error('Can`t run analytics process: %s', error); console.error('Can`t run analytics process: %s', error);
return; return;
} }
console.log('Ok, pid: %s', cp.pid); console.log('Ok, pid: %s', cp.pid);
} }
console.log('Start analytics pinger...'); console.log('Start analytics pinger...');
this._runAlalyticsPinger(); this._runAlalyticsPinger();
console.log('Ok'); console.log('Ok');
} }
/** /**
* Spawns analytics process. Reads process stderr and fails if it isn`t empty. * Spawns analytics process. Reads process stderr and fails if it isn`t empty.
* No need to stop the process later. * No need to stop the process later.
* *
* @returns Creaded child process * @returns Creaded child process
* @throws Process start error or first exception during python start * @throws Process start error or first exception during python start
*/ */
private static async _runAnalyticsProcess(zmqConnectionString: string): Promise<childProcess.ChildProcess> { private static async _runAnalyticsProcess(zmqConnectionString: string): Promise<childProcess.ChildProcess> {
let cp: childProcess.ChildProcess; let cp: childProcess.ChildProcess;
let cpOptions = { let cpOptions = {
cwd: config.ANALYTICS_PATH, cwd: config.ANALYTICS_PATH,
env: { env: {
...process.env, ...process.env,
ZMQ_CONNECTION_STRING: zmqConnectionString ZMQ_CONNECTION_STRING: zmqConnectionString
} }
}; };
if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) { if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) {
console.log('dist/worker/worker'); console.log('dist/worker/worker');
cp = childProcess.spawn('dist/worker/worker', [], cpOptions); cp = childProcess.spawn('dist/worker/worker', [], cpOptions);
} else { } else {
console.log('python3 server.py'); console.log('python3 server.py');
// If compiled analytics script doesn't exist - fallback to regular python // If compiled analytics script doesn't exist - fallback to regular python
console.log(config.ANALYTICS_PATH); console.log(config.ANALYTICS_PATH);
cp = childProcess.spawn('python3', ['server.py'], cpOptions); cp = childProcess.spawn('python3', ['server.py'], cpOptions);
} }
if(cp.pid === undefined) { if(cp.pid === undefined) {
return new Promise<childProcess.ChildProcess>((resolve, reject) => { return new Promise<childProcess.ChildProcess>((resolve, reject) => {
cp.on('error', reject); cp.on('error', reject);
}); });
} }
return new Promise<childProcess.ChildProcess>((resolve, reject) => { return new Promise<childProcess.ChildProcess>((resolve, reject) => {
var resolved = false; var resolved = false;
cp.stdout.on('data', () => { cp.stdout.on('data', () => {
if(resolved) { if(resolved) {
return; return;
} else { } else {
resolved = true; resolved = true;
} }
resolve(cp); resolve(cp);
}); });
cp.stderr.on('data', function(data) { cp.stderr.on('data', function(data) {
if(resolved) { if(resolved) {
return; return;
} else { } else {
resolved = true; resolved = true;
} }
reject(data); reject(data);
}); });
}); });
} }
private _onAnalyticsUp() { private _onAnalyticsUp() {
console.log('Analytics is up'); console.log('Analytics is up');
} }
private async _onAnalyticsDown() { private async _onAnalyticsDown() {
console.log('Analytics is down'); console.log('Analytics is down');
if(process.env.NODE_ENV !== 'development') { if(process.env.NODE_ENV !== 'development') {
await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString);
} }
} }
private _onAnalyticsMessage(data: any, error) { private _onAnalyticsMessage(data: any, error) {
if(data.toString() === 'PONG') { if(data.toString() === 'PONG') {
this._pingResponded = true; this._pingResponded = true;
if(!this._ready) { if(!this._ready) {
this._ready = true; this._ready = true;
this._onAnalyticsUp(); this._onAnalyticsUp();
} }
return; return;
} }
let text = data.toString(); let text = data.toString();
let response; let response;
try { try {
response = JSON.parse(text); response = JSON.parse(text);
} catch (e) { } catch (e) {
console.error("Can`t parse response from analytics as json:"); console.error("Can`t parse response from analytics as json:");
console.error(text); console.error(text);
throw new Error('Unexpected response'); throw new Error('Unexpected response');
} }
this._onMessage(AnalyticsMessage.fromJSON(response)); this._onMessage(AnalyticsMessage.fromJSON(response));
} }
private async _runAlalyticsPinger() { private async _runAlalyticsPinger() {
this._analyticsPinger = setInterval(() => { this._analyticsPinger = setInterval(() => {
if(this._isClosed) { if(this._isClosed) {
return; return;
} }
if(!this._pingResponded && this._ready) { if(!this._pingResponded && this._ready) {
this._ready = false; this._ready = false;
this._onAnalyticsDown(); this._onAnalyticsDown();
} }
this._pingResponded = false; this._pingResponded = false;
// TODO: set life limit for this ping // TODO: set life limit for this ping
this.sendMessage({ method: 'PING' }); this.sendMessage({ method: 'PING' });
}, config.ANLYTICS_PING_INTERVAL); }, config.ANLYTICS_PING_INTERVAL);
} }
private static async createIPCAddress(): Promise<{ address: string, file: string }> { private static async createIPCAddress(): Promise<{ address: string, file: string }> {
let filename = `${process.pid}.ipc` let filename = `${process.pid}.ipc`
let p = path.join(config.ZMQ_IPC_PATH, filename); let p = path.join(config.ZMQ_IPC_PATH, filename);
fs.writeFileSync(p, ''); fs.writeFileSync(p, '');
return Promise.resolve({ address: 'ipc://' + p, file: p }); return Promise.resolve({ address: 'ipc://' + p, file: p });
} }
} }

247
server/src/services/data_service.ts

@ -1,124 +1,123 @@
import * as config from '../config'; import * as config from '../config';
import * as nedb from 'nedb'; import * as nedb from 'nedb';
import * as fs from 'fs'; import * as fs from 'fs';
export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS }; export enum Collection { ANALYTIC_UNITS, SEGMENTS };
/** /**
* Class which helps to make queries to your collection * Class which helps to make queries to your collection
* *
* @param { string | object } query: a key as a string or mongodb-style query * @param { string | object } query: a key as a string or mongodb-style query
*/ */
export type DBQ = { export type DBQ = {
insert: (document: object) => string, insertOne: (document: object) => string,
insertMany: (documents: object[]) => string[], insertMany: (documents: object[]) => string[],
update: (query: string | object, updateQuery: any) => void, updateOne: (query: string | object, updateQuery: any) => void,
findOne: (query: string | object) => any, findOne: (query: string | object) => any,
remove: (query: string | object) => number removeOne: (query: string | object) => number
} }
export function makeDBQ(collection: Collection): DBQ { export function makeDBQ(collection: Collection): DBQ {
return { return {
insert: dbInsert.bind(null, collection), insertOne: dbInsert.bind(null, collection),
insertMany: dbInsertMany.bind(null, collection), insertMany: dbInsertMany.bind(null, collection),
update: dbUpdate.bind(null, collection), updateOne: dbUpdate.bind(null, collection),
findOne: dbFindOne.bind(null, collection), findOne: dbFindOne.bind(null, collection),
remove: dbRemove.bind(null, collection) removeOne: dbRemove.bind(null, collection)
} }
} }
function wrapIdToQuery(query: string | object) { function wrapIdToQuery(query: string | object) {
if(typeof query === 'string') { if(typeof query === 'string') {
return { _id: query }; return { _id: query };
} }
return query; return query;
} }
const db = new Map<Collection, nedb>(); const db = new Map<Collection, nedb>();
let dbInsert = (collection: Collection, doc: object) => { let dbInsert = (collection: Collection, doc: object) => {
return new Promise<string>((resolve, reject) => { return new Promise<string>((resolve, reject) => {
db[collection].insert(doc, (err, newDoc) => { db[collection].insert(doc, (err, newDoc) => {
if(err) { if(err) {
reject(err); reject(err);
} else { } else {
resolve(newDoc._id); resolve(newDoc._id);
} }
}); });
}); });
} }
let dbInsertMany = (collection: Collection, docs: object[]) => { let dbInsertMany = (collection: Collection, docs: object[]) => {
return new Promise<string[]>((resolve, reject) => { return new Promise<string[]>((resolve, reject) => {
db[collection].insert(docs, (err, newDocs: any[]) => { db[collection].insert(docs, (err, newDocs: any[]) => {
if(err) { if(err) {
reject(err); reject(err);
} else { } else {
resolve(newDocs.map(d => d._id)); resolve(newDocs.map(d => d._id));
} }
}); });
}); });
} }
let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => {
query = wrapIdToQuery(query); query = wrapIdToQuery(query);
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => {
if(err) { if(err) {
reject(err); reject(err);
} else { } else {
resolve(); resolve();
} }
}); });
}); });
} }
let dbFindOne = (collection: Collection, query: string | object) => { let dbFindOne = (collection: Collection, query: string | object) => {
query = wrapIdToQuery(query); query = wrapIdToQuery(query);
return new Promise<any>((resolve, reject) => { return new Promise<any>((resolve, reject) => {
db[collection].findOne(query, (err, doc) => { db[collection].findOne(query, (err, doc) => {
if(err) { if(err) {
reject(err); reject(err);
} else { } else {
resolve(doc); resolve(doc);
} }
}); });
}); });
} }
let dbRemove = (collection: Collection, query: string | object) => { let dbRemove = (collection: Collection, query: string | object) => {
query = wrapIdToQuery(query); query = wrapIdToQuery(query);
return new Promise<number>((resolve, reject) => { return new Promise<number>((resolve, reject) => {
db[collection].remove(query, (err, numRemoved) => { db[collection].remove(query, (err, numRemoved) => {
if(err) { if(err) {
reject(err); reject(err);
} else { } else {
resolve(numRemoved); resolve(numRemoved);
} }
}); });
}); });
} }
function maybeCreateDir(path: string): void { function maybeCreateDir(path: string): void {
if(fs.existsSync(path)) { if(fs.existsSync(path)) {
return; return;
} }
console.log('mkdir: ' + path); console.log('mkdir: ' + path);
fs.mkdirSync(path); fs.mkdirSync(path);
} }
function checkDataFolders(): void { function checkDataFolders(): void {
[ [
config.DATA_PATH, config.DATA_PATH,
config.ZMQ_IPC_PATH config.ZMQ_IPC_PATH
].forEach(maybeCreateDir); ].forEach(maybeCreateDir);
} }
checkDataFolders(); checkDataFolders();
// TODO: it's better if models request db which we create if it`s needed // TODO: it's better if models request db which we create if it`s needed
db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true });
db[Collection.METRICS] = new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }); db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true });
db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true });

86
server/src/services/process_service.ts

@ -1,44 +1,44 @@
var exitHandlers = [] var exitHandlers = []
var exitHandled = false; var exitHandled = false;
/** /**
* Add a callback for closing programm bacause of any reason * Add a callback for closing programm bacause of any reason
* *
* @param callback a sync function * @param callback a sync function
*/ */
export function registerExitHandler(callback: () => void) { export function registerExitHandler(callback: () => void) {
exitHandlers.push(callback); exitHandlers.push(callback);
} }
function exitHandler(options, err?) { function exitHandler(options, err?) {
if(exitHandled) { if(exitHandled) {
return; return;
} }
exitHandled = true; exitHandled = true;
for(let i = 0; i < exitHandlers.length; i++) { for(let i = 0; i < exitHandlers.length; i++) {
exitHandlers[i](); exitHandlers[i]();
} }
console.log('process exit'); console.log('process exit');
process.exit(); process.exit();
} }
function catchException(options, err) { function catchException(options, err) {
console.log('Server exception:'); console.log('Server exception:');
console.log(err); console.log(err);
exitHandler({ exit: true }); exitHandler({ exit: true });
} }
//do something when app is closing //do something when app is closing
process.on('exit', exitHandler.bind(null, { cleanup:true })); process.on('exit', exitHandler.bind(null, { cleanup:true }));
//catches ctrl+c event //catches ctrl+c event
process.on('SIGINT', exitHandler.bind(null, { exit:true })); process.on('SIGINT', exitHandler.bind(null, { exit:true }));
// catches "kill pid" (for example: nodemon restart) // catches "kill pid" (for example: nodemon restart)
process.on('SIGUSR1', exitHandler.bind(null, { exit:true })); process.on('SIGUSR1', exitHandler.bind(null, { exit:true }));
process.on('SIGUSR2', exitHandler.bind(null, { exit:true })); process.on('SIGUSR2', exitHandler.bind(null, { exit:true }));
//catches uncaught exceptions //catches uncaught exceptions
process.on('uncaughtException', catchException.bind(null, { exit:true })); process.on('uncaughtException', catchException.bind(null, { exit:true }));
Loading…
Cancel
Save