'use strict'; // FIXME: // replace this Transform mess with a method that pipes input argument to output argument const MessageParser = require('./message-parser'); const RelaxedBody = require('./relaxed-body'); const sign = require('./sign'); const PassThrough = require('stream').PassThrough; const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const DKIM_ALGO = 'sha256'; const MAX_MESSAGE_SIZE = 128 * 1024; // buffer messages larger than this to disk /* // Usage: let dkim = new DKIM({ domainName: 'example.com', keySelector: 'key-selector', privateKey, cacheDir: '/tmp' }); dkim.sign(input).pipe(process.stdout); // Where inputStream is a rfc822 message (either a stream, string or Buffer) // and outputStream is a DKIM signed rfc822 message */ class DKIMSigner { constructor(options, keys, input, output) { this.options = options || {}; this.keys = keys; this.cacheTreshold = Number(this.options.cacheTreshold) || MAX_MESSAGE_SIZE; this.hashAlgo = this.options.hashAlgo || DKIM_ALGO; this.cacheDir = this.options.cacheDir || false; this.chunks = []; this.chunklen = 0; this.readPos = 0; this.cachePath = this.cacheDir ? path.join(this.cacheDir, 'message.' + Date.now() + '-' + crypto.randomBytes(14).toString('hex')) : false; this.cache = false; this.headers = false; this.bodyHash = false; this.parser = false; this.relaxedBody = false; this.input = input; this.output = output; this.output.usingCache = false; this.errored = false; this.input.on('error', err => { this.errored = true; this.cleanup(); output.emit('error', err); }); } cleanup() { if (!this.cache || !this.cachePath) { return; } fs.unlink(this.cachePath, () => false); } createReadCache() { // pipe remainings to cache file this.cache = fs.createReadStream(this.cachePath); this.cache.once('error', err => { this.cleanup(); this.output.emit('error', err); }); this.cache.once('close', () => { this.cleanup(); }); this.cache.pipe(this.output); } sendNextChunk() { if (this.errored) { return; } if (this.readPos >= this.chunks.length) { if (!this.cache) { return this.output.end(); } return this.createReadCache(); } let chunk = this.chunks[this.readPos++]; if (this.output.write(chunk) === false) { return this.output.once('drain', () => { this.sendNextChunk(); }); } setImmediate(() => this.sendNextChunk()); } sendSignedOutput() { let keyPos = 0; let signNextKey = () => { if (keyPos >= this.keys.length) { this.output.write(this.parser.rawHeaders); return setImmediate(() => this.sendNextChunk()); } let key = this.keys[keyPos++]; let dkimField = sign(this.headers, this.hashAlgo, this.bodyHash, { domainName: key.domainName, keySelector: key.keySelector, privateKey: key.privateKey, headerFieldNames: this.options.headerFieldNames, skipFields: this.options.skipFields }); if (dkimField) { this.output.write(Buffer.from(dkimField + '\r\n')); } return setImmediate(signNextKey); }; if (this.bodyHash && this.headers) { return signNextKey(); } this.output.write(this.parser.rawHeaders); this.sendNextChunk(); } createWriteCache() { this.output.usingCache = true; // pipe remainings to cache file this.cache = fs.createWriteStream(this.cachePath); this.cache.once('error', err => { this.cleanup(); // drain input this.relaxedBody.unpipe(this.cache); this.relaxedBody.on('readable', () => { while (this.relaxedBody.read() !== null) { // do nothing } }); this.errored = true; // emit error this.output.emit('error', err); }); this.cache.once('close', () => { this.sendSignedOutput(); }); this.relaxedBody.removeAllListeners('readable'); this.relaxedBody.pipe(this.cache); } signStream() { this.parser = new MessageParser(); this.relaxedBody = new RelaxedBody({ hashAlgo: this.hashAlgo }); this.parser.on('headers', value => { this.headers = value; }); this.relaxedBody.on('hash', value => { this.bodyHash = value; }); this.relaxedBody.on('readable', () => { let chunk; if (this.cache) { return; } while ((chunk = this.relaxedBody.read()) !== null) { this.chunks.push(chunk); this.chunklen += chunk.length; if (this.chunklen >= this.cacheTreshold && this.cachePath) { return this.createWriteCache(); } } }); this.relaxedBody.on('end', () => { if (this.cache) { return; } this.sendSignedOutput(); }); this.parser.pipe(this.relaxedBody); setImmediate(() => this.input.pipe(this.parser)); } } class DKIM { constructor(options) { this.options = options || {}; this.keys = [].concat( this.options.keys || { domainName: options.domainName, keySelector: options.keySelector, privateKey: options.privateKey } ); } sign(input, extraOptions) { let output = new PassThrough(); let inputStream = input; let writeValue = false; if (Buffer.isBuffer(input)) { writeValue = input; inputStream = new PassThrough(); } else if (typeof input === 'string') { writeValue = Buffer.from(input); inputStream = new PassThrough(); } let options = this.options; if (extraOptions && Object.keys(extraOptions).length) { options = {}; Object.keys(this.options || {}).forEach(key => { options[key] = this.options[key]; }); Object.keys(extraOptions || {}).forEach(key => { if (!(key in options)) { options[key] = extraOptions[key]; } }); } let signer = new DKIMSigner(options, this.keys, inputStream, output); setImmediate(() => { signer.signStream(); if (writeValue) { setImmediate(() => { inputStream.end(writeValue); }); } }); return output; } } module.exports = DKIM;