rc-web@69: rc-web@69: /*! rc-web@69: * socket.io-node rc-web@69: * Copyright(c) 2011 LearnBoost rc-web@69: * MIT Licensed rc-web@69: */ rc-web@69: rc-web@69: /** rc-web@69: * Module dependencies. rc-web@69: */ rc-web@69: rc-web@69: var parser = require('./parser'); rc-web@69: rc-web@69: /** rc-web@69: * Expose the constructor. rc-web@69: */ rc-web@69: rc-web@69: exports = module.exports = Transport; rc-web@69: rc-web@69: /** rc-web@69: * Transport constructor. rc-web@69: * rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: function Transport (mng, data, req) { rc-web@69: this.manager = mng; rc-web@69: this.id = data.id; rc-web@69: this.disconnected = false; rc-web@69: this.drained = true; rc-web@69: this.handleRequest(req); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Access the logger. rc-web@69: * rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.__defineGetter__('log', function () { rc-web@69: return this.manager.log; rc-web@69: }); rc-web@69: rc-web@69: /** rc-web@69: * Access the store. rc-web@69: * rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.__defineGetter__('store', function () { rc-web@69: return this.manager.store; rc-web@69: }); rc-web@69: rc-web@69: /** rc-web@69: * Handles a request when it's set. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.handleRequest = function (req) { rc-web@69: this.log.debug('setting request', req.method, req.url); rc-web@69: this.req = req; rc-web@69: rc-web@69: if (req.method == 'GET') { rc-web@69: this.socket = req.socket; rc-web@69: this.open = true; rc-web@69: this.drained = true; rc-web@69: this.setHeartbeatInterval(); rc-web@69: rc-web@69: this.setHandlers(); rc-web@69: this.onSocketConnect(); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called when a connection is first set. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onSocketConnect = function () { }; rc-web@69: rc-web@69: /** rc-web@69: * Sets transport handlers rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.setHandlers = function () { rc-web@69: var self = this; rc-web@69: rc-web@69: // we need to do this in a pub/sub way since the client can POST the message rc-web@69: // over a different socket (ie: different Transport instance) rc-web@69: this.store.subscribe('heartbeat-clear:' + this.id, function () { rc-web@69: self.onHeartbeatClear(); rc-web@69: }); rc-web@69: rc-web@69: this.store.subscribe('disconnect-force:' + this.id, function () { rc-web@69: self.onForcedDisconnect(); rc-web@69: }); rc-web@69: rc-web@69: this.store.subscribe('dispatch:' + this.id, function (packet, volatile) { rc-web@69: self.onDispatch(packet, volatile); rc-web@69: }); rc-web@69: rc-web@69: this.bound = { rc-web@69: end: this.onSocketEnd.bind(this) rc-web@69: , close: this.onSocketClose.bind(this) rc-web@69: , error: this.onSocketError.bind(this) rc-web@69: , drain: this.onSocketDrain.bind(this) rc-web@69: }; rc-web@69: rc-web@69: this.socket.on('end', this.bound.end); rc-web@69: this.socket.on('close', this.bound.close); rc-web@69: this.socket.on('error', this.bound.error); rc-web@69: this.socket.on('drain', this.bound.drain); rc-web@69: rc-web@69: this.handlersSet = true; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Removes transport handlers rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.clearHandlers = function () { rc-web@69: if (this.handlersSet) { rc-web@69: this.store.unsubscribe('disconnect-force:' + this.id); rc-web@69: this.store.unsubscribe('heartbeat-clear:' + this.id); rc-web@69: this.store.unsubscribe('dispatch:' + this.id); rc-web@69: rc-web@69: this.socket.removeListener('end', this.bound.end); rc-web@69: this.socket.removeListener('close', this.bound.close); rc-web@69: this.socket.removeListener('error', this.bound.error); rc-web@69: this.socket.removeListener('drain', this.bound.drain); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called when the connection dies rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onSocketEnd = function () { rc-web@69: this.end('socket end'); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called when the connection dies rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onSocketClose = function (error) { rc-web@69: this.end(error ? 'socket error' : 'socket close'); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called when the connection has an error. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onSocketError = function (err) { rc-web@69: if (this.open) { rc-web@69: this.socket.destroy(); rc-web@69: this.onClose(); rc-web@69: } rc-web@69: rc-web@69: this.log.info('socket error ' + err.stack); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called when the connection is drained. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onSocketDrain = function () { rc-web@69: this.drained = true; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called upon receiving a heartbeat packet. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onHeartbeatClear = function () { rc-web@69: this.clearHeartbeatTimeout(); rc-web@69: this.setHeartbeatInterval(); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called upon a forced disconnection. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onForcedDisconnect = function () { rc-web@69: if (!this.disconnected) { rc-web@69: this.log.info('transport end by forced client disconnection'); rc-web@69: if (this.open) { rc-web@69: this.packet({ type: 'disconnect' }); rc-web@69: } rc-web@69: this.end('booted'); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Dispatches a packet. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onDispatch = function (packet, volatile) { rc-web@69: if (volatile) { rc-web@69: this.writeVolatile(packet); rc-web@69: } else { rc-web@69: this.write(packet); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Sets the close timeout. rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.setCloseTimeout = function () { rc-web@69: if (!this.closeTimeout) { rc-web@69: var self = this; rc-web@69: rc-web@69: this.closeTimeout = setTimeout(function () { rc-web@69: self.log.debug('fired close timeout for client', self.id); rc-web@69: self.closeTimeout = null; rc-web@69: self.end('close timeout'); rc-web@69: }, this.manager.get('close timeout') * 1000); rc-web@69: rc-web@69: this.log.debug('set close timeout for client', this.id); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Clears the close timeout. rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.clearCloseTimeout = function () { rc-web@69: if (this.closeTimeout) { rc-web@69: clearTimeout(this.closeTimeout); rc-web@69: this.closeTimeout = null; rc-web@69: rc-web@69: this.log.debug('cleared close timeout for client', this.id); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Sets the heartbeat timeout rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.setHeartbeatTimeout = function () { rc-web@69: if (!this.heartbeatTimeout && this.manager.enabled('heartbeats')) { rc-web@69: var self = this; rc-web@69: rc-web@69: this.heartbeatTimeout = setTimeout(function () { rc-web@69: self.log.debug('fired heartbeat timeout for client', self.id); rc-web@69: self.heartbeatTimeout = null; rc-web@69: self.end('heartbeat timeout'); rc-web@69: }, this.manager.get('heartbeat timeout') * 1000); rc-web@69: rc-web@69: this.log.debug('set heartbeat timeout for client', this.id); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Clears the heartbeat timeout rc-web@69: * rc-web@69: * @param text rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.clearHeartbeatTimeout = function () { rc-web@69: if (this.heartbeatTimeout && this.manager.enabled('heartbeats')) { rc-web@69: clearTimeout(this.heartbeatTimeout); rc-web@69: this.heartbeatTimeout = null; rc-web@69: this.log.debug('cleared heartbeat timeout for client', this.id); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Sets the heartbeat interval. To be called when a connection opens and when rc-web@69: * a heartbeat is received. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.setHeartbeatInterval = function () { rc-web@69: if (!this.heartbeatInterval && this.manager.enabled('heartbeats')) { rc-web@69: var self = this; rc-web@69: rc-web@69: this.heartbeatInterval = setTimeout(function () { rc-web@69: self.heartbeat(); rc-web@69: self.heartbeatInterval = null; rc-web@69: }, this.manager.get('heartbeat interval') * 1000); rc-web@69: rc-web@69: this.log.debug('set heartbeat interval for client', this.id); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Clears all timeouts. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.clearTimeouts = function () { rc-web@69: this.clearCloseTimeout(); rc-web@69: this.clearHeartbeatTimeout(); rc-web@69: this.clearHeartbeatInterval(); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Sends a heartbeat rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.heartbeat = function () { rc-web@69: if (this.open) { rc-web@69: this.log.debug('emitting heartbeat for client', this.id); rc-web@69: this.packet({ type: 'heartbeat' }); rc-web@69: this.setHeartbeatTimeout(); rc-web@69: } rc-web@69: rc-web@69: return this; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Handles a message. rc-web@69: * rc-web@69: * @param {Object} packet object rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onMessage = function (packet) { rc-web@69: var current = this.manager.transports[this.id]; rc-web@69: rc-web@69: if ('heartbeat' == packet.type) { rc-web@69: this.log.debug('got heartbeat packet'); rc-web@69: rc-web@69: if (current && current.open) { rc-web@69: current.onHeartbeatClear(); rc-web@69: } else { rc-web@69: this.store.publish('heartbeat-clear:' + this.id); rc-web@69: } rc-web@69: } else { rc-web@69: if ('disconnect' == packet.type && packet.endpoint == '') { rc-web@69: this.log.debug('got disconnection packet'); rc-web@69: rc-web@69: if (current) { rc-web@69: current.onForcedDisconnect(); rc-web@69: } else { rc-web@69: this.store.publish('disconnect-force:' + this.id); rc-web@69: } rc-web@69: rc-web@69: return; rc-web@69: } rc-web@69: rc-web@69: if (packet.id && packet.ack != 'data') { rc-web@69: this.log.debug('acknowledging packet automatically'); rc-web@69: rc-web@69: var ack = parser.encodePacket({ rc-web@69: type: 'ack' rc-web@69: , ackId: packet.id rc-web@69: , endpoint: packet.endpoint || '' rc-web@69: }); rc-web@69: rc-web@69: if (current && current.open) { rc-web@69: current.onDispatch(ack); rc-web@69: } else { rc-web@69: this.manager.onClientDispatch(this.id, ack); rc-web@69: this.store.publish('dispatch:' + this.id, ack); rc-web@69: } rc-web@69: } rc-web@69: rc-web@69: // handle packet locally or publish it rc-web@69: if (current) { rc-web@69: this.manager.onClientMessage(this.id, packet); rc-web@69: } else { rc-web@69: this.store.publish('message:' + this.id, packet); rc-web@69: } rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Clears the heartbeat interval rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.clearHeartbeatInterval = function () { rc-web@69: if (this.heartbeatInterval && this.manager.enabled('heartbeats')) { rc-web@69: clearTimeout(this.heartbeatInterval); rc-web@69: this.heartbeatInterval = null; rc-web@69: this.log.debug('cleared heartbeat interval for client', this.id); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Finishes the connection and makes sure client doesn't reopen rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.disconnect = function (reason) { rc-web@69: this.packet({ type: 'disconnect' }); rc-web@69: this.end(reason); rc-web@69: rc-web@69: return this; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Closes the connection. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.close = function () { rc-web@69: if (this.open) { rc-web@69: this.doClose(); rc-web@69: this.onClose(); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Called upon a connection close. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.onClose = function () { rc-web@69: if (this.open) { rc-web@69: this.setCloseTimeout(); rc-web@69: this.clearHandlers(); rc-web@69: this.open = false; rc-web@69: this.manager.onClose(this.id); rc-web@69: this.store.publish('close', this.id); rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Cleans up the connection, considers the client disconnected. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.end = function (reason) { rc-web@69: if (!this.disconnected) { rc-web@69: this.log.info('transport end (' + reason + ')'); rc-web@69: rc-web@69: var local = this.manager.transports[this.id]; rc-web@69: rc-web@69: this.close(); rc-web@69: this.clearTimeouts(); rc-web@69: this.disconnected = true; rc-web@69: rc-web@69: if (local) { rc-web@69: this.manager.onClientDisconnect(this.id, reason, true); rc-web@69: } else { rc-web@69: this.store.publish('disconnect:' + this.id, reason); rc-web@69: } rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Signals that the transport should pause and buffer data. rc-web@69: * rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.discard = function () { rc-web@69: this.log.debug('discarding transport'); rc-web@69: this.discarded = true; rc-web@69: this.clearTimeouts(); rc-web@69: this.clearHandlers(); rc-web@69: rc-web@69: return this; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Writes an error packet with the specified reason and advice. rc-web@69: * rc-web@69: * @param {Number} advice rc-web@69: * @param {Number} reason rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.error = function (reason, advice) { rc-web@69: this.packet({ rc-web@69: type: 'error' rc-web@69: , reason: reason rc-web@69: , advice: advice rc-web@69: }); rc-web@69: rc-web@69: this.log.warn(reason, advice ? ('client should ' + advice) : ''); rc-web@69: this.end('error'); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Write a packet. rc-web@69: * rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.packet = function (obj) { rc-web@69: return this.write(parser.encodePacket(obj)); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Writes a volatile message. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Transport.prototype.writeVolatile = function (msg) { rc-web@69: if (this.open) { rc-web@69: if (this.drained) { rc-web@69: this.write(msg); rc-web@69: } else { rc-web@69: this.log.debug('ignoring volatile packet, buffer not drained'); rc-web@69: } rc-web@69: } else { rc-web@69: this.log.debug('ignoring volatile packet, transport not open'); rc-web@69: } rc-web@69: };