在群集中发现阿卡演员

最近我一直试图围绕着阿卡和基于演员的系统的概念。 虽然现在我对Akka的基础知识已经有了很好的理解,但是在集群和远程angular色方面,我还是苦苦挣扎了一些。

我将尝试使用Play Framework 2.0附带的WebSocket聊天示例来说明问题:有一个演员拥有WebSockets,并保存当前连接的用户列表。 演员在技术上和逻辑上基本代表聊天室。 只要在一台服务器上运行一个聊天室,这种方式就完全正常了。

现在我试图理解,当我们谈论运行在服务器集群上的许多dynamic聊天室(可以随时打开/closures新的房间)时,这个例子将不得不被延长(单个节点被添加或删除根据目前的需求)。 在这样的情况下,用户A可以连接到服务器1,而用户B连接到服务器2.两者都可以在同一个聊天室中通话。 在每台服务器上,仍然会有一个actor(对于每个聊天室?),该actor保存WebSocket实例以接收和发布事件(消息)给正确的用户。 但从逻辑上讲,服务器1或服务器2上应该只有一个聊天室参与者,该主持人拥有当前连接的用户(或类似任务)的列表。

你将如何去实现这个目标,最好是在“纯粹的阿克卡”中,而不需要增加额外的消息系统,如ZeroMQ或RabbitMQ?

这是我到目前为止,请让我知道这是否有道理:

  1. 用户A连接到服务器1,并分配一个演员来保存他的WebSocket。
  2. 参与者检查(使用Router?EventBus?其他?)在任何连接的群集节点上是否存在活动聊天室的“聊天室演员”。 因为它不会要求创build一个新的聊天室演员,并会发送和接收这个演员未来的聊天消息。
  3. 用户B在服务器2上连接并且一个演员也被分配给他的WebSocket。
  4. 它还检查所请求的聊天室的演员是否存在并在服务器1上find它。
  5. 服务器1上的聊天室演员现在充当给定聊天室的中心,将消息发送给所有“连接”的聊天成员演员并分发传入的聊天室。

如果服务器2出现故障,聊天室的参与者将不得不重新创build/移动到服务器2,尽pipe这不是我现在主要关心的问题。 我最想知道如何dynamic发现演员如何传播各种基本上独立的机器可以使用Akka的工具集。

我一直在看阿卡的文档已经有相当长的一段时间了,所以也许我在这里忽略了这个问题。 如果是这样,请赐教:-)

我正在开发一个私人项目,这个项目基本上是聊天室示例的一个非常扩展的版本,我也遇到了有关akka和整个“分散”思维的启动问题。 所以我可以告诉你我如何“解决”我的扩展聊天室:

我想要一个服务器,可以很容易地部署多次,没有太多额外的configuration。 我使用redis作为所有打开的用户会话(ActorRefs的简单序列化)和所有聊天室的存储。

服务器有以下参与者:

  • WebsocketSession :它保存到一个用户的连接,并处理来自用户的请求并转发来自系统的消息。
  • ChatroomManager :这是中央广播公司,它被部署在服务器的每个实例上。 如果用户想要将消息发送到聊天室,则WebSocketSession-Actor将所有信息发送到ChatroomManager-Actor,ChatroomManager-Actor将该消息广播到聊天室的所有成员。

所以这里是我的程序:

  1. 用户A连接到分配新WebsocketSession的服务器1。 这个actor将这个actor的绝对path插入到redis中。
  2. 用户Ajoin聊天室X,该聊天室X也插入他的绝对path(我将其用作用户会话的唯一ID)到redis中(每个聊天室都有一个“连接”集合)
  3. 用户B连接到服务器2 – > redis
  4. 用户Bjoin聊天室X – > redis
  5. 用户B向聊天室X发送消息如下:用户B通过WebSocket将他的消息发送给他的会话演员,会话演员(在一些检查之后)发送演员消息给ChatroomManager。 这个actor实际上从redis(与akka的actorFor -method一起使用的绝对path)中检索到聊天室的用户列表,然后将消息发送给每个session-actor。 这些会话演员然后写入他们的websockets。

在每个ChatroomManager-演员,我做一些ActorRefcaching,这给了额外的速度。 我认为这与您的方法有所不同,特别是这些聊天室pipe理员处理所有聊天室的请求。 但是一个聊天室里只有一个演员是我想要避免的一个单点故障。 此外,这将导致更多的消息,例如:

  • 用户A和用户B在服务器1上。
  • Chatroom X在服务器2上。

如果用户A想要与用户B交谈,他们都必须通过服务器1上的聊天室主angular进行通信。

另外,我使用akka的function(比如(循环)路由)在每个系统上创build一个ChatroomManager-actor的多个实例来处理许多请求。

我花了一些时间来设置整个akka远程基础设施结合序列化和redis。 但是现在我能够创build任意数量的服务器应用程序实例,它们使用redis来共享那里的ActorRef (串行化为带有ip +端口的绝对path)。

这可能会帮助你更多一点,我可以提出新的问题(请不要关于我的英语)。

在多台机器上扩展的关键是尽可能保持可变状态。 尽pipe您可以使用分布式caching来协调所有节点的状态,但是在扩展到大量节点时,这会给您带来同步以及瓶颈问题。 理想情况下,应该有一个演员了解聊天室中的消息和参与者。

