diff --git a/server/src/config.ts b/server/src/config.ts index 2b75778..41fc9db 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -12,9 +12,7 @@ export const ANALYTICS_PATH = path.join(__dirname, '../../analytics'); export const DATA_PATH = path.join(__dirname, '../../data'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db'); -export const METRICS_DATABASE_PATH = path.join(DATA_PATH, 'metrics.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); -export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index f707ef0..00afba3 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -123,8 +123,8 @@ export function isAnalyticReady(): boolean { } export async function createAnalyticUnitFromObject(obj: any): Promise { - let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.analyticUnitFromObj(obj); - AnalyticUnit.create(unit); + let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.AnalyticUnit.fromObject(obj); + let id = await AnalyticUnit.create(unit); // runLearning(unit); - return + return id; } \ No newline at end of file diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index a4e6162..8203625 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -7,54 +7,66 @@ let db = makeDBQ(Collection.ANALYTIC_UNITS); export type AnalyticUnitId = string; -export type AnalyticUnit = { - id?: AnalyticUnitId, - name: string, - panelUrl: string, - type: string, - metric: Metric - status: string, - error?: string, - lastPredictionTime: number -} - -export function analyticUnitFromObj(obj: any): AnalyticUnit { - if(obj === undefined) { - throw new Error('obj is undefined'); - } - if(obj.type === undefined) { - throw new Error(`Missing field "type"`); - } - if(obj.name === undefined) { - throw new Error(`Missing field "name"`); - } - if(obj.panelUrl === undefined) { - throw new Error(`Missing field "panelUrl"`); +export class AnalyticUnit { + constructor( + public name: string, + public panelUrl: string, + public type: string, + public metric: Metric, + public id?: AnalyticUnitId, + public lastPredictionTime?: number, + public status?: string, + public error?: string, + ) { + if(name === undefined) { + throw new Error(`Missing field "name"`); + } + if(panelUrl === undefined) { + throw new Error(`Missing field "panelUrl"`); + } + if(type === undefined) { + throw new Error(`Missing field "type"`); + } + if(metric === undefined) { + throw new Error(`Missing field "metric"`); + } } - if(obj.metric === undefined) { - throw new Error(`Missing field "datasource"`); - } - if(obj.metric.datasource === undefined) { - throw new Error(`Missing field "metric.datasource"`); + + public toObject() { + return { + _id: this.id, + name: this.name, + panelUrl: this.panelUrl, + type: this.type, + metric: this.metric.toObject(), + lastPredictionTime: this.lastPredictionTime, + status: this.status, + error: this.error + }; } - if(obj.metric.targets === undefined) { - throw new Error(`Missing field "metric.targets"`); + + static fromObject(obj: any): AnalyticUnit { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new AnalyticUnit( + obj.name, + obj.panelUrl, + obj.type, + Metric.fromObject(obj.metric), + obj.status, + obj.lastPredictionTime, + obj.id || obj._id, + obj.error, + ); } - const unit: AnalyticUnit = { - name: obj.name, - panelUrl: obj.panelUrl, - type: obj.type, - metric: obj.metric, - status: 'LEARNING', - lastPredictionTime: 0 - }; - return unit; } + export async function findById(id: AnalyticUnitId): Promise { - return db.findOne(id); + return AnalyticUnit.fromObject(db.findOne(id)); } /** @@ -64,22 +76,22 @@ export async function findById(id: AnalyticUnitId): Promise { * @returns unit.id */ export async function create(unit: AnalyticUnit): Promise { - return unit.id = await db.insert(unit); + return unit.id = await db.insertOne(unit); } export async function remove(id: AnalyticUnitId): Promise { - await db.remove(id); + await db.removeOne(id); return; } export async function update(id: AnalyticUnitId, unit: AnalyticUnit) { - return db.update(id, unit); + return db.updateOne(id, unit); } export async function setStatus(id: AnalyticUnitId, status: string, error?: string) { - return db.update(id, { status, error }); + return db.updateOne(id, { status, error }); } export async function setPredictionTime(id: AnalyticUnitId, lastPredictionTime: number) { - return db.update(id, { lastPredictionTime }); + return db.updateOne(id, { lastPredictionTime }); } diff --git a/server/src/models/metric_model.ts b/server/src/models/metric_model.ts index b4062d3..2d97b2d 100644 --- a/server/src/models/metric_model.ts +++ b/server/src/models/metric_model.ts @@ -1,46 +1,32 @@ -import { Collection, makeDBQ } from '../services/data_service'; - - -let db = makeDBQ(Collection.METRICS); - - -export type Datasource = { - method: string, - data: Object, - params: Object, - type: string, - url: string -} - -export type MetricId = string; - -export type Metric = { - id?: MetricId, - datasource: Datasource, - targets: string[] -} - -export function metricFromObj(obj: any): Metric { - return { - datasource: obj.datasource, - targets: obj.targets - }; -} - -// export async function saveTargets(targets: string[]) { -// let metrics = []; -// for (let target of targets) { -// metrics.push(create(target)); -// } -// return metrics; -// } - -export async function create(metric: Metric): Promise { - return metric.id = await db.insert(metric); -} - -export async function findMetric(id: MetricId): Promise { - return db.findOne(id); -} - - +export class Metric { + constructor(public datasource: string, public targets: any[]) { + if(datasource === undefined) { + throw new Error('datasource is undefined'); + } + if(targets === undefined) { + throw new Error('targets is undefined'); + } + if(targets.length === 0) { + throw new Error('targets is empty'); + } + } + + public toObject() { + return { + datasource: this.datasource, + targets: this.targets + }; + } + + static fromObject(obj: any): Metric { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new Metric( + obj.datasource, + obj.targets + ); + } +} + + diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index 28b0867..a2cced2 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -2,35 +2,61 @@ import { AnalyticUnitId } from './analytic_unit_model'; import { Collection, makeDBQ } from '../services/data_service'; +let db = makeDBQ(Collection.SEGMENTS); -import * as _ from 'lodash'; type SegmentId = string; -type Segment = { - id?: SegmentId, - from: number, - to: number, - labeled: boolean +export class Segment { + constructor( + public from: number, + public to: number, + public labeled: boolean, + public id?: SegmentId + ) { + if(from === undefined) { + throw new Error('from is undefined'); + } + if(to === undefined) { + throw new Error('to is undefined'); + } + if(labeled === undefined) { + throw new Error('labeled is undefined'); + } + } + + public toObject() { + return { + _id: this.id, + from: this.from, + to: this.to, + labeled: this.labeled + }; + } + + static fromObject(obj: any): Segment { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new Segment( + obj.from, obj.to, + obj.labeled, obj.id || obj._id + ); + } } -let db = makeDBQ(Collection.SEGMENTS); -export function getLabeledSegments(id: AnalyticUnitId) { - return +export function getLabeled(id: AnalyticUnitId) { + //return db. } -export function getPredictedSegments(id: AnalyticUnitId) { +export function getPredicted(id: AnalyticUnitId) { } -export function saveSegments(id: AnalyticUnitId, segments: Segment[]) { - -} - -export async function insertSegments(id: AnalyticUnitId, addedSegments: Segment[], labeled: boolean) { +export async function create(segments: Segment[]) { } -export function removeSegments(idsToRemove: SegmentId[]) { +export function remove(idsToRemove: SegmentId[]) { } diff --git a/server/src/models/task_model.ts b/server/src/models/task_model.ts index 8992bc1..880519c 100644 --- a/server/src/models/task_model.ts +++ b/server/src/models/task_model.ts @@ -1,3 +1,3 @@ -class Task { - -} \ No newline at end of file +export class Task { + +} diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index eb25bb6..7686135 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,232 +1,232 @@ -import * as config from '../config'; - -const zmq = require('zeromq'); - -import * as childProcess from 'child_process' -import * as fs from 'fs'; -import * as path from 'path'; - - -export class AnalyticsMessage { - public constructor(public method: string, public payload?: string, public requestId?: number) { - - } - - static fromJSON(obj: any): AnalyticsMessage { - if(obj.method === undefined) { - throw new Error('No method in obj:' + obj); - } - return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); - } -} - -function analyticsMessageFromJson(obj: any): AnalyticsMessage { - return new AnalyticsMessage(obj); -} - -export class AnalyticsService { - - private _requester: any; - private _ready: boolean = false; - private _pingResponded = false; - private _zmqConnectionString = null; - private _ipcPath = null; - private _analyticsPinger: NodeJS.Timer = null; - private _isClosed = false; - - constructor(private _onMessage: (message: AnalyticsMessage) => void) { - this._init(); - } - - public async sendTask(taskObj: any): Promise { - if(!this._ready) { - return Promise.reject("Analytics is not ready"); - } - let message = { - method: 'TASK', - payload: taskObj - } - return this.sendMessage(message); - } - - public async sendMessage(message: AnalyticsMessage): Promise { - let strMessage = JSON.stringify(message); - if(message.method === 'PING') { - strMessage = 'PING'; - } - return new Promise((resolve, reject) => { - this._requester.send(strMessage, undefined, (err) => { - if(err) { - reject(err); - } else { - resolve(); - } - }); - }); - } - - public close() { - this._isClosed = true; - console.log('Terminating analytics service...'); - clearInterval(this._analyticsPinger); - if(this._ipcPath !== null) { - console.log('Remove ipc path: ' + this._ipcPath); - fs.unlinkSync(this._ipcPath); - } - this._requester.close(); - console.log('Ok'); - } - - public get ready(): boolean { return this._ready; } - - private async _init() { - this._requester = zmq.socket('pair'); - let productionMode = process.env.NODE_ENV !== 'development'; - - this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode - if(productionMode) { - this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; - if(this._zmqConnectionString === null) { - var createResult = await AnalyticsService.createIPCAddress(); - this._zmqConnectionString = createResult.address; - this._ipcPath = createResult.file; - } - } - - console.log("Binding to zmq... %s", this._zmqConnectionString); - this._requester.connect(this._zmqConnectionString); - this._requester.on("message", this._onAnalyticsMessage.bind(this)); - console.log('Ok'); - - if(productionMode) { - console.log('Creating analytics process...'); - try { - var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); - } catch(error) { - console.error('Can`t run analytics process: %s', error); - return; - } - console.log('Ok, pid: %s', cp.pid); - } - - console.log('Start analytics pinger...'); - this._runAlalyticsPinger(); - console.log('Ok'); - - } - - /** - * Spawns analytics process. Reads process stderr and fails if it isn`t empty. - * No need to stop the process later. - * - * @returns Creaded child process - * @throws Process start error or first exception during python start - */ - private static async _runAnalyticsProcess(zmqConnectionString: string): Promise { - let cp: childProcess.ChildProcess; - let cpOptions = { - cwd: config.ANALYTICS_PATH, - env: { - ...process.env, - ZMQ_CONNECTION_STRING: zmqConnectionString - } - }; - - if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) { - console.log('dist/worker/worker'); - cp = childProcess.spawn('dist/worker/worker', [], cpOptions); - } else { - console.log('python3 server.py'); - // If compiled analytics script doesn't exist - fallback to regular python - console.log(config.ANALYTICS_PATH); - cp = childProcess.spawn('python3', ['server.py'], cpOptions); - } - - if(cp.pid === undefined) { - return new Promise((resolve, reject) => { - cp.on('error', reject); - }); - } - - return new Promise((resolve, reject) => { - var resolved = false; - - cp.stdout.on('data', () => { - if(resolved) { - return; - } else { - resolved = true; - } - resolve(cp); - }); - - cp.stderr.on('data', function(data) { - if(resolved) { - return; - } else { - resolved = true; - } - reject(data); - }); - - }); - - } - - private _onAnalyticsUp() { - console.log('Analytics is up'); - } - - private async _onAnalyticsDown() { - console.log('Analytics is down'); - if(process.env.NODE_ENV !== 'development') { - await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); - } - } - - private _onAnalyticsMessage(data: any, error) { - if(data.toString() === 'PONG') { - this._pingResponded = true; - if(!this._ready) { - this._ready = true; - this._onAnalyticsUp(); - } - return; - } - - - let text = data.toString(); - let response; - try { - response = JSON.parse(text); - } catch (e) { - console.error("Can`t parse response from analytics as json:"); - console.error(text); - throw new Error('Unexpected response'); - } - this._onMessage(AnalyticsMessage.fromJSON(response)); - } - - private async _runAlalyticsPinger() { - this._analyticsPinger = setInterval(() => { - if(this._isClosed) { - return; - } - if(!this._pingResponded && this._ready) { - this._ready = false; - this._onAnalyticsDown(); - } - this._pingResponded = false; - // TODO: set life limit for this ping - this.sendMessage({ method: 'PING' }); - }, config.ANLYTICS_PING_INTERVAL); - } - - private static async createIPCAddress(): Promise<{ address: string, file: string }> { - let filename = `${process.pid}.ipc` - let p = path.join(config.ZMQ_IPC_PATH, filename); - fs.writeFileSync(p, ''); - return Promise.resolve({ address: 'ipc://' + p, file: p }); - } - -} +import * as config from '../config'; + +const zmq = require('zeromq'); + +import * as childProcess from 'child_process' +import * as fs from 'fs'; +import * as path from 'path'; + + +export class AnalyticsMessage { + public constructor(public method: string, public payload?: string, public requestId?: number) { + + } + + static fromJSON(obj: any): AnalyticsMessage { + if(obj.method === undefined) { + throw new Error('No method in obj:' + obj); + } + return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); + } +} + +function analyticsMessageFromJson(obj: any): AnalyticsMessage { + return new AnalyticsMessage(obj); +} + +export class AnalyticsService { + + private _requester: any; + private _ready: boolean = false; + private _pingResponded = false; + private _zmqConnectionString = null; + private _ipcPath = null; + private _analyticsPinger: NodeJS.Timer = null; + private _isClosed = false; + + constructor(private _onMessage: (message: AnalyticsMessage) => void) { + this._init(); + } + + public async sendTask(taskObj: any): Promise { + if(!this._ready) { + return Promise.reject("Analytics is not ready"); + } + let message = { + method: 'TASK', + payload: taskObj + } + return this.sendMessage(message); + } + + public async sendMessage(message: AnalyticsMessage): Promise { + let strMessage = JSON.stringify(message); + if(message.method === 'PING') { + strMessage = 'PING'; + } + return new Promise((resolve, reject) => { + this._requester.send(strMessage, undefined, (err) => { + if(err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + public close() { + this._isClosed = true; + console.log('Terminating analytics service...'); + clearInterval(this._analyticsPinger); + if(this._ipcPath !== null) { + console.log('Remove ipc path: ' + this._ipcPath); + fs.unlinkSync(this._ipcPath); + } + this._requester.close(); + console.log('Ok'); + } + + public get ready(): boolean { return this._ready; } + + private async _init() { + this._requester = zmq.socket('pair'); + let productionMode = process.env.NODE_ENV !== 'development'; + + this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode + if(productionMode) { + this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; + if(this._zmqConnectionString === null) { + var createResult = await AnalyticsService.createIPCAddress(); + this._zmqConnectionString = createResult.address; + this._ipcPath = createResult.file; + } + } + + console.log("Binding to zmq... %s", this._zmqConnectionString); + this._requester.connect(this._zmqConnectionString); + this._requester.on("message", this._onAnalyticsMessage.bind(this)); + console.log('Ok'); + + if(productionMode) { + console.log('Creating analytics process...'); + try { + var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); + } catch(error) { + console.error('Can`t run analytics process: %s', error); + return; + } + console.log('Ok, pid: %s', cp.pid); + } + + console.log('Start analytics pinger...'); + this._runAlalyticsPinger(); + console.log('Ok'); + + } + + /** + * Spawns analytics process. Reads process stderr and fails if it isn`t empty. + * No need to stop the process later. + * + * @returns Creaded child process + * @throws Process start error or first exception during python start + */ + private static async _runAnalyticsProcess(zmqConnectionString: string): Promise { + let cp: childProcess.ChildProcess; + let cpOptions = { + cwd: config.ANALYTICS_PATH, + env: { + ...process.env, + ZMQ_CONNECTION_STRING: zmqConnectionString + } + }; + + if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) { + console.log('dist/worker/worker'); + cp = childProcess.spawn('dist/worker/worker', [], cpOptions); + } else { + console.log('python3 server.py'); + // If compiled analytics script doesn't exist - fallback to regular python + console.log(config.ANALYTICS_PATH); + cp = childProcess.spawn('python3', ['server.py'], cpOptions); + } + + if(cp.pid === undefined) { + return new Promise((resolve, reject) => { + cp.on('error', reject); + }); + } + + return new Promise((resolve, reject) => { + var resolved = false; + + cp.stdout.on('data', () => { + if(resolved) { + return; + } else { + resolved = true; + } + resolve(cp); + }); + + cp.stderr.on('data', function(data) { + if(resolved) { + return; + } else { + resolved = true; + } + reject(data); + }); + + }); + + } + + private _onAnalyticsUp() { + console.log('Analytics is up'); + } + + private async _onAnalyticsDown() { + console.log('Analytics is down'); + if(process.env.NODE_ENV !== 'development') { + await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); + } + } + + private _onAnalyticsMessage(data: any, error) { + if(data.toString() === 'PONG') { + this._pingResponded = true; + if(!this._ready) { + this._ready = true; + this._onAnalyticsUp(); + } + return; + } + + + let text = data.toString(); + let response; + try { + response = JSON.parse(text); + } catch (e) { + console.error("Can`t parse response from analytics as json:"); + console.error(text); + throw new Error('Unexpected response'); + } + this._onMessage(AnalyticsMessage.fromJSON(response)); + } + + private async _runAlalyticsPinger() { + this._analyticsPinger = setInterval(() => { + if(this._isClosed) { + return; + } + if(!this._pingResponded && this._ready) { + this._ready = false; + this._onAnalyticsDown(); + } + this._pingResponded = false; + // TODO: set life limit for this ping + this.sendMessage({ method: 'PING' }); + }, config.ANLYTICS_PING_INTERVAL); + } + + private static async createIPCAddress(): Promise<{ address: string, file: string }> { + let filename = `${process.pid}.ipc` + let p = path.join(config.ZMQ_IPC_PATH, filename); + fs.writeFileSync(p, ''); + return Promise.resolve({ address: 'ipc://' + p, file: p }); + } + +} diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 62a0555..ae0ee09 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -1,124 +1,123 @@ -import * as config from '../config'; - -import * as nedb from 'nedb'; -import * as fs from 'fs'; - - -export enum Collection { ANALYTIC_UNITS, METRICS, SEGMENTS }; - - -/** - * Class which helps to make queries to your collection - * - * @param { string | object } query: a key as a string or mongodb-style query - */ -export type DBQ = { - insert: (document: object) => string, - insertMany: (documents: object[]) => string[], - update: (query: string | object, updateQuery: any) => void, - findOne: (query: string | object) => any, - remove: (query: string | object) => number -} - -export function makeDBQ(collection: Collection): DBQ { - return { - insert: dbInsert.bind(null, collection), - insertMany: dbInsertMany.bind(null, collection), - update: dbUpdate.bind(null, collection), - findOne: dbFindOne.bind(null, collection), - remove: dbRemove.bind(null, collection) - } -} - -function wrapIdToQuery(query: string | object) { - if(typeof query === 'string') { - return { _id: query }; - } - return query; -} - -const db = new Map(); - -let dbInsert = (collection: Collection, doc: object) => { - return new Promise((resolve, reject) => { - db[collection].insert(doc, (err, newDoc) => { - if(err) { - reject(err); - } else { - resolve(newDoc._id); - } - }); - }); -} - -let dbInsertMany = (collection: Collection, docs: object[]) => { - return new Promise((resolve, reject) => { - db[collection].insert(docs, (err, newDocs: any[]) => { - if(err) { - reject(err); - } else { - resolve(newDocs.map(d => d._id)); - } - }); - }); -} - -let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { - query = wrapIdToQuery(query); - return new Promise((resolve, reject) => { - db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { - if(err) { - reject(err); - } else { - resolve(); - } - }); - }); -} - -let dbFindOne = (collection: Collection, query: string | object) => { - query = wrapIdToQuery(query); - return new Promise((resolve, reject) => { - db[collection].findOne(query, (err, doc) => { - if(err) { - reject(err); - } else { - resolve(doc); - } - }); - }); -} - -let dbRemove = (collection: Collection, query: string | object) => { - query = wrapIdToQuery(query); - return new Promise((resolve, reject) => { - db[collection].remove(query, (err, numRemoved) => { - if(err) { - reject(err); - } else { - resolve(numRemoved); - } - }); - }); -} - -function maybeCreateDir(path: string): void { - if(fs.existsSync(path)) { - return; - } - console.log('mkdir: ' + path); - fs.mkdirSync(path); -} - -function checkDataFolders(): void { - [ - config.DATA_PATH, - config.ZMQ_IPC_PATH - ].forEach(maybeCreateDir); -} -checkDataFolders(); - -// TODO: it's better if models request db which we create if it`s needed -db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); -db[Collection.METRICS] = new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }); -db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); +import * as config from '../config'; + +import * as nedb from 'nedb'; +import * as fs from 'fs'; + + +export enum Collection { ANALYTIC_UNITS, SEGMENTS }; + + +/** + * Class which helps to make queries to your collection + * + * @param { string | object } query: a key as a string or mongodb-style query + */ +export type DBQ = { + insertOne: (document: object) => string, + insertMany: (documents: object[]) => string[], + updateOne: (query: string | object, updateQuery: any) => void, + findOne: (query: string | object) => any, + removeOne: (query: string | object) => number +} + +export function makeDBQ(collection: Collection): DBQ { + return { + insertOne: dbInsert.bind(null, collection), + insertMany: dbInsertMany.bind(null, collection), + updateOne: dbUpdate.bind(null, collection), + findOne: dbFindOne.bind(null, collection), + removeOne: dbRemove.bind(null, collection) + } +} + +function wrapIdToQuery(query: string | object) { + if(typeof query === 'string') { + return { _id: query }; + } + return query; +} + +const db = new Map(); + +let dbInsert = (collection: Collection, doc: object) => { + return new Promise((resolve, reject) => { + db[collection].insert(doc, (err, newDoc) => { + if(err) { + reject(err); + } else { + resolve(newDoc._id); + } + }); + }); +} + +let dbInsertMany = (collection: Collection, docs: object[]) => { + return new Promise((resolve, reject) => { + db[collection].insert(docs, (err, newDocs: any[]) => { + if(err) { + reject(err); + } else { + resolve(newDocs.map(d => d._id)); + } + }); + }); +} + +let dbUpdate = (collection: Collection, query: string | object, updateQuery: object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].update(query, updateQuery, { /* options */ }, (err: Error) => { + if(err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +let dbFindOne = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].findOne(query, (err, doc) => { + if(err) { + reject(err); + } else { + resolve(doc); + } + }); + }); +} + +let dbRemove = (collection: Collection, query: string | object) => { + query = wrapIdToQuery(query); + return new Promise((resolve, reject) => { + db[collection].remove(query, (err, numRemoved) => { + if(err) { + reject(err); + } else { + resolve(numRemoved); + } + }); + }); +} + +function maybeCreateDir(path: string): void { + if(fs.existsSync(path)) { + return; + } + console.log('mkdir: ' + path); + fs.mkdirSync(path); +} + +function checkDataFolders(): void { + [ + config.DATA_PATH, + config.ZMQ_IPC_PATH + ].forEach(maybeCreateDir); +} +checkDataFolders(); + +// TODO: it's better if models request db which we create if it`s needed +db[Collection.ANALYTIC_UNITS] = new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }); +db[Collection.SEGMENTS] = new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }); diff --git a/server/src/services/process_service.ts b/server/src/services/process_service.ts index b4bee36..6d0a237 100644 --- a/server/src/services/process_service.ts +++ b/server/src/services/process_service.ts @@ -1,44 +1,44 @@ - - -var exitHandlers = [] -var exitHandled = false; - -/** - * Add a callback for closing programm bacause of any reason - * - * @param callback a sync function - */ -export function registerExitHandler(callback: () => void) { - exitHandlers.push(callback); -} - -function exitHandler(options, err?) { - if(exitHandled) { - return; - } - exitHandled = true; - for(let i = 0; i < exitHandlers.length; i++) { - exitHandlers[i](); - } - console.log('process exit'); - process.exit(); -} - -function catchException(options, err) { - console.log('Server exception:'); - console.log(err); - exitHandler({ exit: true }); -} - -//do something when app is closing -process.on('exit', exitHandler.bind(null, { cleanup:true })); - -//catches ctrl+c event -process.on('SIGINT', exitHandler.bind(null, { exit:true })); - -// catches "kill pid" (for example: nodemon restart) -process.on('SIGUSR1', exitHandler.bind(null, { exit:true })); -process.on('SIGUSR2', exitHandler.bind(null, { exit:true })); - -//catches uncaught exceptions + + +var exitHandlers = [] +var exitHandled = false; + +/** + * Add a callback for closing programm bacause of any reason + * + * @param callback a sync function + */ +export function registerExitHandler(callback: () => void) { + exitHandlers.push(callback); +} + +function exitHandler(options, err?) { + if(exitHandled) { + return; + } + exitHandled = true; + for(let i = 0; i < exitHandlers.length; i++) { + exitHandlers[i](); + } + console.log('process exit'); + process.exit(); +} + +function catchException(options, err) { + console.log('Server exception:'); + console.log(err); + exitHandler({ exit: true }); +} + +//do something when app is closing +process.on('exit', exitHandler.bind(null, { cleanup:true })); + +//catches ctrl+c event +process.on('SIGINT', exitHandler.bind(null, { exit:true })); + +// catches "kill pid" (for example: nodemon restart) +process.on('SIGUSR1', exitHandler.bind(null, { exit:true })); +process.on('SIGUSR2', exitHandler.bind(null, { exit:true })); + +//catches uncaught exceptions process.on('uncaughtException', catchException.bind(null, { exit:true })); \ No newline at end of file