'use strict';
/**
* @module lib/protocol/retrieve
* @summary Whiteflag message retrieval module
* @description Module for Whiteflag message retrieval from datastores and blockchains
* @tutorial modules
* @tutorial protocol
*/
module.exports = {
getMessage,
getQuery,
getReferences,
getAuthMessages,
getSequence,
test: {
refRef: removeSupersededMessages,
semRef: removeSemanticReferences
}
};
/* Type defintitions */
/**
* @callback wfMessagesCb
* @param {error} err any error
* @param {wfMessage[]} message an array with the Whiteflag messages
*/
/* Common internal functions and classes */
const log = require('../_common/logger');
const arr = require('../_common/arrays');
const { ProcessingError } = require('../_common/errors');
/* Whiteflag modules */
const wfBlockchains = require('../blockchains');
const wfDatastores = require('../datastores');
const wfCodec = require('./codec');
const wfRxEvent = require('./events').rxEvent;
/* Module constants */
const MODULELOG = 'retrieve';
const AUTHMESSAGECODE = 'A';
/* MAIN MODULE FUNCTIONS */
/**
* Retrieves a message from database or blockchain
* @function getMessage
* @alias lib/protocol/retrieve.getMessage
* @param {string} transactionHash the hash of the transaction to look up
* @param {string} [blockchain] the blockchain on which the transaction is stored
* @param {wfMessagesCb} callback function called on completion
*/
function getMessage(transactionHash, blockchain = null, callback) {
if (!transactionHash) return callback(new ProcessingError('No transaction hash specified', null, 'WF_API_BAD_REQUEST'));
let wfQuery = {};
wfQuery['MetaHeader.transactionHash'] = transactionHash;
if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;
return retrieveMessages(wfQuery, callback);
}
/**
* Retrieves a message from database or blockchain
* @function getQuery
* @alias module:lib/protocol/retrieve.getQuery
* @param {Object} wfQuery the query to be performed
* @param {wfMessagesCb} callback function called on completion
*/
function getQuery(wfQuery = {}, callback) {
return retrieveMessages(wfQuery, callback);
}
/**
* Retrieves referencing messages from database or blockchain
* @function getReferences
* @alias module:lib/protocol/retrieve.getReferences
* @param {string} transactionHash the transaction hash of the referenced message
* @param {string} [blockchain] the blockchain on which the transaction is stored
* @param {wfMessagesCb} callback function called on completion
*/
function getReferences(transactionHash, blockchain = null, callback) {
if (!transactionHash) return callback(new ProcessingError('No transaction hash specified', null, 'WF_API_BAD_REQUEST'));
let wfQuery = {};
wfQuery['MessageHeader.ReferencedMessage'] = transactionHash;
if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;
return retrieveMessages(wfQuery, callback);
}
/**
* Retrieves authentication messages for a specific blockchain address from database or blockchain
* @todo Remove semantic refs and superseded messages from sequence
* @function getAuthMessages
* @alias module:lib/protocol/retrieve.getAuthMessages
* @param {string} originatorAddress the originator address of the authentication message
* @param {string} [blockchain] the blockchain on which the transaction is stored
* @param {wfMessagesCb} callback function called on completion
*/
function getAuthMessages(originatorAddress, blockchain = null, callback) {
if (!originatorAddress) return callback(new ProcessingError('No originator address specified', null, 'WF_API_BAD_REQUEST'));
let wfQuery = {};
wfQuery['MetaHeader.originatorAddress'] = originatorAddress;
wfQuery['MessageHeader.MessageCode'] = AUTHMESSAGECODE;
if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;
// Retrieve and process messages
retrieveMessages(wfQuery, function retrieveAuthMessagesCb(err, authMessages) {
// Check for errors and if any authentication message was found
if (err) return callback(err);
if (authMessages.length === 0) {
return callback(new ProcessingError(`Could not find authentication messages for address ${originatorAddress}`, null, 'WF_API_NO_DATA'));
}
// WIP: Return result, removing semantic refs and superseded messages from sequence
// WIP: authSequence = removeSemanticReferences(authSequence);
// WIP: return callback(null, removeSupersededMessages(authSequence));
return callback(null, authMessages);
});
}
/**
* Retrieves and evaluates message sequence starting with the message identified by the transaction hash
* @todo Clean up retrieved sequence
* @function getSequence
* @alias module:lib/protocol/retrieve.getSequence
* @param {string} transactionHash the hash of the transaction to look up
* @param {string} blockchain the blockchain on which the transaction is stored
* @param {wfMessagesCb} callback function called on completion
*/
function getSequence(transactionHash, blockchain, callback) {
if (!blockchain) return callback(new ProcessingError('No blockchain specified', null, 'WF_API_BAD_REQUEST'));
if (!transactionHash) return callback(new ProcessingError('No transaction hash specified to find first message in sequence', null, 'WF_API_BAD_REQUEST'));
let wfQuery = {};
wfQuery['MetaHeader.blockchain'] = blockchain;
wfQuery['MetaHeader.transactionHash'] = transactionHash;
// Call function to retrieve first message of sequence
retrieveMessages(wfQuery, function retrieveSequenceCb(err, initMsgSequence) {
// Check if transaction hash of message was found
if (err) return callback(err);
if (initMsgSequence.length === 0) {
return callback(new ProcessingError('Could not find transaction hash of first message in sequence', null, 'WF_API_NO_DATA'));
}
// Variables needed to process sequence
let seqBeginLength;
let seqRefLookups = [];
let seqProcessedLookups = [];
// Iterate until full sequence has been retrieved
iterateSeqence(initMsgSequence);
/**
* Iterates through message sequence to retrieve referenced messages
* @private
* @param {*} msgSequence
*/
function iterateSeqence(msgSequence) {
// Get current length of sequence and transaction hashes that have not been looked up yet
seqBeginLength = msgSequence.length;
seqRefLookups = arr.plucksub(msgSequence, 'MetaHeader', 'transactionHash')
.filter(function retrieveSequenceFilterCb(transactionHash) {
return (seqProcessedLookups.indexOf(transactionHash) < 0);
});
// Loop through sequence to add referencing messages
iterateMessages(seqRefLookups.length);
/**
* Iterates through message to retrieve
* @private
* @param {*} i index
*/
function iterateMessages(i) {
// Decrease iteration counter
i -= 1;
// Construct new query
wfQuery = {};
wfQuery['MessageHeader.ReferencedMessage'] = seqRefLookups[i];
wfQuery['MetaHeader.blockchain'] = blockchain;
// Retrieve referencing messages
retrieveMessages(wfQuery, function retrieveSequenceRefMessagesCb(err, refMessages) {
if (err) log.error(MODULELOG, `Error retrieving mesages in sequence: ${err.message}`);
msgSequence = arr.addArray(msgSequence, refMessages);
// Remember messages for which references just have been retrieved
seqProcessedLookups.push(seqRefLookups[i]);
// Check if more references need to be looked up
if (i > 0) return iterateMessages(i);
// If sequence has grown, remove semantic reference not processed by API, and look for more messages
if (msgSequence.length > seqBeginLength) {
// WIP: return iterateSeqence(removeSemanticReferences(msgSequence));
return iterateSeqence(msgSequence);
}
// WIP: No more referencing messages; clean up sequence and return result
// WIP: log.trace(MODULELOG, `Found sequence of ${msgSequence.length} messages: removing superseded messages`);
// WIP: return callback(null, removeSupersededMessages(msgSequence));
return callback(null, msgSequence);
});
}
}
});
}
/* PRIVATE MODULE FUNCTIONS */
/**
* Retrieves an array of messages from datastore or blockchain
* @private
* @param {Object} wfQuery the query to be performed
* @param {string} blockchain the blockchain on which the transaction is stored
* @param {wfMessagesCb} callback function called on completion
*/
function retrieveMessages(wfQuery = {}, callback) {
// Lookup message in database first - returns an array
wfDatastores.getMessages(wfQuery, function retrieveMessagesDbCb(err, wfMessages, count) {
if (err) return callback(err);
if (count === 1) return callback(null, wfMessages);
if (count > 1) return callback(null, wfMessages);
// Cannot look on blockchain if no blockchain and transaction hash specified
if (!wfQuery['MetaHeader.blockchain']) return callback(null, wfMessages);
if (!wfQuery['MetaHeader.transactionHash']) return callback(null, wfMessages);
/**
* @callback bcGetMessageCb
* @param {Error} err any error
* @param {Object} wfMessage a Whiteflag message
*/
wfBlockchains.getMessage(wfQuery, function retrieveMessagesBcCb(err, wfMessage) {
if (err) return callback(err, null);
wfCodec.decode(wfMessage, function retrieveDecodeCb(err, wfMessage, ivMissing) {
let wfMessages = [ wfMessage ];
if (err) return callback(err, wfMessages);
if (ivMissing) return callback(new ProcessingError('Cannot decrypt message without initialisation vector', null, ''), wfMessages);
// Valid message: process further and return result
if (wfMessage.MetaHeader.formatValid) wfRxEvent.emit('messageDecoded', wfMessage);
return callback(null, wfMessages);
});
});
});
}
/**
* Removes superseded or unneeded messages from a message sequence array
* @todo This function needs to be evaluated
* @private
* @param {Array} msgSequence array containing a message sequence
* @returns {Array} array with superseded messages removed
*/
function removeSupersededMessages(msgSequence) {
// 1. Ignore all messages that are recalled
msgSequence = msgSequence.filter(function removeSuperseded1Filter(wfMessage) {
return (msgSequence.filter(function removeSuperseded1RefFilter(wfRefMessage) {
return (
wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash
&& wfRefMessage.MessageHeader.ReferenceIndicator === '1'
);
}).length === 0);
});
// 2. Ignore all messages that are updated or expired
msgSequence = msgSequence.filter(function removeSuperseded24Filter(wfMessage) {
return (msgSequence.filter(function removeSuperseded24RefFilter(wfRefMessage) {
return (
wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash
&& ['2', '4'].indexOf(wfRefMessage.MessageHeader.ReferenceIndicator) > -1
);
}).length === 0);
});
// 3. Ignore all referencing messages of which the referenced message is ignored, except for updates
msgSequence = msgSequence.filter(function removeSupersededOldRefsRefFilter(wfRefMessage) {
return (
wfRefMessage.MessageHeader.ReferenceIndicator === '0'
|| wfRefMessage.MessageHeader.ReferenceIndicator === '2'
|| msgSequence.filter(function removeSupersededOldRefsFilter(wfMessage) {
return (wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash);
}).length > 0
);
});
// Return resulting cleaned up message sequence array
return msgSequence;
}
/**
* Removes semantic reference messages (reference code >= 5) from a message sequence array
* @todo This function needs to be evaluated
* @private
* @param {Array} msgSequence array containing a message sequence
* @returns {Array} array with superseded messages removed
*/
function removeSemanticReferences(msgSequence) {
// 1. Reference codes 5-9 are ignored
msgSequence = msgSequence.filter(function removeSemantic56789Filter(wfMessage) {
return (['5', '6', '7', '8', '9'].indexOf(wfMessage.MessageHeader.ReferenceIndicator) < 0);
});
// 2. Ignore all referencing messages of which the referenced message is ignored
msgSequence = msgSequence.filter(function removeSemanticOldRefsRefFilter(wfRefMessage) {
return (msgSequence.filter(function removeSemanticOldRefsFilter(wfMessage) {
return (
wfRefMessage.MessageHeader.ReferenceIndicator === '0'
|| wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash
);
}).length > 0);
});
// Return resulting cleaned up message sequence array
return msgSequence;
}