通过使用BookSleeve的ConnectionUtils.Connect(),使用SignalR和Redis messagebus进行故障转移

我正尝试使用SignalR应用程序创buildRedis消息总线故障切换scheme。

起初,我们尝试了一个简单的硬件负载平衡器故障切换,它只监视两台Redis服务器。 SignalR应用程序指向单一的HLB端点。 然后,我失败了一个服务器,但无法在第二个Redis服务器上成功获取任何消息而没有重新使用SignalR应用程序池。 大概这是因为它需要发出设置命令到新的Redis消息总线。

从SignalR RC1开始, Microsoft.AspNet.SignalR.Redis.RedisMessageBus使用Booksleeve的RedisConnection()连接到一个单独的Redis for pub / sub。

我创build了一个新类RedisMessageBusCluster() ,它使用Booksleeve的ConnectionUtils.Connect()连接到一个Redis服务器集群中的一个。

 using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using BookSleeve; using Microsoft.AspNet.SignalR.Infrastructure; namespace Microsoft.AspNet.SignalR.Redis { /// <summary> /// WIP: Getting scaleout for Redis working /// </summary> public class RedisMessageBusCluster : ScaleoutMessageBus { private readonly int _db; private readonly string[] _keys; private RedisConnection _connection; private RedisSubscriberConnection _channel; private Task _connectTask; private readonly TaskQueue _publishQueue = new TaskQueue(); public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver) : base(resolver) { _db = db; _keys = keys.ToArray(); // uses a list of connections _connection = ConnectionUtils.Connect(serverList); //_connection = new RedisConnection(host: server, port: port, password: password); _connection.Closed += OnConnectionClosed; _connection.Error += OnConnectionError; // Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on _connectTask = _connection.Open().Then(() => { // Create a subscription channel in redis _channel = _connection.GetOpenSubscriberChannel(); // Subscribe to the registered connections _channel.Subscribe(_keys, OnMessage); // Dirty hack but it seems like subscribe returns before the actual // subscription is properly setup in some cases while (_channel.SubscriptionCount == 0) { Thread.Sleep(500); } }); } protected override Task Send(Message[] messages) { return _connectTask.Then(msgs => { var taskCompletionSource = new TaskCompletionSource<object>(); // Group messages by source (connection id) var messagesBySource = msgs.GroupBy(m => m.Source); SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); return taskCompletionSource.Task; }, messages); } private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) { if (!enumerator.MoveNext()) { taskCompletionSource.TrySetResult(null); } else { IGrouping<string, Message> group = enumerator.Current; // Get the channel index we're going to use for this message int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; string key = _keys[index]; // Increment the channel number _connection.Strings.Increment(_db, key) .Then((id, k) => { var message = new RedisMessage(id, group.ToArray()); return _connection.Publish(k, message.GetBytes()); }, key) .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) .ContinueWithNotComplete(taskCompletionSource); } } private void OnConnectionClosed(object sender, EventArgs e) { // Should we auto reconnect? if (true) { ; } } private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) { // How do we bubble errors? if (true) { ; } } private void OnMessage(string key, byte[] data) { // The key is the stream id (channel) var message = RedisMessage.Deserialize(data); _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); } protected override void Dispose(bool disposing) { if (disposing) { if (_channel != null) { _channel.Unsubscribe(_keys); _channel.Close(abort: true); } if (_connection != null) { _connection.Close(abort: true); } } base.Dispose(disposing); } } } 

Booksleeve有自己的机制来确定主机,并自动故障转移到另一台服务器,现在正在用SignalR.Chattesting。

web.config ,我设置了可用服务器的列表:

 <add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/> 

然后在Application_Start()

  // Redis cluster server list string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; List<string> eventKeys = new List<string>(); eventKeys.Add("SignalR.Redis.FailoverTest"); GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys); 

我向Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions添加了两个附加方法:

 public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys) { return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); } public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys) { var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); resolver.Register(typeof(IMessageBus), () => bus.Value); return resolver; } 

现在的问题是,当我有几个断点启用,直到添加用户名后,然后禁用所有断点,应用程序按预期工作。 但是,从一开始就禁用断点,在连接过程中似乎存在一些可能失败的竞争条件。

因此,在RedisMessageCluster()

  // Start the connection _connectTask = _connection.Open().Then(() => { // Create a subscription channel in redis _channel = _connection.GetOpenSubscriberChannel(); // Subscribe to the registered connections _channel.Subscribe(_keys, OnMessage); // Dirty hack but it seems like subscribe returns before the actual // subscription is properly setup in some cases while (_channel.SubscriptionCount == 0) { Thread.Sleep(500); } }); 

我试着添加一个Task.Wait ,甚至还有一个额外的Sleep() (上面没有显示) – 正在等待/等,但仍然有错误。

经常出现的错误似乎在Booksleeve.MessageQueue.cs 71:

 A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821 --- End of inner exception stack trace --- ---> (Inner Exception #0) System.InvalidOperationException: The queue is closed at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<--- public void Enqueue(RedisMessage item, bool highPri) { lock (stdPriority) { if (closed) { throw new InvalidOperationException("The queue is closed"); } 

抛出一个封闭的队列exception的地方。

我预见到另一个问题:由于Redis连接是在Application_Start()所以在“重新连接”到另一个服务器时可能会出现一些问题。 不过,我认为这是有效的,当使用单一的RedisConnection() ,只有一个连接可供select。 然而,随着ConnectionUtils.Connect()的引入,我希望从@dfowler或其他SignalR家伙听到@dfowler如何处理这种情况。

SignalR团队现在已经实现了对使用StackExchange.Redis的自定义连接工厂的支持.Redis是BookSleeve的后继,它支持通过ConnectionMultiplexer进行冗余Redis连接。

遇到的最初的问题是,尽pipe在BookSleeve中创build我自己的扩展方法来接受服务器集合,但是无法进行故障切换。

现在,随着BookSleeve到StackExchange.Redis的发展,我们现在可以在Connect初始化中configuration服务器/端口的集合。

新创build的实现比创buildUseRedisCluster方法的道路要简单得多,现在后端UseRedisCluster支持真正的故障切换:

 var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true"); 

StackExchange.Redis还允许进行附加的手动configuration,如文档的“ Automatic and Manual Configuration部分所述:

 ConfigurationOptions config = new ConfigurationOptions { EndPoints = { { "redis0", 6379 }, { "redis1", 6380 } }, CommandMap = CommandMap.Create(new HashSet<string> { // EXCLUDE a few commands "INFO", "CONFIG", "CLUSTER", "PING", "ECHO", "CLIENT" }, available: false), KeepAlive = 180, DefaultVersion = new Version(2, 8, 8), Password = "changeme" }; 

从本质上讲,现在通过一系列服务器来初始化SignalR横向扩展环境的能力可以解决最初的问题。

Interesting Posts