multithreadingC#应用程序与SQL Server数据库调用

我有一个SQL Server数据库,在表main有50万条logging。 还有另外三个表称为child3child3child3child3child3child3main之间的多对多关系通过三个关系表( main_child1_relationshipmain_child2_relationshipmain_child3_relationship 。 我需要读取main的logging,更新main ,还可以在关系表中插入新行,并在子表中插入新logging。 子表中的logging具有唯一性约束,因此实际计算(CalculateDetails)的伪代码将如下所示:

 for each record in main { find its child1 like qualities for each one of its child1 qualities { find the record in child1 that matches that quality if found { add a record to main_child1_relationship to connect the two records } else { create a new record in child1 for the quality mentioned add a record to main_child1_relationship to connect the two records } } ...repeat the above for child2 ...repeat the above for child3 } 

这适用于单线程应用程序。 但是太慢了。 C#中的处理非常繁重,需要很长时间。 我想把它变成一个multithreading的应用程序。

什么是最好的方法来做到这一点? 我们正在使用Linq到Sql。

到目前为止,我的方法是为main每批logging创build一个新的DataContext对象,并使用ThreadPool.QueueUserWorkItem来处理它。 然而,这些批次是踩在对方的脚趾,因为一个线程添加一条logging,然后下一个线程试图添加相同的一个…我得到各种有趣的SQL Server死锁。

这里是代码:

  int skip = 0; List<int> thisBatch; Queue<List<int>> allBatches = new Queue<List<int>>(); do { thisBatch = allIds .Skip(skip) .Take(numberOfRecordsToPullFromDBAtATime).ToList(); allBatches.Enqueue(thisBatch); skip += numberOfRecordsToPullFromDBAtATime; } while (thisBatch.Count() > 0); while (allBatches.Count() > 0) { RRDataContext rrdc = new RRDataContext(); var currentBatch = allBatches.Dequeue(); lock (locker) { runningTasks++; } System.Threading.ThreadPool.QueueUserWorkItem(x => ProcessBatch(currentBatch, rrdc)); lock (locker) { while (runningTasks > MAX_NUMBER_OF_THREADS) { Monitor.Wait(locker); UpdateGUI(); } } } 

这里是ProcessBatch:

  private static void ProcessBatch( List<int> currentBatch, RRDataContext rrdc) { var topRecords = GetTopRecords(rrdc, currentBatch); CalculateDetails(rrdc, topRecords); rrdc.Dispose(); lock (locker) { runningTasks--; Monitor.Pulse(locker); }; } 

  private static List<Record> GetTopRecords(RecipeRelationshipsDataContext rrdc, List<int> thisBatch) { List<Record> topRecords; topRecords = rrdc.Records .Where(x => thisBatch.Contains(x.Id)) .OrderBy(x => x.OrderByMe).ToList(); return topRecords; } 

CalculateDetails最好由顶部的伪代码解释。

我认为一定有更好的方法来做到这一点。 请帮忙。 非常感谢!

这是我对这个问题的看法:

  • 当使用多个线程在SQL Server或任何数据库中插入/更新/查询数据时,死锁是事实。 你必须假设他们会发生并妥善处理他们。

  • 这并不是说我们不应该试图限制僵局的发生。 但是,了解死锁的基本原因并采取措施防止它们很容易,但是SQL Server总会让您感到惊讶:-)

造成死锁的一些原因:

  • 太多的线程 – 尽量限制线程的数量尽可能less,但我们当然需要更多的线程来获得最大的性能。

  • 没有足够的索引。 如果select和更新不够有select性,SQL将取出比正常更大的范围锁。 尝试指定适当的索引。

  • 索引太多 更新索引导致死锁,所以尽量减less索引到最低要求。

  • 交易隔离级别太高。 使用.NET时的默认隔离级别是“Serializable”,而使用SQL Server的默认级别是“Read Committed”。 减less隔离级别可以帮助很多(当然,如果适当的话)。

