/*! * ws: a node.js websocket client * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com> * MIT Licensed */ var util = require('util') , Validation = require('./Validation').Validation , ErrorCodes = require('./ErrorCodes') , BufferPool = require('./BufferPool') , bufferUtil = require('./BufferUtil').BufferUtil , PerMessageDeflate = require('./PerMessageDeflate'); /** * HyBi Receiver implementation */ function Receiver (extensions) { if (this instanceof Receiver === false) { throw new TypeError("Classes can't be function-called"); } // memory pool for fragmented messages var fragmentedPoolPrevUsed = -1; this.fragmentedBufferPool = new BufferPool(1024, function(db, length) { return db.used + length; }, function(db) { return fragmentedPoolPrevUsed = fragmentedPoolPrevUsed >= 0 ? Math.ceil((fragmentedPoolPrevUsed + db.used) / 2) : db.used; }); // memory pool for unfragmented messages var unfragmentedPoolPrevUsed = -1; this.unfragmentedBufferPool = new BufferPool(1024, function(db, length) { return db.used + length; }, function(db) { return unfragmentedPoolPrevUsed = unfragmentedPoolPrevUsed >= 0 ? Math.ceil((unfragmentedPoolPrevUsed + db.used) / 2) : db.used; }); this.extensions = extensions || {}; this.state = { activeFragmentedOperation: null, lastFragment: false, masked: false, opcode: 0, fragmentedOperation: false }; this.overflow = []; this.headerBuffer = new Buffer(10); this.expectOffset = 0; this.expectBuffer = null; this.expectHandler = null; this.currentMessage = []; this.messageHandlers = []; this.expectHeader(2, this.processPacket); this.dead = false; this.processing = false; this.onerror = function() {}; this.ontext = function() {}; this.onbinary = function() {}; this.onclose = function() {}; this.onping = function() {}; this.onpong = function() {}; } module.exports = Receiver; /** * Add new data to the parser. * * @api public */ Receiver.prototype.add = function(data) { var dataLength = data.length; if (dataLength == 0) return; if (this.expectBuffer == null) { this.overflow.push(data); return; } var toRead = Math.min(dataLength, this.expectBuffer.length - this.expectOffset); fastCopy(toRead, data, this.expectBuffer, this.expectOffset); this.expectOffset += toRead; if (toRead < dataLength) { this.overflow.push(data.slice(toRead)); } while (this.expectBuffer && this.expectOffset == this.expectBuffer.length) { var bufferForHandler = this.expectBuffer; this.expectBuffer = null; this.expectOffset = 0; this.expectHandler.call(this, bufferForHandler); } }; /** * Releases all resources used by the receiver. * * @api public */ Receiver.prototype.cleanup = function() { this.dead = true; this.overflow = null; this.headerBuffer = null; this.expectBuffer = null; this.expectHandler = null; this.unfragmentedBufferPool = null; this.fragmentedBufferPool = null; this.state = null; this.currentMessage = null; this.onerror = null; this.ontext = null; this.onbinary = null; this.onclose = null; this.onping = null; this.onpong = null; }; /** * Waits for a certain amount of header bytes to be available, then fires a callback. * * @api private */ Receiver.prototype.expectHeader = function(length, handler) { if (length == 0) { handler(null); return; } this.expectBuffer = this.headerBuffer.slice(this.expectOffset, this.expectOffset + length); this.expectHandler = handler; var toRead = length; while (toRead > 0 && this.overflow.length > 0) { var fromOverflow = this.overflow.pop(); if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead)); var read = Math.min(fromOverflow.length, toRead); fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset); this.expectOffset += read; toRead -= read; } }; /** * Waits for a certain amount of data bytes to be available, then fires a callback. * * @api private */ Receiver.prototype.expectData = function(length, handler) { if (length == 0) { handler(null); return; } this.expectBuffer = this.allocateFromPool(length, this.state.fragmentedOperation); this.expectHandler = handler; var toRead = length; while (toRead > 0 && this.overflow.length > 0) { var fromOverflow = this.overflow.pop(); if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead)); var read = Math.min(fromOverflow.length, toRead); fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset); this.expectOffset += read; toRead -= read; } }; /** * Allocates memory from the buffer pool. * * @api private */ Receiver.prototype.allocateFromPool = function(length, isFragmented) { return (isFragmented ? this.fragmentedBufferPool : this.unfragmentedBufferPool).get(length); }; /** * Start processing a new packet. * * @api private */ Receiver.prototype.processPacket = function (data) { if (this.extensions[PerMessageDeflate.extensionName]) { if ((data[0] & 0x30) != 0) { this.error('reserved fields (2, 3) must be empty', 1002); return; } } else { if ((data[0] & 0x70) != 0) { this.error('reserved fields must be empty', 1002); return; } } this.state.lastFragment = (data[0] & 0x80) == 0x80; this.state.masked = (data[1] & 0x80) == 0x80; var compressed = (data[0] & 0x40) == 0x40; var opcode = data[0] & 0xf; if (opcode === 0) { if (compressed) { this.error('continuation frame cannot have the Per-message Compressed bits', 1002); return; } // continuation frame this.state.fragmentedOperation = true; this.state.opcode = this.state.activeFragmentedOperation; if (!(this.state.opcode == 1 || this.state.opcode == 2)) { this.error('continuation frame cannot follow current opcode', 1002); return; } } else { if (opcode < 3 && this.state.activeFragmentedOperation != null) { this.error('data frames after the initial data frame must have opcode 0', 1002); return; } if (opcode >= 8 && compressed) { this.error('control frames cannot have the Per-message Compressed bits', 1002); return; } this.state.compressed = compressed; this.state.opcode = opcode; if (this.state.lastFragment === false) { this.state.fragmentedOperation = true; this.state.activeFragmentedOperation = opcode; } else this.state.fragmentedOperation = false; } var handler = opcodes[this.state.opcode]; if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode, 1002); else { handler.start.call(this, data); } }; /** * Endprocessing a packet. * * @api private */ Receiver.prototype.endPacket = function() { if (!this.state.fragmentedOperation) this.unfragmentedBufferPool.reset(true); else if (this.state.lastFragment) this.fragmentedBufferPool.reset(true); this.expectOffset = 0; this.expectBuffer = null; this.expectHandler = null; if (this.state.lastFragment && this.state.opcode === this.state.activeFragmentedOperation) { // end current fragmented operation this.state.activeFragmentedOperation = null; } this.state.lastFragment = false; this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0; this.state.masked = false; this.expectHeader(2, this.processPacket); }; /** * Reset the parser state. * * @api private */ Receiver.prototype.reset = function() { if (this.dead) return; this.state = { activeFragmentedOperation: null, lastFragment: false, masked: false, opcode: 0, fragmentedOperation: false }; this.fragmentedBufferPool.reset(true); this.unfragmentedBufferPool.reset(true); this.expectOffset = 0; this.expectBuffer = null; this.expectHandler = null; this.overflow = []; this.currentMessage = []; this.messageHandlers = []; }; /** * Unmask received data. * * @api private */ Receiver.prototype.unmask = function (mask, buf, binary) { if (mask != null && buf != null) bufferUtil.unmask(buf, mask); if (binary) return buf; return buf != null ? buf.toString('utf8') : ''; }; /** * Concatenates a list of buffers. * * @api private */ Receiver.prototype.concatBuffers = function(buffers) { var length = 0; for (var i = 0, l = buffers.length; i < l; ++i) length += buffers[i].length; var mergedBuffer = new Buffer(length); bufferUtil.merge(mergedBuffer, buffers); return mergedBuffer; }; /** * Handles an error * * @api private */ Receiver.prototype.error = function (reason, protocolErrorCode) { this.reset(); this.onerror(reason, protocolErrorCode); return this; }; /** * Execute message handler buffers * * @api private */ Receiver.prototype.flush = function() { if (this.processing || this.dead) return; var handler = this.messageHandlers.shift(); if (!handler) return; this.processing = true; var self = this; handler(function() { self.processing = false; self.flush(); }); }; /** * Apply extensions to message * * @api private */ Receiver.prototype.applyExtensions = function(messageBuffer, fin, compressed, callback) { var self = this; if (compressed) { this.extensions[PerMessageDeflate.extensionName].decompress(messageBuffer, fin, function(err, buffer) { if (self.dead) return; if (err) { callback(new Error('invalid compressed data')); return; } callback(null, buffer); }); } else { callback(null, messageBuffer); } }; /** * Buffer utilities */ function readUInt16BE(start) { return (this[start]<<8) + this[start+1]; } function readUInt32BE(start) { return (this[start]<<24) + (this[start+1]<<16) + (this[start+2]<<8) + this[start+3]; } function fastCopy(length, srcBuffer, dstBuffer, dstOffset) { switch (length) { default: srcBuffer.copy(dstBuffer, dstOffset, 0, length); break; case 16: dstBuffer[dstOffset+15] = srcBuffer[15]; case 15: dstBuffer[dstOffset+14] = srcBuffer[14]; case 14: dstBuffer[dstOffset+13] = srcBuffer[13]; case 13: dstBuffer[dstOffset+12] = srcBuffer[12]; case 12: dstBuffer[dstOffset+11] = srcBuffer[11]; case 11: dstBuffer[dstOffset+10] = srcBuffer[10]; case 10: dstBuffer[dstOffset+9] = srcBuffer[9]; case 9: dstBuffer[dstOffset+8] = srcBuffer[8]; case 8: dstBuffer[dstOffset+7] = srcBuffer[7]; case 7: dstBuffer[dstOffset+6] = srcBuffer[6]; case 6: dstBuffer[dstOffset+5] = srcBuffer[5]; case 5: dstBuffer[dstOffset+4] = srcBuffer[4]; case 4: dstBuffer[dstOffset+3] = srcBuffer[3]; case 3: dstBuffer[dstOffset+2] = srcBuffer[2]; case 2: dstBuffer[dstOffset+1] = srcBuffer[1]; case 1: dstBuffer[dstOffset] = srcBuffer[0]; } } function clone(obj) { var cloned = {}; for (var k in obj) { if (obj.hasOwnProperty(k)) { cloned[k] = obj[k]; } } return cloned; } /** * Opcode handlers */ var opcodes = { // text '1': { start: function(data) { var self = this; // decode length var firstLength = data[1] & 0x7f; if (firstLength < 126) { opcodes['1'].getData.call(self, firstLength); } else if (firstLength == 126) { self.expectHeader(2, function(data) { opcodes['1'].getData.call(self, readUInt16BE.call(data, 0)); }); } else if (firstLength == 127) { self.expectHeader(8, function(data) { if (readUInt32BE.call(data, 0) != 0) { self.error('packets with length spanning more than 32 bit is currently not supported', 1008); return; } opcodes['1'].getData.call(self, readUInt32BE.call(data, 4)); }); } }, getData: function(length) { var self = this; if (self.state.masked) { self.expectHeader(4, function(data) { var mask = data; self.expectData(length, function(data) { opcodes['1'].finish.call(self, mask, data); }); }); } else { self.expectData(length, function(data) { opcodes['1'].finish.call(self, null, data); }); } }, finish: function(mask, data) { var self = this; var packet = this.unmask(mask, data, true) || new Buffer(0); var state = clone(this.state); this.messageHandlers.push(function(callback) { self.applyExtensions(packet, state.lastFragment, state.compressed, function(err, buffer) { if (err) return self.error(err.message, 1007); if (buffer != null) self.currentMessage.push(buffer); if (state.lastFragment) { var messageBuffer = self.concatBuffers(self.currentMessage); self.currentMessage = []; if (!Validation.isValidUTF8(messageBuffer)) { self.error('invalid utf8 sequence', 1007); return; } self.ontext(messageBuffer.toString('utf8'), {masked: state.masked, buffer: messageBuffer}); } callback(); }); }); this.flush(); this.endPacket(); } }, // binary '2': { start: function(data) { var self = this; // decode length var firstLength = data[1] & 0x7f; if (firstLength < 126) { opcodes['2'].getData.call(self, firstLength); } else if (firstLength == 126) { self.expectHeader(2, function(data) { opcodes['2'].getData.call(self, readUInt16BE.call(data, 0)); }); } else if (firstLength == 127) { self.expectHeader(8, function(data) { if (readUInt32BE.call(data, 0) != 0) { self.error('packets with length spanning more than 32 bit is currently not supported', 1008); return; } opcodes['2'].getData.call(self, readUInt32BE.call(data, 4, true)); }); } }, getData: function(length) { var self = this; if (self.state.masked) { self.expectHeader(4, function(data) { var mask = data; self.expectData(length, function(data) { opcodes['2'].finish.call(self, mask, data); }); }); } else { self.expectData(length, function(data) { opcodes['2'].finish.call(self, null, data); }); } }, finish: function(mask, data) { var self = this; var packet = this.unmask(mask, data, true) || new Buffer(0); var state = clone(this.state); this.messageHandlers.push(function(callback) { self.applyExtensions(packet, state.lastFragment, state.compressed, function(err, buffer) { if (err) return self.error(err.message, 1007); if (buffer != null) self.currentMessage.push(buffer); if (state.lastFragment) { var messageBuffer = self.concatBuffers(self.currentMessage); self.currentMessage = []; self.onbinary(messageBuffer, {masked: state.masked, buffer: messageBuffer}); } callback(); }); }); this.flush(); this.endPacket(); } }, // close '8': { start: function(data) { var self = this; if (self.state.lastFragment == false) { self.error('fragmented close is not supported', 1002); return; } // decode length var firstLength = data[1] & 0x7f; if (firstLength < 126) { opcodes['8'].getData.call(self, firstLength); } else { self.error('control frames cannot have more than 125 bytes of data', 1002); } }, getData: function(length) { var self = this; if (self.state.masked) { self.expectHeader(4, function(data) { var mask = data; self.expectData(length, function(data) { opcodes['8'].finish.call(self, mask, data); }); }); } else { self.expectData(length, function(data) { opcodes['8'].finish.call(self, null, data); }); } }, finish: function(mask, data) { var self = this; data = self.unmask(mask, data, true); var state = clone(this.state); this.messageHandlers.push(function() { if (data && data.length == 1) { self.error('close packets with data must be at least two bytes long', 1002); return; } var code = data && data.length > 1 ? readUInt16BE.call(data, 0) : 1000; if (!ErrorCodes.isValidErrorCode(code)) { self.error('invalid error code', 1002); return; } var message = ''; if (data && data.length > 2) { var messageBuffer = data.slice(2); if (!Validation.isValidUTF8(messageBuffer)) { self.error('invalid utf8 sequence', 1007); return; } message = messageBuffer.toString('utf8'); } self.onclose(code, message, {masked: state.masked}); self.reset(); }); this.flush(); }, }, // ping '9': { start: function(data) { var self = this; if (self.state.lastFragment == false) { self.error('fragmented ping is not supported', 1002); return; } // decode length var firstLength = data[1] & 0x7f; if (firstLength < 126) { opcodes['9'].getData.call(self, firstLength); } else { self.error('control frames cannot have more than 125 bytes of data', 1002); } }, getData: function(length) { var self = this; if (self.state.masked) { self.expectHeader(4, function(data) { var mask = data; self.expectData(length, function(data) { opcodes['9'].finish.call(self, mask, data); }); }); } else { self.expectData(length, function(data) { opcodes['9'].finish.call(self, null, data); }); } }, finish: function(mask, data) { var self = this; data = this.unmask(mask, data, true); var state = clone(this.state); this.messageHandlers.push(function(callback) { self.onping(data, {masked: state.masked, binary: true}); callback(); }); this.flush(); this.endPacket(); } }, // pong '10': { start: function(data) { var self = this; if (self.state.lastFragment == false) { self.error('fragmented pong is not supported', 1002); return; } // decode length var firstLength = data[1] & 0x7f; if (firstLength < 126) { opcodes['10'].getData.call(self, firstLength); } else { self.error('control frames cannot have more than 125 bytes of data', 1002); } }, getData: function(length) { var self = this; if (this.state.masked) { this.expectHeader(4, function(data) { var mask = data; self.expectData(length, function(data) { opcodes['10'].finish.call(self, mask, data); }); }); } else { this.expectData(length, function(data) { opcodes['10'].finish.call(self, null, data); }); } }, finish: function(mask, data) { var self = this; data = self.unmask(mask, data, true); var state = clone(this.state); this.messageHandlers.push(function(callback) { self.onpong(data, {masked: state.masked, binary: true}); callback(); }); this.flush(); this.endPacket(); } } }