2016-06-09 5 views
2

В моем приложении есть 2 фазы, один загружает некоторые большие данные, а другой манипулирует им. поэтому я создал 2 классы, которые реализуют исполняемым: ImageDownloader и ImageManipulator, и они разделяют downloadedBlockingQueue:Java Producer Consumer ArrayBlockingQueue deadlock on take()

 public class ImageDownloader implements Runnable { 

     private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue; 
     private ArrayBlockingQueue<String> imgUrlsBlockingQueue; 

     public ImageDownloader(ArrayBlockingQueue<String> imgUrlsBlockingQueue, ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue) { 

      this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue; 
      this.imgUrlsBlockingQueue = imgUrlsBlockingQueue; 

     } 

     @Override 
     public void run() { 
      while (!this.imgUrlsBlockingQueue.isEmpty()) { 
       try { 
        String imgUrl = this.imgUrlsBlockingQueue.take(); 
        ImageBean imageBean = doYourThing(imgUrl); 
        this.downloadedImagesBlockingQueue.add(imageBean); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 

     public class ImageManipulator implements Runnable { 

     private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue; 
     private AtomicInteger capacity; 

     public ImageManipulator(ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue, 
           AtomicInteger capacity) { 
      this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue; 
      this.capacity = capacity; 
     } 

     @Override 
     public void run() { 
      while (capacity.get() > 0) { 
       try { 
        ImageBean imageBean = downloadedImagesBlockingQueue.take(); // <- HERE I GET THE DEADLOCK 
        capacity.decrementAndGet(); 

       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       // .... 
      } 
     } 
    } 




    public class Main { 
     public static void main(String[] args) { 
      String[] imageUrls = new String[]{"url1", "url2"}; 
      int capacity = imageUrls.length; 

      ArrayBlockingQueue<String> imgUrlsBlockingQueue = initImgUrlsBlockingQueue(imageUrls, capacity); 
      ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue = new ArrayBlockingQueue<>(capacity); 

      ExecutorService downloaderExecutor = Executors.newFixedThreadPool(3); 
      for (int i = 0; i < 3; i++) { 
       Runnable worker = new ImageDownloader(imgUrlsBlockingQueue, downloadedImagesBlockingQueue); 
       downloaderExecutor.execute(worker); 
      } 
      downloaderExecutor.shutdown(); 

      ExecutorService manipulatorExecutor = Executors.newFixedThreadPool(3); 
      AtomicInteger manipulatorCapacity = new AtomicInteger(capacity); 

      for (int i = 0; i < 3; i++) { 
       Runnable worker = new ImageManipulator(downloadedImagesBlockingQueue, manipulatorCapacity); 
       manipulatorExecutor.execute(worker); 
      } 
      manipulatorExecutor.shutdown(); 
      while (!downloaderExecutor.isTerminated() && !manipulatorExecutor.isTerminated()) { 
      } 
     } 
    } 

Тупик происходит потому, что этот сценарий: t1 проверяет способность его 1.

t2 проверяет 1.

t3 проверяет его 1.

t2 принимает, устанавливает мощность 0, продолжить с потоком и в конце концов выходит. t1 и t3 теперь находятся в тупике, потому что не будет добавлено к загружаемомуImagesBlockingQueue.

В конце концов я хочу что-то вроде этого: когда емкость достигнута & & очередь пуста = сломать «while» и закончить изящно.

, чтобы установить «очередь пуста», поскольку только условие не будет работать, в начале пуста оно будет пустым, пока какой-нибудь ImageDownloader не поместит imageBean в очередь.

ответ

0

Ну, я использовал некоторые из предлагаемых функций, но это полное решение для меня, которое не занято ожиданием и ждет, пока Downloader уведомит об этом.

public ImageManipulator(LinkedBlockingQueue<ImageBean> downloadedImagesBlockingQueue, 
         LinkedBlockingQueue<ImageBean> manipulatedImagesBlockingQueue, 
         AtomicInteger capacity, 
         ManipulatedData manipulatedData, 
         ReentrantLock downloaderReentrantLock, 
         ReentrantLock manipulatorReentrantLock, 
         Condition downloaderNotFull, 
         Condition manipulatorNotFull) { 

    this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue; 
    this.manipulatedImagesBlockingQueue = manipulatedImagesBlockingQueue; 
    this.capacity = capacity; 
    this.downloaderReentrantLock = downloaderReentrantLock; 
    this.manipulatorReentrantLock = manipulatorReentrantLock; 
    this.downloaderNotFull = downloaderNotFull; 
    this.manipulatorNotFull = manipulatorNotFull; 
    this.manipulatedData = manipulatedData; 
} 

@Override 
public void run() { 
    while (capacity.get() > 0) { 
     downloaderReentrantLock.lock(); 
     if (capacity.get() > 0) { //checks if the value is updated. 

      ImageBean imageBean = downloadedImagesBlockingQueue.poll(); 

      if (imageBean != null) { // will be null if no downloader finished is work (successfully downloaded or not) 

       capacity.decrementAndGet(); 
       if (capacity.get() == 0) { //signal all the manipulators to wake up and stop waiting for downloaded images. 
        downloaderNotFull.signalAll(); 
       } 
       downloaderReentrantLock.unlock(); 

       if (imageBean.getOriginalImage() != null) { // the downloader will set it null iff it failes to download it. 

        // business logic 
       } 

       manipulatedImagesBlockingQueue.add(imageBean); 

       signalAllPersisters(); // signal the persisters (which has the same lock/unlock as this manipulator. 

      } else { 
       try { 
        downloaderNotFull.await(); //manipulator will wait for downloaded image - downloader will signalAllManipulators (same as signalAllPersisters() here) when an imageBean will be inserted to queue. 
        downloaderReentrantLock.unlock(); 
       } catch (InterruptedException e) { 
        logger.log(Level.ERROR, e.getMessage(), e); 
       } 
      } 
     } 
    } 

    logger.log(Level.INFO, "Manipulator: " + Thread.currentThread().getId() + " Ended Gracefully"); 
} 

private void signalAllPersisters() { 
    manipulatorReentrantLock.lock(); 
    manipulatorNotFull.signalAll(); 
    manipulatorReentrantLock.unlock(); 
} 

Для полного потока вы можете проверить этот проект на моем GitHub: Реализации https://github.com/roy-key/image-service/

0

Вам не нужен capacity у вашего потребителя. Теперь он читается и обновляется в нескольких потоках, что вызывает проблему синхронизации.

  1. initImgUrlsBlockingQueue создает блокировки URL очереди с capacity количеством элементов URL. (Верно?)
  2. ImageDownloader потребляет imgUrlsBlockingQueue и производят изображения, оно заканчивается, когда все адреса будут загружены, или, если capacity означает количество изображений, которые должны быть загружены, потому что может быть какой-то сбой, он заканчивается, когда он добавил capacity номер изображений.
  3. Перед ImageDownloader заканчивается, это добавить маркер к downloadedImagesBlockingQueue, например, нулевой элемент , статический окончательный ImageBean static final ImageBean marker = new ImageBean().
  4. Все ImageManipulator истощает очередь, используя следующую конструкцию, и когда она видит нулевой элемент, она снова добавляет ее в очередь и завершает работу.

    // use identity comparison 
    while ((imageBean = downloadedImagesBlockingQueue.take()) != marker) { 
        // process image 
    } 
    downloadedImagesBlockingQueue.add(marker); 
    

Обратите внимание, что BlockingQueue обещает его метод называют его атомным, однако, если вы проверить его способность первой, и потребляют элемент в соответствии с мощностью, инициативная группа не будет атомарным.

+0

Нет BlockingQueue может иметь нулевые элементы: https://docs.oracle.com/javase/8/docs/api/java /util/concurrent/BlockingQueue.html#add-E- –

+1

Это всего лишь пример. Как я уже сказал, элемент ** маркер **. Вы можете объявить статическое конечное поле «статический конечный маркер ImageBean = новый ImageBean()» и использовать его как маркер и сравнивать, используя сравнение идентичности 'while ((imageBean = скачаноImagesBlockingQueue.take())!)) –

1

There площадь несколько вещей, которые вы можете сделать, чтобы предотвратить затор:

  • Используйте LinkedBlockingQueue, который имеет емкость
  • Используйте offer для добавления в очередь, которая не блокирует
  • Использование drainTo или poll взять предметы из очереди, которые не блокируются

Есть также несколько советов, GHT рассмотреть следующие вопросы:

  • Использовать ThreadPool:
    final ExecutorService executorService = Executors.newFixedThreadPool(4);
  • Если вы используете фиксированный размер ThreadPool вы можете добавить "poison pill" сек, когда вы закончили добавление данных в очереди, соответствующей размеру вашего ThreadPool и проверки это когда вы poll

Использование ThreadPool так просто, как это:

final ExecutorService executorService = Executors.newFixedThreadPool(4); 

    final Future<?> result = executorService.submit(new Runnable() { 
     @Override 
     public void run() { 

     } 
    }); 

Существует также менее известный ExecutorCompletionService, который абстрагирует весь этот процесс. Подробнее here.

-1

Ваша проблема заключается в том, что вы пытаетесь использовать счетчик для отслеживания элементов очереди и не составляете операции, которые должны быть атомарными. Вы делаете чек, принимаете, уменьшаете. Это позволяет размеру очереди и счетчику деинхронизировать, а ваши потоки блокируются навсегда. Было бы лучше написать примитив синхронизации, который является «закрываемым», так что вам не нужно поддерживать соответствующий счетчик. Тем не менее, быстрое решение было бы изменить его так, вы получите и декремент счетчика атомарен:

while (capacity.getAndDecrement() > 0) { 
    try { 
     ImageBean imageBean = downloadedImagesBlockingQueue.take(); 

    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

В этом случае, если есть 3 нити и только один элемент, оставшаяся в очереди, то только один поток будет атомарно декремент счетчик и посмотреть, что он может принимать без блокировки. Оба других потока будут видеть 0 или < 0 и вырваться из цикла.

Вам также необходимо сделать окончательные переменные экземпляра вашего класса окончательными, чтобы они имели правильную видимость памяти. Вы также должны определить, как будете обрабатывать прерывания, а не полагаться на шаблон трассировки печати по умолчанию.

Смежные вопросы