2010-05-28 4 views
0

Я начинаю писать свои первые параллельные приложения. Этот разделитель перечислит по IDataReader, потянув за собой chunkSize записей из источника данных.Является ли мой GetEnumerator причиной тупика?

TLDR; версия

private object _Lock = new object(); 
public IEnumerator GetEnumerator() 
{ 
    var infoSource = myInforSource.GetEnumerator(); 
        //Will this cause a deadlock if two threads 
    lock (_Lock) //use the enumator at the same time? 
    { 
     while (infoSource.MoveNext()) 
     { 
      yield return infoSource.Current; 
     } 
    } 
} 

полный код

protected class DataSourcePartitioner<object[]> : System.Collections.Concurrent.Partitioner<object[]> 
{ 
    private readonly System.Data.IDataReader _Input; 
    private readonly int _ChunkSize; 
    public DataSourcePartitioner(System.Data.IDataReader input, int chunkSize = 10000) 
     : base() 
    { 
     if (chunkSize < 1) 
      throw new ArgumentOutOfRangeException("chunkSize"); 
     _Input = input; 
     _ChunkSize = chunkSize; 
    } 

    public override bool SupportsDynamicPartitions { get { return true; } } 

    public override IList<IEnumerator<object[]>> GetPartitions(int partitionCount) 
    { 

     var dynamicPartitions = GetDynamicPartitions(); 
     var partitions = 
      new IEnumerator<object[]>[partitionCount]; 

     for (int i = 0; i < partitionCount; i++) 
     { 
      partitions[i] = dynamicPartitions.GetEnumerator(); 
     } 
     return partitions; 


    } 

    public override IEnumerable<object[]> GetDynamicPartitions() 
    { 
     return new ListDynamicPartitions(_Input, _ChunkSize); 
    } 
    private class ListDynamicPartitions : IEnumerable<object[]> 
    { 
     private System.Data.IDataReader _Input; 
     int _ChunkSize; 
     private object _ChunkLock = new object(); 
     public ListDynamicPartitions(System.Data.IDataReader input, int chunkSize) 
     { 
      _Input = input; 
      _ChunkSize = chunkSize; 
     } 

     public IEnumerator<object[]> GetEnumerator() 
     { 

      while (true) 
      { 
       List<object[]> chunk = new List<object[]>(_ChunkSize); 
       lock(_Input) 
       { 
        for (int i = 0; i < _ChunkSize; ++i) 
        { 
         if (!_Input.Read()) 
          break; 
         var values = new object[_Input.FieldCount]; 
         _Input.GetValues(values); 
         chunk.Add(values); 
        } 
        if (chunk.Count == 0) 
         yield break; 
       } 
       var chunkEnumerator = chunk.GetEnumerator(); 
       lock(_ChunkLock) //Will this cause a deadlock? 
       { 
        while (chunkEnumerator.MoveNext()) 
        { 
         yield return chunkEnumerator.Current; 
        } 
       } 
      } 
     } 

     IEnumerator IEnumerable.GetEnumerator() 
     { 
      return ((IEnumerable<object[]>)this).GetEnumerator(); 
     } 
    } 
} 

Я хотел IEnumerable объект он передается обратно потокобезопасной (MSDN example был так я предполагаю, что PLINQ и TPL может понадобиться его) будет замок на _ChunkLock недалеко от нижняя помощь обеспечивает безопасность потока или вызывает ли тупик? Из документации я не мог сказать, будет ли блокировка выпущена на yeld return.

Также, если есть встроенная функциональность .net, которая будет делать то, что я пытаюсь сделать, я бы скорее использовал ее. И если вы найдете какие-либо другие проблемы с кодом, я был бы признателен.

ответ

1

Одним словом: возможно *.

Если вы всегда использовать этот код в контексте foreach цикла, то вы, вероятно, не ударил затор (если это не возможно, что ваш myInfoSource бесконечно, или что ваш foreach цикл имеет некоторый код это никогда не прекратится), хотя вы можете увидеть замедление.

Более вероятной причиной потенциала (на самом деле, гарантированный) тупике бы это:

var myObject = new YourObject(); 
var enumerator = myObject.GetEnumerator(); 

// if you do this, and then forget about it... 
enumerator.MoveNext(); 

// ...your lock will never be released 

* Я основывая этот ответ на свой начальный блок кода.

1

Я написал тестовую структуру, это не тупик, но вторая нить никогда не получит данные.

static void Main() 
{ 
    En en = new En(); 
    Task.Factory.StartNew(() => 
     { 
      foreach (int i in en) 
      { 
       Thread.Sleep(100); 
       Console.WriteLine("A:" + i.ToString()); 
      } 
     }); 
    Task.Factory.StartNew(() => 
    { 
     foreach (int i in en) 
     { 
      Thread.Sleep(10); 
      Console.WriteLine("B:" +i.ToString()); 
     } 
    }); 
    Console.ReadLine(); 
} 

public class En : IEnumerable 
{ 
    object _lock = new object(); 
    static int i = 0; 
    public IEnumerator GetEnumerator() 
    { 
     lock (_lock) 
     { 
      while (true) 
      { 
       if (i < 10) 
        yield return i++; 
       else 
        yield break; 
      } 
     } 
    } 
} 

Возвращает

A:0 
A:1 
A:2 
A:3 
A:4 
A:5 
A:6 
A:7 
A:8 
A:9 

Вот обновленная версия GetEnumerator, которая должна вести себя правильно.

public IEnumerator<object[]> GetEnumerator() 
{ 

    while (true) 
    { 
     List<object[]> chunk = new List<object[]>(_ChunkSize); 
     _ChunkPos = 0; 
     lock(_Input) 
     { 
      for (int i = 0; i < _ChunkSize; ++i) 
      { 
       if (!_Input.Read()) 
        break; 
       var values = new object[_Input.FieldCount]; 
       _Input.GetValues(values); 
       chunk.Add(values); 
      } 
      if (chunk.Count == 0) 
       yield break; 
     } 
     var chunkEnumerator = chunk.GetEnumerator(); 
     while (true) 
     { 
      object[] retVal; 
      lock (_ChunkLock) 
      { 
       if (chunkEnumerator.MoveNext()) 
       { 
        retVal = chunkEnumerator.Current; 
       } 
       else 
        break; //break out of chunk while loop. 
      } 
      yield return retVal; 
     } 
    } 
} 
Смежные вопросы