2013-08-24 3 views
0

Добрый день.Правильный способ фильтрации BlockBuffer.RecieveAsync

У меня есть TPL Dataflow сетка для RPC вызовов

Он имеет два unkinked потоки, которые в упрощенном виде выглядит следующим образом:

Выходной поток:

  • BlockBuffer для хранения OUTPUT
  • ActionBLock для отправки вывода на сервер и отправки отправленного идентификатора

И входной поток:

  • в то время как цикл, чтобы получать ДАННЫЕ
  • TransformBlock обрабатывать данные
  • BlockBuffer, чтобы сохранить ответ с sentid

есть проблема: когда я делать звонки с отдельной потоки я могу возиться с ответами, поэтому мне нужно отфильтровать его.

мой вызов RPC:

public async Task<RpcAnswer> PerformRpcCall(Call rpccall) 
{ 
    ... 
    _outputRpcCalls.Post(rpccall); 
    long uniqueId = GetUniq(); // call unique id 
    ... 
    var sent = new Tuple<long, long>(uniqueId, 0); 
    while (_sentRpcCalls.TryReceive(u => u.Item1 == uniqueId, out sent)) ; // get generated id from send function 

    return await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(30)); 
} 

, как вы можете видеть, у меня есть UniqueID, которые могут помочь мне определить ответ на этот вызов, , но как я могу фильтровать его и ждать его?

Это хороший способ иметь некоторый массив буферов (возможно, WriteOnceBlock?), Который будет создан в вызове rpc и LinkedTo с фильтром?

ответ

0

Хорошо, я не нашел ни надлежащим образом, так что я сделал грязный обходной

while (true) 
{ 
    answer = await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(5)); 

    if (answer.Success) 
    { 
     if (answer.Answer.Combinator.ValueType.Equals(rpccall.Combinator.ValueType)) 
     { 
      break; 
     } 
     else 
     { 
      // wrong answer - post it back 
      _inputAnswers.Post(answer.Answer); 
     } 

    } 
    else 
    { 
     // answer fail - return it 
     break; 
    } 
} 
0

Один из способов сделать это было бы создать новый блок для каждого идентификатора, и связать его с блоком ответов с предикатом, проверяющим идентификатор и MaxMessages, установленным на 1:

Task<Answer> ReceiveAnswerAsync(int uniqueId) 
{ 
    var block = new BufferBlock<Answer>(); 

    _inputAnswers.LinkTo(
     block, 
     new DataflowLinkOptions { MaxMessages = 1, PropagateCompletion = true }, 
     answer => answer.Id == uniqueId); 

    return block.ReceiveAsync(); 
} 
Смежные вопросы