'use strict';
/**
* @module lib/datastores
* @summary Whiteflag API datastores module
* @description Module with the datastore abstraction layer to connect with multiple databases
* @tutorial installation
* @tutorial configuration
* @tutorial modules
*/
module.exports = {
init: initDatastores,
close: closeDatastores,
storeMessage,
getMessages,
storeState,
getState
};
/* Node.js core and external modules */
const fs = require('fs');
const toml = require('toml');
const jsonValidate = require('jsonschema').validate;
/* Common internal functions and classes */
const arr = require('./_common/arrays');
const log = require('./_common/logger');
const { ignore } = require('./_common/processing');
/* Whiteflag modules */
const wfRxEvent = require('./protocol/events').rxEvent;
const wfTxEvent = require('./protocol/events').txEvent;
/* Module constants */
const NOTSET = '(uninitialsed)';
const MODULELOG = 'datastores';
const WFCONFDIR = process.env.WFCONFDIR || './config';
const DBCONFFILE = WFCONFDIR + '/datastores.toml';
const DBSCHEMADIR = './lib/datastores/_static/'
const DBSCHEMAFILE = DBSCHEMADIR + 'datastores.config.schema.json'
const DBMODULEDIR = './datastores/';
const dbRequiredEvents = ['messageProcessed', 'messageUpdated'];
/* Module variables */
let _datastores = [];
let _dbConfig = [];
let _dbConfigSchema = {};
let _dbPrimary = NOTSET;
let _dbEvents = {
rx: [],
tx: []
};
/* MAIN MODULE FUNCTIONS */
/**
* Initialises configured datastores
* @function initDatastores
* @alias module:lib/datastores.init
* @param {datastoreInitCb} callback function called on completion
*/
function initDatastores(callback) {
// Read the configuration file
let dbConfig = {};
try {
dbConfig = toml.parse(fs.readFileSync(DBCONFFILE));
} catch(err) {
return callback(new Error(`Could not read configuration from ${DBCONFFILE}: ${err.message}`));
}
try {
_dbConfigSchema = JSON.parse(fs.readFileSync(DBSCHEMAFILE));
} catch(err) {
return callback(new Error(`Could not read configuration schema from ${DBSCHEMAFILE}: ${err.message}`));
}
// Parse config file and initialise each enabled blockchain
if (!parseConfig(dbConfig)) {
return callback(new Error(`Could not parse configuration from ${DBCONFFILE}`));
}
_dbConfig = dbConfig.databases;
_datastores = arr.pluck(_dbConfig, 'name');
log.debug(MODULELOG, `Configured datastores in ${DBCONFFILE}: ` + JSON.stringify(_datastores));
// Pass result of initialization to callback
return initDbInstance(callback);
}
/**
* Closes configured datastores
* @function closeDatastores
* @alias module:lib/datastores.close
* @param {genericCb} callback function called on completion
*/
function closeDatastores(callback) {
_dbConfig.forEach(dbInstance => {
// Get configuration and call init function for this datastore
if (dbInstance._enabled) {
/**
* Callback after closing the datastore
* @callback datastoreInitCb
* @param {Error} err any error
*/
dbInstance._module.close(function datastoreCloseCb(err) {
dbInstance._initialised = false;
if (err) {
log.error(MODULELOG, `Error closing ${dbInstance.name}: ${err.message}`);
} else {
log.info(MODULELOG, `Closed datastore: ${dbInstance.name}`);
}
// Only callback for primary datastore
if (dbInstance.primary) return callback(err, _dbPrimary);
});
}
});
}
/**
* Stores Whiteflag messsage in active and enabled datastores
* @function storeMessage
* @alias module:lib/datastores.storeMessage
* @param {wfMessage} wfMessage the Whiteflag message to be stored
* @param {datastoreStoreMessageCb} callback function called on completion
*/
function storeMessage(wfMessage = {}, callback) {
if (_dbConfig.length <= 0) return callback(new Error('No datastore configured'), null);
_dbConfig.forEach(dbInstance => {
if (dbInstance._initialised) {
/**
* Callback after storing the Whiteflag message
* @callback datastoreStoreMessageCb
* @param {Error} err any error
* @param {*} result the result
*/
dbInstance._module.storeMessage(wfMessage, function datastoreStoreMessageCb(err, result) {
if (err) log.error(MODULELOG, `Could not store message in ${dbInstance.name}: ${err.message}`);
// Only call callback for primary datastore
if (dbInstance.primary) {
if (err) return callback(err, null);
return callback(null, result);
}
});
}
});
}
/**
* Get Whiteflag messages from the primary database
* @function getMessages
* @alias module:lib/datastores.getMessages
* @param {Object} wfQuery the properties of the messages to look up
* @param {datastoreGetMessagesCb} callback function called on completion
*/
function getMessages(wfQuery = {}, callback) {
if (_dbConfig.length <= 0) return callback(new Error('No datastore configured'), null);
return getPrimaryDatastore().getMessages(wfQuery, datastoreGetMessagesCb);
/**
* Callback after retrieving Whiteflag messages
* @callback datastoreGetMessagesCb
* @param {Error} err any error
* @param {Array} wfMessages the resulting Whiteflag messages
* @param {number} count the number of messages found
*/
function datastoreGetMessagesCb(err, wfMessages, count) {
if (err) return callback(new Error(`Could not retrieve messages from ${_dbPrimary}: ${err.message}`));
if (Array.isArray(wfMessages)) {
ignore(count);
return callback(null, wfMessages, wfMessages.length);
}
return callback(new Error(`The primary datastore ${_dbPrimary} did not return an array`));
}
}
/**
* Stores Whiteflag state in the primary database
* @function storeState
* @alias module:lib/datastores.storeState
* @param {Object} stateObject state data enclosed in a storage / encryption container
* @param {datastoreStoreStateCb} callback function called on completion
*/
function storeState(stateObject, callback) {
if (_dbConfig.length <= 0) return callback(new Error('No datastore configured'), null);
return getPrimaryDatastore().storeState(stateObject, datastoreStoreStateCb);
/**
* Callback after storing the Whiteflag state
* @callback datastoreStoreStateCb
* @param {Error} err any error
* @param {*} result the result
*/
function datastoreStoreStateCb(err, result) {
if (err) return callback(new Error(`Could not save state in ${_dbPrimary}: ${err.message}`));
ignore(result);
return callback(null, result);
}
}
/**
* Gets Whiteflag state from the primary database
* @function getState
* @alias module:lib/datastores.getState
* @param {datastoreGetStateCb} callback function called on completion
*/
function getState(callback) {
if (_dbConfig.length <= 0) return callback(new Error('No datastore configured'), null);
return getPrimaryDatastore().getState(datastoreGetStateCb);
/**
* Callback after getting the Whiteflag state
* @callback datastoreGetStateCb
* @param {Error} err any error
* @param {Object} stateObject state data enclosed in a storage / encryption container
*/
function datastoreGetStateCb(err, stateObject) {
if (err) return callback(new Error(`Could not retrieve state from ${_dbPrimary}: ${err.message}`));
return callback(null, stateObject);
}
}
/* PRIVATE MODULE FUNCTIONS
/**
* Returns the primary datastore module
* @private
*/
function getPrimaryDatastore() {
const dbInstance = _dbConfig.find(dbInstance => {
return (dbInstance._initialised && dbInstance.primary);
});
if (dbInstance) return dbInstance._module;
throw new Error(`No primary datastore initialized`);
}
/**
* Parses the base elements of the configuration before processing the configuration of each datastore
* @private
* @param {Object} dbConfig the datastores configuration object read from file
* @returns {boolean} true if configuration could be parsed, else false
*/
function parseConfig(dbConfig) {
// Check if any databases defined in datastores config
if (dbConfig?.databases) {
// Validate config file based on schema
let datastoresConfigErrors = validateConfig(dbConfig);
if (datastoresConfigErrors?.length > 0) {
log.error(MODULELOG, 'Configuration errors: ' + JSON.stringify(datastoresConfigErrors));
} else {
// Parse config of each datastore
dbConfig.databases.forEach(dbInstance => {
dbInstance._enabled = enableDbInstance(dbInstance);
});
return true;
}
}
return false;
}
/**
* Validates the datastore configuration against the datastore configuration schema
* @private
* @param {Object} dbConfig the datastore configuration to be validated
* @returns {Array} validation errors, empty if no errors
*/
function validateConfig(dbConfig) {
try {
return [].concat(arr.pluck(jsonValidate(dbConfig, _dbConfigSchema).errors, 'stack'));
} catch(err) {
return [].push(err.message);
}
}
/**
* Enables a specific datastore and loads module (but does not yet connect to the database)
* @private
* @param {string} event the tx event to be checked with each enabled datastore
* @param {Object} dbInstance the configuration of a specific database
* @returns {boolean} true if datastore could be activated and module could be loaded, else false
*/
function enableDbInstance(dbInstance) {
// CHeck if datastore is set to active
if (!dbInstance.active) return false;
// Assure parameters are configured
if (dbInstance.primary !== undefined) {
// Try loading the module to assure it exists
try {
dbInstance._module = require(DBMODULEDIR + dbInstance.module);
dbInstance._initialised = false;
} catch(err) {
log.error(MODULELOG, `Module ${dbInstance.module} cannot be loaded: ${err.message}`);
return false;
}
// Primary datastore requires 'messageProcessed' tx and rx events configured
if (dbInstance.rxStoreEvent.length > -1 && dbInstance.txStoreEvent.length > -1) {
if (dbInstance.primary) {
dbRequiredEvents.forEach(event => {
if (dbInstance.rxStoreEvent.indexOf(event) < 0) {
dbInstance.rxStoreEvent.push(event);
log.debug(MODULELOG, `Auto configured primary datastore ${dbInstance.name} to store message on '${event}' RX events`);
}
if (dbInstance.txStoreEvent.indexOf(event) < 0) {
dbInstance.txStoreEvent.push(event);
log.debug(MODULELOG, `Auto configured primary datastore ${dbInstance.name} to store message on '${event}' TX events`);
}
});
}
// Handle storage of messages based on configured events
dbInstance.rxStoreEvent.forEach(event => {
if (_dbEvents.rx.indexOf(event) < 0) {
wfRxEvent.on(event, function datastoreRxEventCb(wfMessage) {
rxStoreMessage(event, wfMessage);
});
_dbEvents.rx.push(event);
}
});
dbInstance.txStoreEvent.forEach(event => {
if (_dbEvents.tx.indexOf(event) < 0) {
wfTxEvent.on(event, function datastoreTxEventCb(wfMessage) {
txStoreMessage(event, wfMessage);
});
_dbEvents.tx.push(event);
}
});
} else {
// Ignore datastore because no rx and tx events defined
log.warn(MODULELOG, `Ignoring ${dbInstance.name}: Datastore requires at least one tx and one rx event`);
return false;
}
} else {
// Ignore datastore because of incomplete configuration
log.warn(MODULELOG, `Ignoring ${dbInstance.name}: Missing one of the following parameters in ${DBCONFFILE}: primary, module, active, txStoreEvent, rxStoreEvent`);
return false;
}
// Datastore enabled
return true;
}
/**
* Calls init function for each datastore
* @private
* @param {genericCb} callback
*/
function initDbInstance(callback) {
// Get configuration and call init function for each datastore
_dbConfig.forEach(dbInstance => {
if (dbInstance._enabled) {
/**
* Callback after initialising the datastore
* @callback datastoreInitCb
* @param {Error} err any error
*/
dbInstance._module.init(dbInstance, function datastoreInitCb(err) {
if (err) {
log.error(MODULELOG, `Error initialising ${dbInstance.name}: ${err.message}`);
} else {
dbInstance._initialised = true;
log.info(MODULELOG, `Initialised datastore: ${dbInstance.name}`);
}
// Ensure only one datastore is primary
if (_dbPrimary !== NOTSET) {
dbInstance.primary = false;
log.warn(MODULELOG, `Ignoring ${dbInstance.name} as primary, because ${_dbPrimary} already is; please update ${DBCONFFILE}`);
}
// Only invoke callback if primary
if (dbInstance.primary && _dbPrimary === NOTSET) {
_dbPrimary = dbInstance.name;
return callback(err, _dbPrimary);
}
});
}
});
}
/**
* Checks each enabled datastore whether message needs to be stored at tx event
* @private
* @param {string} event the tx event to be checked with each enabled datastore
* @param {wfMessage} wfMessage the sent whiteflag message
*/
function txStoreMessage(event, wfMessage) {
_dbConfig.forEach(dbInstance => {
if (dbInstance._initialised && dbInstance.txStoreEvent.indexOf(event) > -1) {
log.trace(MODULELOG, `Storing message in ${dbInstance.name} upon tx event '${event}': ${wfMessage.MetaHeader.transactionHash}`);
dbInstance._module.storeMessage(wfMessage, datastoreTxStoreMessageCb);
}
});
/**
* Transmits result of storage of sent message
* @callback txStoreMessageDbCb
* @param {Error} err any error
* @param {wfMessage} storedMessage the stored Whiteflag message
*/
function datastoreTxStoreMessageCb(err, storedMessage) {
if (err) return wfTxEvent.emit('error', err);
return wfTxEvent.emit('messageStored', storedMessage);
}
}
/**
* Checks each enabled datastore whether message needs to be stored at rx event
* @private
* @param {string} event the rx event to be checked with each enabled datastore
* @param {wfMessage} wfMessage the received whiteflag message
*/
function rxStoreMessage(event, wfMessage) {
_dbConfig.forEach(dbInstance => {
if (dbInstance._initialised && dbInstance.rxStoreEvent.indexOf(event) > -1) {
log.trace(MODULELOG, `Storing message in ${dbInstance.name} upon rx event '${event}': ${wfMessage.MetaHeader.transactionHash}`);
dbInstance._module.storeMessage(wfMessage, datastoreRxStoreMessageCb);
}
});
/**
* Transmits result of storage of received message
* @callback rxStoreMessageDbCb
* @param {Error} err any error
* @param {wfMessage} storedMessage the stored Whiteflag message
*/
function datastoreRxStoreMessageCb(err, storedMessage) {
if (err) return wfRxEvent.emit('error', err);
return wfRxEvent.emit('messageStored', storedMessage);
}
}