2016-01-17 1 views
1

Требование: У меня есть пакетное задание, которое обрабатывает 1 миллион записей. Я храню 1 миллион записей в Arraylist и повторяю его, чтобы сделать сторонний внешний вызов для каждой записи. Теперь требование - третье лицо отправит HTTP-ответ 200 или 400 или 500. В единственном случае ответ 500, я должен обновить базу данных для этой конкретной записи.Application Design с вызовом базы данных в многопоточной среде

Задача: Чтобы ускорить обработку, я пытаюсь реализовать потоки для стороннего вызова. Но я застрял в том, что после выполнения потоковой обработки, как я могу обработать ответ от стороннего вызова, чтобы сделать обновление базы данных. Я не хочу включать обновление БД в поток, потому что если есть несколько потоков, пытающихся обновить БД, там будет тупик БД.

Мое усилие: То, что я пытался, - объявить одноэлементный arraylist и сохранить номер записи, для которого ответ от стороннего вызова равен 500 в одиночном объекте. Когда все сторонние вызовы будут завершены, я бы перебрал этот одноадресный arraylist для извлечения записей и обновления в БД.

RoadBlock: Даже в этом случае я не могу понять, как я могу сделать последовательность потоков последовательной, чтобы я мог хранить запись в singleton arraylist.

Код:

class callExtPrty implements Runnable{ 

public callExtPrty(String recordNumber) 
    this.recordNumber = recordNumber; 

public void run(){ 

    int response = externalCall(String recordNumber); 
    if response == 500 
      singletonList.add(recordNumber); 

} 

class recordProcessorDAO{ 

    public void processRecords(){ 

    List<String> dbRecordList= new ArrayList<String>(); 

    //DB call to add 1 million records to dbRecordList 

    Iterator<String> recordList = dbRecordList.iterator(); 
    while (recordList.hasNext()) { 

    new callExtPrty(recordList.next()); 

    } 

    //Getting the singleton list populated by the 3rd party call 
    Iterator<String> singletonList = singletonList.iterator(); 
    while (singletonList .hasNext()) { 

     //DB call to update the record fetched from singletonList 

    } 
} 

Может кто-нибудь помочь мне в получении этого разработан надлежащим образом. Реализация Threading должна быть реализована для повышения производительности, так как работа обрабатывает 1 миллион записей за один проход, а задание работает около 12-13 часов.

Благодаря

ответ

0

Вы должны сделать HTTP-вызовы в многопоточном образом, как вы уже сделали. Вместо того, чтобы сделать его работоспособным, вы можете использовать для этого ExecutorService. Это просто проще поддерживать код таким образом.

Что касается обновления базы данных, то, вы должны партии эти обновления и применять их в одном кадре, попробуйте сделать запрос следующим образом: UDDATE Table SET Column=Value WHERE KEY IN(a,b,c,d). Индексируйте ключ, если он еще не проиндексирован.

На данный момент эти значения сохраняются в памяти, если вы не хотите хранить его в памяти, чтобы сделать его отказоустойчивым и повторно выполняемым, вы можете использовать некоторые внешние кеши, такие как Redis, где хранится запрос HTTP: ответы как ключевое значение, и вы можете искать его вместо того, чтобы делать HTTP-вызов в случае, если ваш код ломается/сбой системы, и вам нужно снова запустить все это.

Batching Logic: Предположим, вы получили X-номер HTTP-ответов, из которых Y - HTTP: 500. Теперь вы обновляете базу данных для каждого Y = 1000. Это уменьшит количество запросов к базе данных, которые вы стремитесь значительно.

Это будет сделано в одном основном потоке, который получает обратные вызовы других потоков, обрабатывающих HTTP-вызовы. Таким образом, нет никаких шансов на запись нескольких потоков в БД.

Еще одно предложение: используйте пул соединений, и вы можете кэшировать результаты HTTP-вызовов локально, так как вы говорили о структуре данных списка, которая может иметь дубликаты, вы в конечном итоге сохраните некоторые HTTP-вызовы.

0

Вам просто нужно

  • Divide работы для каждого потока, так что никакие два потока не разделяют ту же самую работу

  • подождать, пока все нити, чтобы закончить, а затем один из потоков должен решить проблема.

  • Не забудьте сообщить в другой раздел, что проблема решена, поэтому они прекращают поиск.

Пример:

private CopyOnWriteArrayList list; 

private class Shared<T> { 
    private T data; 
    public synchronized T getData() { return data; } 
    public synchronized void setData(T data) { this.data = data; } 
} 

public boolean multiThreadedSearch(final int value) { 
    int numThreads = 4; 
    int threadWork = list.size()/numThreads; 
    final Shared<Boolean> found = new Shared<>(); 
    found.setData(false); 
    Thread[] threads = new Thread[numThreads]; 
    for (int i = 0; i < numThreads; ++i) { 
     final int myStart = i * threadWork; 
     final int myEnd = i == numThreads - 1 ? 
      list.size() : (i + 1) * threadWork; 
     threads[i] = new Thread(new Runnable() { 
     public void run() { 
      for (int k = myStart; k < myEnd && !found.getData(); ++k) { 
       if (list.get(k) == value) { 
        found.setData(true); 
       } 
      } 
     } 
     }); 
    } 
    for (Thread t : threads) t.start(); 
    //now wait them to finish 
    for (Thread t : threads) { 
     try { 
     t.join(); 
     } catch (InterruptedException ex) { 
     } 
    } 
    return found.getData(); 
} 

Вы можете позвонить multiThreadedSearch в отдельном потоке или в главном потоке.

0

Вам не нужно класть все в арраист, и вы можете сделать многопоточность намного проще.

Основная стратегия: Вы собираетесь потратить большую часть времени на чтение из соединения с базой данных, кучу времени, ожидающего возвращения каждого запроса, а затем кучу времени, выполняющего вызовы базы данных для ответов HTTP 500. Таким образом, лучший способ разделить это:

Сделайте ThreadPoolExecutor с кучей потоков (скрипте с ним, чтобы найти нужный размер, я бы начал около 8 рабочих потоков), caller запускает политику и SynchonousQueue чтобы накормить его. Здесь ничего сложного.

Запустите свой первоначальный запрос. Как строки приходят, вызов выполнить(), передавая ему исполняемый(), которая выполняет следующие функции для каждой строки базы данных в результатах запроса:

1) сделать запрос HTTP с данными из записи базы данных

2) посмотреть результат

3) сделать запрос базы данных, чтобы обновить запись, если необходимо. Вся эта часть должна находиться в синхронизированном блоке, основанном на чем-то простом и уникальном, таком как ID записи БД. Таким образом, вы не получите двух потоков, обновляющих одну и ту же запись db одновременно.

И все готово.

ThreadPoolExecutor имеет afterExecute() для обработки ошибок или вы можете попытаться поймать метод run(), который еще проще.

0

Вы должны использовать механизм обратного вызова с FutureTask.

Решение вашей проблемы:

  1. newWorkStealingPool Создать из Executors или использовать ForkJoinPool с количеством процессорных ядер, как размер.

  2. В вашей задаче Callable или Runnable, добавьте бизнес-логику с классом Callback.

  3. Когда вы получаете ошибку от стороннего API, вызовите метод класса Callback.

Похожие SE вопросы:

Executing Java callback on a new thread

Java executors: how to be notified, without blocking, when a task completes?

Смежные вопросы