3

У меня есть экземпляр класса, доступ к которому осуществляется из нескольких потоков. Этот класс принимает эти вызовы и добавляет кортеж в базу данных. Мне нужно, чтобы это выполнялось последовательным образом, поскольку из-за некоторых ограничений db параллельные потоки могут приводить к несогласованной базе данных.Async Producer/Consumer

Как я новичок в параллелизм и параллелизм в C#, я сделал это:

private BlockingCollection<Task> _tasks = new BlockingCollection<Task>(); 

public void AddDData(string info) 
{ 
    Task t = new Task(() => { InsertDataIntoBase(info); }); 
    _tasks.Add(t); 
} 

private void InsertWorker() 
{ 
    Task.Factory.StartNew(() => 
    { 
     while (!_tasks.IsCompleted) 
     { 
      Task t; 
      if (_tasks.TryTake(out t)) 
      { 
       t.Start(); 
       t.Wait(); 
      } 
     } 
    }); 
} 

AddDData это тот, кто вызывается несколькими потоками и InsertDataIntoBase очень простая вставка, которая должна занять несколько миллисекунд.

Проблема в том, что по какой-то причине моя нехватка знаний не позволяет мне разобраться, иногда задача называется дважды! Она всегда выглядит следующим образом:

T1 T2 T3 T1 < - ошибка PK. T4 ...

Я понял, что .Take() совершенно неправ, я что-то упустил, или моя реализация производителя/потребителя действительно плоха?

С наилучшими пожеланиями, Рафаэлем

UPDATE:

Как было предложено, я сделал быстрое выполнение тестовой песочницы с этой архитектурой и, как я подозревал, это не гарантирует, что задача не будет уволен до предыдущего.

enter image description here

Так что вопрос остается: как правильно очереди заданий и запустить их последовательно?

UPDATE 2:

я упростил код:

private BlockingCollection<Data> _tasks = new BlockingCollection<Data>(); 

public void AddDData(Data info) 
{ 
    _tasks.Add(info); 
} 

private void InsertWorker() 
{ 
    Task.Factory.StartNew(() => 
    { 
     while (!_tasks.IsCompleted) 
     { 
      Data info; 
      if (_tasks.TryTake(out info)) 
      { 
       InsertIntoDB(info); 
      } 
     } 
    }); 
} 

Обратите внимание, что я избавился от задач, как я опирающийся на синхронизированную InsertIntoDB вызов (как внутри цикла), но все равно не повезло ... Генерация в порядке, и я абсолютно уверен, что в очередь попадают только уникальные экземпляры. Но независимо от того, что я пытаюсь, иногда один и тот же объект используется дважды.

+1

Как вы генерируете первичный ключ? –

+0

На самом деле я упростил приведенный здесь код, поскольку данные не являются строкой, а чертовски сложным объектом. PK фактически представляют собой 2 поля объектов (строка имени и значение datetime). Я не контролирую базу данных. –

+0

Я думаю, что для блокировки вызовов достаточно простого 'lock'. – I4V

ответ

1

Я думаю, что это должно работать:

private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>(); 

    static void Main(string[] args) 
    { 
     InsertWorker(); 
     GenerateItems(10, 1000); 
     _itemsToProcess.CompleteAdding(); 
    } 

    private static void InsertWorker() 
    { 
     Task.Factory.StartNew(() => 
     { 
      while (!_itemsToProcess.IsCompleted) 
      { 
       string t; 
       if (_itemsToProcess.TryTake(out t)) 
       { 
        // Do whatever needs doing here 
        // Order should be guaranteed since BlockingCollection 
        // uses a ConcurrentQueue as a backing store by default. 
        // http://msdn.microsoft.com/en-us/library/dd287184.aspx#remarksToggle 
        Console.WriteLine(t); 
       } 
      } 
     }); 
    } 

    private static void GenerateItems(int count, int maxDelayInMs) 
    { 
     Random r = new Random(); 
     string[] items = new string[count]; 

     for (int i = 0; i < count; i++) 
     { 
      items[i] = i.ToString(); 
     } 

     // Simulate many threads adding items to the collection 
     items 
      .AsParallel() 
      .WithDegreeOfParallelism(4) 
      .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 
      .Select((x) => 
      { 
       Thread.Sleep(r.Next(maxDelayInMs)); 
       _itemsToProcess.Add(x); 
       return x; 
      }).ToList(); 
    } 

Это означает, что потребитель однопоточный, но позволяет несколько потоков производителей.

+0

Не повезло ... проверьте обновление 2. –

+0

Я пробовал более 10000000 итераций, не получая дубликатов. Подумайте об этом еще немного. – sga101

+3

+1. Обратите внимание, что вы можете упростить своего работника, заменив 'while (! IsCompleted)' и 'TryTake' на один' foreach (строка t в _itemsToProcess.GetConsumingEnumerable()) '. См. [GetConsumingEnumerable] (http://msdn.microsoft.com/en-us/library/dd287186.aspx). –

0

Ваш комментарий

«Я упростил код, показанный здесь, так как данные не является строкой»

Я предполагаю, что info параметр, передаваемый в AddDData является изменяемый ссылочный тип. Убедитесь, что вызывающий абонент не использует один и тот же экземпляр info для многократных вызовов, поскольку эта ссылка зафиксирована в лямбда задания.

+0

@ I4V: см. Правки – alexm

+0

Да, это изменяемый ссылочный тип, и я дважды проверю его (хотя я думаю, что это не проблема). Состояние гонки может потенциально вызвать это поведение, но использование BlockingCollection в качестве буфера не приведет к снижению этого риска? –

+0

Rafa Borges: Я неправильно читаю код - нет условий гонки, так как он гарантирует, что элементы будут выполняться последовательно. Это не исключает возможности использования одного и того же предмета в разных задачах. – alexm

0

Основываясь на следе, который вы предоставили, единственной логической возможностью является то, что вы дважды вызывали InsertWorker (или более). Таким образом, есть два фоновых потока, ожидающих появления элементов в коллекции, и иногда им удается захватить элемент и начать его выполнение.

Смежные вопросы