2015-03-31 3 views
5

Я использую ThreadPoolTaskExecutor (весны), чтобы выполнить некоторые задачи асинхронно.очередь пула потоков с уникальными задачами

Необходимая задача будет загружать некоторый объект из внешней БД в мою системную память. Я использую максимальный размер пула потоков 10 и максимальный размер очереди 100.

Предположим, что все 10 потоков заняты получением объектов из моей БД и задача создана, она перейдет в очередь. Теперь создается еще одна задача, которая должна получить тот же объект (тот же ключ в БД) из БД, он также перейдет в очередь (при условии, что все 10 потоков все еще заняты).

Таким образом, моя очередь может быть полностью заполнена дублированными задачами, которые будут выполняться по очереди, и я не хочу, чтобы это произошло.

Я думал, что решение должно прийти в виде уникальной коллекции, которая служит очереди пула потоков. Под капотом ThreadPoolTaskExecutor использует LinkedBlockingQueue, который не обеспечивает уникальности.

Я подумал о нескольких возможных решений, но ни один не удовлетворяет меня:

  • Использование ThreadPoolExecutor вместо ThreadPoolTaskExecutor. ThreadPoolExecutor предоставляет конструктор, который позволяет мне определять тип очереди пула потоков, но ему необходимо реализовать интерфейс BlockingQueue. Я не смог найти реализацию, которая сохранит уникальность.

Это привело меня, чтобы попытаться расширить LinkedBlockingQueue и переопределить добавить:

public boolean add(E e) 
    if(!this.contains(e)) { 
     return super.add(e); 
    } else { 
     return false; 
    } 
} 

Но, насколько я могу сказать, что это приведет к значительному снижению производительности, так как метод contains ограничен О (п) - плохая идея.

Что может решить моя проблема? Я нацелен на хорошую производительность (в случае компрометации производительности памяти я не против отказаться от производительности памяти).

ответ

6

Использование Guava и ListenableFuture вы могли бы сделать что-то подобное (не проверено)

Set<String> uniqueQueue = Sets.newConcurrentHashSet(); 
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100)); 
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); 

String t1 = "abc"; 
if(uniqueQueue.add(t1)) { 
    ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1); 
    Futures.addCallback(future, new FutureCallback<String>() { 
     @Override 
     public void onSuccess(String result) { 
      uniqueQueue.remove(t1); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      uniqueQueue.remove(t1); 
     } 
    }); 
} 

в результате

  • только элементы, которые в настоящее время не обрабатываются или в очереди будут добавлены в очередь (uniqueQueue)
  • элементы, которые были обработаны, будут удалены с uniqueQueue
  • вы будете иметь только Maxium 100 элементов в очереди

эта реализация не обрабатывает

  • Exceptions выброшены методом submit()
  • Максимальное количество элементов в unqiueQueue

Что касается вашего требования по загрузке объектов из базы данных в память, вы можете взглянуть на Guava's Caches.

UPDATE:

+0

Я уже думал об использовании обходного пути, который более или менее похож на ваше предложение (применяя уникальность с использованием дополнительного набора). Если я не получу лучшего ответа в ближайшие дни, я приму ваше. – forhas

+0

, пожалуйста, дайте мне знать, если вы придете к лучшему решению в ближайшие дни. –

+0

Я определенно буду, спасибо. – forhas

0

Если вы разрешили управлять базой данных, я предложил бы использовать саму базу данных с целью предотвращения дублирования усилий:

  • Добавить столбец-столбец в таблицу
  • Добавить статую s столбец в таблицу (возможно, «новый» и «сделано»)
  • Убедитесь, что уровень изоляции DB, по крайней мере READ_COMMITTED

Тогда попробуйте что-то вроде этого, в основном потоке:

Random rand = new Random(); 
int lockId = rand.nextInt(Integer.MAX_VALUE - 1) + 1; 
String update = "UPDATE DB.Table SET lockid=" + lockId + " WHERE lockid=0 AND status='new' " // + AND your conditions + LIMIT ## 
String select = "SELECT * FROM DB.Table WHERE lockid=" + lockId; 
// now execute those sql statements with QueryRunner or whatever you use in-house 

Строки, которые возвращаются из выбранного, это то, что вы добавляете в очередь.

Затем у вас есть класс, который реализует Runnable, который обрабатывает эти строки, путем извлечения их из очереди. Как только он обрабатывает строку, вы выполняете другое обновление SQL (внутри Runnable), чтобы установить lockId обратно в ноль и установить статус «done».

У этого есть преимущество, даже если у вас есть несколько машин, каждый со своей собственной очередью.

+0

Кто сказал, что я использую реляционную БД (я не)? И я не ищу решение для оптимизации БД, это может быть любая задача, выполняемая там (доступ к БД - это только частный случай). – forhas

+0

Это справедливо; Я просто продолжал то, что я видел в вашем вопросе («загрузит какой-то объект из внешней БД»). То, что я написал, может также применяться к не-RDB, в зависимости от ваших настроек согласованности. – spudone

+0

Название вопросов: «очередь пула потоков с уникальными задачами». У тебя были хорошие намерения, но я думаю, что ты пропустил немного. Спасибо, что поделились :) – forhas

1

Раствор аналогично принятому решению, но на основе Spring (в отличие от гуавы):

Создать интерфейс RunnableWithId:

public interface RunnableWithId extends Runnable { 

    /** 
    * @return A unique id for this task 
    */ 
    String getTaskId(); 
} 

Создать другой интерфейс TaskWithIdExecutor:

import org.springframework.core.task.TaskExecutor; 


public interface TaskWithIdExecutor extends TaskExecutor { 

    /** 
    * Executes the given task if it is not queued or already running 
    * 
    * @param task The task to execute 
    */ 
    void executeIfNotQueuedOrRunningAlready(RunnableWithId task); 
} 

Создайте свой собственный исполнитель UniquTaskExecutor:

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 
import org.springframework.util.concurrent.ListenableFuture; 
import org.springframework.util.concurrent.ListenableFutureCallback; 

import java.util.Set; 

/** 
* In addition to all the abilities of ThreadPoolTaskExecutor adds the ability 
* to execute a task only if it is not already running/queued using the 
* executeIfNotQueuedOrRunningAlready method. 
* 
* @see ThreadPoolTaskExecutor 
*/ 
public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor { 

    private Set<String> queuedTasks; 

    public UniquTaskExecutor() { 
     queuedTasks = Sets.newConcurrentHashSet(); 
    } 

    @Override 
    public void execute(Runnable task) { 
     super.execute(task); 
    } 

    /** 
    * @param task The task to execute 
    */ 
    @Override 
    public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) { 
     if (queuedTasks.add(task.getTaskId())) { 
      ListenableFuture<?> res = submitListenable(task); 
      res.addCallback(new ListenableFutureCallback<Object>() { 
       @Override 
       public void onFailure(Throwable throwable) { 
        queuedTasks.remove(task.getTaskId()); 
       } 

       @Override 
       public void onSuccess(Object o) { 
        queuedTasks.remove(task.getTaskId()); 
       } 
      }); 
     } 
    } 
} 

Используйте executeIfNotQueuedOrRunningAlready метод UniquTaskExecutor добиться уникальности в расстрелах задач.

Смежные вопросы