2015-04-30 4 views
2

Я проектирую систему, в которой будут n производители и m потребителей, где n и m - числа, и n! = M.Производитель/Потребитель: производитель/потребитель не должен блокировать другого производителя/потребителя

Я хотел спроектировать систему таким образом, что,

  1. не производитель не должен блокировать другого производителя при производстве
  2. не потребитель должен блокировать других потребителей, когда не потребляя ни
  3. производитель, ни потребитель блока друг друга в то время как производство/потребление

Например: в java, если я использую синхронизированное ключевое слово, он будет блокировать соответствующий вызывающий.

Я не уверен, какую структуру данных и алгоритм я должен использовать для реализации этой системы.

Может ли кто-нибудь предоставить мне помощь/указатели на это?

+0

Вам все равно потребуется синхронизация между актерами. Вы можете минимизировать время блокировки потока, делая как можно меньше под блокировкой (будь то блок 'sync' или использование явной реализации« Lock »). –

+0

ConcurrentLinkedQueue поможет –

ответ

0

Вам нужен подход, при котором каждое действие будет атомарным и бесперебойным, так что да, на мой взгляд, наилучшим подходом было бы использовать синхронный модификатор на методах установки блокировки.

Другой интересный подход будет использовать атомные переменные ->http://baptiste-wicht.com/posts/2010/09/java-concurrency-atomic-variables.html

Это зависит от ваших данных в этих производителей/потребителей структур.

1

В зависимости от того, сколько тяжелой работы вам нужно сделать, и насколько хорошо ваше решение нужно масштабировать, RxJava имеет немного крутую кривую обучения, но как только вы закончите, это, вероятно, самое элегантное, масштабирующее и выполняющее решение ,

Запустите всех ваших производителей в разных потоках, объедините их с Merg(), переместите потребителей туда, где есть собственный поток на несвязанный буфер с .observeOn(Scheduler.newThread()).


Если вам нужно что-то, что хорошо работает параллельно на нескольких системах, смотрите на mapreduce.

Если вам нужно что-то на другом конце спектра (что-то простое), просто придерживайтесь ConcurrentQueue. Это не поддерживает многоадресную рассылку, но, по крайней мере, решает проблему производителя.

0

использование ожидания() и уведомить() для нарезания связи, и может создать n производителя и m потребительские темы

class Q{ 

int n; 

boolean value=false; 

synchronized int get() { 

if(!value) 

try { wait(); } 

catch(InterruptedException e) 

{ System.out.println("thread interrupted"); } 

System.out.println("Got : "+n); 

value=false; 

notify(); 

return n;} 

synchronized void put(int n) { 

if(value) 

try { wait();} 

catch(InterruptedException e) 

{ System.out.println("thread interrupted"); } 

this.n=n; 

value=true; 

System.out.println("Put : "+n); 

notify();}} 

class Producer implements Runnable{ 

Q q; 

Producer(Q q){ 

this.q=q; 

    new Thread(this,"Producer").start();} 

public void run(){ 

int i=0; 

while(true) 

{ 

q.put(i++);}} 

} 

class Consumer implements Runnable{ 

Q q; 

    Consumer(Q q) { 

    this.q=q; 

    new Thread(this,"Consumer").start();} 

    public void run(){ 

    while(true) 

    { 

    q.get(); 

    }}} 

    class PCFixed 

    { 

    public static void main(String ar[]) 

    { 

    Q q=new Q(); 

    new Producer(q); 

    new Consumer(q); 

    System.out.println("PRESS CONTROL-C TO STOP"); 

    } 

    } 

уходит в бесконечность, изменение, основанное на ур требований

2

Вы, наверное, хотите что-то вроде ConcurrentLinkedQueue. Идея состоит в том, что вы создаете одну очередь. Каждый из ваших n-производителей добавляет рабочие объекты в очередь, и каждый из m-пользователей читает рабочие элементы из очереди.Производитель просто:

while not done 
    create work item 
    add work item to queue 

потребитель так же просто:

while not done 
    get next work item from queue 
    process work item 

В ConcurrentLinkedQueue методы обрабатывать добавление и удаление элементов, синхронизация с другими производителями и потребителями по мере необходимости.

Единственный реальный недостаток заключается в том, что вы должны опросить очередь, чтобы увидеть, есть ли предметы. Поэтому вам, вероятно, понадобится событие автоматического сброса, которое срабатывает всякий раз, когда элемент добавляется в очередь. Например:

add work item to queue 
set ItemAvailable event 

И потребитель будет опрашивать очередь, и если ни один элемент не доступен, подождите на мероприятии:

while not done 
    while ((item = queue.poll) == null) 
     wait on ItemAvailable event 
    process item 

Посмотрите на пример, я связан. Это действительно не сложно.

+0

Я бы добавил, что методы CLQ блокируются и не требуют отклика –

Смежные вопросы