Browse Source

Mongodb persistence #570 part 2 (#774)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
71f053623a
  1. 2
      server/jest.config.js
  2. 2
      server/package.json
  3. 10
      server/spec/setup_tests.ts
  4. 12
      server/src/config.ts
  5. 2
      server/src/controllers/analytics_controller.ts
  6. 2
      server/src/migrations.ts
  7. 5
      server/src/routes/analytic_units_router.ts
  8. 15
      server/src/services/data_layer/basedb.ts
  9. 25
      server/src/services/data_layer/index.ts
  10. 117
      server/src/services/data_layer/mongodb.ts
  11. 139
      server/src/services/data_layer/nedb.ts
  12. 41
      server/src/services/data_layer/utils.ts
  13. 200
      server/src/services/data_service.ts

2
server/jest.config.js

@ -19,5 +19,3 @@ module.exports = {
"<rootDir>/spec/setup_tests.ts"
]
};
process.env.HASTIC_DB_IN_MEMORY = 'true';

2
server/package.json

@ -30,7 +30,6 @@
"@types/mongodb": "^3.3.1",
"@types/nedb": "^1.8.0",
"@types/zeromq": "^4.6.0",
"deasync": "^0.1.15",
"axios": "^0.18.0",
"babel-core": "^6.26.3",
"babel-jest": "^23.4.2",
@ -38,6 +37,7 @@
"babel-polyfill": "^6.26.0",
"babel-preset-env": "^1.7.0",
"babel-preset-es2015": "^6.24.1",
"deasync": "^0.1.15",
"es6-promise": "^4.2.4",
"event-stream": "3.3.4",
"file-loader": "^1.1.11",

10
server/spec/setup_tests.ts

@ -1,6 +1,4 @@
import * as AnalyticUnit from '../src/models/analytic_units';
import * as AnalyticUnitCache from '../src/models/analytic_unit_cache_model';
import { TEST_ANALYTIC_UNIT_ID, createTestDB } from './utils_for_tests/analytic_units';
import { createTestDB } from './utils_for_tests/analytic_units';
import { clearSegmentsDB } from './utils_for_tests/segments';
console.log = jest.fn();
@ -9,8 +7,12 @@ console.error = jest.fn();
jest.mock('../src/config.ts', () => ({
DATA_PATH: 'fake-data-path',
HASTIC_API_KEY: 'fake-key',
ZMQ_IPC_PATH: 'fake-zmq-path'
ZMQ_IPC_PATH: 'fake-zmq-path',
HASTIC_DB_CONNECTION_TYPE: 'nedb',
HASTIC_IN_MEMORY_PERSISTANCE: true
}));
jest.mock('deasync', () => ({ loopWhile: jest.fn() }));
clearSegmentsDB();
createTestDB();

12
server/src/config.ts

