2016-04-12 2 views
2

Я изменяю программное обеспечение сервера Java. Все приложение однопоточное. Одно из моих изменений занимает много времени, поэтому я решил сделать это асинхронно, чтобы избежать замораживания основного потока.Безопасная блокировка потоков Java

Это пример исходного кода (а не реальный код, просто пример):

public class Packet { 
    private final byte[] data = new byte[1024]; 

    public void setData(int index, byte data) { 
     this.data[index] = data; 
    } 

    public byte getData(int index) { 
     return data[index]; 
    } 

    public void sendPacket(ClientConnection clientConnection) { 
     clientConnection.sendPacket(data); 
    } 
} 

В настоящее время это мой код (смотреть на комментарии):

public class Packet { 
    private final byte[] data = new byte[1024]; 

    public void setData(int index, byte data) { 
     synchronized (this) { 
      this.data[index] = data; 
     } 
    } 

    public byte getData(int index) { 
     return data[index]; 
    } 

    public void sendPacket(final ClientConnection clientConnection) { 
     //This state of data should be sent 
     new Thread(new Runnable() { 
      @Override 
      public void run() { 
       //The thread is now running 
       //The main-thread can move on 
       //The main-thread can also modify data now because we are not inside the synchronized block 
       //But it should not because the state of data when the method sendPacket was called should be sent 
       synchronized (Packet.this) { 
        thisTakesMuchTime(data); 
        clientConnection.sendPacket(data); 
       } 
      } 
     }).start(); 
    } 
} 

Что Я действительно ищу что-то вроде этого:

public class Packet { 
    private final byte[] data = new byte[1024]; 

    public void setData(int index, byte data) { 
     //wait for unlock 
     this.data[index] = data; 
    } 

    public byte getData(int index) { 
     return data[index]; 
    } 

    public void sendPacket(final ClientConnection clientConnection) { 
     //lock 
     new Thread(new Runnable() { 
      @Override 
      public void run() { 
       thisTakesMuchTime(data); 
       clientConnection.sendPacket(data); 
       //unlock 
      } 
     }).start(); 
    } 
} 

Вопрос: Какая наилучшая реализация su ch блокировка в Java? Должен ли я сделать это сам с AtomicInteger, например.

Редактировать: Посмотрите на мой ответ для моей текущей реализации.

+1

это не очень понятно, что вы спрашиваете здесь. каков порядок операций? сначала я вызываю 'Packet.setData', а затем' Packet.sendPacket'? что вы хотите сделать, когда пакет был отправлен? –

+0

В общем, я бы не стал реализовывать свой собственный код блокировки: никогда не изобретайте колесо, если вам это не нужно. И есть такие вещи, как ReentrantLock ... материал, который хорошо документирован; и используется многими другими людьми. Выполнение вещей «самостоятельно» всегда несет риск ошибиться. – GhostCat

+0

Ваша блокировка гарантирует, что вы не можете писать в пакет во время его отправки, и вы не можете отправить его во время записи на него (из разных потоков). Это не похоже на лучший вариант. Вы должны реализовать пул для 'clientConnection' так много пакетов может быть передано в одно и то же время. – OldCurmudgeon

ответ

2

Вы можете сделать копию своих данных и отправить копию, чтобы избежать параллелизма.

public class Packet { 
    private final byte[] data = new byte[1024]; 

    public void setData(final int index, final byte data) { 
     this.data[index] = data; 
    } 

    public byte getData(final int index) { 
     return data[index]; 
    } 

    public void sendPacket(final ClientConnection clientConnection) { 
     byte[] dataToSend = new byte[1024]; 
     System.arraycopy(data, 0, dataToSend, 0, 1024); 
     new Thread(new Runnable() { 
      @Override public void run() { 
       clientConnection.sendPacket(dataToSend); 
      } 
     }).start(); 
    } 
} 

Использование CopyOnWriteArrayList аналогично коду сильфона, который также позволяет избежать параллелизма, но не так эффективно (учитывая вы будете называть setData чаще sendPacket):

public class Packet { 
    private byte[] data = new byte[1024]; 

    public void setData(final int index, final byte data) { 
     byte[] newData = new byte[1024]; 
     System.arraycopy(data, 0, newData, 0, 1024); 
     newData[index] = data; 
     this.data = newData; 
    } 

    public byte getData(final int index) { 
     return data[index]; 
    } 

    public void sendPacket(final ClientConnection clientConnection) { 
     new Thread(new Runnable() { 
      @Override public void run() { 
       clientConnection.sendPacket(data); 
      } 
     }).start(); 
    } 
} 
+0

Да, это сработает, но я хочу избежать копирования на основной теме. – stonar96

+0

@AntonK., Что вы имеете в виду? «Все приложение однопоточное». – ericbn

+0

Извините, моя ошибка ... –

1

Самый простой запирать вас может использовать Reentrant Lock, что означает, что если вы попытаетесь приобрести блокировку, когда у вас уже есть, операция будет успешной.

