'use strict';
/**
* @module lib/datastores
* @summary Whiteflag API datastores module
* @description Module with the datastore abstraction layer to connect with multiple databases
* @tutorial installation
* @tutorial configuration
* @tutorial modules
*/
module.exports = {
// Datastore functions
init: initDatastores,
close: closeDatastores,
storeMessage,
getMessages,
storeState,
getState
};
// Node.js core and external modules //
const fs = require('fs');
const toml = require('toml');
const jsonValidate = require('jsonschema').validate;
// Whiteflag common functions and classes //
const array = require('./common/arrays');
const log = require('./common/logger');
const { ignore } = require('./common/processing');
// Whiteflag event emitters //
const wfRxEvent = require('./protocol/events').rxEvent;
const wfTxEvent = require('./protocol/events').txEvent;
// Module constants //
const MODULELOG = 'datastores';
const WFCONFDIR = process.env.WFCONFDIR || './config';
const DSCONFFILE = WFCONFDIR + '/datastores.toml';
const DSMODULEDIR = './datastores/';
const requiredEvents = ['messageProcessed', 'messageUpdated'];
const wfDatastoresConfigSchema = JSON.parse(fs.readFileSync('./lib/datastores/static/datastores.config.schema.json'));
// Module variables //
let _datastores = [];
let _datastoresConfig = [];
let _primaryDatastore = '';
let _storeEventRegister = {
rx: [],
tx: []
};
// MAIN MODULE FUNCTIONS //
/**
* Initialises configured datastores
* @function initDatastores
* @alias module:lib/datastores.init
* @param {datastoreInitCb} callback function to be called upon completion
*/
function initDatastores(callback) {
// Read the configuration file
let datastoresConfig = {};
try {
datastoresConfig = toml.parse(fs.readFileSync(DSCONFFILE));
} catch(err) {
return callback(new Error(`Could not read configuration in ${DSCONFFILE}`));
}
// Parse config file and initialise each enabled blockchain
if (parseConfig(datastoresConfig)) {
_datastoresConfig = datastoresConfig.databases;
// Get array of names of configured datastores
_datastores = array.pluck(_datastoresConfig, 'name');
log.info(MODULELOG, `Configured datastores in ${DSCONFFILE}: ` + JSON.stringify(_datastores));
_datastoresConfig.forEach(dbConfig => {
// Get configuration and call init function for this datastore
if (dbConfig && dbConfig._enabled) {
/**
* Callback after initialising the datastore
* @callback datastoreInitCb
* @param {Error} err error object if any error
*/
if (dbConfig.primary) _primaryDatastore = dbConfig.name;
dbConfig._moduleImpl.init(dbConfig, function datastoreInitCb(err) {
if (err) {
// Only call callback with error for primary datastore
if (dbConfig.primary) return callback(err);
log.error(MODULELOG, `Error connecting to ${dbConfig.name}: ${err.message}`);
} else {
dbConfig.initialised = true;
log.info(MODULELOG, `Connected to datastore: ${dbConfig.name}`);
if (dbConfig.primary) return callback(null, _primaryDatastore);
}
});
}
});
} else {
return callback(new Error(`Could not parse configuration in ${DSCONFFILE}`));
}
}
/**
* Closes configured datastores
* @function closeDatastores
* @alias module:lib/datastores.close
* @param {datastoreCloseCb} callback function to be called upon completion
* @typedef {function(Error)} datastoreCloseCb
*/
function closeDatastores(callback) {
_datastoresConfig.forEach(dbConfig => {
// Get configuration and call init function for this datastore
if (dbConfig && dbConfig._enabled) {
/**
* Callback after closing the datastore
* @callback datastoreInitCb
* @param {Error} err error object if any error
*/
dbConfig._moduleImpl.close(function datastoreCloseCb(err) {
dbConfig.initialised = false;
if (err) {
// Only call callback with error for primary datastore
if (dbConfig.primary) return callback(err);
log.error(MODULELOG, `Error closing ${dbConfig.name}: ${err.message}`);
} else {
if (dbConfig.primary) {
log.info(MODULELOG, `Closed primary datastore: ${dbConfig.name}`);
return callback(null);
} else {
log.info(MODULELOG, `Closed datastore: ${dbConfig.name}`);
}
}
});
}
});
}
/**
* Stores Whiteflag messsage in active and enabled datastores
* @function storeMessage
* @alias module:lib/datastores.storeMessage
* @param {wfMessage} wfMessage the Whiteflag message to be stored
* @param {datastoreStoreMessageCb} callback function to be called upon completion
*/
function storeMessage(wfMessage = {}, callback) {
if (_datastoresConfig.length <= 0) return callback(new Error('No datastore configured'), null);
_datastoresConfig.forEach(dbConfig => {
if (dbConfig && dbConfig._enabled) {
/**
* Callback after storing the Whiteflag message
* @callback datastoreStoreMessageCb
* @param {Error} err error object if any error
* @param {*} result the result
*/
dbConfig._moduleImpl.storeMessage(wfMessage, function datastoreStoreMessageCb(err, result) {
if (err) log.error(MODULELOG, `Could not store message in ${dbConfig.name}: ${err.message}`);
// Only call callback for primary datastore
if (dbConfig.primary) {
if (err) return callback(err, null);
return callback(null, result);
}
});
}
});
}
/**
* Get Whiteflag messages from the primary database
* @function getMessages
* @alias module:lib/datastores.getMessages
* @param {Object} wfQuery the properties of the messages to look up
* @param {datastoreGetMessagesCb} callback function to be called upon completion
*/
function getMessages(wfQuery = {}, callback) {
if (_datastoresConfig.length <= 0) return callback(new Error('No datastore configured'), null);
_datastoresConfig.forEach(dbConfig => {
if (dbConfig && dbConfig._enabled && dbConfig.primary) {
dbConfig._moduleImpl.getMessages(wfQuery, datastoreGetMessagesCb);
}
});
/**
* Callback after retrieving Whiteflag messages
* @callback datastoreGetMessagesCb
* @param {Error} err error object if any error
* @param {Array} wfMessages the resulting Whiteflag messages
* @param {number} count the number of messages found
*/
function datastoreGetMessagesCb(err, wfMessages, count) {
if (err) return callback(new Error(`Could not retrieve messages from ${_primaryDatastore}: ${err.message}`));
if (Array.isArray(wfMessages)) {
ignore(count);
return callback(null, wfMessages, wfMessages.length);
}
return callback(new Error(`The primary datastore ${_primaryDatastore} did not return an array`));
}
}
/**
* Stores Whiteflag state in the primary database
* @function storeState
* @alias module:lib/datastores.storeState
* @param {Object} stateObject state data enclosed in a storage / encryption container
* @param {datastoreStoreStateCb} callback function to be called upon completion
*/
function storeState(stateObject, callback) {
if (_datastoresConfig.length <= 0) return callback(new Error('No datastore configured'), null);
_datastoresConfig.forEach(dbConfig => {
if (dbConfig && dbConfig._enabled && dbConfig.primary) {
dbConfig._moduleImpl.storeState(stateObject, datastoreStoreStateCb);
}
});
/**
* Callback after storing the Whiteflag state
* @callback datastoreStoreStateCb
* @param {Error} err error object if any error
* @param {*} result the result
*/
function datastoreStoreStateCb(err, result) {
if (err) return callback(new Error(`Could not save state in ${_primaryDatastore}: ${err.message}`));
ignore(result);
return callback(null, result);
}
}
/**
* Gets Whiteflag state from the primary database
* @function getState
* @alias module:lib/datastores.getState
* @param {datastoreGetStateCb} callback function to be called upon completion
*/
function getState(callback) {
if (_datastoresConfig.length <= 0) return callback(new Error('No datastore configured'), null);
_datastoresConfig.forEach(dbConfig => {
if (dbConfig && dbConfig._enabled && dbConfig.primary) {
dbConfig._moduleImpl.getState(datastoreGetStateCb);
}
});
/**
* Callback after getting the Whiteflag state
* @callback datastoreGetStateCb
* @param {Error} err error object if any error
* @param {Object} stateObject state data enclosed in a storage / encryption container
*/
function datastoreGetStateCb(err, stateObject) {
if (err) return callback(new Error(`Could not retrieve state from ${_primaryDatastore}: ${err.message}`));
return callback(null, stateObject);
}
}
// PRIVATE MODULE FUNCTIONS
// Whiteflag modules //
/**
* Parses the base elements of the configuration before processing the configuration of each datastore
* @private
* @param {Object} datastoresConfig the datastores configuration object read from file
* @returns {boolean} true if configuration could be parsed, else false
*/
function parseConfig(datastoresConfig) {
// Check if any databases defined in datastores config
if (datastoresConfig && datastoresConfig.databases) {
// Validate config file based on schema
let datastoresConfigErrors = validateConfig(datastoresConfig);
if (datastoresConfigErrors && datastoresConfigErrors.length > 0) {
log.error(MODULELOG, 'Configuration errors: ' + JSON.stringify(datastoresConfigErrors));
} else {
// Parse config of each datastore
datastoresConfig.databases.forEach(dbConfig => {
dbConfig._enabled = enableDbConfig(dbConfig);
});
return true;
}
}
return false;
}
/**
* Validates the datastore configuration against the datastore configuration schema
* @private
* @param {Object} datastoresConfig the datastore configuration to be validated
* @returns {Array} validation errors, empty if no errors
*/
function validateConfig(datastoresConfig) {
try {
return [].concat(array.pluck(jsonValidate(datastoresConfig, wfDatastoresConfigSchema).errors, 'stack'));
} catch(err) {
return [].push(err.message);
}
}
/**
* Enables a specific datastore and loads module (but does not yet connect to the database)
* @private
* @param {string} event the tx event to be checked with each enabled datastore
* @param {Object} dbConfig the configuration of a specific database
* @returns {boolean} true if datastore could be activated and module could be loaded, else false
*/
function enableDbConfig(dbConfig) {
// CHeck if datastore is set to active
if (!dbConfig.active) return false;
// Assure parameters are configured
if (dbConfig.primary !== undefined) {
// Try loading the module to assure it exists
try {
dbConfig._moduleImpl = require(DSMODULEDIR + dbConfig.module);
dbConfig.initialised = false;
} catch(err) {
log.error(MODULELOG, `Module ${dbConfig.module} cannot be loaded: ${err.message}`);
return false;
}
// Primary datastore requires 'messageProcessed' tx and rx events configured
if (dbConfig.rxStoreEvent.length > -1 && dbConfig.txStoreEvent.length > -1) {
if (dbConfig.primary) {
requiredEvents.forEach(event => {
if (dbConfig.rxStoreEvent.indexOf(event) < 0) {
dbConfig.rxStoreEvent.push(event);
log.info(MODULELOG, `Auto configured primary datastore ${dbConfig.name} to store message on '${event}' RX events`);
}
if (dbConfig.txStoreEvent.indexOf(event) < 0) {
dbConfig.txStoreEvent.push(event);
log.info(MODULELOG, `Auto configured primary datastore ${dbConfig.name} to store message on '${event}' TX events`);
}
});
}
// Handle storage of messages based on configured events.
dbConfig.rxStoreEvent.forEach(event => {
if (_storeEventRegister.rx.indexOf(event) < 0) {
wfRxEvent.on(event, function datastoreRxEventCb(wfMessage) {
rxStoreMessage(event, wfMessage);
});
_storeEventRegister.rx.push(event);
}
});
dbConfig.txStoreEvent.forEach(event => {
if (_storeEventRegister.tx.indexOf(event) < 0) {
wfTxEvent.on(event, function datastoreTxEventCb(wfMessage) {
txStoreMessage(event, wfMessage);
});
_storeEventRegister.tx.push(event);
}
});
} else {
// Ignore datastore because no rx and tx events defined
log.warn(MODULELOG, `Ignoring ${dbConfig.name}: Datastore requires at least one tx and one rx event`);
return false;
}
} else {
// Ignore datastore because of incomplete configuration
log.warn(MODULELOG, `Ignoring ${dbConfig.name}: Missing one of the following parameters in ${DSCONFFILE}: primary, module, active, txStoreEvent, rxStoreEvent`);
return false;
}
// Datastore enabled
return true;
}
/**
* Checks each enabled datastore whether message needs to be stored at tx event
* @private
* @param {string} event the tx event to be checked with each enabled datastore
* @param {wfMessage} wfMessage the sent whiteflag message
*/
function txStoreMessage(event, wfMessage) {
_datastoresConfig.forEach(dbConfig => {
if (dbConfig && dbConfig._enabled && dbConfig.txStoreEvent.indexOf(event) > -1) {
log.trace(MODULELOG, `Storing message in ${dbConfig.name} upon tx event '${event}': ${wfMessage.MetaHeader.transactionHash}`);
dbConfig._moduleImpl.storeMessage(wfMessage, datastoreTxStoreMessageCb);
}
});
/**
* Transmits result of storage of sent message
* @callback txStoreMessageDbCb
* @param {Error} err error object if any error
* @param {wfMessage} storedMessage the stored Whiteflag message
*/
function datastoreTxStoreMessageCb(err, storedMessage) {
if (err) return wfTxEvent.emit('error', err);
return wfTxEvent.emit('messageStored', storedMessage);
}
}
/**
* Checks each enabled datastore whether message needs to be stored at rx event
* @private
* @param {string} event the rx event to be checked with each enabled datastore
* @param {wfMessage} wfMessage the received whiteflag message
*/
function rxStoreMessage(event, wfMessage) {
_datastoresConfig.forEach(dbConfig => {
if (dbConfig && dbConfig._enabled && dbConfig.rxStoreEvent.indexOf(event) > -1) {
log.trace(MODULELOG, `Storing message in ${dbConfig.name} upon rx event '${event}': ${wfMessage.MetaHeader.transactionHash}`);
dbConfig._moduleImpl.storeMessage(wfMessage, datastoreRxStoreMessageCb);
}
});
/**
* Transmits result of storage of received message
* @callback rxStoreMessageDbCb
* @param {Error} err error object if any error
* @param {wfMessage} storedMessage the stored Whiteflag message
*/
function datastoreRxStoreMessageCb(err, storedMessage) {
if (err) return wfRxEvent.emit('error', err);
return wfRxEvent.emit('messageStored', storedMessage);
}
}