rc-web@69: rc-web@69: /*! rc-web@69: * socket.io-node rc-web@69: * Copyright(c) 2011 LearnBoost rc-web@69: * MIT Licensed rc-web@69: */ rc-web@69: rc-web@69: /** rc-web@69: * Module dependencies. rc-web@69: */ rc-web@69: rc-web@69: var crypto = require('crypto') rc-web@69: , Store = require('../store') rc-web@69: , assert = require('assert'); rc-web@69: rc-web@69: /** rc-web@69: * Exports the constructor. rc-web@69: */ rc-web@69: rc-web@69: exports = module.exports = Redis; rc-web@69: Redis.Client = Client; rc-web@69: rc-web@69: /** rc-web@69: * Redis store. rc-web@69: * Options: rc-web@69: * - nodeId (fn) gets an id that uniquely identifies this node rc-web@69: * - redis (fn) redis constructor, defaults to redis rc-web@69: * - redisPub (object) options to pass to the pub redis client rc-web@69: * - redisSub (object) options to pass to the sub redis client rc-web@69: * - redisClient (object) options to pass to the general redis client rc-web@69: * - pack (fn) custom packing, defaults to JSON or msgpack if installed rc-web@69: * - unpack (fn) custom packing, defaults to JSON or msgpack if installed rc-web@69: * rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: function Redis (opts) { rc-web@69: opts = opts || {}; rc-web@69: rc-web@69: // node id to uniquely identify this node rc-web@69: var nodeId = opts.nodeId || function () { rc-web@69: // by default, we generate a random id rc-web@69: return Math.abs(Math.random() * Math.random() * Date.now() | 0); rc-web@69: }; rc-web@69: rc-web@69: this.nodeId = nodeId(); rc-web@69: rc-web@69: // packing / unpacking mechanism rc-web@69: if (opts.pack) { rc-web@69: this.pack = opts.pack; rc-web@69: this.unpack = opts.unpack; rc-web@69: } else { rc-web@69: try { rc-web@69: var msgpack = require('msgpack'); rc-web@69: this.pack = msgpack.pack; rc-web@69: this.unpack = msgpack.unpack; rc-web@69: } catch (e) { rc-web@69: this.pack = JSON.stringify; rc-web@69: this.unpack = JSON.parse; rc-web@69: } rc-web@69: } rc-web@69: rc-web@69: var redis = opts.redis || require('redis') rc-web@69: , RedisClient = redis.RedisClient; rc-web@69: rc-web@69: // initialize a pubsub client and a regular client rc-web@69: if (opts.redisPub instanceof RedisClient) { rc-web@69: this.pub = opts.redisPub; rc-web@69: } else { rc-web@69: opts.redisPub || (opts.redisPub = {}); rc-web@69: this.pub = redis.createClient(opts.redisPub.port, opts.redisPub.host, opts.redisPub); rc-web@69: } rc-web@69: if (opts.redisSub instanceof RedisClient) { rc-web@69: this.sub = opts.redisSub; rc-web@69: } else { rc-web@69: opts.redisSub || (opts.redisSub = {}); rc-web@69: this.sub = redis.createClient(opts.redisSub.port, opts.redisSub.host, opts.redisSub); rc-web@69: } rc-web@69: if (opts.redisClient instanceof RedisClient) { rc-web@69: this.cmd = opts.redisClient; rc-web@69: } else { rc-web@69: opts.redisClient || (opts.redisClient = {}); rc-web@69: this.cmd = redis.createClient(opts.redisClient.port, opts.redisClient.host, opts.redisClient); rc-web@69: } rc-web@69: rc-web@69: Store.call(this, opts); rc-web@69: rc-web@69: this.sub.setMaxListeners(0); rc-web@69: this.setMaxListeners(0); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Inherits from Store. rc-web@69: */ rc-web@69: rc-web@69: Redis.prototype.__proto__ = Store.prototype; rc-web@69: rc-web@69: /** rc-web@69: * Publishes a message. rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Redis.prototype.publish = function (name) { rc-web@69: var args = Array.prototype.slice.call(arguments, 1); rc-web@69: this.pub.publish(name, this.pack({ nodeId: this.nodeId, args: args })); rc-web@69: this.emit.apply(this, ['publish', name].concat(args)); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Subscribes to a channel rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Redis.prototype.subscribe = function (name, consumer, fn) { rc-web@69: this.sub.subscribe(name); rc-web@69: rc-web@69: if (consumer || fn) { rc-web@69: var self = this; rc-web@69: rc-web@69: self.sub.on('subscribe', function subscribe (ch) { rc-web@69: if (name == ch) { rc-web@69: function message (ch, msg) { rc-web@69: if (name == ch) { rc-web@69: msg = self.unpack(msg); rc-web@69: rc-web@69: // we check that the message consumed wasnt emitted by this node rc-web@69: if (self.nodeId != msg.nodeId) { rc-web@69: consumer.apply(null, msg.args); rc-web@69: } rc-web@69: } rc-web@69: }; rc-web@69: rc-web@69: self.sub.on('message', message); rc-web@69: rc-web@69: self.on('unsubscribe', function unsubscribe (ch) { rc-web@69: if (name == ch) { rc-web@69: self.sub.removeListener('message', message); rc-web@69: self.removeListener('unsubscribe', unsubscribe); rc-web@69: } rc-web@69: }); rc-web@69: rc-web@69: self.sub.removeListener('subscribe', subscribe); rc-web@69: rc-web@69: fn && fn(); rc-web@69: } rc-web@69: }); rc-web@69: } rc-web@69: rc-web@69: this.emit('subscribe', name, consumer, fn); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Unsubscribes rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Redis.prototype.unsubscribe = function (name, fn) { rc-web@69: this.sub.unsubscribe(name); rc-web@69: rc-web@69: if (fn) { rc-web@69: var client = this.sub; rc-web@69: rc-web@69: client.on('unsubscribe', function unsubscribe (ch) { rc-web@69: if (name == ch) { rc-web@69: fn(); rc-web@69: client.removeListener('unsubscribe', unsubscribe); rc-web@69: } rc-web@69: }); rc-web@69: } rc-web@69: rc-web@69: this.emit('unsubscribe', name, fn); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Destroys the store rc-web@69: * rc-web@69: * @api public rc-web@69: */ rc-web@69: rc-web@69: Redis.prototype.destroy = function () { rc-web@69: Store.prototype.destroy.call(this); rc-web@69: rc-web@69: this.pub.end(); rc-web@69: this.sub.end(); rc-web@69: this.cmd.end(); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Client constructor rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: function Client (store, id) { rc-web@69: Store.Client.call(this, store, id); rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Inherits from Store.Client rc-web@69: */ rc-web@69: rc-web@69: Client.prototype.__proto__ = Store.Client; rc-web@69: rc-web@69: /** rc-web@69: * Redis hash get rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Client.prototype.get = function (key, fn) { rc-web@69: this.store.cmd.hget(this.id, key, fn); rc-web@69: return this; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Redis hash set rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Client.prototype.set = function (key, value, fn) { rc-web@69: this.store.cmd.hset(this.id, key, value, fn); rc-web@69: return this; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Redis hash del rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Client.prototype.del = function (key, fn) { rc-web@69: this.store.cmd.hdel(this.id, key, fn); rc-web@69: return this; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Redis hash has rc-web@69: * rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Client.prototype.has = function (key, fn) { rc-web@69: this.store.cmd.hexists(this.id, key, function (err, has) { rc-web@69: if (err) return fn(err); rc-web@69: fn(null, !!has); rc-web@69: }); rc-web@69: return this; rc-web@69: }; rc-web@69: rc-web@69: /** rc-web@69: * Destroys client rc-web@69: * rc-web@69: * @param {Number} number of seconds to expire data rc-web@69: * @api private rc-web@69: */ rc-web@69: rc-web@69: Client.prototype.destroy = function (expiration) { rc-web@69: if ('number' != typeof expiration) { rc-web@69: this.store.cmd.del(this.id); rc-web@69: } else { rc-web@69: this.store.cmd.expire(this.id, expiration); rc-web@69: } rc-web@69: rc-web@69: return this; rc-web@69: };