@ -9,11 +9,12 @@ import * as os from 'os';
let configFile = path.join(__dirname, '../../config.json');
let configExists = fs.existsSync(configFile);
export type DataBaseConfig = {
// TODO: move to data_layer
export type DBConfig = {
user: string,
password: string,
url: string,
db_name: string
dbName: string
}
export const ANALYTICS_PATH = path.join(__dirname, '../../analytics');
@ -130,16 +131,17 @@ function createZMQConnectionString() {
return zmq;
}
function getDbConfig(connectionStr: string): DataBaseConfig {
// TODO: move to data_layer
function getDbConfig(connectionStr: string): DBConfig {
const [user, password] = connectionStr.split('@')[0].split(':');
const [db_name, ...urlParts] = connectionStr.split('@')[1].split('/').reverse();
const [dbName, ...urlParts] = connectionStr.split('@')[1].split('/').reverse();
const url = urlParts.reverse().join('/');
const config = {
user,
password,
url,
db_name
dbName
};
return config;
}

2
server/src/controllers/analytics_controller.ts

@ -592,7 +592,7 @@ export async function getDetectionSpans(
async function getPayloadData(
analyticUnit: AnalyticUnit.AnalyticUnit,
from: number,
to:number
to: number
) {
let range: TimeRange;
if(from !== undefined && to !== undefined) {

2
server/src/migrations.ts

@ -17,7 +17,7 @@ const analyticUnitsDB = makeDBQ(Collection.ANALYTIC_UNITS);
const analyticUnitCachesDB = makeDBQ(Collection.ANALYTIC_UNIT_CACHES);
const thresholdsDB = makeDBQ(Collection.THRESHOLD);
const DB_META_ID = '0';
const DB_META_ID = '000000000000000000000001'; //24 symbols for mongodb
type DbMeta = {
revision: number

5
server/src/routes/analytic_units_router.ts

@ -61,8 +61,9 @@ async function updateUnit(ctx: Router.IRouterContext) {
throw new Error('Cannot update undefined id');
}
const updatedObj = await AnalyticUnit.update(analyticUnitObj.id, analyticUnitObj);
const analyticUnit = AnalyticUnit.createAnalyticUnitFromObject(updatedObj);
await AnalyticUnit.update(analyticUnitObj.id, analyticUnitObj);
// TODO: check if learning is necessary without database query
const analyticUnit = await AnalyticUnit.findById(analyticUnitObj.id);
if(analyticUnit.learningAfterUpdateRequired) {
await AnalyticsController.runLearning(analyticUnitObj.id);

15
server/src/services/data_layer/basedb.ts

@ -0,0 +1,15 @@
import * as nedb from 'nedb';
import * as mongodb from 'mongodb';
export type dbCollection = nedb | mongodb.Collection;
export interface DbQueryWrapper {
dbInsertOne(collection: dbCollection, doc: object): Promise<string>;
dbInsertMany(collection: dbCollection, docs: object[]): Promise<string[]>;
dbUpdateOne(collection: dbCollection, query: string | object, updateQuery: object): Promise<void>;
dbUpdateMany(collection: dbCollection, query: string[] | object, updateQuery: object): Promise<void>;
dbFindOne(collection: dbCollection, query: string | object): Promise<any>;
dbFindMany(collection: dbCollection, query: string[] | object, sortQuery: object): Promise<any[]>;
dbRemoveOne(collection: dbCollection, query: string | object): Promise<boolean>;
dbRemoveMany(collection: dbCollection, query: string[] | object): Promise<number>;
}

25
server/src/services/data_layer/index.ts

@ -0,0 +1,25 @@
import { DbQueryWrapper, dbCollection } from './basedb';
import { NeDbQueryWrapper } from './nedb';
import { MongoDbQueryWrapper } from './mongodb';
import { HASTIC_DB_CONNECTION_TYPE } from '../../config';
export enum DBType {
nedb = 'nedb',
mongodb = 'mongodb'
};
export { NeDbQueryWrapper, MongoDbQueryWrapper, DbQueryWrapper, dbCollection };
export function getDbQueryWrapper(): DbQueryWrapper {
if(HASTIC_DB_CONNECTION_TYPE === DBType.nedb) {
return new NeDbQueryWrapper();
}
if(HASTIC_DB_CONNECTION_TYPE === DBType.mongodb) {
return new MongoDbQueryWrapper();
}
throw new Error(
`"${HASTIC_DB_CONNECTION_TYPE}" HASTIC_DB_CONNECTION_TYPE is not supported. Possible values: "nedb", "mongodb"`
);
}

117
server/src/services/data_layer/mongodb.ts

@ -0,0 +1,117 @@
import { DbQueryWrapper } from './basedb';
import { Collection, FilterQuery, ObjectID } from 'mongodb';
import { wrapIdToMongoDbQuery, wrapIdsToMongoDbQuery, isEmptyArray } from './utils';
import * as _ from 'lodash';
export class MongoDbQueryWrapper implements DbQueryWrapper {
async dbInsertOne(collection: Collection, doc: any): Promise<string> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#insertOne
// TODO: move to utils
if(doc._id !== undefined) {
doc._id = new ObjectID(doc._id);
}
const newDoc = await collection.insertOne(doc);
return newDoc.insertedId.toString();
}
async dbInsertMany(collection: Collection, docs: any[]): Promise<string[]> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#insertMany
if(docs.length === 0) {
return [];
}
// TODO: move to utils
docs.forEach(doc => {
if(doc._id !== undefined) {
doc._id = new ObjectID(doc._id);
}
});
const newDocs = await collection.insertMany(docs);
return _.map(newDocs.insertedIds, (id: ObjectID) => id.toString());
}
async dbUpdateOne(collection: Collection, query: FilterQuery<string | object>, updateQuery: any): Promise<void> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#updateOne
// "_id" is immutable. Mongo throws an exception if updateQuery contains "_id" field.
if(updateQuery._id !== undefined) {
delete updateQuery._id;
}
let mongodbUpdateQuery = { $set: updateQuery }
query = wrapIdToMongoDbQuery(query);
await collection.updateOne(
query,
mongodbUpdateQuery
);
}
async dbUpdateMany(collection: Collection, query: string[] | object, updateQuery: any): Promise<void> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#updateMany
if(isEmptyArray(query)) {
return;
}
// "_id" is immutable. Mongo throws an exception if updateQuery contains "_id" field.
if(updateQuery._id !== undefined) {
delete updateQuery._id;
}
let mongodbUpdateQuery = { $set: updateQuery };
query = wrapIdsToMongoDbQuery(query);
await collection.updateMany(
query,
mongodbUpdateQuery
);
}
async dbFindOne(collection: Collection, query: FilterQuery<string | object>): Promise<any> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#findOne
query = wrapIdToMongoDbQuery(query);
let doc = await collection.findOne(query);
// TODO: move to utils
if(doc !== null) {
doc._id = doc._id.toString();
}
return doc;
}
async dbFindMany(collection: Collection, query: string[] | object, sortQuery: object = {}): Promise<any[]> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#find
if(isEmptyArray(query)) {
return [];
}
query = wrapIdsToMongoDbQuery(query);
const docs = await collection.find(query).sort(sortQuery).toArray();
// TODO: move to utils
docs.forEach(doc => {
if(doc !== null) {
doc._id = doc._id.toString();
}
});
return docs;
}
async dbRemoveOne(collection: Collection, query: FilterQuery<string | object>): Promise<boolean> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#deleteOne
query = wrapIdToMongoDbQuery(query);
const deleted = await collection.deleteOne(query);
if(deleted.deletedCount > 1) {
throw new Error(`Removed ${deleted.deletedCount} elements with query: ${JSON.stringify(query)}. Only one is Ok.`);
}
return deleted.deletedCount === 1;
}
async dbRemoveMany(collection: Collection, query: string[] | object): Promise<number> {
// http://mongodb.github.io/node-mongodb-native/3.1/api/Collection.html#deleteMany
if(isEmptyArray(query)) {
return 0;
}
query = wrapIdsToMongoDbQuery(query);
const deleted = await collection.deleteMany(query);
return deleted.deletedCount;
}
}