В своем коде, чтобы достичь резьба вы хотите, вы также должны использовать wait() и notify() блокировать основной поток, пока ваш ребенок нить не приобрела замок:

public class Packet { 
    private final ReentrantLock lock = new ReentrantLock(); 
    private final byte[] data = new byte[1024]; 

    public void setData(int index, byte data) { 
     lock.lock(); //wait for unlock 
     try { 
      this.data[index] = data; 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public byte getData(int index) { 
     return data[index]; 
    } 

    public void sendPacket(final ClientConnection clientConnection) { 
     Thread thread = new Thread(new Runnable() { 
      @Override 
      public void run() { 
       lock.lock(); //lock 
       try { 
        synchronized(this) { 
         this.notify(); 
        } 

        thisTakesMuchTime(data); 
        clientConnection.sendPacket(data); 
       } finally { 
        lock.unlock(); //unlock 
       } 
      } 
     }).start(); 

     synchronized(thread) { 
      try { 
       thread.wait(); 
      } catch (InterruptedException e) { 
       //handle 
      } 
     } 
    } 
} 

рассмотреть также с помощью ExecutorService и не создает необработанные объекты Thread.

+0

Я думаю, что вы пропустили блокировку внутри 'setData', правильно? Но он не является функционально идентичным. – stonar96

+0

Произошла некоторая проблема с копировальной пастой, с тех пор я обновился. – Danikov

+0

Хорошо, но первый код по-прежнему не функционально идентичен второму коду. Во втором коде основной поток по-прежнему может вызывать 'setData' после вызова' sendPacket' и до отправки пакета, что на самом деле является моей проблемой. Первый код решит эту проблему. – stonar96

0

Какова наилучшая реализация такой блокировки на Java? Должен ли я сделать это сам с AtomicInteger, например.

Я думаю, что ответ @ ericbn будет работать нормально. Захват копии данных с использованием основного потока, но все же внутри Packet в порядке.

Однако вы беспокоитесь о 1k буфере? real расход здесь не создает копию данных в основном потоке, он разветвляет нить каждый раз, когда вы идете отправить пакет. Это чрезвычайно дорого по сравнению с созданием объекта. Я бы использовал пул потоков и отправлял ему пакетные задания.

// you might want this to be bounded so you don't queue up too many packets 
private final ExecutorService threadPool = Executors.newSingleThreadExecutor(); 
... 

public void sendPacket(ClientConnection clientConnection) { 
    byte[] dataToWrite = new byte[data.length]; 
    System.arraycopy(data, 0, dataToWrite, 0, dataToWrite.length); 
    threadPool.submit(new PacketToWrite(dataToWrite, clientConnection)); 
    // you can clear or reset the `data` array now 
} 

private static class PacketToWrite implements Runnable { 
    private final byte[] dataToWrite; 
    private final ClientConnection clientConnection; 
    public PacketToWrite(byte[] dataToWrite, ClientConnection clientConnection) { 
     this.dataToWrite = dataToWrite; 
     this.clientConnection = clientConnection; 
    } 
    public void run() { 
     thisTakesMuchTime(data); 
     clientConnection.sendPacket(data); 
    } 
} 

Вы отправляете данные по сети, поэтому пропускная способность дополнительного объекта ничто по сравнению с задержкой в ​​сети.

+0

Он будет работать с копией массива данных. Но реальный массив данных намного больше, и пакет отправляется очень часто. Поэтому я действительно хочу избежать копирования массива. У меня уже есть решение с собственной реализацией блокировки. Я знаю, что это заморозит мой основной поток, но только в редких случаях ('setData' вызывается редко). – stonar96

+0

Я только что опубликовал свою текущую реализацию в качестве ответа. – stonar96

0

Моя текущая реализация:

Пакет:

public class Packet { 
    private final Lock lock = new Lock(); 
    private final byte[] data = new byte[1024]; 

    public void setData(int index, byte data) { 
     lock.waitUntilUnlock(); 
     this.data[index] = data; 
    } 

    public byte getData(int index) { 
     return data[index]; 
    } 

    public void sendPacket(final ClientConnection clientConnection) { 
     lock.lock(); 
     new Thread(new Runnable() { // I use an ExecutorService 
      @Override 
       public void run() { 
       thisTakesMuchTime(data); 
       clientConnection.sendPacket(data); 
       lock.unlock(); 
      } 
     }).start(); 
    } 
} 

Lock:

public class Lock { 
    private final AtomicInteger lockCount = new AtomicInteger(); 

    public void lock() { // Main thread 
     lockCount.incrementAndGet(); 
    } 

    public synchronized void unlock() { 
     lockCount.decrementAndGet(); 
     notifyAll(); 
    } 

    public synchronized void waitUntilUnlock() { // Main thread 
     try { 
      while (lockCount.get() > 0) { 
       wait(); 
      } 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 
Смежные вопросы