使用集群将Socket.IO缩放到多个Node.js进程

把这个头发拉出来…有人设法将Socket.IO扩展到由Node.js的集群模块产生的多个“工作者”进程?

比方说,我有以下四个工作进程(伪):

// on the server var express = require('express'); var server = express(); var socket = require('socket.io'); var io = socket.listen(server); // socket.io io.set('store', new socket.RedisStore); // set-up connections... io.sockets.on('connection', function(socket) { socket.on('join', function(rooms) { rooms.forEach(function(room) { socket.join(room); }); }); socket.on('leave', function(rooms) { rooms.forEach(function(room) { socket.leave(room); }); }); }); // Emit a message every second function send() { io.sockets.in('room').emit('data', 'howdy'); } setInterval(send, 1000); 

并在浏览器…

 // on the client socket = io.connect(); socket.emit('join', ['room']); socket.on('data', function(data){ console.log(data); }); 

问题:由于四个独立的工作进程发送消息,我每秒都收到四条消息。

如何确保邮件只发送一次?

编辑:在Socket.IO 1.0+中,现在可以使用更简单的Redis适配器模块,而不是使用多个Redis客户端来设置存储。

 var io = require('socket.io')(3000); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 })); 

下面显示的例子看起来更像这样:

 var cluster = require('cluster'); var os = require('os'); if (cluster.isMaster) { // we create a HTTP server, but we do not use listen // that way, we have a socket.io server that doesn't accept connections var server = require('http').createServer(); var io = require('socket.io').listen(server); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 })); setInterval(function() { // all workers will receive this in Redis, and emit io.emit('data', 'payload'); }, 1000); for (var i = 0; i < os.cpus().length; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } if (cluster.isWorker) { var express = require('express'); var app = express(); var http = require('http'); var server = http.createServer(app); var io = require('socket.io').listen(server); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 })); io.on('connection', function(socket) { socket.emit('data', 'connected to worker: ' + cluster.worker.id); }); app.listen(80); } 

如果您有一个需要发布到其他Socket.IO进程但不接受套接字连接的主节点,请使用socket.io-emitter而不是socket.io-redis 。

如果您在扩展时遇到问题,请使用DEBUG=*运行您的Node应用程序。 现在,Socket.IO实现了debugging ,也将打印出Redis适配器debugging消息。 示例输出:

 socket.io:server initializing namespace / +0ms socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms socket.io:server attaching client serving req handler +2ms socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms socket.io-redis ignore same uid +0ms 

如果您的主进程和subprocess都显示相同的parsing器消息,那么您的应用程序正确缩放。


如果你是从一个单独的工作人员排出,那么你的设置应该没有问题。 你所做的是从所有四名工作人员中发出,而由于Redis发布/订阅,消息不会被重复,而是会被写入四次,就像你要求应用程序那样。 以下是Redis所做的一个简单的示意图:

 Client <-- Worker 1 emit --> Redis Client <-- Worker 2 <----------| Client <-- Worker 3 <----------| Client <-- Worker 4 <----------| 

正如您所看到的,当您从工作人员发出时,它会将发布发布到Redis,并且将从其他已订阅Redis数据库的工作人员镜像。 这也意味着你可以使用连接同一个实例的多个套接字服务器,并且一个服务器上的一个发射将在所有连接的服务器上被触发。

对于集群,当客户端连接时,它将连接到你的四个工作者中的一个,而不是全部四个。 这也意味着你从该工作人员发出的任何东西只会向客户显示一次。 所以是的,应用程序正在扩展,但是你这样做的方式,你是从四名工作人员中发出的,而Redis数据库就像你在一个工作人员上调用了四次一样。 如果一个客户端实际连接到所有的四个套接字实例,他们将每秒接收16条消息,而不是四条。

套接字处理的types取决于您将要使用的应用程序的types。 如果您要单独处理客户,那么您应该没有问题,因为连接事件只会针对每个客户端的一个工作人员触发。 如果你需要一个全局的“心跳”,那么你可以在你的主进程中有一个套接字处理器。 由于工作人员在主进程死亡时死亡,您应该将主进程的连接负载抵消掉,并让subprocess处理连接。 这是一个例子:

 var cluster = require('cluster'); var os = require('os'); if (cluster.isMaster) { // we create a HTTP server, but we do not use listen // that way, we have a socket.io server that doesn't accept connections var server = require('http').createServer(); var io = require('socket.io').listen(server); var RedisStore = require('socket.io/lib/stores/redis'); var redis = require('socket.io/node_modules/redis'); io.set('store', new RedisStore({ redisPub: redis.createClient(), redisSub: redis.createClient(), redisClient: redis.createClient() })); setInterval(function() { // all workers will receive this in Redis, and emit io.sockets.emit('data', 'payload'); }, 1000); for (var i = 0; i < os.cpus().length; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } if (cluster.isWorker) { var express = require('express'); var app = express(); var http = require('http'); var server = http.createServer(app); var io = require('socket.io').listen(server); var RedisStore = require('socket.io/lib/stores/redis'); var redis = require('socket.io/node_modules/redis'); io.set('store', new RedisStore({ redisPub: redis.createClient(), redisSub: redis.createClient(), redisClient: redis.createClient() })); io.sockets.on('connection', function(socket) { socket.emit('data', 'connected to worker: ' + cluster.worker.id); }); app.listen(80); } 

在这个例子中,有五个Socket.IO实例,其中一个是主设备,四个是子设备。 主服务器从不调用listen()因此在该进程中没有连接开销。 但是,如果您在主进程中调用emit,它将发布到Redis,并且这四个工作进程将在其客户端上执行发射。 这将抵消连接负载的工人,如果一个工人死亡,你的主要应用程序逻辑将不会在主人。

请注意,使用Redis时,即使在命名空间或房间中,也会发出其他工作进程的处理,就像触发了该进程的发出一样。 换句话说,如果你有一个Redis实例的两个Socket.IO实例,在第一个worker上调用emit()函数将把数据发送给它的客户端,而worker 2会像你调用emit那个工人。

让主人处理你的心跳(下面的例子),或者在内部不同的端口启动多个进程,并用nginx(也支持V1.3以上版本的websockets)进行负载平衡。

集群与硕士

 // on the server var express = require('express'); var server = express(); var socket = require('socket.io'); var io = socket.listen(server); var cluster = require('cluster'); var numCPUs = require('os').cpus().length; // socket.io io.set('store', new socket.RedisStore); // set-up connections... io.sockets.on('connection', function(socket) { socket.on('join', function(rooms) { rooms.forEach(function(room) { socket.join(room); }); }); socket.on('leave', function(rooms) { rooms.forEach(function(room) { socket.leave(room); }); }); }); if (cluster.isMaster) { // Fork workers. for (var i = 0; i < numCPUs; i++) { cluster.fork(); } // Emit a message every second function send() { console.log('howdy'); io.sockets.in('room').emit('data', 'howdy'); } setInterval(send, 1000); cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } 

这实际上看起来像Socket.IO成功缩放。 您可能希望从一台服务器发送一条消息到该房间的所有套接字,而不pipe它们恰好连接到哪个服务器。

你最好的select是有一个主进程每秒发送一个消息。 例如,只有在运行cluster.isMaster才能执行此操作。

进程间通信不足以使socket.io 1.4.5与群集一起工作。 强制websocket模式也是必须的。 请参阅Node.JS中的WebSocket握手,Socket.IO和群集不起作用