'use strict';
/**
* @module lib/blockchains/ethereum/listener
* @summary Whiteflag API Ethereum listener module
* @description Module to connect to the Ethereum network and crawl/listen for transactions
*/
module.exports = {
init: initListener,
scanBlocks
};
/* Common internal functions and classes */
const log = require('../../_common/logger');
const arr = require('../../_common/arrays');
/* Whiteflag modules */
const wfRxEvent = require('../../protocol/events').rxEvent;
const wfState = require('../../protocol/state');
/* Common blockchain functions */
const { determineStartingBlock,
logStartingBlock} = require('../_common/state');
/* Ethereum sub-modules */
const ethRpc = require('./rpc');
const ethTransactions = require('./transactions');
/* Module constants */
const MODULELOG = 'ethereum';
const BLOCKSTACKSIZE = 100;
const BLOCKRETRYDELAY = 5000;
/* Module variables */
let _ethChain;
let _ethState;
let _iterationCount = 0;
let _discoveredBlock = 0;
let _blockCursor = 0;
let _blockInterval = 6000;
let _blockRetrievalRestart = 100;
let _blockRetrievalStart = 0;
let _blockRetrievalEnd = 0;
let _blockStackSize = 0;
let _blockMaxRetries = 0;
let _blockRetryCount = 0;
let _skippedBlocks = 0;
let _transactionBatchSize = 64;
/**
* Initialises Ethereum blockchain listener
* @function initListener
* @alias module:lib/blockchains/ethereum/listener.init
* @param {Object} ethConfig the Ethereum blockchain configuration
* @param {Object} ethState the Ethereum blockchain state
* @returns {Promise} resolve if succesfully initialised
*/
async function initListener(ethConfig, ethState) {
log.trace(MODULELOG, 'Initialising listener for Ethereum blockchain transactions');
_ethChain = ethConfig.name;
_ethState = ethState;
// Block processing parameters
if (Object.hasOwn(ethConfig, 'blockMaxRetries')) _blockMaxRetries = ethConfig.blockMaxRetries;
if (_blockMaxRetries > 0) log.debug(MODULELOG, `Maximum retries for processing a block is set to ${_blockMaxRetries} for each block`);
if (Object.hasOwn(ethConfig, 'transactionBatchSize')) _transactionBatchSize = ethConfig.transactionBatchSize;
log.debug(MODULELOG, `Maximum number of transactions in a block that are processed in parallel is set to ${_transactionBatchSize}`);
// Block interval time
if (Object.hasOwn(ethConfig, 'blockRetrievalInterval') && ethConfig.blockRetrievalInterval > 500) {
_blockInterval = ethConfig.blockRetrievalInterval;
}
log.info(MODULELOG, `Block retrieval interval: ${_blockInterval} ms`);
// Determine block retrieval range
if (Object.hasOwn(ethConfig, 'blockRetrievalRestart')) {
log.debug(MODULELOG, `Number of blocks to look back specified in configuration: ${fnlConfig.blockRetrievalRestart}`);
_blockRetrievalRestart = ethConfig.blockRetrievalRestart;
}
if (Object.hasOwn(ethConfig, 'blockRetrievalStart') && ethConfig.blockRetrievalStart > 0) {
log.info(MODULELOG, `Starting block specified in configuration: ${ethConfig.blockRetrievalStart}`);
_blockRetrievalStart = ethConfig.blockRetrievalStart;
}
if (Object.hasOwn(ethConfig, 'blockRetrievalEnd') && ethConfig.blockRetrievalEnd > 0) {
log.info(MODULELOG, `Ending block specified in configuration: ${ethConfig.blockRetrievalEnd}`);
_blockRetrievalEnd = ethConfig.blockRetrievalEnd;
}
// Determine starting block
try {
_ethState.status.highestBlock = await ethRpc.getHighestBlock();
_discoveredBlock = _ethState.status.highestBlock;
_blockCursor = determineStartingBlock(
_ethState.status.highestBlock,
_ethState.status.currentBlock,
_blockRetrievalStart,
_blockRetrievalRestart
);
logStartingBlock(MODULELOG, _blockCursor, _ethState.status.highestBlock);
} catch(err) {
if (err) return Promise.reject(err);
return Promise.reject(new Error(`Could not connect to ${_ethChain} node and determine starting block`));
}
// Schedule iterative block retrieval
scheduleBlockIteration();
return Promise.resolve();
}
/**
* Scans a block for Whiteflag messages, and triggers to scan next block
* @function scanBlocks
* @alias module:lib/blockchains/ethereum/listener.scanBlocks
* @param {number} cursor the block to scan
* @param {number} endBlock the last block to scan block
* @param {wfMessages[]} [wfMessages] Whiteflag messages processed in an earlier block
* @returns {Promise} resolves if all blocks are successfully processed
*/
function scanBlocks(cursor, endBlock, wfMessages = []) {
return processBlock(cursor)
.then(messages => {
wfMessages = arr.addArray(wfMessages, messages);
let nextBlock = ++cursor;
if (nextBlock > endBlock) return Promise.resolve(wfMessages);
return scanBlocks(nextBlock, endBlock, wfMessages);
})
.catch(err => {
return Promise.reject(err);
});
}
/* PRIVATE LISTENER FUNCTIONS */
/*
* The functions below retrieve Ethereum blockchain data, then the blocks and then for each
* block the transactions. To assure the data is retrieved in a controlled and sequential
* manner, promises are used. When one block has succesfully been processed, the next
* block will be processed. If an error occurs while retrieving a block, the block
* will be retrieved again until successfully processed. To optimize retrieval of data,
* transactions for a single block will be retrieved in a batch, based on the batch number
* configuration parameter.
*/
/**
* Schedules next block retrieval iteration
* @private
*/
function scheduleBlockIteration() {
wfState.updateBlockchainData(_ethChain, _ethState);
setTimeout(executeBlockIteration, _blockInterval);
}
/**
* Schedules next block iteration for retry
* @private
*/
function scheduleBlockRetry() {
wfState.updateBlockchainData(_fnlChain, _fnlState);
setTimeout(executeBlockIteration, BLOCKRETRYDELAY);
}
/**
* Executes block retrieval iteration and re-schedules itself when completed
* @private
*/
function executeBlockIteration() {
_iterationCount += 1;
// Get the actual Ethereum blockchain height
ethRpc.getHighestBlock()
.then(highestBlock => {
// Check if one more new block exists
if (highestBlock > _discoveredBlock) {
_discoveredBlock = highestBlock;
_ethState.status.highestBlock = highestBlock;
log.trace(MODULELOG, `Iteration ${_iterationCount}: New highest block discovered on node is ${_discoveredBlock}`);
}
// Check if highest block is already processed, and schedule next iteration
if (highestBlock === _blockCursor) {
log.trace(MODULELOG, `Iteration ${_iterationCount}: Highest block ${highestBlock} has already been processed`);
return Promise.reject(); // Stop this iteration without error
}
// Current block may be higher than highest block when node is resyncing
if (_blockCursor > highestBlock) {
log.trace(MODULELOG, `Iteration ${_iterationCount}: Current block ${_blockCursor} is higher than highest block ${highestBlock} on node`);
return Promise.reject(); // Stop this iteration withour error
}
// Last block to retrieve is highest block or, if provided, end block
let endBlock = highestBlock;
if (_blockRetrievalEnd > 0 && _blockRetrievalEnd < highestBlock) {
endBlock = _blockRetrievalEnd;
}
// Process new stack of blocks
_blockStackSize = 0;
return processBlocks(_blockCursor, endBlock); // Return new Promise
})
.then(async () => {
// Done processing blocks, continue with the next
if (_blockRetrievalEnd === 0 || _blockCursor < _blockRetrievalEnd) {
return scheduleBlockIteration(); // This iteration is completed
}
// Provided end block is reached
log.info(MODULELOG, `Iteration ${_iterationCount}: Reached configured block retrieval end: ${_blockCursor}`);
_blockRetrievalEnd = 0;
// Dtermine from where to proceed
_blockCursor = determineStartingBlock(
_ethState.status.highestBlock,
_blockCursor,
_blockRetrievalEnd,
_blockRetrievalRestart
);
logStartingBlock(MODULELOG, _blockCursor, _ethState.status.highestBlock);
return scheduleBlockIteration(); // This iteration is completed
})
.catch(err => {
if (err) {
// Schedule next iteration shortly to retry
log.warn(MODULELOG, `Iteration ${_iterationCount}: Could not complete retrieval of block ${_blockCursor}: ${err.message}`);
return scheduleBlockRetry();
}
return scheduleBlockIteration();
});
}
/**
* Processes multiple Ethereum blocks
* @private
* @param {number} startBlock the block after which the next blocks are processed
* @param {number} endBlock the block up to which blocks should be processed
* @returns {Promise} resolves to Error or null when completed
*/
async function processBlocks(startBlock, endBlock) {
// Stack callbacks to a maximum
_blockStackSize += 1;
if (_blockStackSize > BLOCKSTACKSIZE) {
log.trace(MODULELOG, `Iteration ${_iterationCount}: Reached maximum block processing stack size: ${_blockStackSize}`);
return Promise.resolve(); // Completed for now
}
// If current block is last block, then there is no new block to retrieve
if (startBlock === endBlock) {
log.trace(MODULELOG, `Iteration ${_iterationCount}: Completed iteration upon reaching block ${endBlock}`);
return Promise.resolve(); // Completed for now
}
// Block to be processed is the next block after starting block and skipped blocks
if (_skippedBlocks > 0) log.debug(MODULELOG, `Iteration ${_iterationCount}: Skipped ${_skippedBlocks} blocks since block ${startBlock}`);
let thisBlock = startBlock + _skippedBlocks + 1;
// Skip this block if the block has been retried too often
if (_blockMaxRetries > 0 && _blockRetryCount > _blockMaxRetries) {
log.warn(MODULELOG, `Iteration ${_iterationCount}: Skipping block ${thisBlock} after ${_blockMaxRetries} retries`);
thisBlock += 1;
_blockRetryCount = 0;
_skippedBlocks += 1;
}
// Log where we are with blocks
if (_blockRetryCount !== 0) log.debug(MODULELOG, `Iteration ${_iterationCount}: Retry ${_blockRetryCount} to process block: ${thisBlock}`);
log.trace(MODULELOG, `Iteration ${_iterationCount}: Retrieving block ${_blockStackSize}: ${thisBlock}`);
// Retrieve new block
try {
await processBlock(thisBlock);
} catch(err) {
_blockRetryCount += 1;
return Promise.reject(err); // Retry this block
}
// Completed this block
_blockCursor = thisBlock;
_blockRetryCount = 0;
_skippedBlocks = 0;
// Update state
_ethState.status.currentBlock = _blockCursor;
wfState.updateBlockchainData(_ethChain, _ethState);
// Get next block if not yet at last block
if (thisBlock < endBlock) return processBlocks(_blockCursor, endBlock); // Process next block
return Promise.resolve(); // Completed all blocks
}
/**
* Processes a single Ethereum block
* @private
* @param {number} blockNumber the block to be processed
* @returns {Promise} resolves if the block is succesfully processed
*/
async function processBlock(blockNumber) {
// Get block from node
let block;
try {
block = await ethRpc.getBlockByNumber(blockNumber);
} catch(err) {
return Promise.reject(new Error(`Could not retrieve block ${blockNumber}: ${err.message}`));
}
// Check transactions in block
let transactionCount = block.transactions.length;
if (!block || transactionCount < 1) {
// Rare case, but in certain instances there could be no transactions in the block
log.info(MODULELOG, `No transactions in block: ${blockNumber}`);
return Promise.resolve(); // Completed this block
}
// Retrieve transactions from block
log.trace(MODULELOG, `Transactions discovered in block ${blockNumber}: ${transactionCount}`);
return processTransactions(block.timestamp, block.transactions)
.then((wfMessages) => {
return Promise.resolve(wfMessages.filter(message => {
return message !== null &&
message !== '' &&
message !== undefined;
}
));
})
.then((wfMessages) => {
log.info(MODULELOG, `Found ${wfMessages.length} Whiteflag messages in ${transactionCount} transactions in block ${blockNumber}`);
return Promise.resolve(wfMessages); // Completed this block
})
.catch(err => {
if (!err) return Promise.reject(new Error(`Could not process block ${blockNumber}`));
return Promise.reject(new Error(`Could not process block ${blockNumber}: ${err.message}`));
});
}
/**
* Processes the transactions of an Ethereum block
* @private
* @param {number} blockTime the block timestamp
* @param {Array} transactions the transactions to process
* @param {number} [index] the first transaction in the array to process
* @param {Array} [messages] messages processes in an earlier batch
* @returns {Promise} resolves if all transactions are successfully processed
*/
function processTransactions(blockTime, transactions, index = 0, messages = []) {
// Get transaction batch of Promises in an array
let transactionBatch = createTransactionBatch(blockTime, transactions, index);
if (transactionBatch.length < 1) return Promise.resolve();
// Resolve all transaction promises in the batch
return Promise.all(transactionBatch)
.then(wfMessages => {
messages = arr.addArray(messages, wfMessages);
// Next batch
let nextIndex = index + _transactionBatchSize;
if (nextIndex >= transactions.length) return Promise.resolve(messages);
return processTransactions(blockTime, transactions, nextIndex, messages);
})
.catch(err => {
return Promise.reject(err);
});
}
/**
* Combines multiple transactions from an Ethereum block as promises in an array for batch processing
* @private
* @param {number} index the first transaction in the array to process
* @param {Array} transactions the transactions to process
* @param {number} blockTime the block timestamp
* @returns {Array} Array with transaction Promises
*/
function createTransactionBatch(blockTime, transactions, index) {
let transactionBatch = [];
for (
let i = index;
i < Math.min(index + _transactionBatchSize, transactions.length);
i++
) {
// Get a promise for the next transaction
let transactionHash = transactions[i];
transactionBatch.push(
// Whiteflag message is extracted by resolving the promise
ethRpc.getRawTransaction(transactionHash)
.then(transaction => {
if (!transaction) return log.warn(MODULELOG, `No data received for transaction: ${transactionHash}`);
return ethTransactions.extractMessage(transaction, blockTime);
})
.then(wfMessage => {
log.trace(MODULELOG, `Received Whiteflag message: ${JSON.stringify(wfMessage.MetaHeader)}`);
wfRxEvent.emit('messageReceived', wfMessage);
return resolve(wfMessage);
})
.catch(err => {
if (err.code === 'WF_API_NO_DATA') return Promise.resolve(); // No Whiteflag message in transaction
return Promise.reject(new Error(`Could not process transaction ${transactionHash}: ${err.message}`)); // Other error
})
);
}
return transactionBatch;
}