2010-04-09 4 views
0

Мне нужно получить несколько объектов из внешней системы. Внешняя система поддерживает несколько одновременных запросов (т. Е. Потоки), но можно затопить внешнюю систему - поэтому я хочу иметь возможность извлекать несколько объектов асинхронно, но я хочу иметь возможность дросселировать количество одновременных асинхронных запросов. т. е. мне нужно получить 100 элементов, но не хочу быть , получая более 25 из них сразу. Когда каждый запрос из 25 завершается, я хочу вызвать другое извлечение, и как только они будут завершены, я хочу вернуть все результаты в том порядке, в котором они были запрошены (т. Е. Нет смысла возвращать результаты до тех пор, пока весь вызов не будет возвращен). Есть ли рекомендованные шаблоны для такого рода вещей?Шаблон для ограничения числа одновременных асинхронных вызовов

Будет ли что-то вроде этого подходящим (псевдокод, очевидно)?

private List<externalSystemObjects> returnedObjects = new List<externalSystemObjects>; 

    public List<externalSystemObjects> GetObjects(List<string> ids) 
    { 
     int callCount = 0; 
     int maxCallCount = 25; 
     WaitHandle[] handles; 

     foreach(id in itemIds to get) 
     { 
      if(callCount < maxCallCount) 
      { 
       WaitHandle handle = executeCall(id, callback); 
       addWaitHandleToWaitArray(handle) 
      } 
     else 
     { 
      int returnedCallId = WaitHandle.WaitAny(handles); 
      removeReturnedCallFromWaitHandles(handles); 
     } 
    } 

    WaitHandle.WaitAll(handles); 

    return returnedObjects; 
    } 

    public void callback(object result) 
    { 
     returnedObjects.Add(result); 
    } 

ответ

1

Рассмотрим список элементов для обработки в очереди, из которых 25 потоков обработки DEQUEUE задач, процесса задачу, добавьте результат затем повторите до тех пор, пока очередь не пуста:

class Program 
    { 
    class State 
    { 
     public EventWaitHandle Done; 
     public int runningThreads; 
     public List<string> itemsToProcess; 
     public List<string> itemsResponses; 
    } 

    static void Main(string[] args) 
    { 
     State state = new State(); 

     state.itemsResponses = new List<string>(1000); 
     state.itemsToProcess = new List<string>(1000); 
     for (int i = 0; i < 1000; ++i) 
     { 
     state.itemsToProcess.Add(String.Format("Request {0}", i)); 
     } 

     state.runningThreads = 25; 
     state.Done = new AutoResetEvent(false); 

     for (int i = 0; i < 25; ++i) 
     { 
     Thread t =new Thread(new ParameterizedThreadStart(Processing)); 
     t.Start(state); 
     } 

     state.Done.WaitOne(); 

     foreach (string s in state.itemsResponses) 
     { 
     Console.WriteLine("{0}", s); 
     } 
    } 

    private static void Processing(object param) 
    { 
     Debug.Assert(param is State); 
     State state = param as State; 

     try 
     { 
     do 
     { 
      string item = null; 
      lock (state.itemsToProcess) 
      { 
      if (state.itemsToProcess.Count > 0) 
      { 
       item = state.itemsToProcess[0]; 
       state.itemsToProcess.RemoveAt(0); 
      } 
      } 
      if (null == item) 
      { 
      break; 
      } 
      // Simulate some processing 
      Thread.Sleep(10); 
      string response = String.Format("Response for {0} on thread: {1}", item, Thread.CurrentThread.ManagedThreadId); 
      lock (state.itemsResponses) 
      { 
      state.itemsResponses.Add(response); 
      } 
     } while (true); 

     } 
     catch (Exception) 
     { 
     // ... 
     } 
     finally 
     { 
     int threadsLeft = Interlocked.Decrement(ref state.runningThreads); 
     if (0 == threadsLeft) 
     { 
      state.Done.Set(); 
     } 
     } 
    } 
    } 

Вы можете сделать то же самое, используя асинхронные обратные вызовы, нет необходимости использовать потоки.

0

Наличие некоторой очереди, подобной структуре для хранения ожидающих запросов, является довольно распространенным шаблоном. В веб-приложениях, где может быть несколько уровней обработки, вы видите подход стиля «воронка», причем ранние части обработки меняются с большими очередями. Также может быть некоторая приоритизация, применяемая к очередям, запросы с более высоким приоритетом перетасовываются в верхнюю часть очереди.

Важное замечание, которое необходимо учитывать в вашем решении, заключается в том, что если скорость поступления запроса выше, чем ваша скорость обработки (это может быть связано с атакой «Отказ в обслуживании» или просто за то, что какая-то часть обработки сегодня необычно медленная) ваши очереди будут увеличиваться без ограничений. Вы должны иметь некоторую политику, такую ​​как немедленное отклонение новых запросов, когда глубина очереди превышает некоторое значение.

Смежные вопросы