Source: protocol/retrieve.js

'use strict';
/**
 * @module lib/protocol/retrieve
 * @summary Whiteflag message retrieval module
 * @description Module for Whiteflag message retrieval from datastores and blockchains
 * @tutorial modules
 * @tutorial protocol
 */
module.exports = {
    // Message retrieval functions
    getMessage,
    getQuery,
    getReferences,
    getAuthMessages,
    getSequence,
    test: {
        refRef: removeSupersededMessages,
        semRef: removeSemanticReferences
    }
};

// Whiteflag common functions and classes //
const log = require('../common/logger');
const array = require('../common/arrays');
const { ProcessingError } = require('../common/errors');

// Whiteflag modules //
const wfApiBlockchains = require('../blockchains');
const wfApiDatastores = require('../datastores');
const wfCodec = require('./codec');

 // Whiteflag event emitters //
const wfRxEvent = require('./events').rxEvent;

// Module constants //
const MODULELOG = 'retrieve';
const AUTHMESSAGECODE = 'A';

// MAIN MODULE FUNCTIONS //
/**
 * Retrieves a message from database or blockchain
 * @function getMessage
 * @alias lib/protocol/retrieve.getMessage
 * @param {string} transactionHash the hash of the transaction to look up
 * @param {string} blockchain the blockchain on which the transaction is stored
 * @param {function(Error, wfMessages)} callback function passed to retrieve function to be called upon completion
 */
function getMessage(transactionHash = '0', blockchain = null, callback) {
    let wfQuery = {};
    wfQuery['MetaHeader.transactionHash'] = transactionHash;
    if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;
    return retrieveMessages(wfQuery, blockchain, callback);
}

/**
 * Retrieves a message from database or blockchain
 * @function getQuery
 * @alias module:lib/protocol/retrieve.getQuery
 * @param {Object} wfQuery the query to be performed
 * @param {function(Error, wfMessages)} callback function passed to retrieve function to be called upon completion
 */
function getQuery(wfQuery = {}, callback) {
    let blockchain = null;
    if (wfQuery['MetaHeader.blockchain']) blockchain = wfQuery['MetaHeader.blockchain'];
    return retrieveMessages(wfQuery, blockchain, callback);
}

/**
 * Retrieves referencing messages from database or blockchain
 * @function getReferences
 * @alias module:lib/protocol/retrieve.getReferences
 * @param {string} transactionHash the transaction hash of the referenced message
 * @param {string} blockchain the blockchain on which the transaction is stored
 * @param {function(Error, wfMessages)} callback function passed to retrieve function to be called upon completion
 */
function getReferences(transactionHash = '0', blockchain = null, callback) {
    let wfQuery = {};
    wfQuery['MessageHeader.ReferencedMessage'] = transactionHash;
    if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;
    return retrieveMessages(wfQuery, blockchain, callback);
}

/**
 * Retrieves authentication messages for a specific blockchain address from database or blockchain
 * @function getAuthMessages
 * @alias module:lib/protocol/retrieve.getAuthMessages
 * @param {string} originatorAddress the originator address of the authentication message
 * @param {string} blockchain the blockchain on which the transaction is stored
 * @param {function(Error, wfMessages)} callback function called upon completion
 * @todo Remove semantic refs and superseded messages from sequence
 */
function getAuthMessages(originatorAddress = '0', blockchain = null, callback) {
    let wfQuery = {};

    // Construct query to get all authentication messages
    wfQuery['MetaHeader.originatorAddress'] = originatorAddress;
    wfQuery['MessageHeader.MessageCode'] = AUTHMESSAGECODE;
    if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;

    // Retrieve and process messages
    retrieveMessages(wfQuery, blockchain, function retrieveAuthMessagesCb(err, authMessageSequence) {
        // Check for errors and if any authentication message was found
        if (err) return callback(err);
        if (authMessageSequence.length === 0) {
            return callback(new ProcessingError('Could not find transaction hash of first message in sequence', null, 'WF_API_NO_DATA'));
        }
        // WIP: Return result, removing semantic refs and superseded messages from sequence
        // WIP: authMessageSequence = removeSemanticReferences(authMessageSequence);
        // WIP: return callback(null, removeSupersededMessages(authMessageSequence));
        return callback(null, authMessageSequence);
    });
}

