2015-05-14 3 views
2

Я надеялся получить некоторые рекомендации о том, как использовать EventProcessorHost с рабочей ролью. В основном я надеюсь, что процесс EventProcessorHost будет обрабатывать разделы параллельно, и мне интересно, где я должен поместить этот тип кода в рабочую роль, и если у меня отсутствует какой-либо ключ.Azure EventProcessorHost and Worker role

var manager = NamespaceManager.CreateFromConnectionString(connectionString); 
    var desc = manager.CreateEventHubIfNotExistsAsync(path).Result; 
    var client = Microsoft.ServiceBus.Messaging.EventHubClient.CreateFromConnectionString(connectionString, path); 
    var host = new EventProcessorHost(hostname, path, consumerGroup, connectionString, blobStorageConnectionString); 
    EventHubProcessorFactory<EventData> factory = new EventHubProcessorFactory<EventData>(); 
    host.RegisterEventProcessorFactoryAsync(factory); 

Все, что я прочитал говорит EventProcessorHost будет делить разделы на своих собственных, но выше код достаточно, чтобы обработать все разделы асинхронно?

+0

Что класс EventHubProcessorFactory? Типичное использование EventProcessorHost находится здесь: https://github.com/ppatierno/azuresblite-examples/blob/master/IoTEventHubProcessor/Program.cs Вы можете видеть, что класс IoTEventHubProcessor зарегистрирован и он реализует интерфейс IEventProcessor: https: // github.com/ppatierno/azuresblite-examples/blob/master/IoTEventHubProcessor/IoTEventHubProcessor.cs. Вся логика в методе ProcessEventsAsync() достаточно для обработки всех сообщений параллельно на всех разделах. – ppatierno

ответ

3

Вот упрощенная версия того, как мы обрабатываем наш центр событий из рабочей роли. Мы сохраняем экземпляр в роли mainWorker и вызываем IEventProcessor для его обработки.

Таким образом, мы можем назвать его и закрыть его, когда работник Реагирует на отключение событий и т.д.

EDIT:

Что касается обработки его параллельно, класс IEventProcessor будет просто захватить больше 10 событий из центра событий, когда он завершил обработку текущего. Обращайтесь со всеми лизинговыми разделами для вас.

Это синхронный рабочий процесс. Когда я масштабируюсь до нескольких рабочих ролей, я начинаю видеть, как разделы становятся разделенными между экземплярами, и они становятся быстрее и т. Д. Вам нужно будет свернуть свое собственное решение, если вы хотите, чтобы он обрабатывал концентратор событий по-другому.

public class WorkerRole : RoleEntryPoint 
{ 
    private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); 
    private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false); 
    private EventProcessorHost _eventProcessorHost; 

    public override bool OnStart() 
    { 
     ThreadPool.SetMaxThreads(4096, 2048); 
     ServicePointManager.DefaultConnectionLimit = 500; 
     ServicePointManager.UseNagleAlgorithm = false; 
     ServicePointManager.Expect100Continue = false; 

     var eventClient = EventHubClient.CreateFromConnectionString("consumersConnectionString", 
          "eventHubName"); 
     _eventProcessorHost = new EventProcessorHost(Dns.GetHostName(), eventClient.Path, 
       eventClient.GetDefaultConsumerGroup().GroupName, 
       "consumersConnectionString", "blobLeaseConnectionString"); 
     return base.OnStart(); 
    } 

    public override void Run() 
    { 
     try 
     { 
      RunAsync(this._cancellationTokenSource.Token).Wait(); 
     } 
     finally 
     { 
      _runCompleteEvent.Set(); 
     } 
    } 

    private async Task RunAsync(CancellationToken cancellationToken) 
    { 
     // starts processing here 
     await _eventProcessorHost.RegisterEventProcessorAsync<EventProcessor>(); 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      await Task.Delay(TimeSpan.FromMinutes(1)); 
     } 
    } 

    public override void OnStop() 
    { 
     _eventProcessorHost.UnregisterEventProcessorAsync().Wait(); 
     _cancellationTokenSource.Cancel(); 
     _runCompleteEvent.WaitOne(); 
     base.OnStop(); 
    } 
} 

У меня есть несколько процессоров для конкретных разделов (вы можете гарантировать ФИФО этот путь), но вы можете реализовать вы собственную логику легко т.е. пропустить использование класса EventDataProcessor и поиск по словарю в моем примере и просто реализовать некоторую логику в методе ProcessEventsAsync.

public class EventProcessor : IEventProcessor 
{ 
    private readonly Dictionary<string, IEventDataProcessor> _eventDataProcessors; 

    public EventProcessor() 
    { 
     _eventDataProcessors = new Dictionary<string, IEventDataProcessor> 
     { 
      {"A", new EventDataProcessorA()}, 
      {"B", new EventDataProcessorB()}, 
      {"C", new EventDataProcessorC()} 
     } 
    } 

    public Task OpenAsync(PartitionContext context) 
    { 
     return Task.FromResult<object>(null); 
    } 

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     foreach(EventData eventData in messages) 
     { 
      // implement your own logic here, you could just process the data here, just remember that they will all be from the same partition in this block 
      try 
      { 
       IEventDataProcessor eventDataProcessor; 
       if(_eventDataProcessors.TryGetValue(eventData.PartitionKey, out eventDataProcessor)) 
       { 
        await eventDataProcessor.ProcessMessage(eventData); 
       } 
      } 
      catch (Exception ex) 
      { 
       _//log exception 
      } 
     } 
     await context.CheckpointAsync(); 
    } 

    public async Task CloseAsync(PartitionContext context, CloseReason reason) 
    { 
     if (reason == CloseReason.Shutdown) 
      await context.CheckpointAsync(); 
    } 
} 

Пример одного из наших EventDataProcessors

public interface IEventDataProcessor 
{ 
    Task ProcessMessage(EventData eventData); 
} 

public class EventDataProcessorA : IEventDataProcessor 
{ 
    public async Task ProcessMessage(EventData eventData) 
    { 
     // Do Something specific with data from Partition "A" 
    } 
} 

public class EventDataProcessorB : IEventDataProcessor 
{ 
    public async Task ProcessMessage(EventData eventData) 
    { 
     // Do Something specific with data from Partition "B" 
    } 
} 

Надежда это помогает, это было скала для нас до сих пор и легко масштабируется для нескольких экземпляров

Смежные вопросы