2014-12-03 2 views
2

Мои задачи содержат некоторый идентификатор. Мне нужен ScheduledExecutionService, который будет выполнять заданные задачи после определенного интервала времени (как это делают все стандартные реализации), но с одним ограничением: он не должен запускать задачу до тех пор, пока не будет завершена предыдущая задача с тем же идентификатором (другие задачи, конечно же, должны выполняться одновременно).ScheduledExecutorService с частичным упорядочением заданий

Другими словами, для данного идентификатора все задачи должны выполняться последовательно.

Есть ли возможность использовать реализацию в стандартной или 3-й партийной библиотеке или простой способ ее реализовать?

Также возвращен ScheduledFuture.cancel должен работать правильно, поскольку запланированное задание может быть отменено выполняемой в настоящее время задачей.

ответ

1

Вот приблизительное представление о том, как я это сделаю. (Не проверено)

public class Solution { 

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1000); 
    //a map of taskQueues. We queue all tasks of same ID. 
    private static final ConcurrentHashMap<Long, BlockingDeque<MyTask>> mapOfTasks = new ConcurrentHashMap<>(1000); 

    public boolean submitWithChecks(Runnable task, long ID, long interval) { 


     final BlockingDeque<MyTask> queue; 
     if(mapOfTasks.containsKey(ID)) queue = mapOfTasks.get(ID); 
     else queue = new LinkedBlockingDeque<>(1000); 

     //At this point we have a valid queue object 

     try { 
      //insert the task into the queue 
      queue.putLast(new MyTask(task, ID, interval)); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
      return false; 
     } 

     //If the queue was already present it will get Updated my previous queue.putLast operation 
     //If the queue was not present, we put it in the map and start a new queueEater thread. 
     if(!mapOfTasks.containsKey(ID)) { 
      mapOfTasks.put(ID, queue); 
      scheduler.submit(new QueueEater(ID)); //start a new task execution queue 
     } 
     return true; 
    } 

    private class QueueEater implements Runnable { 

     //This queueEater will consume the queue with this taskID 
     private final Long ID; 

     private QueueEater(Long id) { 
      ID = id; 
     } 

     @Override 
     public void run() { 

      //QueueEater will poll the mapOfTasks for the queue Object with its associated ID 
      while (mapOfTasks.containsKey(ID)) { 

       final BlockingDeque<MyTask> tasks = mapOfTasks.get(ID); 
       if(tasks.size() == 0) { 
        mapOfTasks.remove(ID); 
        return; //if task queue empty kill this thread; 
       } 

       final MyTask myTask; 
       try { 
        myTask = tasks.takeFirst(); 
        //schedule the task with given interval 
        final Future future = scheduler.schedule(myTask.getTask(), myTask.getInterval(), TimeUnit.SECONDS); 
        future.get(); //wait till this task gets executed before scheduling new task 
       } catch (InterruptedException | ExecutionException e) { 
        e.printStackTrace(); 
       } 
      } 

     } 
    } 

    private class MyTask { 

     private final Runnable task; 
     private final long ID; 
     private final long interval; 

     public long getInterval() { 
      return interval; 
     } 

     public long getID() { 
      return ID; 
     } 

     public Runnable getTask() { 
      return task; 
     } 

     private MyTask(Runnable task, long id, long interval) { 
      this.task = task; 
      ID = id; 
      this.interval = interval; 
     } 
    } 
} 
+1

Я сделал аналогичное решение. https://gist.github.com/vbezhenar/cc49928995c03c1289bb вот код. – vbezhenar

Смежные вопросы