/**
 * Retrieves and evaluates message sequence starting with the message identified by the transaction hash
 * @function getSequence
 * @alias module:lib/protocol/retrieve.getSequence
 * @param {string} transactionHash the hash of the transaction to look up
 * @param {string} blockchain the blockchain on which the transaction is stored
 * @param {function(Error, wfMessages)} callback function called upon completion
 * @todo Clean up retrieved sequence
 */
function getSequence(transactionHash = '0', blockchain = null, callback) {
    let wfQuery = {};

    // Construct query to get sequence
    wfQuery['MetaHeader.transactionHash'] = transactionHash;
    if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;

    // Call function to retrieve first message of sequence
    retrieveMessages(wfQuery, blockchain, function retrieveSequenceCb(err, wfMessageInitSequence) {
        // Check if transaction hash of message was found
        if (err) return callback(err);
        if (wfMessageInitSequence.length === 0) {
            return callback(new ProcessingError('Could not find transaction hash of first message in sequence', null, 'WF_API_NO_DATA'));
        }
        // Variables needed to process sequence
        let sequenceBeginLength;
        let sequenceRefLookups = [];
        let sequenceProcessedRefLookups = [];

        // Iterate until full sequence has been retrieved
        function iterateSeqence(wfMessageSequence) {
            // Get current length of sequence and transaction hashed that have not been looked up yet
            sequenceBeginLength = wfMessageSequence.length;
            sequenceRefLookups = array.plucksub(wfMessageSequence, 'MetaHeader', 'transactionHash').filter(function retrieveSequenceFilterCb(transactionHash) {
                return (sequenceProcessedRefLookups.indexOf(transactionHash) < 0);
            });
            // Loop through sequence to add referencing messages
            function iterateMessages(i) {
                // Decrease iteration counter
                i -= 1;

                // Construct new query
                wfQuery = {};
                wfQuery['MessageHeader.ReferencedMessage'] = sequenceRefLookups[i];
                if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;

                // Retrieve referencing messages
                retrieveMessages(wfQuery, blockchain, function retrieveSequenceRefMessagesCb(err, refMessages) {
                    if (err) log.error(MODULELOG, `Error retrieving mesages in sequence: ${err.message}`);
                    wfMessageSequence = array.addArray(wfMessageSequence, refMessages);

                    // Remember messages for which references just have been retrieved
                    sequenceProcessedRefLookups.push(sequenceRefLookups[i]);

                    // Check if more references need to be looked up
                    if (i > 0) return iterateMessages(i);

                    // If sequence has grown, remove semantic reference not processed by API, and look for more messages
                    if (wfMessageSequence.length > sequenceBeginLength) {
                        // WIP: return iterateSeqence(removeSemanticReferences(wfMessageSequence));
                        return iterateSeqence(wfMessageSequence);
                    }
                    // WIP: No more referencing messages; clean up sequence and return result
                    // WIP: log.trace(MODULELOG, `Found sequence of ${wfMessageSequence.length} messages: removing superseded messages`);
                    // WIP: return callback(null, removeSupersededMessages(wfMessageSequence));
                    return callback(null, wfMessageSequence);
                });
            }
            iterateMessages(sequenceRefLookups.length);
        }
        iterateSeqence(wfMessageInitSequence);
    });
}

// PRIVATE MODULE FUNCTIONS //
/**
 * Retrieves an array of messages from datastore or blockchain
 * @private
 * @param {Object} wfQuery the query to be performed
 * @param {string} blockchain the blockchain on which the transaction is stored
 * @param {function(Error, wfMessages)} callback function called upon completion
 * @typedef {Array} wfMessages retrieved Whiteflag messages
 */
