2015-10-02 3 views
0

Я знаю, что на этот вопрос ответил много раз, но я изо всех сил пытаюсь понять, как это работает.Редактируемая очередь задач, выполняющихся в фоновом потоке

Так в моем приложении пользователь должен иметь возможность выбрать элементы, которые будут добавлены в очередь (отображается в ListView с использованием ObservableList<Task>), и каждый элемент должен быть обработан последовательно с помощью ExecutorService.

Кроме того, эта очередь должна быть доступна для редактирования (измените порядок и удалите элементы из списка).

private void handleItemClicked(MouseEvent event) { 
    if (event.getClickCount() == 2) { 
     File item = listView.getSelectionModel().getSelectedItem(); 
     Task<Void> task = createTask(item); 
     facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited 
     Future result = executor.submit(task); 
     // where executor is an ExecutorService of which type? 

     try { 
      result.get(); 
     } catch (Exception e) { 
      // ... 
     } 
    } 
} 

Пробовали с executor = Executors.newFixedThreadPool(1), но у меня нет контроля над очередью.
Я читал о ThreadPoolExecutor и очередях, но я изо всех сил пытаюсь понять это, поскольку я совершенно новый для параллелизма.

Мне нужно запустить этот метод handleItemClicked в фоновом потоке, чтобы пользовательский интерфейс не зависал, как я могу это сделать наилучшим образом?

Подведено: как я могу реализовать очередь задач, которая редактируется и последовательно обрабатывается фоновым потоком?

Пожалуйста, помогите мне понять это

EDIT Используя SerialTaskQueue класс от vanOekel помог мне, теперь я хочу, чтобы связать список задач моей ListView.

ListProperty<Runnable> listProperty = new SimpleListProperty<>(); 
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue 
queueListView.itemsProperty().bind(listProperty); 

Очевидно, что это не сработало, поскольку оно ожидает ObservableList. Есть элегантный способ сделать это?

ответ

1

Простейшим решением, которое я могу придумать, является сохранение списка задач вне исполнителя и использование обратного вызова для подачи исполнителем следующей задачи, если она доступна. К сожалению, он включает синхронизацию в списке задач и AtomicBoolean, чтобы указать выполнение задачи.

Обратный вызов - это просто Runnable, который обертывает исходную задачу для запуска, а затем «обращается», чтобы увидеть, есть ли другая задача для выполнения, и если это так, выполняет ее с использованием (фонового) исполнителя.

Синхронизация необходима для сохранения списка задач в порядке и в известном состоянии. Список задач может быть изменен двумя потоками одновременно: через обратный вызов, выполняемый в потоке исполнителя (фоновом), и через метод handleItemClicked, выполняемый через поток переднего плана пользовательского интерфейса. Это, в свою очередь, означает, что он никогда точно не известен, например, когда список задач пуст. Чтобы сохранить список задач в порядке и в известном фиксированном состоянии, необходима синхронизация списка задач.

Это все еще оставляет неопределенный момент, чтобы решить, когда задача готова к выполнению. Здесь находится AtomicBoolean: набор значений всегда доступен непосредственно и читается любым другим потоком, а метод compareAndSet всегда гарантирует, что только один поток получит «ОК».

Сочетание синхронизации и использования AtomicBoolean позволяет создать один метод с «критическим сектором», который может одновременно вызываться обоими передними и фоновыми потоками для запуска выполнения новой задачи, если возможное. Приведенный ниже код разработан и настроен таким образом, что может существовать один такой метод (runNextTask).Хорошая практика - сделать «критический раздел» в параллельном коде максимально простым и явным (что, в свою очередь, обычно приводит к эффективному «критическому разделу»).

import java.util.*; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicBoolean; 

public class SerialTaskQueue { 

    public static void main(String[] args) { 

     ExecutorService executor = Executors.newSingleThreadExecutor(); 
     // all operations on this list must be synchronized on the list itself. 
     SerialTaskQueue tq = new SerialTaskQueue(executor); 
     try { 
      // test running the tasks one by one 
      tq.add(new SleepSome(10L)); 
      Thread.sleep(5L); 
      tq.add(new SleepSome(20L)); 
      tq.add(new SleepSome(30L)); 

      Thread.sleep(100L); 
      System.out.println("Queue size: " + tq.size()); // should be empty 
      tq.add(new SleepSome(10L)); 

      Thread.sleep(100L); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      executor.shutdownNow(); 
     } 
    } 

    // all lookups and modifications to the list must be synchronized on the list. 
    private final List<Runnable> tasks = new LinkedList<Runnable>(); 
    // atomic boolean used to ensure only 1 task is executed at any given time 
    private final AtomicBoolean executeNextTask = new AtomicBoolean(true); 
    private final Executor executor; 

    public SerialTaskQueue(Executor executor) { 
     this.executor = executor; 
    } 

    public void add(Runnable task) { 

     synchronized(tasks) { tasks.add(task); } 
     runNextTask(); 
    } 

    private void runNextTask() { 
     // critical section that ensures one task is executed. 
     synchronized(tasks) { 
      if (!tasks.isEmpty() 
        && executeNextTask.compareAndSet(true, false)) { 
       executor.execute(wrapTask(tasks.remove(0))); 
      } 
     } 
    } 

    private CallbackTask wrapTask(Runnable task) { 

     return new CallbackTask(task, new Runnable() { 
      @Override public void run() { 
       if (!executeNextTask.compareAndSet(false, true)) { 
        System.out.println("ERROR: programming error, the callback should always run in execute state."); 
       } 
       runNextTask(); 
      } 
     }); 
    } 

    public int size() { 
     synchronized(tasks) { return tasks.size(); } 
    } 

    public Runnable get(int index) { 
     synchronized(tasks) { return tasks.get(index); } 
    } 

    public Runnable remove(int index) { 
     synchronized(tasks) { return tasks.remove(index); } 
    } 

    // general callback-task, see https://stackoverflow.com/a/826283/3080094 
    static class CallbackTask implements Runnable { 

     private final Runnable task, callback; 

     public CallbackTask(Runnable task, Runnable callback) { 
      this.task = task; 
      this.callback = callback; 
     } 

     @Override public void run() { 
      try { 
       task.run(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } finally { 
       try { 
        callback.run(); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 

    // task that just sleeps for a while 
    static class SleepSome implements Runnable { 

     static long startTime = System.currentTimeMillis(); 

     private final long sleepTimeMs; 
     public SleepSome(long sleepTimeMs) { 
      this.sleepTimeMs = sleepTimeMs; 
     } 
     @Override public void run() { 
      try { 
       System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms."); 
       Thread.sleep(sleepTimeMs); 
       System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms."); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 

     private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); } 
    } 
} 

Update: если нужно выполнить серийный группы задач, взгляните на адаптированной реализации here.

+0

спасибо, что помогло мне. Еще один вопрос: каким образом я могу привязать этот список «List tasks = new LinkedList ();' list к моему «ListView»? – lenny

+1

@ lenny42 Я не знаком с 'ObservableList', но можете ли вы сделать это наоборот? Используйте что-то вроде 'tasks = FXCollections.observableArrayList()' (см. Http://stackoverflow.com/a/26195354/3080094)? – vanOekel

+0

@ lenny42 Или используйте [ModifiableObservableListBase] (http://docs.oracle.com/javase/8/javafx/api/javafx/collections/ModifiableObservableListBase.html) и используйте обновленную версию 'SerialTaskQueue' в качестве делегата, как показано на рисунке в примере javadoc? – vanOekel

Смежные вопросы