2014-11-05 4 views
0

проблема: у нас есть некоторый IP-адрес, тогда мы должны выполнить ping каждого из них по времени, указанному пользователем, например, ping 192.168.0.1 каждые 400 мс, ping 192.168.137.20 каждые 40000 мс и т. д. ... как мы можем обрабатывать подобные ситуации асинхронным образом?обрабатывать большое количество асинхронных заданий

это где я нагрузить целевые показатели информации из базы данных и создать датчик для каждого из них:

public class PingService 
{ 
    private CancellationTokenSource _cancel;   
    private List<PingSensor> _pings; 
    private IRepository<IPDevice> _deviceRepository; 


    public PingService() 
    { 
     _cancel = new CancellationTokenSource(); 
     _pings = new List<PingSensor>(); 
     _deviceRepository = ObjectFactory.GetInstance<IRepository<IPDevice>>(); 
    } 

    public void Start() 
    { 
     Action act = new Action(() => 
     { 
      IQueryable<IPDevice> allDevices = _deviceRepository.GetAll(); 

      foreach (IPDevice device in allDevices) 
      { 
       PingResultCollector collector = new PingResultCollector(device); 
       _pings.Add(new PingSensor(device.Address, collector, device.CheckDuration, _cancel.Token)); 
      } 

      foreach (PingSensor _ping in _pings) 
      { 
       _ping.DoDiscovery(); 
      } 
     }); 

     Task.Factory.StartNew(act); 
    } 

    public void Stop() 
    { 
     _cancel.Cancel(); 
    } 
} 

и это, где мы делаем пинг и ждать задержек, указанных пользователем, собирает отклик и .. .

public class PingSensor 
{ 
    private string _address;   
    Ping _ping; 
    private bool lastRequestReplayed; 
    private int _delay;   
    private CancellationToken _cancellationToken; 
    private PingResultCollector _resultCollector; 


    public PingSensor(string address, PingResultCollector resultCollector, int delay, CancellationToken CancellationToken) 
    { 
     _address = address; 
     _cancellationToken = CancellationToken; 
     _ping = new Ping(); 
     _resultCollector = resultCollector; 

     _ping.PingCompleted += _ping_PingCompleted; 
     _delay = delay; 
    } 

    void _ping_PingCompleted(object sender, PingCompletedEventArgs e) 
    { 
     if (_cancellationToken.IsCancellationRequested) 
      return; 

     lastRequestReplayed = true; 
     //_eventHandler(sender, e); 

     if (_resultCollector != null) 
      _resultCollector.CollectData(new PingStatus() 
      { 
       PingStatusId = Guid.NewGuid(), 
       Address = e.Reply == null ? "" : e.Reply.Address.ToString(), 
       Status = e.Reply == null ? e.Error.Message : e.Reply.Status.ToString(), 
       Target = e.UserState.ToString(), 
       ResponseTime = e.Reply == null ? 0 : e.Reply.RoundtripTime, 
       UpdateTime = DateTime.UtcNow 
      }); 
    } 

    public void DoDiscovery() 
    { 
     lastRequestReplayed = true; 
     Action act = new Action(() => 
     { 
      while (_cancellationToken.IsCancellationRequested!=true) 
      { 
       if (_cancellationToken.IsCancellationRequested) 
        return; 

       byte[] data = new byte[2]; 
       if (lastRequestReplayed) 
       { 
        _ping.SendAsync(_address, 30000, data, _address); 
        lastRequestReplayed = false; 
       } 
       if (_cancellationToken.IsCancellationRequested) 
        return; 
       Thread.Sleep(_delay); 
       if (_cancellationToken.IsCancellationRequested) 
        return; 
      } 
     }); 

     Task.Factory.StartNew(act); 
    } 
} 

и проблема с этим кодом является его создать один поток для каждого датчика, что это значит, если я имел 500 цель для пинг у меня также есть 500 нитей! любое предложение? и поэтому извините за мой плохой английский: D

+0

Важно отметить, что это _will not_ start 500 threads. Это будет ** queue ** 500 задач, среда выполнения решит, сколько потоков будет выполнено для выполнения на основе среды, в которой работает код, но поток 'Task'! =. Поэтому, если вам действительно нужны эти 500 задач, чтобы определенно работать параллельно, то это, вероятно, не очень хорошее решение для вас. – CodingGorilla

+0

thx для вашего комментария, я понимаю это, и я не тестировал его на самом деле с 500 задачами, но для 70 целей я получаю 70 потоков, и это точно моя проблема, некоторые задачи наклонены на долгое время, и это может занять 30 секунд одна задача снова работать – cyberw0lf

+0

