Browse Source

Mongodb persistence #570 part 1 (#771)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
bb1854e1b9
  1. 4
      server/package.json
  2. 31
      server/src/config.ts
  3. 102
      server/src/services/data_service.ts

4
server/package.json

@ -21,13 +21,16 @@
"homepage": "https://github.com/hastic/hastic-server#readme",
"dependencies": {},
"devDependencies": {
"@types/deasync": "^0.1.0",
"@types/jest": "^23.3.14",
"@types/koa": "^2.0.46",
"@types/koa-bodyparser": "^4.2.0",
"@types/koa-router": "^7.0.31",
"@types/lodash": "^4.14.116",
"@types/mongodb": "^3.3.1",
"@types/nedb": "^1.8.0",
"@types/zeromq": "^4.6.0",
"deasync": "^0.1.15",
"axios": "^0.18.0",
"babel-core": "^6.26.3",
"babel-jest": "^23.4.2",
@ -44,6 +47,7 @@
"koa-bodyparser": "^4.2.0",
"koa-router": "^7.0.31",
"lodash": "^4.17.10",
"mongodb": "^3.3.2",
"nedb": "^1.8.0",
"node-loader": "^0.6.0",
"nodemon": "^1.17.5",

31
server/src/config.ts

@ -9,12 +9,27 @@ import * as os from 'os';
let configFile = path.join(__dirname, '../../config.json');
let configExists = fs.existsSync(configFile);
export type DataBaseConfig = {
user: string,
password: string,
url: string,
db_name: string
}
export const ANALYTICS_PATH = path.join(__dirname, '../../analytics');
export const HASTIC_DB_IN_MEMORY = getConfigField('HASTIC_IN_MEMORY_PERSISTANCE', false);
export const HASTIC_DB_CONNECTION_TYPE = getConfigField('HASTIC_DB_CONNECTION_TYPE', 'nedb'); //nedb or mongodb
export const DATA_PATH = path.join(__dirname, '../../data');
//connection string syntax: <db_user>:<db_password>@<db_url>/<db_name>
export const HASTIC_DB_CONNECTION_STRING = getConfigField(
'HASTIC_DB_CONNECTION_STRING',
'hastic:password@mongodb:27017/hastic'
);
export const HASTIC_DB_CONFIG = getDbConfig(HASTIC_DB_CONNECTION_STRING);
export const DATA_PATH = path.join(__dirname, '../../data');
export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analytic_units.db');
export const ANALYTIC_UNIT_CACHES_DATABASE_PATH = path.join(DATA_PATH, 'analytic_unit_caches.db');
export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
@ -114,3 +129,17 @@ function createZMQConnectionString() {
}
return zmq;
}
function getDbConfig(connectionStr: string): DataBaseConfig {
const [user, password] = connectionStr.split('@')[0].split(':');
const [db_name, ...urlParts] = connectionStr.split('@')[1].split('/').reverse();
const url = urlParts.reverse().join('/');
const config = {
user,
password,
url,
db_name
};
return config;
}

102
server/src/services/data_service.ts

