'use strict';
/**
* @module lib/blockchains/fennel/listener
* @summary Whiteflag API Fennel listener module
* @description Module to connect to the Fennel parachain and crawl/listen for transactions
* @todo Improve performance
*/
module.exports = {
init: initListener,
scanBlocks
};
/* Common internal functions and classes */
const log = require('../../_common/logger');
const arr = require('../../_common/arrays');
const { noHexPrefix } = require('../../_common/format');
/* Whiteflag modules */
const wfRxEvent = require('../../protocol/events').rxEvent;
const wfState = require('../../protocol/state');
/* Common blockchain functions */
const { determineStartingBlock,
logStartingBlock } = require('../_common/state');
/* Fennel sub-modules */
const fnlRpc = require('./rpc');
const fnlTransactions = require('./transactions');
/* Module constants */
const MODULELOG = 'fennel';
const BATCHDELAY = 50;
const MAXBATCHSIZE = 100;
const BLOCKRETRYDELAY = 10000;
const TIMESECTION = 'timestamp';
const TIMEMETHOD = 'set';
const WFMSGSECTION = 'signal';
const WFMSGMETHOD = 'sendSignal';
/* Module variables */
let _fnlChain;
let _fnlState;
let _traceRaw = false;
let _iterationCount = 0;
let _blockCursor = 0;
let _endBlock = 0;
let _blockInterval = 6000;
let _blockRetrievalRestart = 100;
let _blockRetrievalStart = 0;
let _blockRetrievalEnd = 0;
let _blockMaxRetries = 0;
let _blockBatchSize = 3;
let _batchRetryCount = 0;
/**
* Initiates the listener for Fennel blockchain transactions
* @function initListener
* @alias module:lib/blockchains/fennel/listener.init
* @param {Object} fnlConfig the Fennel blockchain configuration
* @param {Object} fnlState the Fennel blockchain state
* @param {Object} [fnlApi] the Fennel API
* @returns {Promise} resolve if succesfully initialised
*/
async function initListener(fnlConfig, fnlState, fnlApi = null) {
log.trace(MODULELOG, 'Initialising listener for Fennel blockchain transactions');
_fnlChain = fnlConfig.name;
_fnlState = fnlState;
// Trace all transactions if configured
if (fnlConfig.traceRawTransaction) _traceRaw = fnlConfig.traceRawTransaction;
// Block processing parameters
if (Object.hasOwn(fnlConfig, 'blockBatchSize')) _blockBatchSize = fnlConfig.blockBatchSize;
if (_blockBatchSize > MAXBATCHSIZE) _blockBatchSize = MAXBATCHSIZE;
log.debug(MODULELOG, `Maximum number of blocks that can be processed in parallel is set to ${_blockBatchSize}`);
if (Object.hasOwn(fnlConfig, 'blockMaxRetries')) _blockMaxRetries = fnlConfig.blockMaxRetries;
if (_blockMaxRetries > 0) log.debug(MODULELOG, `Maximum retries for processing a block is set to ${_blockMaxRetries} for each block`);
// Block interval time
if (Object.hasOwn(fnlConfig, 'blockRetrievalInterval') && fnlConfig.blockRetrievalInterval > 500) {
_blockInterval = fnlConfig.blockRetrievalInterval;
}
log.info(MODULELOG, `Block retrieval interval is set to ${_blockInterval} ms`);
// Determine block retrieval range
if (Object.hasOwn(fnlConfig, 'blockRetrievalRestart')) {
log.debug(MODULELOG, `Number of blocks to look back specified in configuration: ${fnlConfig.blockRetrievalRestart}`);
_blockRetrievalRestart = fnlConfig.blockRetrievalRestart;
}
if (Object.hasOwn(fnlConfig, 'blockRetrievalStart') && fnlConfig.blockRetrievalStart > 0) {
log.info(MODULELOG, `Starting block specified in configuration: ${fnlConfig.blockRetrievalStart}`);
_blockRetrievalStart = fnlConfig.blockRetrievalStart;
}
if (Object.hasOwn(fnlConfig, 'blockRetrievalEnd') && fnlConfig.blockRetrievalEnd > 0) {
log.info(MODULELOG, `Ending block specified in configuration: ${fnlConfig.blockRetrievalEnd}`);
_blockRetrievalEnd = fnlConfig.blockRetrievalEnd;
}
// Check if connected via RPC or API
if (!fnlApi) {
log.warn(MODULELOG, 'Cannot start block listener because only limited web RPC functions available');
return Promise.resolve();
}
// Determine starting block
try {
_fnlState.status.highestBlock = await fnlRpc.getHighestBlock();
_blockCursor = determineStartingBlock(
_fnlState.status.highestBlock,
_fnlState.status.currentBlock,
_blockRetrievalStart,
_blockRetrievalRestart
);
logStartingBlock(MODULELOG, _blockCursor, _fnlState.status.highestBlock);
} catch(err) {
if (err) return Promise.reject(err);
return Promise.reject(new Error(`Could not connect to ${_fnlChain} node and determine starting block`));
}
// Schedule iterative block retrieval
scheduleNextIteration();
return Promise.resolve();
}
/**
* Scans a block for Whiteflag messages, and triggers to scan next block
* @function scanBlocks
* @alias module:lib/blockchains/fennel/listener.scanBlocks
* @param {number} cursor the block to scan
* @param {number} endBlock the last block to scan block
* @param {wfMessages[]} [wfMessages] Whiteflag messages processed in an earlier block
* @returns {Promise} resolves if all blocks are successfully processed
*/
function scanBlocks(cursor, endBlock, wfMessages = []) {
return processBlock(cursor)
.then(messages => {
wfMessages = arr.addArray(wfMessages, messages);
let nextBlock = ++cursor;
if (nextBlock > endBlock) return Promise.resolve(wfMessages);
return scanBlocks(nextBlock, endBlock, wfMessages);
})
.catch(err => {
return Promise.reject(err);
});
}
/* PRIVATE BLOCKCHAIN LISTENER FUNCTIONS */
/**
* Schedules next block retrieval iteration
* @private
*/
function scheduleNextIteration() {
wfState.updateBlockchainData(_fnlChain, _fnlState);
log.trace(MODULELOG, `Scheduling block iteration ${(_iterationCount + 1)} in ${_blockInterval} ms`)
setTimeout(executeBlockIteration, _blockInterval);
}
/**
* Schedules next block iteration for retry
* @private
*/
function scheduleImmediateRetry() {
wfState.updateBlockchainData(_fnlChain, _fnlState);
log.trace(MODULELOG, `Scheduling block iteration ${(_iterationCount + 1)} in ${BLOCKRETRYDELAY} ms`)
setTimeout(executeBlockIteration, BLOCKRETRYDELAY, true);
}
/**
* Schedules next block iteration for retry
* @private
*/
function scheduleImmediateIteration() {
wfState.updateBlockchainData(_fnlChain, _fnlState);
log.trace(MODULELOG, `Scheduling block iteration ${(_iterationCount + 1)} in ${BATCHDELAY} ms`)
setTimeout(executeBlockIteration, BATCHDELAY, true);
}
/**
* Executes block retrieval iteration and re-schedules itself when completed
* @param {boolean} [immediate] proces next block batch immediately
*/
function executeBlockIteration(immediate = false) {
_iterationCount += 1;
return nextBlockIteration(immediate)
.then((immediate) => {
// Prcesses next batch immediately
if (immediate) {
return scheduleImmediateIteration(immediate);
}
// Done processing blocks, continue with the next
if (_blockRetrievalEnd === 0 || _blockCursor < _blockRetrievalEnd) {
return scheduleNextIteration();
}
// Provided end block is reached
log.info(MODULELOG, `Iteration ${_iterationCount}: Reached configured block retrieval end: ${_blockCursor}`);
_blockRetrievalEnd = 0;
// Dtermine from where to proceed
_blockCursor = determineStartingBlock(
_fnlState.status.highestBlock,
_blockCursor,
_blockRetrievalEnd,
_blockRetrievalRestart
);
logStartingBlock(MODULELOG, _blockCursor, _fnlState.status.highestBlock);
return scheduleNextIteration();
})
.catch(err => {
if (err) {
log.warn(MODULELOG, `Iteration ${_iterationCount}: ${err.message}`)
return scheduleImmediateRetry();
}
return scheduleNextIteration();
});
}
/**
* Performs the block iteration by starting the batch processing of blocks
* @param {boolean} [immediate] proces next block batch immediately
* @private
*/
function nextBlockIteration(immediate = false) {
// Prcesses batch immediately
if (immediate) {
return processBlocks(_blockCursor, _endBlock);
}
// Get the actual Fennel blockchain height
return fnlRpc.getHighestBlock()
.then(highestBlock => {
// Check if one more new block exists
if (highestBlock > _fnlState.status.highestBlock) {
log.trace(MODULELOG, `Iteration ${_iterationCount}: New highest block discovered on node is ${highestBlock}`);
_fnlState.status.highestBlock = highestBlock;
}
// Current block may be higher than highest block when node is resyncing
if (_blockCursor >= highestBlock) {
log.trace(MODULELOG, `Iteration ${_iterationCount}: Waiting for new highest block`);
return Promise.reject();
}
// Retrieve until highest block or, if provided, end block
if (_blockRetrievalEnd > 0 && _blockRetrievalEnd < highestBlock) {
_endBlock = _blockRetrievalEnd;
} else {
_endBlock = highestBlock - 1;
}
// Process new block batch
return processBlocks(_blockCursor, _endBlock);
})
.catch(err => {
return Promise.reject(err);
})
}
/**
* Processes multiple Fennel blocks
* @private
* @returns {Promise} resolves when batch is completed
*/
function processBlocks() {
// Check start and end block for this batch
if (_blockCursor > _endBlock) return Promise.reject();
// Get batch exact batch
const batchStart = _blockCursor;
const batchEnd = Math.min(batchStart + _blockBatchSize - 1, _endBlock);
const batchSize = (batchEnd - batchStart + 1);
// Create and execute batch
let blockBatch = [];
for (let b = batchStart; b <= batchEnd; b++) {
blockBatch.push(processBlock(b));
}
return Promise.all(blockBatch)
.then(() => {
if (batchSize === 1) {
log.debug(MODULELOG, `Iteration ${_iterationCount}: Processed ${batchSize} block: ${batchEnd}`);
} else {
log.debug(MODULELOG, `Iteration ${_iterationCount}: Processed ${batchSize} blocks: ${batchStart} through ${batchEnd}`);
}
// Update state
_fnlState.status.currentBlock = _blockCursor;
_blockCursor = batchEnd + 1;
return Promise.resolve(true); // Process next batch immediately
})
.catch(err => {
if (_blockMaxRetries > 0 && _batchRetryCount > _blockMaxRetries) {
log.warn(MODULELOG, `Iteration ${_iterationCount}: Skipping blocks ${batchStart} through ${batchEnd} after ${_blockMaxRetries} retries`);
_batchRetryCount = 0;
_blockCursor = batchEnd + 1;
return Promise.resolve(true); // Process next batch immediately
}
_batchRetryCount += 1;
return Promise.reject(new Error(`Error processing blocks ${batchStart} through ${batchEnd}: ${err.message}`));
});
}
/**
* Processes a single Fennel block
* @private
* @param {number} blockNumber the block to be processed
* @returns {Promise} resolves if the block is succesfully processed
*/
async function processBlock(blockNumber) {
// Get block from node
let block;
try {
block = await fnlRpc.getBlockByNumber(blockNumber, true); // Get full block including extrinsics
} catch(err) {
return Promise.reject(new Error(`Could not retrieve block ${blockNumber}: ${err.message}`));
}
// Check extrinsics in block
let extrinsicCount = block?.extrinsics?.length;
if (!block || extrinsicCount < 1) {
// Rare case, but a block may be empty
log.info(MODULELOG, `No extrinsics in block: ${blockNumber}`);
return Promise.resolve(); // Completed this block
}
log.trace(MODULELOG, `Extrinsics discovered in block ${blockNumber}: ${extrinsicCount}`);
// Process transactions from block
return processTransactions(blockNumber, block)
.then((wfMessages) => {
return Promise.resolve(wfMessages.filter(message => {
return message !== null &&
message !== '' &&
message !== undefined;
}
));
})
.then((wfMessages) => {
log.info(MODULELOG, `Found ${wfMessages.length} Whiteflag messages in ${extrinsicCount} extrinsics in block ${blockNumber}`);
return Promise.resolve(wfMessages); // Completed this block
})
.catch(err => {
if (!err) return Promise.reject(new Error(`Could not process block ${blockNumber}`));
return Promise.reject(new Error(`Could not process block ${blockNumber}: ${err.message}`));
});
}
/**
* Processes all transactions of a Fennel block
* @private
* @param {number} blockNumber the block number
* @param {Object} block the full block including extrinsics
* @returns {Promise} resolves if all extrinsics are successfully processed
*/
function processTransactions(blockNumber, block) {
let blockTime;
let transactionBatch = [];
block.extrinsics.forEach(( ex, index ) => {
if (_traceRaw) log.trace(MODULELOG, `Processing extrinsic ${blockNumber}/${index}: ${JSON.stringify(ex.toHuman())}`)
if (ex.method.method === TIMEMETHOD && ex.method.section === TIMESECTION) {
blockTime = new Date(ex.method.args[0].unwrap().toNumber());
if (_traceRaw) log.trace(MODULELOG, `Found timestamp in extrinsic ${blockNumber}/${index}: ${blockTime}`);
}
if (ex.method.method == WFMSGMETHOD && ex.method.section == WFMSGSECTION) {
if (_traceRaw) log.trace(MODULELOG, `Found signal in extrinsic ${blockNumber}/${index}`);
transactionBatch.push(extractTransaction(ex, [], index, blockNumber, blockTime));
}
});
if (transactionBatch.length < 1) return Promise.resolve([]);
return Promise.all(transactionBatch);
}
/**
* Creates transaction from extrinsic and events
* @private
* @param {Object} ex the extrinsic
* @param {Array} events the events related to the block
* @param {number} index the index of the extrinsic in the block
* @param {number} blockNumber the block number of the transaction
* @param {Date} blockTime the timestamp of the block
* @returns {Promise} transaction details from extrinsic and events
*/
function extractTransaction(ex, events, index, blockNumber, blockTime) {
return new Promise((resolve, reject) => {
let exHash, exData, exEvents;
try {
exHash = ex.hash.toHex();
exData = JSON.parse(ex);
// exEvents = Array
// .from(new Set([...events].map(JSON.stringify)))
// .map(JSON.parse)
// .filter(event => event.phase.applyExtrinsic === index)
// .map(event => {return event.event});
} catch(err) {
return reject(new Error(`Could not get transaction data from extrinsic: ${err.message}`))
}
fnlTransactions.extractMessage({
hash: noHexPrefix(exHash),
block: blockNumber,
index: index,
timestamp: blockTime.toISOString(),
originator: exData.signature.signer.id,
signal: noHexPrefix(exData.method.args.signal),
})
.then(wfMessage => {
log.trace(MODULELOG, `Received Whiteflag message: ${JSON.stringify(wfMessage.MetaHeader)}`);
wfRxEvent.emit('messageReceived', wfMessage);
return resolve(wfMessage);
})
.catch(err => {
if (err.code === 'WF_API_NO_DATA') return resolve(); // No Whiteflag message in transaction
return reject(new Error(`Could not process extrinsic ${blockNumber}/${index}: ${err.message}`)); // Other error
});
});
}