function retrieveMessages(wfQuery = {}, blockchain = null, callback) {
    // Lookup message in database first - returns an array
    wfApiDatastores.getMessages(wfQuery, function retrieveMessagesDbCb(err, wfMessages, count) {
        if (err) return callback(err, wfMessages);
        if (count === 1) return callback(null, wfMessages);
        if (count > 1) return callback(null, wfMessages);

        // Cannot look on blockchain if no blockchain and transaction hash specified
        if (!blockchain && !wfQuery['MetaHeader.blockchain']) return callback(null, wfMessages);
        if (!wfQuery['MetaHeader.transactionHash']) return callback(null, wfMessages);

        // Lookup message on blockchain and decode - returns single message
        if (blockchain) wfQuery['MetaHeader.blockchain'] = blockchain;
        wfApiBlockchains.getMessage(wfQuery, function retrieveMessagesBcCb(err, wfMessage) {
            if (err) return callback(err, null);
            wfCodec.decode(wfMessage, function retrieveDecodeCb(err, wfMessage, ivMissing) {
                let wfMessages = [ wfMessage ];
                if (err) return callback(err, wfMessages);
                if (ivMissing) return callback(new ProcessingError('Cannot decrypt message without initialisation vector', null, ''), wfMessages);

                // Valid message: process further and return result
                if (wfMessage.MetaHeader.formatValid) wfRxEvent.emit('messageDecoded', wfMessage);
                return callback(null, wfMessages);
            });
        });
    });
}

/**
 * Removes superseded or unneeded messages from a message sequence array
 * @private
 * @param {Array} wfMessageSequence array containing a message sequence
 * @returns {Array} array with superseded messages removed
 * @todo This function needs to be evaluated
 */
function removeSupersededMessages(wfMessageSequence) {
    // 1. Ignore all messages that are recalled
    wfMessageSequence = wfMessageSequence.filter(function removeSuperseded1Filter(wfMessage) {
        return (wfMessageSequence.filter(function removeSuperseded1RefFilter(wfRefMessage) {
            return (
                wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash
                && wfRefMessage.MessageHeader.ReferenceIndicator === '1'
            );
        }).length === 0);
    });
    // 2. Ignore all messages that are updated or expired
    wfMessageSequence = wfMessageSequence.filter(function removeSuperseded24Filter(wfMessage) {
        return (wfMessageSequence.filter(function removeSuperseded24RefFilter(wfRefMessage) {
            return (
                wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash
                && ['2', '4'].indexOf(wfRefMessage.MessageHeader.ReferenceIndicator) > -1
            );
        }).length === 0);
    });
    // 3. Ignore all referencing messages of which the referenced message is ignored, except for updates
    wfMessageSequence = wfMessageSequence.filter(function removeSupersededOldRefsRefFilter(wfRefMessage) {
        return (
            wfRefMessage.MessageHeader.ReferenceIndicator === '0'
            || wfRefMessage.MessageHeader.ReferenceIndicator === '2'
            || wfMessageSequence.filter(function removeSupersededOldRefsFilter(wfMessage) {
                return (wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash);
            }).length > 0
        );
    });
    // Return resulting cleaned up message sequence array
    return wfMessageSequence;
}

/**
 * Removes semantic reference messages (reference code >= 5) from a message sequence array
 * @private
 * @param {Array} wfMessageSequence array containing a message sequence
 * @returns {Array} array with superseded messages removed
 * @todo This function needs to be evaluated
 */
function removeSemanticReferences(wfMessageSequence) {
    // 1. Reference codes 5-9 are ignored
    wfMessageSequence = wfMessageSequence.filter(function removeSemantic56789Filter(wfMessage) {
        return (['5', '6', '7', '8', '9'].indexOf(wfMessage.MessageHeader.ReferenceIndicator) < 0);
    });
    // 2. Ignore all referencing messages of which the referenced message is ignored
    wfMessageSequence = wfMessageSequence.filter(function removeSemanticOldRefsRefFilter(wfRefMessage) {
        return (wfMessageSequence.filter(function removeSemanticOldRefsFilter(wfMessage) {
            return (
                wfRefMessage.MessageHeader.ReferenceIndicator === '0'
                || wfRefMessage.MessageHeader.ReferencedMessage === wfMessage.MetaHeader.transactionHash
            );
        }).length > 0);
    });
    // Return resulting cleaned up message sequence array
    return wfMessageSequence;
}