你的问题的核心是,如果一个聊天室是由一台机器上运行的单个angular色来代表的,或者实际上是这样一个房间存在的话。 诀窍是使用标识符(例如聊天室的名称)路由与给定聊天室相关的请求。 计算名称的散列,并根据数字从n个框中select一个。 该节点将知道其当前聊天室,并可以安全地为您find或创build正确的聊天室演员。

您可以看看以下博客文章,讨论Akka中的集群和扩展:

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-1/

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-2/

我会使用Zookeeper + Norbert来了解哪些主机正在上下移动:

http://www.ibm.com/developerworks/library/j-zookeeper/

现在我的聊天室服务器场中的每个节点都可以知道逻辑集群中的所有主机。 当一个节点脱机(或联机)时,它们会得到一个callback。 任何节点现在都可以保存当前集群成员的sorting列表,散列聊天室标识,并按列表大小调整mod,以获得应该托pipe任何给定聊天室的节点列表中的索引。 我们可以加1和重新散列来select第二个索引(需要一个循环,直到获得一个新的索引)来计算第二个主机来保存聊天室的第二个副本以获得冗余。 两个聊天室的主持人都是一个聊天室的演员,只是把所有的聊天消息转发给每个聊天室成员的Websocket演员。

现在我们可以通过两个活跃的聊天室演员通过自定义的Akka路由器发送聊天消息。 客户端只发送一次消息,路由器将执行哈希模块并发送给两个远程聊天室参与者。 我将使用twitter雪花algorithm为正在发送的消息生成唯一的64位ID。 在下面的链接中查看代码的nextId()方法中的algorithm。 可以使用norbert属性来设置datacenterId和workerId,以确保不会在不同的服务器上生成冲突ID:

https://github.com/twitter/snowflake/blob/master/src/main/scala/com/twitter/service/snowflake/IdWorker.scala

现在,每个消息的两个副本将通过两个活动聊天室演员中的每一个前往每个客户端端点。 在每个Websocket客户端angular色中,我将取消掩饰雪花ID以获取发送消息的datacenterId + workerId编号,并跟踪从集群中每个主机看到的最高聊天消息编号。 然后,我会忽略任何不高于给定发送者主机在给定客户端已经看到的消息。 这将消除通过两个活跃的聊天室演员进入的一对消息。

到现在为止还挺好; 如果有任何节点死亡,我们就不会失去一个幸存的聊天室副本。 消息将自动通过第二个聊天室不间断地stream动。

接下来,我们需要处理从群集中退出的节点或被添加回群集中的节点。 我们将在每个节点中获得一个norbertcallback,以通知我们有关集群成员资格的更改。 在这个callback中,我们可以通过自定义路由器发送一个akka消息,说明新的成员列表和当前的主机名。 当前主机上的自定义路由器将看到该消息并更新其状态,以了解新的集群成员资格以计算新的节点对以通过发送任何给定的聊天室stream量。 这个确认新的集群成员资格将由路由器发送到所有节点,以便每个服务器可以跟踪所有服务器何时赶上了成员变更,并且现在正在正确地发送消息。

会员变更后,幸存的聊天室可能仍然活跃。 在这种情况下,所有节点上的所有路由器都将继续正常发送,但也会向新的第二个聊天室主机发送推测消息。 那第二个聊天室可能还没有起来,但这不是一个问题,因为消息将通过幸存者stream动。 如果幸存的聊天室在成员资格更改后不再处于活动状态,则所有主机上的所有路由器都将首先发送给三台主机; 幸存者和两个新的节点。 可以使用akka死亡监视机制,以便所有节点最终可以看到幸存的聊天室closures以通过两个主机回到路由聊天stream量。

接下来,我们需要根据情况将幸存服务器上的聊天室迁移到一台或两台新主机上。 潮水般的聊天室演员会在某个时候收到一条消息,告诉它有关新的群集成员。 首先将聊天室成员的副本发送到新节点。 此消息将在新节点上创build具有正确成员身份的聊天室演员的新副本。 如果幸存者不再是应该容纳聊天室的两个节点之一,它将进入退役模式。 在退役模式下,它只会将任何消息转发到新的主节点和辅助节点,而不是任何聊天室成员。 阿卡消息转发是完美的。

退役聊天室将听取来自每个节点的诺贝尔集群成员资格确认消息。 最终会看到集群内的所有节点都已经确认了新的集群成员资格。 然后知道它将不再收到任何进一步的消息转发。 它然后可以自杀。 阿卡热交换是完美的退役行为。

到现在为止还挺好; 我们有一个弹性的消息传递设置不会丢失节点崩溃的消息。 在群集成员资格发生变化的地方,我们将得到一个尖峰的intranodestream量,将聊天室复制到新的节点。 我们还有一个节点转发消息的余波,直到所有的服务器都赶上哪个聊天室移动了两个服务器。 如果我们想扩大系统,我们可以等到用户stream量的低点,然后打开一个新的节点。 聊天室会自动重新分配到新的节点。

以上描述是基于阅读以下论文并将其翻译成阿卡概念:

https://www.dropbox.com/s/iihpq9bjcfver07/VLDB-Paper.pdf