У вас есть возможность использовать .Net 4.5? –

ответ

1

я могу продемонстрировать концепцию с рабочим образцом кода .. до вас, чтобы преобразовать в код, который подходит вашему сценарию 100%

using System; 
using System.Collections.Generic; 
using System.Net; 
using System.Net.NetworkInformation; 
using System.Threading.Tasks; 

namespace ConsoleApplication2 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      // Pick your long list of ip address 
      List<IPAddress> ips = new List<IPAddress> 
      { 
       new IPAddress(new byte[] {127, 0, 0, 1}), 
       new IPAddress(new byte[] {198, 252, 206, 16}), 
       new IPAddress(new byte[] {74, 125, 129, 99}), 
       // Add more ips as you like 
      }; 

      // Exactly what do you do with initiated tasks will depend on your specific scenario. 
      List<Task> tps = new List<Task>(); 

      foreach(var ip in ips) 
      { 
       // Delay could vary by IP, but I am hardcoding 10s here. 
       tps.Add(InitiatePings(ip, 10000)); 
      } 

      // Needed so that console app doesn't exit.. 
      Console.ReadLine(); 
     } 

     private static async Task InitiatePings(IPAddress ip, int delay) 
     { 
      while (true) 
      { 
       // Note, this API is different from SendAsync API you are using 
       // You may also want to reuse Ping instance instead of creating new one each time. 
       var result = await new Ping().SendPingAsync(ip); 
       // Process your result here, however you want. 
       Console.WriteLine(result.Address + "-" + result.Status + "-" + result.RoundtripTime); 
       // Assumes that the delay is not absolute, but time between ping, result processing and next ping. 
       await Task.Delay(delay); 
      } 
     } 
    } 
} 
1

Почему не просто установить Timer для каждого IP-адреса, который вы хотите выполнить ping? То есть:

List<System.Threading.Timer> timers = new List<System.Threading.Timer>(); 

Вы добавляете отдельные таймеры с:

timers.Add(new Timer(PingTimerProc, "192.168.1.1", 400, 400)); 
timers.Add(new Timer(PingTimerProc, "10.10.200.50", 42000, 42000)); 
// add other timers 

И ваш прок таймер:

void PingTimerProc(object state) 
{ 
    string ipAddress = (string)state; 
    // do the ping here 
} 

Единственный потенциальный недостаток в том, что при достаточно большом числе IP адреса, которые вам нужно пинговать очень часто, вы рискуете не иметь возможности обрабатывать все события таймера достаточно быстро. Это в конечном итоге приведет к измельчению, потому что у вас будет куча незавершенных потоков.

Если вы делаете это с помощью одной нити, и у вас есть много пингов за такие короткие промежутки времени, вы просто начнете отставать. Это не приведет к убийству вашего процесса.

Если бы я сделал это с помощью одного потока, я бы создал очередь приоритетов IP-адресов и связанные с ними времена пинга. И долговременная задача, установленная для обслуживания этой очереди. Базовая структура будет что-то вроде этого:

class PingJob 
{ 
    string IPAddress; 
    int Period; // milliseconds between pings 
    TimeSpan NextPingTime; 
} 

var queue = new PriorityQueue<PingJob>(); 

// create the ping jobs and add them to the queue 
queue.Add(new PingJob { IPAddress = "192.168.1.1", Period = 400, NextPingTime = TimeSpan.FromMilliseconds(400) }); 
queue.Add(new PingJob { IPAddress = "10.10.200.50", Period = 42000, NextPingTime = TimeSpan.FromMilliseconds(42000) }); 
// etc. 

// start a thread that processes the pings 
var pingTask = Task.Factory.StartNew(PingTaskProc, TaskCreationOptions.LongRunning); 

И прок задачи:

void PingTaskProc() 
{ 
    var pingTimer = Stopwatch.StartNew(); 
    while (true) 
    { 
     var p = queue.Dequeue(); // get ping job 
     if (p.NextPingTime > pingTimer.Elapsed) 
     { 
      // wait for ping time to come up 
      Thread.Sleep(p.NextPingTime - pingTimer.Elapsed); 
     } 
     // ping the IP address 
     DoPing(p.IPAddress); 
     // update its next ping time 
     p.NextPingTime += TimeSpan.FromMilliseconds(p.Period); 
     // And add it back to the queue 
     queue.Add(p); 
    } 
} 

Это предполагает, что у вас есть общая структура данных очереди приоритетов доступны. Есть много доступных, в том числе mine.

Все становится интереснее, если вы можете добавлять и удалять IP-адреса из очереди или изменять их периоды пинга. Но вы этого не указали.

Смежные вопросы