这是我可以解决你的问题:

  • 我不会推出我自己的线程解决scheme,我会使用TaskParallel库。 我的主要方法看起来像这样:

     using (var dc = new TestDataContext()) { // Get all the ids of interest. // I assume you mark successfully updated rows in some way // in the update transaction. List<int> ids = dc.TestItems.Where(...).Select(item => item.Id).ToList(); var problematicIds = new List<ErrorType>(); // Either allow the TaskParallel library to select what it considers // as the optimum degree of parallelism by omitting the // ParallelOptions parameter, or specify what you want. Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8}, id => CalculateDetails(id, problematicIds)); } 
  • 执行CalculateDetails方法,重试死锁失败

     private static void CalculateDetails(int id, List<ErrorType> problematicIds) { try { // Handle deadlocks DeadlockRetryHelper.Execute(() => CalculateDetails(id)); } catch (Exception e) { // Too many deadlock retries (or other exception). // Record so we can diagnose problem or retry later problematicIds.Add(new ErrorType(id, e)); } } 
  • 核心CalculateDetails方法

     private static void CalculateDetails(int id) { // Creating a new DeviceContext is not expensive. // No need to create outside of this method. using (var dc = new TestDataContext()) { // TODO: adjust IsolationLevel to minimize deadlocks // If you don't need to change the isolation level // then you can remove the TransactionScope altogether using (var scope = new TransactionScope( TransactionScopeOption.Required, new TransactionOptions {IsolationLevel = IsolationLevel.Serializable})) { TestItem item = dc.TestItems.Single(i => i.Id == id); // work done here dc.SubmitChanges(); scope.Complete(); } } } 
  • 当然,我执行死锁重试助手

     public static class DeadlockRetryHelper { private const int MaxRetries = 4; private const int SqlDeadlock = 1205; public static void Execute(Action action, int maxRetries = MaxRetries) { if (HasAmbientTransaction()) { // Deadlock blows out containing transaction // so no point retrying if already in tx. action(); } int retries = 0; while (retries < maxRetries) { try { action(); return; } catch (Exception e) { if (IsSqlDeadlock(e)) { retries++; // Delay subsequent retries - not sure if this helps or not Thread.Sleep(100 * retries); } else { throw; } } } action(); } private static bool HasAmbientTransaction() { return Transaction.Current != null; } private static bool IsSqlDeadlock(Exception exception) { if (exception == null) { return false; } var sqlException = exception as SqlException; if (sqlException != null && sqlException.Number == SqlDeadlock) { return true; } if (exception.InnerException != null) { return IsSqlDeadlock(exception.InnerException); } return false; } } 
  • 另一种可能性是使用分区策略

如果您的表格自然可以分成几个不同的数据集,那么您可以使用SQL Server分区表和索引 ,也可以手动将现有表分成几组表。 我会build议使用SQL Server的分区,因为第二个选项是混乱的。 另外内置的分区只能在SQL企业版上使用。

如果你可以进行分区,你可以select一个分区scheme,打破你的数据,让我们说8个不同的集合。 现在你可以使用你原来的单线程代码,但有8个线程,每个目标单独的分区。 现在不会有任何(或至less是最less数量的)死锁。

我希望这是有道理的。

概观

问题的根源在于L2S DataContext与entity framework的ObjectContext一样不是线程安全的。 正如在MSDN论坛交stream中所解释的那样,在.NET 4.0中,对于.NET ORM解决scheme中的asynchronous操作的支持仍然悬而未决; 你将不得不推出你自己的解决scheme,正如你发现并不总是容易做的,当你的框架假设单线程。

我将借此机会注意到,L2Sbuild立在ADO.NET的基础之上,ADO.NET本身完全支持asynchronous操作 – 个人而言,我更愿意直接处理下层并自己编写SQL,只是为了确保我完全明白networking上发生了什么。

SQL Server解决scheme?

话虽如此,我必须问 – 这是一个C#解决scheme吗? 如果你可以用一组插入/更新语句来编写你的解决scheme,你可以直接发送SQL,并且你的线程和性能问题会消失。*在我看来,你的问题并不涉及到实际的数据转换但是以.NET为中心来performance它们。 如果从等式中删除了.NET,你的任务变得更简单。 毕竟,最好的解决scheme往往是你写的代码量最小的,对吧? ;)

即使您的更新/插入逻辑不能以严格的集合关系方式表示,SQL Server也有一个内置的机制来迭代logging和执行逻辑 – 虽然它们被公正地用于许多用例,游标可能会事实上适合你的任务。

如果这是一个必须重复发生的任务,那么将其编码为存储过程可能会大大受益。

*当然,长时间运行的SQL带来了自己的问题,如locking升级和索引使用,你将不得不面对。

C#解决scheme

当然,也许这样做在SQL中是不可能的 – 也许你的代码的决定取决于来自其他地方的数据,或者你的项目有一个严格的“不允许SQL”的约定。 你提到了一些典型的multithreading错误,但没有看到你的代码,我真的不能真正帮助他们具体。

用C#做这个事情显然是可行的,但是你需要处理这样一个事实:每一个你打的电话都会有一个固定的延迟。 通过使用池连接,启用多个活动结果集,并使用asynchronousBegin / End方法执行查询,可以减轻networking延迟的影响。 即使所有这些,你仍然不得不接受将数据从SQL Server传输到应用程序的成本。

保持代码免于跨越自身的最好方法之一是尽可能避免在线程之间共享可变数据。 这意味着不会在多个线程之间共享相同的DataContext。 下一个最好的办法是locking所有DataContext访问的共享数据lock代码的关键部分,从第一次读取到最后一次写入。 这种方法可能完全避免multithreading的好处; 你可能会使你的locking更加细化,但是你们应该警告这是一个痛苦的道路。

更好的办法是保持你的操作完全相互独立。 如果你可以在“主要”logging之间划分逻辑,那就很理想 – 也就是说,只要各种子表之间没有关系,只要“main”中的一条logging对另外,你可以像这样在多个线程中分割你的操作:

 private IList<int> GetMainIds() { using (var context = new MyDataContext()) return context.Main.Select(m => m.Id).ToList(); } private void FixUpSingleRecord(int mainRecordId) { using (var localContext = new MyDataContext()) { var main = localContext.Main.FirstOrDefault(m => m.Id == mainRecordId); if (main == null) return; foreach (var childOneQuality in main.ChildOneQualities) { // If child one is not found, create it // Create the relationship if needed } // Repeat for ChildTwo and ChildThree localContext.SaveChanges(); } } public void FixUpMain() { var ids = GetMainIds(); foreach (var id in ids) { var localId = id; // Avoid closing over an iteration member ThreadPool.QueueUserWorkItem(delegate { FixUpSingleRecord(id) }); } } 

显然,这与你的问题中的伪代码一样是一个玩具的例子,但希望它能让你思考如何确定你的任务的范围,使得它们之间没有(或最小的)共享状态。 我认为,这将是正确的C#解决scheme的关键。

编辑响应更新和评论

如果你看到数据一致性问题,我build议强制执行事务语义 – 你可以通过使用System.Transactions.TransactionScope(添加对System.Transactions的引用)来完成。 或者,您可以通过访问内部连接并在其上调用BeginTransaction (或任何DataConnection方法被调用)在ADO.NET级别上执行此操作。

你也提到了僵局。 你对抗SQL Server死锁表明,实际的SQL查询正在彼此的脚趾。 如果不知道实际上通过networking发送了什么内容,则很难详细说明发生了什么以及如何解决问题。 只需说SQL死锁是由SQL查询产生的,而不一定是来自C#线程构造 – 你需要检查到底是什么东西。 我的直觉告诉我,如果每个“主要”logging都是真正独立于其他的,那么就不需要行和表锁,而且Linq to SQL可能是这里的罪魁祸首。

通过将DataContext.Log属性设置为Console.Out之类的东西,可以在代码中获得由L2S发出的原始SQL转储。 虽然我从来没有亲自使用它,但是我明白LINQPad提供了L2Sfunction,您也可以在那里获得SQL。

SQL Server Management Studio将为您带来其他方式 – 使用活动监视器,您可以实时观察locking升级情况。 使用查询分析器,您可以看到SQL Server如何执行查询。 有了这些,你应该能够得到你的代码在服务器端做的一个很好的概念,并反过来如何去解决它。

我build议将所有的XML处理移入SQL服务器。 你所有的僵局不仅会消失,而且你会看到这样的performance,你永远不会想回去。

最好用一个例子来解释。 在这个例子中,我假设XML blob已经进入你的主表(我称之为closet)。 我将假设以下模式:

 CREATE TABLE closet (id int PRIMARY KEY, xmldoc ntext) CREATE TABLE shoe(id int PRIMARY KEY IDENTITY, color nvarchar(20)) CREATE TABLE closet_shoe_relationship ( closet_id int REFERENCES closet(id), shoe_id int REFERENCES shoe(id) ) 

我期望你的数据(仅限于主表)看起来像这样:

 INSERT INTO closet(id, xmldoc) VALUES (1, '<ROOT><shoe><color>blue</color></shoe></ROOT>') INSERT INTO closet(id, xmldoc) VALUES (2, '<ROOT><shoe><color>red</color></shoe></ROOT>') 

那么你的整个任务就像下面这样简单:

 INSERT INTO shoe(color) SELECT DISTINCT CAST(CAST(xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) AS color from closet INSERT INTO closet_shoe_relationship(closet_id, shoe_id) SELECT closet.id, shoe.id FROM shoe JOIN closet ON CAST(CAST(closet.xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) = shoe.color 

但是考虑到你会做很多类似的处理,你可以通过将主Blob声明为XMLtypes来简化你的生活,并进一步简化为:

 INSERT INTO shoe(color) SELECT DISTINCT CAST(xmldoc.query('//shoe/color/text()') AS nvarchar) FROM closet INSERT INTO closet_shoe_relationship(closet_id, shoe_id) SELECT closet.id, shoe.id FROM shoe JOIN closet ON CAST(xmldoc.query('//shoe/color/text()') AS nvarchar) = shoe.color 

还可以进行额外的性能优化,例如预先计算在一个临时表或永久表中重复调用Xpath结果,或者将主表的初始填充转换为BULK INSERT,但是我不认为您真的需要这些成功。

SQL服务器死锁是正常的&在这种情况下预计 – MS的build议是, 这些应该在应用程序端而不是数据库端处理。

但是,如果您确实需要确保只调用一次存储过程,则可以使用sp_getapplock使用sql互斥锁。 这是一个如何实现这个的例子

 BEGIN TRAN DECLARE @mutex_result int; EXEC @mutex_result = sp_getapplock @Resource = 'CheckSetFileTransferLock', @LockMode = 'Exclusive'; IF ( @mutex_result < 0) BEGIN ROLLBACK TRAN END -- do some stuff EXEC @mutex_result = sp_releaseapplock @Resource = 'CheckSetFileTransferLock' COMMIT TRAN 

此问题可以通过LimitedConcurrencyLevelTask​​Scheduler( https://msdn.microsoft.com/ru-ru/library/ee789351 ( v= vs.95).aspx)的帮助来解决。

 public class InOutMessagesController { 

private static LimitedConcurrencyLevelTask​​Scheduler scheduler = new LimitedConcurrencyLevelTask​​Scheduler(1);

  private TaskFactory taskFactory = new TaskFactory(scheduler); private TaskFactory<MyTask<Object[]>> taskFactoryWithResult = new TaskFactory<MyTask<Object[]>>(scheduler); private ConcurrentBag<Task> tasks = new ConcurrentBag<Task>(); private ConcurrentBag<MyTask<Object[]>> tasksWithResult = new ConcurrentBag<MyTask<Object[]>>(); private ConcurrentBag<int> endedTaskIds = new ConcurrentBag<int>(); private ConcurrentBag<int> endedTaskWithResultIds = new ConcurrentBag<int>(); private Task TaskForgetEndedTasks; private static object taskForgetLocker = new object(); #region Conveyor private async void AddTaskVoidToQueue(Task task) { try { tasks.Add(task); await taskFactory.StartNew(() => task.Start()); if (TaskForgetEndedTasks == null) { ForgetTasks(); } } catch (Exception ex) { NLogger.Error(ex); } } private async Task<Object[]> AddTaskWithResultToQueue(MyTask<Object[]> task) { ForgetTasks(); tasksWithResult.Add(task); return await taskFactoryWithResult.StartNew(() => { task.Start(); return task; }).Result; } private Object[] GetEqualTaskWithResult(string methodName) { var equalTask = tasksWithResult.FirstOrDefault(x => x.MethodName == methodName); if (equalTask == null) { return null; } else { return equalTask.Result; } } private void ForgetTasks() { Task.WaitAll(tasks.Where(x => x.Status == TaskStatus.Running || x.Status == TaskStatus.Created || x.Status == TaskStatus.WaitingToRun).ToArray()); lock (taskForgetLocker) { if (TaskForgetEndedTasks == null) { TaskForgetEndedTasks = new Task(ForgetEndedTasks); TaskForgetEndedTasks.Start(); } TaskForgetEndedTasks.Wait(); TaskForgetEndedTasks = null; } } private void ForgetEndedTasks() { try { var completedTasks = tasks.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled); var completedTasksWithResult = tasksWithResult.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled); if (completedTasks.Count() > 0) { foreach (var ts in completedTasks) { if (ts.Exception != null) { NLogger.Error(ts.Exception); if (ts.Exception.InnerException != null) { NLogger.Error(ts.Exception.InnerException); } } endedTaskIds.Add(ts.Id); } if (endedTaskIds.Count != 0) { foreach (var t in endedTaskIds) { Task ct = completedTasks.FirstOrDefault(x => x.Id == t); tasks.TryTake(out ct); } } endedTaskIds = new ConcurrentBag<int>(); } if (completedTasksWithResult.Count() > 0) { foreach (var ts in completedTasksWithResult) { if (ts.Exception != null) { NLogger.Error(ts.Exception); if (ts.Exception.InnerException != null) { NLogger.Error(ts.Exception.InnerException); } } endedTaskWithResultIds.Add(ts.Id); } foreach (var t in endedTaskWithResultIds) { var ct = tasksWithResult.FirstOrDefault(x => x.Id == t); tasksWithResult.TryTake(out ct); } endedTaskWithResultIds = new ConcurrentBag<int>(); } } catch(Exception ex) { NLogger.Error(ex); } } #endregion Conveyor internal void UpdateProduct(List<ProductData> products) { var updateProductDataTask = new Task(() => ADOWorker.UpdateProductData(products)); AddTaskVoidToQueue(updateProductDataTask); } internal async Task<IEnumerable<ProductData>> GetProduct() { string methodName = "GetProductData"; Product_Data[] result = GetEqualTaskWithResult(methodName) as Product_Data[]; if (result == null) { var task = new MyTask<Object[]>(ADOWorker.GetProductData, methodName); result = await AddTaskWithResultToQueue(task) as Product_Data[]; } return result; } } public class ADOWorker { public Object[] GetProductData() { entities = new DataContext(); return entities.Product_Data.ToArray(); } public void UpdateProductData(List<Product_Data> products) { entities = new DataContext(); foreach (Product_Data pr_data in products) { entities.sp_Product_Data_Upd(pr_data); } } } 

这可能是显而易见的,但是循环遍历每个元组,并在您的servlet容器中执行您的工作会涉及大量的每次logging开销。

如果可能,通过将逻辑重写为一个或多个存储过程,将部分或全部处理移动到SQL服务器。

如果

  • 你没有太多的时间花在这个问题上,现在需要它来解决这个问题
  • 您确定您的代码已完成,以便不同的线程不会修改相同的logging
  • 你不害怕

然后…你可以在查询中添加“WITH NO LOCK”,这样MSSQL就不会应用锁。

谨慎使用:)

但无论如何,你没有告诉我们在哪里丢失了时间(单线程版本)。 因为如果是在代码中,我build议你直接写入数据库中的所有内容,以避免连续的数据交换。 如果它在数据库中,我会build议检查索引(太多?),I / O,CPU等。