139
server/src/services/data_layer/nedb.ts

@ -0,0 +1,139 @@
import { DbQueryWrapper } from './basedb';
import { wrapIdToQuery, wrapIdsToQuery, isEmptyArray } from './utils';
import * as nedb from 'nedb';
export class NeDbQueryWrapper implements DbQueryWrapper {
async dbInsertOne(nd: nedb, doc: object): Promise<string> {
return new Promise<string>((resolve, reject) => {
nd.insert(doc, (err, newDoc: any) => {
if(err) {
reject(err);
} else {
resolve(newDoc._id);
}
})
});
}
async dbInsertMany(nd: nedb, docs: object[]): Promise<string[]> {
if(docs.length === 0) {
return Promise.resolve([]);
}
return new Promise<string[]>((resolve, reject) => {
nd.insert(docs, (err, newDocs: any[]) => {
if(err) {
reject(err);
} else {
resolve(newDocs.map(d => d._id));
}
});
});
}
async dbUpdateOne(nd: nedb, query: string | object, updateQuery: object): Promise<void> {
// https://github.com/louischatriot/nedb#updating-documents
let nedbUpdateQuery = { $set: updateQuery }
query = wrapIdToQuery(query);
return new Promise<any>((resolve, reject) => {
nd.update(
query,
nedbUpdateQuery,
{ returnUpdatedDocs: true },
(err: Error, numAffected: number, affectedDocument: any) => {
if(err) {
reject(err);
} else {
resolve();
}
}
);
});
}
async dbUpdateMany(nd: nedb, query: string[] | object, updateQuery: object): Promise<any> {
// https://github.com/louischatriot/nedb#updating-documents
if(isEmptyArray(query)) {
return;
}
let nedbUpdateQuery = { $set: updateQuery };
query = wrapIdsToQuery(query);
return new Promise<void>((resolve, reject) => {
nd.update(
query,
nedbUpdateQuery,
{ returnUpdatedDocs: true, multi: true },
(err: Error, numAffected: number, affectedDocuments: any[]) => {
if(err) {
reject(err);
} else {
resolve();
}
}
);
});
}
async dbFindOne(nd: nedb, query: string | object): Promise<any> {
query = wrapIdToQuery(query);
return new Promise<any | null>((resolve, reject) => {
nd.findOne(query, (err, doc) => {
if(err) {
reject(err);
} else {
resolve(doc);
}
});
});
}
async dbFindMany(nd: nedb, query: string[] | object, sortQuery: object = {}): Promise<any[]> {
if(isEmptyArray(query)) {
return Promise.resolve([]);
}
query = wrapIdsToQuery(query);
return new Promise<any[]>((resolve, reject) => {
nd.find(query).sort(sortQuery).exec((err, docs: any[]) => {
if(err) {
reject(err);
} else {
resolve(docs);
}
});
});
}
async dbRemoveOne(nd: nedb, query: string | object): Promise<boolean> {
query = wrapIdToQuery(query);
return new Promise<boolean>((resolve, reject) => {
nd.remove(query, { /* options */ }, (err, numRemoved) => {
if(err) {
reject(err);
} else {
if(numRemoved > 1) {
throw new Error(`Removed ${numRemoved} elements with query: ${JSON.stringify(query)}. Only one is Ok.`);
} else {
resolve(numRemoved == 1);
}
}
});
});
}
async dbRemoveMany(nd: nedb, query: string[] | object): Promise<number> {
if(isEmptyArray(query)) {
return Promise.resolve(0);
}
query = wrapIdsToQuery(query);
return new Promise<number>((resolve, reject) => {
nd.remove(query, { multi: true }, (err, numRemoved) => {
if(err) {
reject(err);
} else {
resolve(numRemoved);
}
});
});
}
}

