У меня есть экземпляр класса, доступ к которому осуществляется из нескольких потоков. Этот класс принимает эти вызовы и добавляет кортеж в базу данных. Мне нужно, чтобы это выполнялось последовательным образом, поскольку из-за некоторых ограничений db параллельные потоки могут приводить к несогласованной базе данных.Async Producer/Consumer
Как я новичок в параллелизм и параллелизм в C#, я сделал это:
private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
public void AddDData(string info)
{
Task t = new Task(() => { InsertDataIntoBase(info); });
_tasks.Add(t);
}
private void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_tasks.IsCompleted)
{
Task t;
if (_tasks.TryTake(out t))
{
t.Start();
t.Wait();
}
}
});
}
AddDData
это тот, кто вызывается несколькими потоками и InsertDataIntoBase
очень простая вставка, которая должна занять несколько миллисекунд.
Проблема в том, что по какой-то причине моя нехватка знаний не позволяет мне разобраться, иногда задача называется дважды! Она всегда выглядит следующим образом:
T1 T2 T3 T1 < - ошибка PK. T4 ...
Я понял, что .Take()
совершенно неправ, я что-то упустил, или моя реализация производителя/потребителя действительно плоха?
С наилучшими пожеланиями, Рафаэлем
UPDATE:
Как было предложено, я сделал быстрое выполнение тестовой песочницы с этой архитектурой и, как я подозревал, это не гарантирует, что задача не будет уволен до предыдущего.
Так что вопрос остается: как правильно очереди заданий и запустить их последовательно?
UPDATE 2:
я упростил код:
private BlockingCollection<Data> _tasks = new BlockingCollection<Data>();
public void AddDData(Data info)
{
_tasks.Add(info);
}
private void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_tasks.IsCompleted)
{
Data info;
if (_tasks.TryTake(out info))
{
InsertIntoDB(info);
}
}
});
}
Обратите внимание, что я избавился от задач, как я опирающийся на синхронизированную InsertIntoDB вызов (как внутри цикла), но все равно не повезло ... Генерация в порядке, и я абсолютно уверен, что в очередь попадают только уникальные экземпляры. Но независимо от того, что я пытаюсь, иногда один и тот же объект используется дважды.
Как вы генерируете первичный ключ? –
На самом деле я упростил приведенный здесь код, поскольку данные не являются строкой, а чертовски сложным объектом. PK фактически представляют собой 2 поля объектов (строка имени и значение datetime). Я не контролирую базу данных. –
Я думаю, что для блокировки вызовов достаточно простого 'lock'. – I4V