'use strict'; const EventEmitter = require('events'); const PoolResource = require('./pool-resource'); const SMTPConnection = require('../smtp-connection'); const wellKnown = require('../well-known'); const shared = require('../shared'); const packageData = require('../../package.json'); /** * Creates a SMTP pool transport object for Nodemailer * * @constructor * @param {Object} options SMTP Connection options */ class SMTPPool extends EventEmitter { constructor(options) { super(); options = options || {}; if (typeof options === 'string') { options = { url: options }; } let urlData; let service = options.service; if (typeof options.getSocket === 'function') { this.getSocket = options.getSocket; } if (options.url) { urlData = shared.parseConnectionUrl(options.url); service = service || urlData.service; } this.options = shared.assign( false, // create new object options, // regular options urlData, // url options service && wellKnown(service) // wellknown options ); this.options.maxConnections = this.options.maxConnections || 5; this.options.maxMessages = this.options.maxMessages || 100; this.logger = shared.getLogger(this.options, { component: this.options.component || 'smtp-pool' }); // temporary object let connection = new SMTPConnection(this.options); this.name = 'SMTP (pool)'; this.version = packageData.version + '[client:' + connection.version + ']'; this._rateLimit = { counter: 0, timeout: null, waiting: [], checkpoint: false, delta: Number(this.options.rateDelta) || 1000, limit: Number(this.options.rateLimit) || 0 }; this._closed = false; this._queue = []; this._connections = []; this._connectionCounter = 0; this.idling = true; setImmediate(() => { if (this.idling) { this.emit('idle'); } }); } /** * Placeholder function for creating proxy sockets. This method immediatelly returns * without a socket * * @param {Object} options Connection options * @param {Function} callback Callback function to run with the socket keys */ getSocket(options, callback) { // return immediatelly return setImmediate(() => callback(null, false)); } /** * Queues an e-mail to be sent using the selected settings * * @param {Object} mail Mail object * @param {Function} callback Callback function */ send(mail, callback) { if (this._closed) { return false; } this._queue.push({ mail, requeueAttempts: 0, callback }); if (this.idling && this._queue.length >= this.options.maxConnections) { this.idling = false; } setImmediate(() => this._processMessages()); return true; } /** * Closes all connections in the pool. If there is a message being sent, the connection * is closed later */ close() { let connection; let len = this._connections.length; this._closed = true; // clear rate limit timer if it exists clearTimeout(this._rateLimit.timeout); if (!len && !this._queue.length) { return; } // remove all available connections for (let i = len - 1; i >= 0; i--) { if (this._connections[i] && this._connections[i].available) { connection = this._connections[i]; connection.close(); this.logger.info( { tnx: 'connection', cid: connection.id, action: 'removed' }, 'Connection #%s removed', connection.id ); } } if (len && !this._connections.length) { this.logger.debug( { tnx: 'connection' }, 'All connections removed' ); } if (!this._queue.length) { return; } // make sure that entire queue would be cleaned let invokeCallbacks = () => { if (!this._queue.length) { this.logger.debug( { tnx: 'connection' }, 'Pending queue entries cleared' ); return; } let entry = this._queue.shift(); if (entry && typeof entry.callback === 'function') { try { entry.callback(new Error('Connection pool was closed')); } catch (E) { this.logger.error( { err: E, tnx: 'callback', cid: connection.id }, 'Callback error for #%s: %s', connection.id, E.message ); } } setImmediate(invokeCallbacks); }; setImmediate(invokeCallbacks); } /** * Check the queue and available connections. If there is a message to be sent and there is * an available connection, then use this connection to send the mail */ _processMessages() { let connection; let i, len; // do nothing if already closed if (this._closed) { return; } // do nothing if queue is empty if (!this._queue.length) { if (!this.idling) { // no pending jobs this.idling = true; this.emit('idle'); } return; } // find first available connection for (i = 0, len = this._connections.length; i < len; i++) { if (this._connections[i].available) { connection = this._connections[i]; break; } } if (!connection && this._connections.length < this.options.maxConnections) { connection = this._createConnection(); } if (!connection) { // no more free connection slots available this.idling = false; return; } // check if there is free space in the processing queue if (!this.idling && this._queue.length < this.options.maxConnections) { this.idling = true; this.emit('idle'); } let entry = (connection.queueEntry = this._queue.shift()); entry.messageId = (connection.queueEntry.mail.message.getHeader('message-id') || '').replace(/[<>\s]/g, ''); connection.available = false; this.logger.debug( { tnx: 'pool', cid: connection.id, messageId: entry.messageId, action: 'assign' }, 'Assigned message <%s> to #%s (%s)', entry.messageId, connection.id, connection.messages + 1 ); if (this._rateLimit.limit) { this._rateLimit.counter++; if (!this._rateLimit.checkpoint) { this._rateLimit.checkpoint = Date.now(); } } connection.send(entry.mail, (err, info) => { // only process callback if current handler is not changed if (entry === connection.queueEntry) { try { entry.callback(err, info); } catch (E) { this.logger.error( { err: E, tnx: 'callback', cid: connection.id }, 'Callback error for #%s: %s', connection.id, E.message ); } connection.queueEntry = false; } }); } /** * Creates a new pool resource */ _createConnection() { let connection = new PoolResource(this); connection.id = ++this._connectionCounter; this.logger.info( { tnx: 'pool', cid: connection.id, action: 'conection' }, 'Created new pool resource #%s', connection.id ); // resource comes available connection.on('available', () => { this.logger.debug( { tnx: 'connection', cid: connection.id, action: 'available' }, 'Connection #%s became available', connection.id ); if (this._closed) { // if already closed run close() that will remove this connections from connections list this.close(); } else { // check if there's anything else to send this._processMessages(); } }); // resource is terminated with an error connection.once('error', err => { if (err.code !== 'EMAXLIMIT') { this.logger.error( { err, tnx: 'pool', cid: connection.id }, 'Pool Error for #%s: %s', connection.id, err.message ); } else { this.logger.debug( { tnx: 'pool', cid: connection.id, action: 'maxlimit' }, 'Max messages limit exchausted for #%s', connection.id ); } if (connection.queueEntry) { try { connection.queueEntry.callback(err); } catch (E) { this.logger.error( { err: E, tnx: 'callback', cid: connection.id }, 'Callback error for #%s: %s', connection.id, E.message ); } connection.queueEntry = false; } // remove the erroneus connection from connections list this._removeConnection(connection); this._continueProcessing(); }); connection.once('close', () => { this.logger.info( { tnx: 'connection', cid: connection.id, action: 'closed' }, 'Connection #%s was closed', connection.id ); this._removeConnection(connection); if (connection.queueEntry) { // If the connection closed when sending, add the message to the queue again // if max number of requeues is not reached yet // Note that we must wait a bit.. because the callback of the 'error' handler might be called // in the next event loop setTimeout(() => { if (connection.queueEntry) { if (this._shouldRequeuOnConnectionClose(connection.queueEntry)) { this._requeueEntryOnConnectionClose(connection); } else { this._failDeliveryOnConnectionClose(connection); } } this._continueProcessing(); }, 50); } else { this._continueProcessing(); } }); this._connections.push(connection); return connection; } _shouldRequeuOnConnectionClose(queueEntry) { if (this.options.maxRequeues === undefined || this.options.maxRequeues < 0) { return true; } return queueEntry.requeueAttempts && queueEntry.requeueAttempts < this.options.maxRequeues; } _failDeliveryOnConnectionClose(connection) { if (connection.queueEntry && connection.queueEntry.callback) { try { connection.queueEntry.callback(new Error('Reached maximum number of retries after connection was closed')); } catch (E) { this.logger.error( { err: E, tnx: 'callback', messageId: connection.queueEntry.messageId, cid: connection.id }, 'Callback error for #%s: %s', connection.id, E.message ); } connection.queueEntry = false; } } _requeueEntryOnConnectionClose(connection) { connection.queueEntry.requeueAttempts = connection.queueEntry.requeueAttempts + 1; this.logger.debug( { tnx: 'pool', cid: connection.id, messageId: connection.queueEntry.messageId, action: 'requeue' }, 'Re-queued message <%s> for #%s. Attempt: #%s', connection.queueEntry.messageId, connection.id, connection.queueEntry.requeueAttempts ); this._queue.unshift(connection.queueEntry); connection.queueEntry = false; } /** * Continue to process message if the pool hasn't closed */ _continueProcessing() { if (this._closed) { this.close(); } else { setTimeout(() => this._processMessages(), 100); } } /** * Remove resource from pool * * @param {Object} connection The PoolResource to remove */ _removeConnection(connection) { let index = this._connections.indexOf(connection); if (index !== -1) { this._connections.splice(index, 1); } } /** * Checks if connections have hit current rate limit and if so, queues the availability callback * * @param {Function} callback Callback function to run once rate limiter has been cleared */ _checkRateLimit(callback) { if (!this._rateLimit.limit) { return callback(); } let now = Date.now(); if (this._rateLimit.counter < this._rateLimit.limit) { return callback(); } this._rateLimit.waiting.push(callback); if (this._rateLimit.checkpoint <= now - this._rateLimit.delta) { return this._clearRateLimit(); } else if (!this._rateLimit.timeout) { this._rateLimit.timeout = setTimeout(() => this._clearRateLimit(), this._rateLimit.delta - (now - this._rateLimit.checkpoint)); this._rateLimit.checkpoint = now; } } /** * Clears current rate limit limitation and runs paused callback */ _clearRateLimit() { clearTimeout(this._rateLimit.timeout); this._rateLimit.timeout = null; this._rateLimit.counter = 0; this._rateLimit.checkpoint = false; // resume all paused connections while (this._rateLimit.waiting.length) { let cb = this._rateLimit.waiting.shift(); setImmediate(cb); } } /** * Returns true if there are free slots in the queue */ isIdle() { return this.idling; } /** * Verifies SMTP configuration * * @param {Function} callback Callback function */ verify(callback) { let promise; if (!callback) { promise = new Promise((resolve, reject) => { callback = shared.callbackPromise(resolve, reject); }); } let auth = new PoolResource(this).auth; this.getSocket(this.options, (err, socketOptions) => { if (err) { return callback(err); } let options = this.options; if (socketOptions && socketOptions.connection) { this.logger.info( { tnx: 'proxy', remoteAddress: socketOptions.connection.remoteAddress, remotePort: socketOptions.connection.remotePort, destHost: options.host || '', destPort: options.port || '', action: 'connected' }, 'Using proxied socket from %s:%s to %s:%s', socketOptions.connection.remoteAddress, socketOptions.connection.remotePort, options.host || '', options.port || '' ); options = shared.assign(false, options); Object.keys(socketOptions).forEach(key => { options[key] = socketOptions[key]; }); } let connection = new SMTPConnection(options); let returned = false; connection.once('error', err => { if (returned) { return; } returned = true; connection.close(); return callback(err); }); connection.once('end', () => { if (returned) { return; } returned = true; return callback(new Error('Connection closed')); }); let finalize = () => { if (returned) { return; } returned = true; connection.quit(); return callback(null, true); }; connection.connect(() => { if (returned) { return; } if (auth && connection.allowsAuth) { connection.login(auth, err => { if (returned) { return; } if (err) { returned = true; connection.close(); return callback(err); } finalize(); }); } else { finalize(); } }); }); return promise; } } // expose to the world module.exports = SMTPPool;