41
server/src/services/data_layer/utils.ts

@ -0,0 +1,41 @@
import { ObjectID } from 'mongodb';
//TODO: move to DbQueryWrapper
export function wrapIdToQuery(query: string | object): object {
if(typeof query === 'string') {
return { _id: query };
}
return query;
}
export function wrapIdToMongoDbQuery(query: string | object): object {
if(typeof query === 'string') {
return { _id: new ObjectID(query) };
}
return query;
}
export function wrapIdsToQuery(query: string[] | object): object {
if(Array.isArray(query)) {
return { _id: { $in: query } };
}
return query;
}
// mongodb uses ObjectIds to store _id
// we should wrap ids into ObjectID to generate correct query
export function wrapIdsToMongoDbQuery(query: string[] | object): object {
if(Array.isArray(query)) {
query = query.map(id => new ObjectID(id));
return { _id: { $in: query } };
}
return query;
}
export function isEmptyArray(obj: any): boolean {
if(!Array.isArray(obj)) {
return false;
}
return obj.length == 0;
}

200
server/src/services/data_service.ts

@ -1,3 +1,4 @@
import { getDbQueryWrapper, dbCollection, DBType } from './data_layer';
import * as config from '../config';
import * as nedb from 'nedb';
@ -22,7 +23,7 @@ const COLLECTION_TO_NAME_MAPPING = new Map<Collection, string>([
[Collection.THRESHOLD, 'threshold'],
[Collection.DETECTION_SPANS, 'detection_spans'],
[Collection.DB_META, 'db_meta']
])
]);
export enum SortingOrder { ASCENDING = 1, DESCENDING = -1 };
@ -36,13 +37,17 @@ export type DBQ = {
findMany: (query: string[] | object, sortQuery?: object) => Promise<any[]>,
insertOne: (document: object) => Promise<string>,
insertMany: (documents: object[]) => Promise<string[]>,
updateOne: (query: string | object, updateQuery: any) => Promise<any>,
updateMany: (query: string[] | object, updateQuery: any) => Promise<any[]>,
updateOne: (query: string | object, updateQuery: any) => Promise<void>,
updateMany: (query: string[] | object, updateQuery: any) => Promise<void>,
removeOne: (query: string) => Promise<boolean>
removeMany: (query: string[] | object) => Promise<number>
}
function dbCollectionFromCollection(collection: Collection): nedb | mongodb.Collection<any> {
const queryWrapper = getDbQueryWrapper();
const db = new Map<Collection, dbCollection>();
let mongoClient: mongodb.MongoClient;
function dbCollectionFromCollection(collection: Collection): dbCollection {
let dbCollection = db.get(collection);
if(dbCollection === undefined) {
throw new Error('Can`t find collection ' + collection);
@ -52,175 +57,20 @@ function dbCollectionFromCollection(collection: Collection): nedb | mongodb.Coll
export function makeDBQ(collection: Collection): DBQ {
return {
findOne: dbFindOne.bind(null, dbCollectionFromCollection(collection)),
findMany: dbFindMany.bind(null, dbCollectionFromCollection(collection)),
insertOne: dbInsertOne.bind(null, dbCollectionFromCollection(collection)),
insertMany: dbInsertMany.bind(null, dbCollectionFromCollection(collection)),
updateOne: dbUpdateOne.bind(null, dbCollectionFromCollection(collection)),
updateMany: dbUpdateMany.bind(null, dbCollectionFromCollection(collection)),
removeOne: dbRemoveOne.bind(null, dbCollectionFromCollection(collection)),
removeMany: dbRemoveMany.bind(null, dbCollectionFromCollection(collection))
findOne: queryWrapper.dbFindOne.bind(null, dbCollectionFromCollection(collection)),
findMany: queryWrapper.dbFindMany.bind(null, dbCollectionFromCollection(collection)),
insertOne: queryWrapper.dbInsertOne.bind(null, dbCollectionFromCollection(collection)),
insertMany: queryWrapper.dbInsertMany.bind(null, dbCollectionFromCollection(collection)),
updateOne: queryWrapper.dbUpdateOne.bind(null, dbCollectionFromCollection(collection)),
updateMany: queryWrapper.dbUpdateMany.bind(null, dbCollectionFromCollection(collection)),
removeOne: queryWrapper.dbRemoveOne.bind(null, dbCollectionFromCollection(collection)),
removeMany: queryWrapper.dbRemoveMany.bind(null, dbCollectionFromCollection(collection))
}
}
function wrapIdToQuery(query: string | object): any {
if(typeof query === 'string') {
return { _id: query };
}
return query;
}
function wrapIdsToQuery(query: string[] | object): any {
if(Array.isArray(query)) {
return { _id: { $in: query } };
}
return query;
}
// TODO: move to utils
function isEmptyArray(obj: any): boolean {
if(!Array.isArray(obj)) {
return false;
}
return obj.length == 0;
}
const db = new Map<Collection, nedb | mongodb.Collection<any>>();
let mongoClient: mongodb.MongoClient;
async function dbInsertOne(nd: nedb, doc: object): Promise<string> {
return new Promise<string>((resolve, reject) => {
nd.insert(doc, (err, newDoc: any) => {
if(err) {
reject(err);
} else {
resolve(newDoc._id);
}
})
});
}
async function dbInsertMany(nd: nedb, docs: object[]): Promise<string[]> {
if(docs.length === 0) {
return Promise.resolve([]);
}
return new Promise<string[]>((resolve, reject) => {
nd.insert(docs, (err, newDocs: any[]) => {
if(err) {
reject(err);
} else {
resolve(newDocs.map(d => d._id));
}
});
});
}
async function dbUpdateOne(nd: nedb, query: string | object, updateQuery: object): Promise<any> {
// https://github.com/louischatriot/nedb#updating-documents
let nedbUpdateQuery = { $set: updateQuery }
query = wrapIdToQuery(query);
return new Promise<any>((resolve, reject) => {
nd.update(
query,
nedbUpdateQuery,
{ returnUpdatedDocs: true },
(err: Error, numAffected: number, affectedDocument: any) => {
if(err) {
reject(err);
} else {
resolve(affectedDocument);
}
}
);
});
}
async function dbUpdateMany(nd: nedb, query: string[] | object, updateQuery: object): Promise<any[]> {
// https://github.com/louischatriot/nedb#updating-documents
if(isEmptyArray(query)) {
return Promise.resolve([]);
}
let nedbUpdateQuery = { $set: updateQuery };
query = wrapIdsToQuery(query);
return new Promise<any[]>((resolve, reject) => {
nd.update(
query,
nedbUpdateQuery,
{ returnUpdatedDocs: true, multi: true },
(err: Error, numAffected: number, affectedDocuments: any[]) => {
if(err) {
reject(err);
} else {
resolve(affectedDocuments);
}
}
);
});
}
async function dbFindOne(nd: nedb, query: string | object): Promise<any> {
query = wrapIdToQuery(query);
return new Promise<any | null>((resolve, reject) => {
nd.findOne(query, (err, doc) => {
if(err) {
reject(err);
} else {
resolve(doc);
}
});
});
}
async function dbFindMany(nd: nedb, query: string[] | object, sortQuery: object = {}): Promise<any[]> {
if(isEmptyArray(query)) {
return Promise.resolve([]);
}
query = wrapIdsToQuery(query);
return new Promise<any[]>((resolve, reject) => {
nd.find(query).sort(sortQuery).exec((err, docs: any[]) => {
if(err) {
reject(err);
} else {
resolve(docs);
}
});
});
}
async function dbRemoveOne(nd: nedb, query: string | object): Promise<boolean> {
query = wrapIdToQuery(query);
return new Promise<boolean>((resolve, reject) => {
nd.remove(query, { /* options */ }, (err, numRemoved) => {
if(err) {
reject(err);
} else {
if(numRemoved > 1) {
throw new Error(`Removed ${numRemoved} elements with query: ${JSON.stringify(query)}. Only one is Ok.`);
} else {
resolve(numRemoved == 1);
}
}
});
});
}
async function dbRemoveMany(nd: nedb, query: string[] | object): Promise<number> {
if(isEmptyArray(query)) {
return Promise.resolve(0);
}
query = wrapIdsToQuery(query);
return new Promise<number>((resolve, reject) => {
nd.remove(query, { multi: true }, (err, numRemoved) => {
if(err) {
reject(err);
} else {
resolve(numRemoved);
}
});
});
}
function maybeCreateDir(path: string): void {
if(fs.existsSync(path)) {
return;
@ -237,10 +87,10 @@ function checkDataFolders(): void {
}
async function connectToDb() {
if(config.HASTIC_DB_CONNECTION_TYPE === 'nedb') {
if(config.HASTIC_DB_CONNECTION_TYPE === DBType.nedb) {
checkDataFolders();
const inMemoryOnly = config.HASTIC_DB_IN_MEMORY;
console.log('NeDB used as storage');
console.log('NeDB is used as the storage');
// TODO: it's better if models request db which we create if it`s needed
db.set(Collection.ANALYTIC_UNITS, new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true, timestampData: true, inMemoryOnly}));
db.set(Collection.ANALYTIC_UNIT_CACHES, new nedb({ filename: config.ANALYTIC_UNIT_CACHES_DATABASE_PATH, autoload: true, inMemoryOnly}));
@ -248,8 +98,8 @@ async function connectToDb() {
db.set(Collection.THRESHOLD, new nedb({ filename: config.THRESHOLD_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.DETECTION_SPANS, new nedb({ filename: config.DETECTION_SPANS_DATABASE_PATH, autoload: true, inMemoryOnly}));
db.set(Collection.DB_META, new nedb({ filename: config.DB_META_PATH, autoload: true, inMemoryOnly}));
} else {
console.log('MongoDB used as storage');
} else if(config.HASTIC_DB_CONNECTION_TYPE === DBType.mongodb) {
console.log('MongoDB is used as the storage');
const dbConfig = config.HASTIC_DB_CONFIG;
const uri = `mongodb://${dbConfig.user}:${dbConfig.password}@${dbConfig.url}`;
const auth = {
@ -262,11 +112,11 @@ async function connectToDb() {
autoReconnect: true,
useUnifiedTopology: true,
authMechanism: 'SCRAM-SHA-1',
authSource: dbConfig.db_name
authSource: dbConfig.dbName
});
try {
const client: mongodb.MongoClient = await mongoClient.connect();
const hasticDb: mongodb.Db = client.db(dbConfig.db_name);
const hasticDb: mongodb.Db = client.db(dbConfig.dbName);
COLLECTION_TO_NAME_MAPPING.forEach((name, collection) => {
db.set(collection, hasticDb.collection(name));
});
@ -274,6 +124,10 @@ async function connectToDb() {
console.log(`got error while connect to MongoDB ${err}`);
throw err;
}
} else {
throw new Error(
`"${config.HASTIC_DB_CONNECTION_TYPE}" HASTIC_DB_CONNECTION_TYPE is not supported. Possible values: "nedb", "mongodb"`
);
}
}

Loading…
Cancel
Save