2016-02-05 4 views
1

Предположим, что у вас есть сетка G из n x m клеток, где n и m огромны. Кроме того, предположим, что у нас есть множество задач, каждая задача которых принадлежит одной ячейке в G и должна выполняться параллельно (в пуле потоков или другом пуле ресурсов).Пул потоков на ключ в Java

Однако задача, относящаяся к той же самой ячейке, должна выполняться серийно, то есть она должна ждать выполнения предыдущей задачи в одной и той же ячейке.

Как я могу решить эту проблему? Я искал и использовал несколько пулов потоков (Executors, Thread), но не повезло.

Минимальный Рабочий пример

import java.util.Random; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

public class MWE { 

    public static void main(String[] args) { 
     ExecutorService threadPool = Executors.newFixedThreadPool(16); 
     Random r = new Random(); 

     for (int i = 0; i < 10000; i++) { 
      int nx = r.nextInt(10); 
      int ny = r.nextInt(10); 

      Runnable task = new Runnable() { 
       public void run() { 
        try { 
        System.out.println("Task is running"); 
        Thread.sleep(1000); 
        } catch (InterruptedException e) { 
        e.printStackTrace(); 
        } 
       } 
      }; 

      threadPool.submit(new Thread(task)); // Should use nx,ny here somehow 
     } 
    } 

} 
+0

Можете ли вы опубликовать код адаптера для этой сетки? – petey

+0

FYI: проект [tag: netty] использует этот вид планирования в своем основном ядре – Ferrybig

+0

Вы говорите, что в каждой ячейке есть список, содержащий много задач? Итак, это сетка, и каждая ячейка сетки - это список? –

ответ

1

Механизм обратного вызова с синхронизированным блоком может работать эффективно здесь. Я ранее отвечал на аналогичный вопрос here. Есть некоторые ограничения (см. Связанный ответ), но это достаточно просто, чтобы отслеживать, что происходит (хорошая ремонтопригодность). Я адаптировал исходный код и сделал его более эффективным для вашего случая, когда большинство задач будут выполняться параллельно (с n и m огромны), но порой должны быть серийными (когда задача для одной и той же точки в сетка G).

import java.util.*; 
import java.util.concurrent.*; 
import java.util.concurrent.locks.ReentrantLock; 

// Adapted from https://stackoverflow.com/a/33113200/3080094 
public class GridTaskExecutor { 

    public static void main(String[] args) { 

     final int maxTasks = 10_000; 
     final CountDownLatch tasksDone = new CountDownLatch(maxTasks); 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(16); 
     try { 
      GridTaskExecutor gte = new GridTaskExecutor(executor); 
      Random r = new Random(); 

      for (int i = 0; i < maxTasks; i++) { 

       final int nx = r.nextInt(10); 
       final int ny = r.nextInt(10); 

       Runnable task = new Runnable() { 
        public void run() { 
         try { 
          // System.out.println("Task " + nx + "/" + ny + " is running"); 
          Thread.sleep(1); 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } finally { 
          tasksDone.countDown(); 
         } 
        } 
       }; 
       gte.addTask(task, nx, ny); 
      } 
      tasksDone.await(); 
      System.out.println("All tasks done, task points remaining: " + gte.size()); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      executor.shutdownNow(); 
     } 
    } 

    private final Executor executor; 
    private final Map<Long, List<CallbackPointTask>> tasksWaiting = new HashMap<>(); 
    // make lock fair so that adding and removing tasks is balanced. 
    private final ReentrantLock lock = new ReentrantLock(true); 

    public GridTaskExecutor(Executor executor) { 
     this.executor = executor; 
    } 

    public void addTask(Runnable r, int x, int y) { 

     Long point = toPoint(x, y); 
     CallbackPointTask pr = new CallbackPointTask(point, r); 
     boolean runNow = false; 
     lock.lock(); 
     try { 
      List<CallbackPointTask> pointTasks = tasksWaiting.get(point); 
      if (pointTasks == null) { 
       if (tasksWaiting.containsKey(point)) { 
        pointTasks = new LinkedList<CallbackPointTask>(); 
        pointTasks.add(pr); 
        tasksWaiting.put(point, pointTasks); 
       } else { 
        tasksWaiting.put(point, null); 
        runNow = true; 
       } 
      } else { 
       pointTasks.add(pr); 
      } 
     } finally { 
      lock.unlock(); 
     } 
     if (runNow) { 
      executor.execute(pr); 
     } 
    } 

    private void taskCompleted(Long point) { 

     lock.lock(); 
     try { 
      List<CallbackPointTask> pointTasks = tasksWaiting.get(point); 
      if (pointTasks == null || pointTasks.isEmpty()) { 
       tasksWaiting.remove(point); 
      } else { 
       System.out.println(Arrays.toString(fromPoint(point)) + " executing task " + pointTasks.size()); 
       executor.execute(pointTasks.remove(0)); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    // for a general callback-task, see https://stackoverflow.com/a/826283/3080094 
    private class CallbackPointTask implements Runnable { 

     final Long point; 
     final Runnable original; 

     CallbackPointTask(Long point, Runnable original) { 
      this.point = point; 
      this.original = original; 
     } 

     @Override 
     public void run() { 

      try { 
       original.run(); 
      } finally { 
       taskCompleted(point); 
      } 
     } 
    } 

    /** Amount of points with tasks. */ 
    public int size() { 

     int l = 0; 
     lock.lock(); 
     try { 
      l = tasksWaiting.size(); 
     } finally { 
      lock.unlock(); 
     } 
     return l; 
    } 

    // https://stackoverflow.com/a/12772968/3080094 
    public static long toPoint(int x, int y) { 
     return (((long)x) << 32) | (y & 0xffffffffL); 
    } 

    public static int[] fromPoint(long p) { 
     return new int[] {(int)(p >> 32), (int)p }; 
    } 

} 
+0

Спасибо! Кажется, это работает отлично! –

1

Если я вас правильно, вы хотите выполнить X задачи (X очень большой) в Y очередей (Y гораздо меньше, чем X).
Java 8 имеет класс CompletableFuture, который представляет собой (асинхронное) вычисление. В основном, это реализация Java Promise. Вот как можно организовать цепочку вычислений (общие типы опущенные):

// start the queue with a "completed" task 
CompletableFuture queue = CompletableFuture.completedFuture(null); 
// append a first task to the queue 
queue = queue.thenRunAsync(() -> System.out.println("first task running")); 
// append a second task to the queue 
queue = queue.thenRunAsync(() -> System.out.println("second task running")); 
// ... and so on 

При использовании thenRunAsync(Runnable), задачи будут выполняться с использованием пула потоков (есть и другие possibilites - см API docs). Вы также можете предоставить свой собственный пул потоков. Вы можете создать Y таких цепей (возможно, ссылаясь на них в некоторой таблице).

1

Это системы, такие как Akka в java-мире, имеют смысл. Если X и Y большие, вам может потребоваться обработать их с помощью механизма передачи сообщений, а не объединить их в огромную цепочку обратных вызовов и фьючерсов , Один актер имеет список задач, которые предстоит выполнить, и передается ячейка, и актер в конечном итоге вычислит результат и сохранит его. Если что-то не удается на промежуточном этапе, это не конец света.