2014-11-15 2 views
1

Некоторое время назад мы внедрили приложение управления хранилищем, которое отслеживает количество каждого продукта, который у нас есть в магазине. Мы решили проблему одновременного доступа к данным с блокировками базы данных (выберите для обновления), но этот подход привел к низкой производительности, когда многие клиенты пытаются потреблять количество продуктов из одного магазина. Обратите внимание, что мы управляем только небольшим набором типов продуктов (менее 10), поэтому степень параллелизма может быть тяжелой (также мы не заботимся о повторном заполнении запасов). Мы решили разделить количество ресурсов в меньших «ведрах», но этот подход может привести к голоду для клиентов, которые пытаются потреблять количество, которое больше, чем каждый объем ковша: мы должны управлять слиянием ведер и так далее ... Мой вопрос есть: есть некоторые широко принятые решения этой проблемы? Я также искал академические статьи, но тема кажется слишком широкой.Алгоритм параллельного доступа к ресурсам (ресурсам) в базе данных

P.S. 1: наше приложение работает в кластерной среде, поэтому мы не можем полагаться на контроль параллелизма приложений. Задача состоит в том, чтобы найти алгоритм, который структурирует и управляет данными по-другому, чем одна строка, но сохраняя все преимущества, которые имеет транзакция db (с использованием блокировок или нет).

P.S. 2: для вашей информации мы управляем большим количеством аналогичных складов, пример фокусируется на одном, но мы сохраняем все данные в одном дБ (цены все одинаковы и т. Д.).

+0

Просьба уточнить, какие операции вы хотите выполнить параллельно? Выбрать, рассчитать и обновить? –

+0

Основная проблема связана с обновлением количества продуктов. Например, в настоящее время у нас есть одна строка для продукта «А» с количеством 500. Клиенты обновляют эту величину, вычитая некоторые суммы. Порядок величины этого вычитания может варьироваться от менее 10 для каждого клиента до 100 или 200; но этот случай на самом деле реже. Мы будем искать какой-то алгоритм, который не имеет строгого отношения к управлению блокировкой базы данных, а к разделению/слиянию ресурсов, чтобы уменьшить степень параллелизма в записи одного продукта. Очевидно, клиенты не должны делать количество ресурсов отрицательным – pcan

+0

В чем проблема, которую вы пытаетесь решить с помощью «выбрать для обновления»? Какие СУБД вы используете? Если вы не используете устаревший MySQL, ваша RDBMS может дать вам незакрепленные последовательные чтения. – usr

ответ

0

Редактировать: установка, приведенная ниже, по-прежнему будет работать на кластере, если вы используете программу очередей, которая может координировать работу нескольких процессов/серверов, например. RabbitMQ.

Вы также можете использовать более простой алгоритм очередности, который использует только базу данных, а недостатком является то, что он требует опроса (тогда как система, такая как RabbitMQ, позволяет потокам блокироваться до тех пор, пока сообщение не будет доступно). Создайте таблицу Requests со столбцом для уникального requestId s (например, случайным UUID), который действует как первичный ключ, столбец timestamp, столбец respourceType и целое число requestedQuantity. Вам также понадобится таблица журналов с уникальным столбцом requestId, который действует как первичный ключ, столбец timestamp, столбец resourceType, целое число requestQuantity и столбец boolean/tinyint/whatever success.

Когда клиент запрашивает количество ресурсовXX, он генерирует случайный UUID и добавляет строку в таблицу Requests, используя UUID в качестве requestId, а затем обследует таблицу журналов для requestId. Если столбец success равен true, запрос выполнится, иначе он не сработает.

Сервер с базой данных назначает один поток или процесс каждому ресурсу, например. ProcessX отвечает за ResourceX. ProcessX извлекает все строки из таблицы Requests, где resourceType = ResourceX, сортируется по метке времени, а затем удаляет их из Requests; он затем обрабатывает каждый запрос в порядке, уменьшая счетчик в памяти для каждого успешного запроса, а в конце обработки запросов он обновляет количество ресурсовX в таблице ресурсов. Затем он записывает каждый запрос и свой статус success в таблицу журналов. Затем он извлекает все запросы из запросов, где requestType = RequestX снова и т. Д.

Может быть несколько более эффективным использование целочисленного значения autoincrement в качестве первичного ключа Requests и для сортировки ProcessX по первичному ключу, а не по метке времени.


Одним из вариантов является назначение одного DAOThread каждого ресурса - это нить единственное, что доступ к таблице базы данных ресурса, так что нет блокировки на уровне базы данных. Worker (например,веб-сеансы) запрашивает количество ресурсов, используя параллельную очередь. В приведенном ниже примере используется Java BlockingQueue, но большинство языков будет иметь некоторую реализацию параллельной очереди, которую вы можете использовать.

public class Request { 
    final int value; 
    final BlockingQueue<ReturnMessage> queue; 
} 

public class ReturnMessage { 
    final int value; 
    final String resourceType; 
    final boolean isSuccess; 
} 

public class DAOThread implements Runnable { 
    private final int MAX_CHANGES = 10; 
    private String resourceType; 
    private int quantity; 
    private int changeCount = 0; 
    private DBTable table; 
    private BlockingQueue<Request> queue; 

