У меня есть простая программа, которая выполняет итерации по бесконечному перечислимому, реализованному в качестве счетчика обратной связи. Я реализовал это как в TPL, так и в PLINQ. Оба примера блокируются после предсказуемого количества итераций: 8 для PLINQ и 3 для TPL. Это код выполняется без использования TPL/PLINQ, он работает нормально. Я внедрил перечислитель небезопасным способом, а также потокобезопасным способом. Первое может быть использовано, если степень параллелизма ограничена одним (как в примерах). Нечеловеческий перечислитель очень прост и не полагается на какие-либо «причудливые» классы библиотеки .NET. Если я увеличиваю степень параллелизма, число итераций, которые выполняются до тупика, увеличивается, например, для PLINQ число итераций составляет 8 * степень параллелизма.PLINQ итерация цикла перечислителя вызывает тупик
Вот итераторы:
перечислитель (не поточно)
public class SimpleEnumerable<T>: IEnumerable<T>
{
private T _value;
private readonly AutoResetEvent _releaseValueEvent = new AutoResetEvent(false);
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public IEnumerator<T> GetEnumerator()
{
while(true)
{
_releaseValueEvent.WaitOne();
yield return _value;
}
}
public void OnNext(T value)
{
_value = value;
_releaseValueEvent.Set();
}
}
Перечислитель (поточно)
public class SimpleEnumerable<T>: IEnumerable<T>
{
private readonly BlockingCollection<T> _blockingCollection = new BlockingCollection<T>();
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public IEnumerator<T> GetEnumerator()
{
while(true)
{
yield return _blockingCollection.Take();
}
}
public void OnNext(T value)
{
_blockingCollection.Add(value);
}
}
PLINQ Пример:
public static void Main(string[] args)
{
var enumerable = new SimpleEnumerable<int>();
enumerable.OnNext(0);
enumerable
.Do(i => Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}"))
.AsParallel()
.WithDegreeOfParallelism(1)
.ForEach
(
i =>
{
Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
enumerable.OnNext(i+1);
}
);
}
TPL Пример:
public static void Main(string[] args)
{
var enumerable = new SimpleEnumerable<int>();
enumerable.OnNext(0);
Parallel.ForEach
(
enumerable,
new ParallelOptions { MaxDegreeOfParallelism = 1},
i =>
{
Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
enumerable.OnNext(i+1);
}
);
}
База на моем анализе стека вызовов, оказывается, что существует тупиковый, что происходит в методе связанного управления разделами, как в PLINQ и TPL, но я не являюсь Конечно, как это интерпретировать.
Через пробную версию и ошибку я нашел обертывание PLINQ enumerable
в Partitioner.Create(enumerable, EnumerablePartitionerOptions.NoBuffering)
исправляет проблему, но я не знаю, почему возникает тупик.
Мне было бы очень интересно узнать основную причину ошибки.
Обратите внимание, что это надуманный пример. Я не ищу критики кода, а почему - это тупик. В частности, в примере PLINQ, если строки .AsParallel()
и .WithDegreeOfParallelism(1)
закомментированы, код работает очень хорошо.
PLINQ и Parallel не вызывают взаимоблокировки, они используют текущую нить вместе с N другими для параллельного обработки данных. –
@PanagiotisKanavos. Очевидно, что тупик находится в его итераторе. На первый взгляд, я не удивлен, это, конечно, не выглядит абсолютно безопасным. – Servy
@Servy Я начал с самой очевидной проблемы. Итератор ... очень, очень странная конструкция. Простой 10K-массив из ints будет более чем достаточно, чтобы протестировать параллельное выполнение. 'Interlocked.Increment' будет очень хорошим способом подсчета. Этот итератор, просто блокирует –