@ -2,6 +2,8 @@ import * as config from '../config';
import * as nedb from 'nedb';
import * as fs from 'fs';
import * as mongodb from 'mongodb';
import * as deasync from 'deasync';
export enum Collection {
@ -13,6 +15,15 @@ export enum Collection {
DB_META
};
const COLLECTION_TO_NAME_MAPPING = new Map<Collection, string>([
[Collection.ANALYTIC_UNITS, 'analytic_units'],
[Collection.ANALYTIC_UNIT_CACHES, 'analytic_unit_caches'],
[Collection.SEGMENTS, 'segments'],
[Collection.THRESHOLD, 'threshold'],
[Collection.DETECTION_SPANS, 'detection_spans'],
[Collection.DB_META, 'db_meta']
])
export enum SortingOrder { ASCENDING = 1, DESCENDING = -1 };
/**
@ -31,24 +42,24 @@ export type DBQ = {
removeMany: (query: string[] | object) => Promise<number>
}
function nedbCollectionFromCollection(collection: Collection): nedb {
let nedbCollection = db.get(collection);
if(nedbCollection === undefined) {
function dbCollectionFromCollection(collection: Collection): nedb | mongodb.Collection<any> {
let dbCollection = db.get(collection);
if(dbCollection === undefined) {
throw new Error('Can`t find collection ' + collection);
}
return nedbCollection;
return dbCollection;
}
export function makeDBQ(collection: Collection): DBQ {
return {
findOne: dbFindOne.bind(null, nedbCollectionFromCollection(collection)),
findMany: dbFindMany.bind(null, nedbCollectionFromCollection(collection)),
insertOne: dbInsertOne.bind(null, nedbCollectionFromCollection(collection)),
insertMany: dbInsertMany.bind(null, nedbCollectionFromCollection(collection)),
updateOne: dbUpdateOne.bind(null, nedbCollectionFromCollection(collection)),
updateMany: dbUpdateMany.bind(null, nedbCollectionFromCollection(collection)),
removeOne: dbRemoveOne.bind(null, nedbCollectionFromCollection(collection)),
removeMany: dbRemoveMany.bind(null, nedbCollectionFromCollection(collection))
findOne: dbFindOne.bind(null, dbCollectionFromCollection(collection)),
findMany: dbFindMany.bind(null, dbCollectionFromCollection(collection)),
insertOne: dbInsertOne.bind(null, dbCollectionFromCollection(collection)),
insertMany: dbInsertMany.bind(null, dbCollectionFromCollection(collection)),
updateOne: dbUpdateOne.bind(null, dbCollectionFromCollection(collection)),
updateMany: dbUpdateMany.bind(null, dbCollectionFromCollection(collection)),
removeOne: dbRemoveOne.bind(null, dbCollectionFromCollection(collection)),
removeMany: dbRemoveMany.bind(null, dbCollectionFromCollection(collection))
}
}
@ -74,7 +85,8 @@ function isEmptyArray(obj: any): boolean {
return obj.length == 0;
}
const db = new Map<Collection, nedb>();
const db = new Map<Collection, nedb | mongodb.Collection<any>>();
let mongoClient: mongodb.MongoClient;
async function dbInsertOne(nd: nedb, doc: object): Promise<string> {
@ -223,14 +235,60 @@ function checkDataFolders(): void {
config.ZMQ_IPC_PATH
].forEach(maybeCreateDir);
}
checkDataFolders();
const inMemoryOnly = config.HASTIC_DB_IN_MEMORY;
async function connectToDb() {
if(config.HASTIC_DB_CONNECTION_TYPE === 'nedb') {
checkDataFolders();
const inMemoryOnly = config.HASTIC_DB_IN_MEMORY;
console.log('NeDB used as storage');
// TODO: it's better if models request db which we create if it`s needed
db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true, timestampData: true, inMemoryOnly}));
db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.DETECTION_SPANS, new nedb({ filename: config.DETECTION_SPANS_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.DB_META, new nedb({ filename: config.DB_META_PATH, autoload: true, inMemoryOnly}));
} else {
console.log('MongoDB used as storage');
const dbConfig = config.HASTIC_DB_CONFIG;
const uri = `mongodb://${dbConfig.user}:${dbConfig.password}@${dbConfig.url}`;
const auth = {
user: dbConfig.user,
password: dbConfig.password
};
mongoClient = new mongodb.MongoClient(uri, {
useNewUrlParser: true,
auth,
autoReconnect: true,
useUnifiedTopology: true,
authMechanism: 'SCRAM-SHA-1',
authSource: dbConfig.db_name
});
try {
const client: mongodb.MongoClient = await mongoClient.connect();
const hasticDb: mongodb.Db = client.db(dbConfig.db_name);
COLLECTION_TO_NAME_MAPPING.forEach((name, collection) => {
db.set(collection, hasticDb.collection(name));
});
} catch(err) {
console.log(`got error while connect to MongoDB ${err}`);
throw err;
}
}
}
export async function closeDb() {
if(mongoClient !== undefined && mongoClient.isConnected) {
await mongoClient.close();
}
}
// TODO: it's better if models request db which we create if it`s needed
db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true, timestampData: true, inMemoryOnly}));
db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.SEGMENTS, new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.DETECTION_SPANS, new nedb({ filename: config.DETECTION_SPANS_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.DB_META, new nedb({ filename: config.DB_META_PATH, autoload: true, inMemoryOnly}));
let done = false;
connectToDb().then(() => {
done = true;
}).catch((err) => {
console.log(`data service got error while connect to data base ${err}`);
//TODO: choose best practice for error handling
throw err;
});
deasync.loopWhile(() => !done);

Loading…
Cancel
Save