Редактировать: установка, приведенная ниже, по-прежнему будет работать на кластере, если вы используете программу очередей, которая может координировать работу нескольких процессов/серверов, например. RabbitMQ.
Вы также можете использовать более простой алгоритм очередности, который использует только базу данных, а недостатком является то, что он требует опроса (тогда как система, такая как RabbitMQ, позволяет потокам блокироваться до тех пор, пока сообщение не будет доступно). Создайте таблицу Requests со столбцом для уникального requestId
s (например, случайным UUID), который действует как первичный ключ, столбец timestamp
, столбец respourceType
и целое число requestedQuantity
. Вам также понадобится таблица журналов с уникальным столбцом requestId
, который действует как первичный ключ, столбец timestamp
, столбец resourceType
, целое число requestQuantity
и столбец boolean/tinyint/whatever success
.
Когда клиент запрашивает количество ресурсовXX, он генерирует случайный UUID и добавляет строку в таблицу Requests, используя UUID в качестве requestId, а затем обследует таблицу журналов для requestId. Если столбец success
равен true, запрос выполнится, иначе он не сработает.
Сервер с базой данных назначает один поток или процесс каждому ресурсу, например. ProcessX отвечает за ResourceX. ProcessX извлекает все строки из таблицы Requests, где resourceType = ResourceX
, сортируется по метке времени, а затем удаляет их из Requests; он затем обрабатывает каждый запрос в порядке, уменьшая счетчик в памяти для каждого успешного запроса, а в конце обработки запросов он обновляет количество ресурсовX в таблице ресурсов. Затем он записывает каждый запрос и свой статус success
в таблицу журналов. Затем он извлекает все запросы из запросов, где requestType = RequestX
снова и т. Д.
Может быть несколько более эффективным использование целочисленного значения autoincrement в качестве первичного ключа Requests и для сортировки ProcessX по первичному ключу, а не по метке времени.
Одним из вариантов является назначение одного DAOThread
каждого ресурса - это нить единственное, что доступ к таблице базы данных ресурса, так что нет блокировки на уровне базы данных. Worker
(например,веб-сеансы) запрашивает количество ресурсов, используя параллельную очередь. В приведенном ниже примере используется Java BlockingQueue, но большинство языков будет иметь некоторую реализацию параллельной очереди, которую вы можете использовать.
public class Request {
final int value;
final BlockingQueue<ReturnMessage> queue;
}
public class ReturnMessage {
final int value;
final String resourceType;
final boolean isSuccess;
}
public class DAOThread implements Runnable {
private final int MAX_CHANGES = 10;
private String resourceType;
private int quantity;
private int changeCount = 0;
private DBTable table;
private BlockingQueue<Request> queue;
public DAOThread(DBTable table, BlockingQueue<Request> queue) {
this.table = table;
this.resourceType = table.select("resource_type");
this.quantity = table.select("quantity");
this.queue = queue;
}
public void run() {
while(true) {
Requester request = queue.take();
if(request.value <= quantity) {
quantity -= request.value;
if(++changeCount > MAX_CHANGES) {
changeCount = 0;
table.update("quantity", quantity);
}
request.queue.offer(new ReturnMessage(request.value, resourceType, true));
} else {
request.queue.offer(new ReturnMessage(request.value, resourceType, false));
}
}
}
}
public class Worker {
final Map<String, BlockingQueue<Request>> dbMap;
final SynchronousQueue<ReturnMessage> queue = new SynchronousQueue<>();
public class WorkerThread(Map<String, BlockingQueue<Request>> dbMap) {
this.dbMap = dbMap;
}
public boolean request(String resourceType, int value) {
dbMap.get(resourceType).offer(new Request(value, queue));
return queue.take();
}
}
Рабочие отправляют запросы ресурсов в соответствующую очередь DAOThread; DAOThread обрабатывает эти запросы по порядку, либо обновляя количество локального ресурса, если значение запроса не превышает количество и возвращает Success, оставив неизменным количество и возвращая отказ. База данных обновляется только после десяти обновлений, чтобы уменьшить количество ввода-вывода; чем больше MAX_CHANGES, тем сложнее будет восстановить системный сбой. Вы также можете иметь выделенный IOThread, который выполняет всю запись в базе данных - таким образом вам не нужно дублировать какие-либо протоколирование или синхронизацию (например, там должен быть таймер, который каждые несколько секунд сбрасывает текущее количество в базу данных).
Работник использует SynchronousQueue для ожидания ответа от DAOThread (SynchronousQueue - это BlockingQueue, который может содержать только один элемент); если Рабочий работает в своем потоке, вы можете заменить его стандартным многопозиционным BlockingQueue, чтобы Рабочий мог обрабатывать ReturnMessages в любом порядке.
Есть несколько баз данных, например. Riak, которые имеют встроенную поддержку счетчиков, поэтому это может улучшить ваш уровень ввода-вывода и уменьшить или устранить необходимость в MAX_CHANGES.
Вы можете дополнительно увеличить пропускную способность, введя BufferThread
s для буферизации запросов к DAOThread
.
public class BufferThread implements Runnable {
final SynchronousQueue<ReturnMessage> returnQueue = new SynchronousQueue<>();
final int BUFFERSIZE = 10;
private DAOThread daoThread;
private BlockingQueue<Request> queue;
private ArrayList<Request> buffer = new ArrayList<>(BUFFERSIZE);
private int tempTotal = 0;
public BufferThread(DAOThread daoThread, BlockingQueue<Request> queue) {
this.daoThread = daoThread;
this.queue = queue;
}
public void run() {
while(true) {
Request request = queue.poll(100, TimeUnit.MILLISECONDS);
if(request != null) {
tempTotal += request.value;
buffer.add(request);
}
if(buffer.size() == BUFFERSIZE || request == null) {
daoThread.queue.offer(new Request(tempTotal, returnQueue));
ReturnMessage message = returnQueue.take();
if(message.isSuccess()) {
for(Request request: buffer) {
request.queue.offer(new ReturnMessage(request.value, daoThread.resourceType, message.isSuccess));
}
} else {
// send unbuffered requests to DAOThread to see if any can be satisfied
for(Request request: buffer) {
daoThread.queue.offer(request);
}
}
buffer.clear();
tempTotal = 0;
}
}
}
}
Тружеников направлять свои запросы на BufferThreads, которые затем ждать, пока они не буферный BUFFERSIZE
запросов или ждали 100мса для запроса, чтобы прийти через буфер (Request request = queue.poll(100, TimeUnit.MILLISECONDS)
), в какой момент они пересылают буферизированные сообщение DAOThread
. У вас может быть несколько буферов на DAOThread
- вместо отправки Map<String, BlockingQueue<Request>>
Рабочим вы вместо этого отправляете Map<String, ArrayList<BlockingQueue<Request>>>
, одну очередь за BufferThread
, с Рабочем либо с помощью счетчика или генератора случайных чисел, чтобы определить, для какого BufferThread
отправить запрос. Обратите внимание, что если BUFFERSIZE
слишком велико и/или если у вас слишком много BufferThread
с, то работники будут страдать от длительных периодов паузы, пока они ждут заполнения буфера.
Просьба уточнить, какие операции вы хотите выполнить параллельно? Выбрать, рассчитать и обновить? –
Основная проблема связана с обновлением количества продуктов. Например, в настоящее время у нас есть одна строка для продукта «А» с количеством 500. Клиенты обновляют эту величину, вычитая некоторые суммы. Порядок величины этого вычитания может варьироваться от менее 10 для каждого клиента до 100 или 200; но этот случай на самом деле реже. Мы будем искать какой-то алгоритм, который не имеет строгого отношения к управлению блокировкой базы данных, а к разделению/слиянию ресурсов, чтобы уменьшить степень параллелизма в записи одного продукта. Очевидно, клиенты не должны делать количество ресурсов отрицательным – pcan
В чем проблема, которую вы пытаетесь решить с помощью «выбрать для обновления»? Какие СУБД вы используете? Если вы не используете устаревший MySQL, ваша RDBMS может дать вам незакрепленные последовательные чтения. – usr