2014-11-14 3 views
1

При компиляции моего кода ниже он находится в состоянии тупика, и я не знаю, как я могу это исправить. Я пытаюсь написать конвейер как последовательность потоков, связанных вместе как буфер, и каждый поток может прочитать предыдущий узел в конвейере и, следовательно, записать в следующий. Общая цель состоит в том, чтобы проливать случайно созданный arraylist данных более чем на 10 потоков и сортировать их.Является ли мой код в состоянии тупика?

class Buffer{ 
    // x is the current node 
    private int x; 
    private boolean item; 
    private Lock lock = new ReentrantLock(); 
    private Condition full = lock.newCondition(); 
    private Condition empty = lock.newCondition(); 

    public Buffer(){item = false;} 

    public int read(){ 
     lock.lock(); 
     try{ 
      while(!item) 
      try{full.await();} 
      catch(InterruptedException e){} 
      item = false; 
      empty.signal(); 
      return x; 
    }finally{lock.unlock();} 
    } 

    public void write(int k){ 
     lock.lock(); 
     try{ 
     while(item) 
      try{empty.await();} 
      catch(InterruptedException e){} 
     x = k; item = true; 
     full.signal(); 
     }finally{lock.unlock();} 

    } 
} 

class Pipeline extends Thread { 

    private Buffer b; 
    //private Sorted s; 
    private ArrayList<Integer> pipe; // array pipeline 
    private int ub; // upper bounds 
    private int lb; // lower bounds 

    public Pipeline(Buffer bf, ArrayList<Integer> p, int u, int l) { 
     pipe = p;ub = u;lb = l;b = bf;//s = ss; 
    } 

    public void run() { 
     while(lb < ub) { 
      if(b.read() > pipe.get(lb+1)) { 
       b.write(pipe.get(lb+1)); 
      } 

      lb++; 
     } 

     if(lb == ub) { 
      // store sorted array segment 
      Collections.sort(pipe); 
      new Sorted(pipe, this.lb, this.ub); 
     } 
    } 

} 

class Sorted { 

    private volatile ArrayList<Integer> shared; 
    private int ub; 
    private int lb; 

    public Sorted(ArrayList<Integer> s, int u, int l) { 
     ub = u;lb = l;shared = s; 
     // merge data to array from given bounds 
    } 
} 

class Test1 { 
    public static void main(String[] args) { 


     int N = 1000000; 
     ArrayList<Integer> list = new ArrayList<Integer>(); 

     for(int i=0;i<N;i++) { 
      int k = (int)(Math.random()*N); 
      list.add(k); 
     } 

     // write to buffer 
     Buffer b = new Buffer(); 
     b.write(list.get(0)); 

     //Sorted s = new Sorted(); 

     int maxBuffer = 10; 
     int index[] = new int[maxBuffer+1]; 
     Thread workers[] = new Pipeline[maxBuffer]; 

     // Distribute data evenly over threads 
     for(int i=0;i<maxBuffer;i++) 
      index[i] = (i*N)/maxBuffer; 

     for(int i=0;i<maxBuffer;i++) { 
      // create instacen of pipeline 
      workers[i] = new Pipeline(b,list,index[i],index[i+1]); 
      workers[i].start(); 
     } 

     // join threads 
     try { 
      for(int i=0;i<maxBuffer;i++) { 
       workers[i].join(); 
      } 
     } catch(InterruptedException e) {} 

     boolean sorted = true; 

     System.out.println(); 
     for(int i=0;i<list.size()-1;i++) { 
      if(list.get(i) > list.get(i+1)) { 
       sorted = false; 
      } 
     } 

     System.out.println(sorted); 
    } 
} 

ответ

0

При запуске методов запуска все потоки будут блокироваться до тех пор, пока первый поток не достигнет full.await(). то один за другим, все потоки в конечном итоге попадают в full.await(). они будут ждать этого сигнала.

Однако единственное место, где происходит full.signal, - это после того, как один из методов чтения заканчивается. Поскольку этот код никогда не был достигнут (потому что сигнал никогда не срабатывает), вы оказываетесь в ожидании всех потоков.

Короче говоря, только после того, как 1 прочитанный заканчивается, появится триггер записи. Если вы отмените логику, вы начнете пустым, вы напишете в буфер (с сигналом и т. Д. И т. Д.), А затем потоки попытаются прочитать, я ожидаю, что это сработает.

Как правило, вы хотите писать в конвейер перед чтением с него. (или читать нечего).

Надеюсь, я не ошибаюсь в вашем коде, но это то, что я вижу при первом сканировании.

0

Ваш класс Buffer перелистывает режим чтения и записи. Каждое чтение должно сопровождаться записью, прочитанной и т. Д.

Вы пишете буфер сначала в своем методе main.

Теперь одна из ваших нитей достигает if(b.read() > pipe.get(lb+1)) { в Pipeline#run. Если это условие оценивается как false, тогда ничего не записывается. И так как каждый другой поток все равно должен быть тем же самым if(b.read(), вы получите все потоки чтения, которые не могут прогрессировать. Вам нужно либо написать в ветке else, либо разрешить несколько чтений.

Смежные вопросы