2015-08-16 3 views
0

Я строю сервер, который отправляет данные через один TCP-сокет для каждого пользователя каждые 2 секунды и в отдельном потоке. Есть также специальные события, которые иногда отправляются вместе с регулярными данными. Иногда данные в нескольких пакетах смешивались, поэтому я создал очередь, чтобы убедиться, что этого не произойдет. Однако проблема все еще существует, мой подход неправильный или что-то не так с моим кодом?Обработка параллелизма Java socket

protected void sendData (byte[] data) { 
     if (isSendingData) { 
      dataQueue.add(data); 
      return; 
     } 

     isSendingData = true; 
     Thread sendThread = new Thread() { 
      public void run() { 
       try { 
        BufferedOutputStream outStream = new BufferedOutputStream(connectionSocket.getOutputStream()); 
        outStream.write(data); 
        outStream.flush(); 

        // check queue, if there are data, send 
        byte[] moreData = null; 
        if (dataQueue.size() > 0) { 
         moreData = dataQueue.remove(0); 
        } 
        isSendingData = false; 
        if (moreData != null) { 
         sendData(moreData); 
        } 
       } 
       catch (Exception e) { 
        System.out.println ("Error sending data to peripheral: " + e); 
        isSendingData = false; 
       } 
      } 
     }; 

     sendThread.start(); 
    } 
+0

Вы удаляете только элемент 0 в dataQueue. Что, если размер> 1? –

+0

создайте свой собственный пакет при отправке данных. У вас есть номер для пакета. –

+0

- это 'sendData', вызываемый одновременно из разных потоков? –

ответ

2

Надлежащая идиома для удаления проблем параллелизма с помощью очереди, чтобы иметь долгоживущую нить запустить бесконечный цикл, который принимает элементы из очереди и обрабатывает их. Как правило, вы будете использовать , блокирующий очередь, чтобы на каждой итерации поток переходил спать до тех пор, пока не будет обработан элемент.

Ваше решение отличается от вышеизложенного во многих аспектах. Например:

if (isSendingData) { 
     dataQueue.add(data); 
     return; 
    } 

    isSendingData = true; 

—, если этот метод вызывается одновременно, это приведет к состоянию гонки: оба потока могут читать isSendingData как ложные, а затем одновременно приступить к передаче данных по сети. Если isSendingData не volatile, у вас также есть гонка данных (полностью отдельно от состояния гонки, описанного выше).

   if (dataQueue.size() > 0) { 
        moreData = dataQueue.remove(0); 
       } 

— это другое состояние гонки: после того, как вы прочитали размер как ноль, другой поток может добавить элемент в очереди. Теперь этот элемент, возможно, никогда не будет обработан. Он будет задерживаться в очереди до тех пор, пока не начнется другой такой поток.

Более очевидный способ, которым ваше решение неверно, заключается в том, что в запущенном вами потоке нет циклов и он предназначен для обработки одного сообщения, а также, возможно, одного дополнительного сообщения в очереди. Это должно быть переработано так, чтобы не было особых случаев, и sendData всегда, безоговорочно, отправляется в очередь и никогда не отправляет его самостоятельно.

1

Я бы сделал это совершенно по-другому. Вы не хотите произвольно длинных очередей в своем приложении.

  • При прослушивании вашего сердечного ритма синхронизируйте свою нишу прослушивания в гнезде.
  • Не нужно посылать что-либо еще.
  • Избавьтесь от очереди, isSendingData и т. Д.
  • Попросите ваше основное приложение синхронизироваться в сокете, когда оно хочет отправить, и просто отправлять, когда это необходимо.
  • Используйте те же самые BufferedOutputStream или BufferedWriter для всех отправок, и промойте их после каждой отправки.
Смежные вопросы