    public DAOThread(DBTable table, BlockingQueue<Request> queue) { 
    this.table = table; 
    this.resourceType = table.select("resource_type"); 
    this.quantity = table.select("quantity"); 
    this.queue = queue; 
    } 

    public void run() { 
    while(true) { 
     Requester request = queue.take(); 
     if(request.value <= quantity) { 
     quantity -= request.value; 
     if(++changeCount > MAX_CHANGES) { 
      changeCount = 0; 
      table.update("quantity", quantity); 
     } 
     request.queue.offer(new ReturnMessage(request.value, resourceType, true)); 
     } else { 
     request.queue.offer(new ReturnMessage(request.value, resourceType, false)); 
     } 
    } 
    } 
} 

public class Worker { 
    final Map<String, BlockingQueue<Request>> dbMap; 
    final SynchronousQueue<ReturnMessage> queue = new SynchronousQueue<>(); 

    public class WorkerThread(Map<String, BlockingQueue<Request>> dbMap) { 
    this.dbMap = dbMap; 
    } 

    public boolean request(String resourceType, int value) { 
    dbMap.get(resourceType).offer(new Request(value, queue)); 
    return queue.take(); 
    } 
} 

Рабочие отправляют запросы ресурсов в соответствующую очередь DAOThread; DAOThread обрабатывает эти запросы по порядку, либо обновляя количество локального ресурса, если значение запроса не превышает количество и возвращает Success, оставив неизменным количество и возвращая отказ. База данных обновляется только после десяти обновлений, чтобы уменьшить количество ввода-вывода; чем больше MAX_CHANGES, тем сложнее будет восстановить системный сбой. Вы также можете иметь выделенный IOThread, который выполняет всю запись в базе данных - таким образом вам не нужно дублировать какие-либо протоколирование или синхронизацию (например, там должен быть таймер, который каждые несколько секунд сбрасывает текущее количество в базу данных).

Работник использует SynchronousQueue для ожидания ответа от DAOThread (SynchronousQueue - это BlockingQueue, который может содержать только один элемент); если Рабочий работает в своем потоке, вы можете заменить его стандартным многопозиционным BlockingQueue, чтобы Рабочий мог обрабатывать ReturnMessages в любом порядке.

Есть несколько баз данных, например. Riak, которые имеют встроенную поддержку счетчиков, поэтому это может улучшить ваш уровень ввода-вывода и уменьшить или устранить необходимость в MAX_CHANGES.

Вы можете дополнительно увеличить пропускную способность, введя BufferThread s для буферизации запросов к DAOThread.

public class BufferThread implements Runnable { 
    final SynchronousQueue<ReturnMessage> returnQueue = new SynchronousQueue<>(); 
    final int BUFFERSIZE = 10; 

    private DAOThread daoThread; 
    private BlockingQueue<Request> queue; 
    private ArrayList<Request> buffer = new ArrayList<>(BUFFERSIZE); 
    private int tempTotal = 0; 

    public BufferThread(DAOThread daoThread, BlockingQueue<Request> queue) { 
    this.daoThread = daoThread; 
    this.queue = queue; 
    } 

    public void run() { 
    while(true) { 
     Request request = queue.poll(100, TimeUnit.MILLISECONDS); 
     if(request != null) { 
     tempTotal += request.value; 
     buffer.add(request); 
     } 
     if(buffer.size() == BUFFERSIZE || request == null) { 
     daoThread.queue.offer(new Request(tempTotal, returnQueue)); 
     ReturnMessage message = returnQueue.take(); 
     if(message.isSuccess()) { 
      for(Request request: buffer) { 
      request.queue.offer(new ReturnMessage(request.value, daoThread.resourceType, message.isSuccess)); 
      } 
     } else { 
      // send unbuffered requests to DAOThread to see if any can be satisfied 
      for(Request request: buffer) { 
      daoThread.queue.offer(request); 
      } 
     } 
     buffer.clear(); 
     tempTotal = 0; 
     } 
    } 
    } 
} 

Тружеников направлять свои запросы на BufferThreads, которые затем ждать, пока они не буферный BUFFERSIZE запросов или ждали 100мса для запроса, чтобы прийти через буфер (Request request = queue.poll(100, TimeUnit.MILLISECONDS)), в какой момент они пересылают буферизированные сообщение DAOThread. У вас может быть несколько буферов на DAOThread - вместо отправки Map<String, BlockingQueue<Request>> Рабочим вы вместо этого отправляете Map<String, ArrayList<BlockingQueue<Request>>>, одну очередь за BufferThread, с Рабочем либо с помощью счетчика или генератора случайных чисел, чтобы определить, для какого BufferThread отправить запрос. Обратите внимание, что если BUFFERSIZE слишком велико и/или если у вас слишком много BufferThread с, то работники будут страдать от длительных периодов паузы, пока они ждут заполнения буфера.

+0

Благодарим вас за ответ. К сожалению, приложение работает в кластерной среде, поэтому подход нити не может быть принят (мы думали, что это механизм распределенного блокирования, но это должен быть последний вариант для связанных с ним проблем). Вместо этого мы предпочли бы стратегию управления структурой данных на самой БД, не исключая вообще блокировок. Совет о Риак кажется интересным. Тем не менее, я собираюсь улучшить вопрос с этими деталями. Еще раз спасибо! – pcan

+0

@ piero86 Я отредактировал верхушку своего ответа на адрес кластеров –

Смежные вопросы