2009-02-06 5 views
5

У меня есть два потока. Производитель создает куски данных (объекты String), где потребитель обрабатывает эти строки. Уловка в том, что моему приложению нужен только последний обработанный объект данных. Другими словами, если производителю удалось создать две строки «s1», а затем «s2», то я хочу, чтобы потребитель обрабатывал только «s2». «s1» можно безопасно отбросить.Передача рабочего элемента между потоками (Java)

Конечно, нет проблем с реализацией класса, который реализует это поведение, но я хочу использовать стандартный механизм из java.util.concurrent (если такой механизм существует). Обратите внимание, что SynchronousQueue не является хорошим решением: потребитель будет блокировать при запуске «s1» и не получит возможности произвести «s2».

(Короче говоря, я ищу для коллекции одного элемента с блокирующим удалить операцию и неблокирующая множественная операция)

Есть идеи?

ответ

3

Я думаю, что ваш лучший ответ, вероятно, использовать ArrayBlockingQueue, где производитель (у вас есть только один продюсер, правильно?) удаляет любой существующий элемент перед добавлением нового элемента.

Конечно, в этой реализации есть условия гонки: потребитель может начать обработку элемента непосредственно перед тем, как производитель удалит его. Но эти условия гонки всегда будут существовать, независимо от того, какую структуру данных вы используете.

+0

Есть ли действительно состояние гонки? Я думаю, что ReentrantLock внутри ArrayBlockingQueue должен их избегать. –

+0

@Errandir - состояние гонки, о котором я говорил, было, когда в очереди был элемент, ожидающий очереди, и потребитель берет этот элемент непосредственно перед тем, как производитель добавит новый элемент. Это из-за отсутствия лучшего термина - «макро-расы», которое происходит за пределами любой структуры данных, которую вы используете. – kdgregory

0

Вы можете использовать массив размера один за что:

String[] oeq = new String[1]; 

источник Пример:

public class Test { 
    private static final String[] oeq = new String[1]; 
    public static void main(String[] args) { 
     (new Producer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
    } 

    private static class Producer extends Thread { 
     public void run() { 
      int i=0; 
      while(true) { 
       i++; 
       synchronized(oeq) { 
        oeq[0] = ""+i; 
        oeq.notifyAll(); 
       } 
      } 
     } 
    } 

    private static class Consumer extends Thread { 
     public void run() { 
      String workload = null; 
      while(true) { 
       synchronized(oeq) { 
        try { 
         oeq.wait(); 
        } catch(InterruptedException ie) { 
         ie.printStackTrace(); 
        } 
        if(oeq[0] != null) { 
         workload = oeq[0]; 
         oeq[0] = null; 
        } 
       } 
       if(workload != null) { 
        System.out.println(workload); 
       } 
      } 
     } 
    } 
} 
+0

Это будет очень неэффективно. Потребительский поток не должен блокировать процессор, ожидая работы. –

+0

Собственно, потребитель будет потреблять весь процессор, поскольку в цикле нет ожидания() (это может быть то, что вы подразумеваете под «блоком»). Вот почему, я думаю, OP хотел использовать существующий класс JDK - легко написать разбитый доморощенный параллельный объект. – kdgregory

+0

Оба намека верны, я исправил первый, спасибо. –

3

Что относительно класса Exchanger? Это стандартный способ обмена объектами между потоками. Специализируйте его с вашим классом, может быть список строк. Сделайте потребителя только первым/последним.

+0

, вероятно, лучший подход, чем мой, хотя вам нужно установить тайм-аут продюсера на 0 – kdgregory

+0

. Exchanger также блокирует производителя :-( –

+0

, если вы не установите таймаут на 0 (или отрицательное число) – kdgregory

0

Ну, если вам нужна только самая последняя строка, то вам совсем не нужна очередь - все, что вам нужно, это строковая ссылка: производитель устанавливает ее, потребитель читает ее. Если потребитель так долго читает, что продюсер переустанавливает его ... ну и что?

Ссылки на источник и чтение являются атомарными. Единственная проблема - если вы хотите, чтобы потребитель каким-то образом был уведомлен о наличии строки. Но даже тогда ... если потребитель делает что-то, что требует времени, тогда вам действительно не нужны какие-либо причудливые вещи из библиотек параллелизма.

Обратите внимание, кстати, что этот пример работает с любым количеством производителей и/или потребительских потоков.

import java.util.Random; 

public class Example { 
    public static void main(String[] av) { 
     new Example().go(); 
    } 

    Object mutex  = new Object(); 
    String theString = null; 

    void go() { 
     Runnable producer = new Runnable() { 
      public void run() { 
       Random rnd = new Random(); 
       try { 
        for (;;) { 
         Thread.sleep(rnd.nextInt(10000)); 
         synchronized (mutex) { 
          theString = "" + System.currentTimeMillis(); 
          System.out.println("Producer: Setting string to " + theString); 
          mutex.notify(); 
         } 
        } 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 

      } 
     }; 

     Runnable consumer = new Runnable() { 
      public void run() { 
       try { 
        String mostRecentValue = null; 
        Random rnd = new Random(); 
        for (;;) { 
         synchronized (mutex) { 
          // we use == because the producer 
          // creates new string 
          // instances 
          if (theString == mostRecentValue) { 
           System.out.println("Consumer: Waiting for new value"); 
           mutex.wait(); 
           System.out.println("Consumer: Producer woke me up!"); 
          } else { 
           System.out.println("Consumer: There's a new value waiting for me"); 
          } 
          mostRecentValue = theString; 
         } 
         System.out.println("Consumer: processing " + mostRecentValue); 
         Thread.sleep(rnd.nextInt(10000)); 
        } 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 
     }; 


     new Thread(producer).start(); 
     new Thread(consumer).start(); 
    } 
} 
+0

, вам нужно сделать ссылку изменчивой, и работать в 1.5+ JDK, чтобы это было гарантировано (модель памяти Java позволяет потоку сохранять свою собственную копию на неопределенный срок иначе) – kdgregory

+0

Yup - забыли этот бит. – paulmurray

Смежные вопросы