2013-10-02 3 views
7

В моем текущем сценарии скорость важна. У меня есть карта, которая читается только несколькими потоками, и это прекрасно работает. Теперь возникло требование, которое может потребовать записи на статическую карту раз в то время, пока карты читаются другими потоками. Я считаю, что это смена игры, так как мне нужно будет заблокировать свои карты для обеспечения безопасности потоков. Это создает проблему, так как у меня есть несколько потоков 10-12 потоков, которые будут читать карту. Если одна карта берет замок на карте (с момента его чтения), я считаю, что блокировка была бы необходима, поскольку на карте можно было бы написать что-то. В любом случае, как я уже говорил ранее, если одна карта читает, то другие карты не будут иметь доступ к параллельному доступу к карте, как это было раньше. Есть ли способ, которым я могу обойти эту проблему?Создание стандартной карты, которая является потокобезопасной

+2

Вы должны найти несколько читателей одного сочинителя замков. –

+0

Любые предложения? Есть ли у них стимул? – MistyD

+0

Вы можете найти вопросы на этом сайте, где люди используют Boost для этого. –

ответ

12

Вы можете использовать shared_mutex у вашей карты, чтобы приобрести совместно или уникальный доступ. Как правило, для операции записи потребуется уникальный доступ, а для операций чтения требуется общий доступ.

Любое количество потоков может получить общий доступ, если нитки не имеют уникальный доступ. Если поток пытается получить уникальный доступ, он ждет, пока не будет освобожден весь общий доступ.

Стандартная библиотека и Boost предоставляют shared_lock<T> и unique_lock<T> для получения ограниченной площади shared_mutex.

Опасайтесь, что некоторые люди требуют shared_mutex выполняет плохо, хотя я не видел никаких доказательств или сильного анализа для поддержки этих претензий. Возможно, стоит посмотреть, если это важно для вас.

+0

Спасибо, это похоже на то, что мне нужно – MistyD

+0

Отмечено как ответ после таймера. Спасибо – MistyD

+1

Следите за итераторами ... поток мог бы захватить незащищенный итератор этой карты, указывающий на ячейки памяти, которые потенциально могут быть изменены (в результате добавления или удаления) из кажущихся безопасных областей, защищенных мьютексом, но в целом это не быть потокобезопасным. Вот почему в Java этот прецедент покрывается страшным «ConcurrentModificationException», который вызывается при переходе по Карте, которая изменяется другим потоком. –

3

Одним из решений может быть сохранение указателя на эту карту, а когда вам нужно его изменить - сделайте копию, измените эту копию и затем с помощью атомарной замены указатель на новый экземпляр. Это решение будет потреблять больше памяти, но может быть более эффективным, если у вас много потоков чтения, так как этот метод блокируется.

В приведенном ниже примере только одна тема может изменить карту. Это не означает, что один поток за раз, это означает один и тот же поток для жизни структуры данных. В противном случае модификация должна быть выполнена при сохранении мьютекса, который защищает весь код в updateMap. Нити считывателя могут получить доступ к theData, как обычно, без блокировки.

typedef std::map<...> Data; 

std::atomic<Data *> theData; 

void updateMap(...) 
{ 
    Data *newData = new Data(*theData); 
    // modify newData here 
    Data *old = theData.exchange(newData); 
    delete old; 
} 
+0

Тогда мне нужно будет управлять свопингами и когда это должно произойти. Для этого потребуется много управления и тестирования – MistyD

+0

@MistyD нет, если вы используете std :: atomic или что-то подобное, что было бы довольно просто. Приведите пример к моему ответу. – Slava

+0

Это двойное чтение времени поиска, хотя с 1 разыменования указателя на 2. – OllieB

5

только для C++ удовольствия, читать эту книгу, вы найдете намного больше, стоит, чем потраченных денег, ваш параллелизм мир получит открывают широкая
C++-Concurrency in Action Practical Multithreading
книги дело со всем родом вопросы и практические решения между обменом данными потока, как будить темы, создание пулов потоков и многое другое ... подробнее ... и еще

здесь пример обмена данными между потоками без использования атомной или shared_locks

template<class T> 
class TaskQueue 
{ 
public: 
    TaskQueue(){} 
    TaskQueue& operator = (TaskQueue&) = delete; 

    void Push(T value){ 
     std::lock_guard<std::mutex> lk(mut); 
     data.push(value); 
     condition.notify_one(); //if you have many threads trying to access the data at same time, this will wake one thread only 
    } 

    void Get(T& value){ 
     std::unique_lock<std::mutex> lk(mut); 
     condition.wait(lk, [this]{ return !data.empty(); }); // in this case it waits if queue is empty, if not needed you can remove this line 
     value = data.front(); 
     data.pop(); 
     lk.unlock(); 
    } 

private: 
    std::mutex mut; 
    std::queue<T> data; //in your case change this to a std::map 
    std::condition_variable condition; 
}; 
+0

Я определенно смотрю на это – MistyD

0

Что вам нужно, это эквивалент ConcurrentHashMap в Java, который позволяет одновременно читать и записывать в базовую хеш-таблицу. Этот класс является частью пакета java.util.concurrent и обеспечивает одновременное чтение и запись (до уровня параллелизма, по умолчанию - 16).

Дополнительную информацию можно найти на странице javadoc.Я цитирую javadoc здесь:

Хэш-таблица, поддерживающая полный параллелизм поиска и регулируемый ожидаемый параллелизм для обновлений. Этот класс подчиняется той же функциональной спецификации, что и Hashtable, и включает версии методов, соответствующих каждому методу Hashtable. Однако, несмотря на то, что все операции являются потокобезопасными, операции поиска не влекут за собой блокировку, и нет никакой поддержки для блокировки всей таблицы таким образом, чтобы предотвратить весь доступ. Этот класс полностью совместим с Hashtable в программах, которые полагаются на безопасность потока, но не на детали синхронизации.

0

Два других ответов довольно хорошо, но я думал, что я должен добавить немного цвета:

Клифф Нажмите написал lock-free concurrent hash map in Java. Было бы нетривиальным адаптировать его к C++ (не GC, отличается памяти модель и т. д.), но это лучшая реализация незащищенной структуры данных, которую я когда-либо видел. Если вы можете использовать JAva вместо C++, это может быть способ пойти.

Я не знаю никаких блокированных сбалансированных двоичных древовидных структур. Это не значит, что они не существуют.

Возможно, проще всего пойти с одним из двух других ответов (объемная копия/атомная своп/что-то вроде shared_ptr или блокировка считывающего устройства) для управления доступом к map. Один из двух будет быстрее в зависимости от относительных величин чтения и записи и размера map; вы должны проверить, какой из них вы должны использовать.

+1

Эта реализация на самом деле легко перевести на C++, но в этом нет необходимости. Intel [TBB] (https://www.threadingbuildingblocks.org) отлично работает и предоставляет еще пару незакрепленных структур данных. Однако я не проверял, реализует ли реализация Intel совместное изменение размера, например, реализация Cliff Click. Теперь, если только у кого-то была куча min-max ... –

4

Вот моя реализация THREADSAFE родового перезначительного HashMap без использования СТЛИ контейнеры:

#pragma once 

#include <iomanip> 
#include <exception> 
#include <mutex> 
#include <condition_variable> 

/* 
* wrapper for items stored in the map 
*/ 
template<typename K, typename V> 
class HashItem { 
public: 
    HashItem(K key, V value) { 
     this->key = key; 
     this->value = value; 
     this->nextItem = nullptr; 
    } 

    /* 
    * copy constructor 
    */ 
    HashItem(const HashItem & item) { 
     this->key = item.getKey(); 
     this->value = item.getValue(); 
     this->nextItem = nullptr; 
    } 

    void setNext(HashItem<K, V> * item) { 
     this->nextItem = item; 
    } 

    HashItem * getNext() { 
     return nextItem; 
    } 

    K getKey() { 
     return key; 
    } 

    V getValue() { 
     return value; 
    } 

    void setValue(V value) { 
     this->value = value; 
    } 

private: 
    K key; 
    V value; 
    HashItem * nextItem; 

}; 

/* 
* template HashMap for storing items 
* default hash function HF = std::hash<K> 
*/ 
template <typename K, typename V, typename HF = std::hash<K>> 
class HashMap { 
public: 
    /* 
    * constructor 
    * @mSize specifies the bucket size og the map 
    */ 
    HashMap(std::size_t mSize) { 
     // lock initialization for single thread 
     std::lock_guard<std::mutex>lock(mtx); 
     if (mSize < 1) 
      throw std::exception("Number of buckets ust be greater than zero."); 

     mapSize = mSize; 
     numOfItems = 0; 
     // initialize 
     hMap = new HashItem<K, V> *[mapSize](); 
    } 

    /* 
    * for simplicity no copy constructor 
    * anyway we want test how different threads 
    * use same instance of the map 
    */ 
    HashMap(const HashMap & hmap) = delete; 

    /* 
    * inserts item 
    * replaces old value with the new one when item already exists 
    * @key key of the item 
    * @value value of the item 
    */ 
    void insert(const K & key, const V & value) { 
     std::lock_guard<std::mutex>lock(mtx); 
     insertHelper(this->hMap, this->mapSize, numOfItems, key, value); 
     condVar.notify_all(); 
    } 

    /* 
    * erases item with key when siúch item exists 
    * @key of item to erase 
    */ 
    void erase(const K & key) { 
     std::lock_guard<std::mutex>lock(mtx); 
     // calculate the bucket where item must be inserted 
     std::size_t hVal = hashFunc(key) % mapSize; 
     HashItem<K, V> * prev = nullptr; 
     HashItem<K, V> * item = hMap[hVal]; 

     while ((item != nullptr) && (item->getKey() != key)) { 
      prev = item; 
      item = item->getNext(); 
     } 
     // no item found with the given key 
     if (item == nullptr) { 
      return; 
     } 
     else { 
      if (prev == nullptr) { 
       // item found is the first item in the bucket 
       hMap[hVal] = item->getNext(); 
      } 
      else { 
       // item found in one of the entries in the bucket 
       prev->setNext(item->getNext()); 
      } 
      delete item; 
      numOfItems--; 
     } 
     condVar.notify_all(); 
    } 

    /* 
    * get element with the given key by reference 
    * @key is the key of item that has to be found 
    * @value is the holder where the value of item with key will be copied 
    */ 
    bool getItem(const K & key, V & value) const { 
     std::lock_guard<std::mutex>lock(mtx); 
     // calculate the bucket where item must be inserted 
     std::size_t hVal = hashFunc(key) % mapSize; 
     HashItem<K, V> * item = hMap[hVal]; 

     while ((item != nullptr) && (item->getKey() != key)) 
      item = item->getNext(); 
     // item not found 
     if (item == nullptr) { 
      return false; 
     } 

     value = item->getValue(); 
     return true; 
    } 


    /* 
    * get element with the given key by reference 
    * @key is the key of item that has to be found 
    * shows an example of thread waitung for some condition 
    * @value is the holder where the value of item with key will be copied 
    */ 
    bool getWithWait(const K & key, V & value) { 
     std::unique_lock<std::mutex>ulock(mtxForWait); 
     condVar.wait(ulock, [this] {return !this->empty(); }); 
     // calculate the bucket where item must be inserted 
     std::size_t hVal = hashFunc(key) % mapSize; 
     HashItem<K, V> * item = hMap[hVal]; 

     while ((item != nullptr) && (item->getKey() != key)) 
      item = item->getNext(); 
     // item not found 
     if (item == nullptr) { 
      return false; 
     } 

     value = item->getValue(); 
     return true; 
    } 


    /* 
    * resizes the map 
    * creates new map on heap 
    * copies the elements into new map 
    * @newSize specifies new bucket size 
    */ 
    void resize(std::size_t newSize) { 
     std::lock_guard<std::mutex>lock(mtx); 
     if (newSize < 1) 
      throw std::exception("Number of buckets must be greater than zero."); 

     resizeHelper(newSize); 
     condVar.notify_all(); 
    } 

    /* 
    * outputs all items of the map 
    */ 
    void outputMap() const { 
     std::lock_guard<std::mutex>lock(mtx); 
     if (numOfItems == 0) { 
      std::cout << "Map is empty." << std::endl << std::endl; 
      return; 
     } 
     std::cout << "Map contains " << numOfItems << " items." << std::endl; 
     for (std::size_t i = 0; i < mapSize; i++) { 
      HashItem<K, V> * item = hMap[i]; 
      while (item != nullptr) { 
       std::cout << "Bucket: " << std::setw(3) << i << ", key: " << std::setw(3) << item->getKey() << ", value:" << std::setw(3) << item->getValue() << std::endl; 
       item = item->getNext(); 
      } 
     } 
     std::cout << std::endl; 
    } 

    /* 
    * returns true when map has no items 
    */ 
    bool empty() const { 
     std::lock_guard<std::mutex>lock(mtx); 
     return numOfItems == 0; 
    } 

    void clear() { 
     std::lock_guard<std::mutex>lock(mtx); 
     deleteMap(hMap, mapSize); 
     numOfItems = 0; 
     hMap = new HashItem<K, V> *[mapSize](); 
    } 

    /* 
    * returns number of items stored in the map 
    */ 
    std::size_t size() const { 
     std::lock_guard<std::mutex>lock(mtx); 
     return numOfItems; 
    } 

    /* 
    * returns number of buckets 
    */ 
    std::size_t bucket_count() const { 
     std::lock_guard<std::mutex>lock(mtx); 
     return mapSize; 
    } 

    /* 
    * desctructor 
    */ 
    ~HashMap() { 
     std::lock_guard<std::mutex>lock(mtx); 
     deleteMap(hMap, mapSize); 
    } 

private: 
    std::size_t mapSize; 
    std::size_t numOfItems; 
    HF hashFunc; 
    HashItem<K, V> ** hMap; 
    mutable std::mutex mtx; 
    mutable std::mutex mtxForWait; 
    std::condition_variable condVar; 

    /* 
    * help method for inserting key, value item into the map hm 
    * mapSize specifies the size of the map, items - the number 
    * of stored items, will be incremented when insertion is completed 
    * @hm HashMap 
    * @mSize specifies number of buckets 
    * @items holds the number of items in hm, will be incremented when insertion successful 
    * @key - key of item to insert 
    * @value - value of item to insert 
    */ 
    void insertHelper(HashItem<K, V> ** hm, const std::size_t & mSize, std::size_t & items, const K & key, const V & value) { 
     std::size_t hVal = hashFunc(key) % mSize; 
     HashItem<K, V> * prev = nullptr; 
     HashItem<K, V> * item = hm[hVal]; 

     while ((item != nullptr) && (item->getKey() != key)) { 
      prev = item; 
      item = item->getNext(); 
     } 

     // inserting new item 
     if (item == nullptr) { 
      item = new HashItem<K, V>(key, value); 
      items++; 
      if (prev == nullptr) { 
       // insert new value as first item in the bucket 
       hm[hVal] = item; 
      } 
      else { 
       // append new item on previous in the same bucket 
       prev->setNext(item); 
      } 
     } 
     else { 
      // replace existing value 
      item->setValue(value); 
     } 
    } 

    /* 
    * help method to resize the map 
    * @newSize specifies new number of buckets 
    */ 
    void resizeHelper(std::size_t newSize) { 
     HashItem<K, V> ** newMap = new HashItem<K, V> *[newSize](); 
     std::size_t items = 0; 
     for (std::size_t i = 0; i < mapSize; i++) { 
      HashItem<K, V> * item = hMap[i]; 
      while (item != nullptr) { 
       insertHelper(newMap, newSize, items, item->getKey(), item->getValue()); 
       item = item->getNext(); 
      } 
     } 

     deleteMap(hMap, mapSize); 
     hMap = newMap; 
     mapSize = newSize; 
     numOfItems = items; 
     newMap = nullptr; 

    } 

    /* 
    * help function for deleting the map hm 
    * @hm HashMap 
    * @mSize number of buckets in hm 
    */ 
    void deleteMap(HashItem<K, V> ** hm, std::size_t mSize) { 
     // delete all nodes 
     for (std::size_t i = 0; i < mSize; ++i) { 
      HashItem<K, V> * item = hm[i]; 
      while (item != nullptr) { 
       HashItem<K, V> * prev = item; 
       item = item->getNext(); 
       delete prev; 
      } 
      hm[i] = nullptr; 
     } 
     // delete the map 
     delete[] hm; 
    } 
}; 
+0

Простите за возрождение довольно старого потока ... но вы уверены, что 'getWithWait' будет работать нормально? Представьте, что один поток вызывает это, другие вызовы, скажем, 'erase'. Тогда разные мьютексы заблокированы, верно? Когда вызывается с тем же аргументом 'key', возникает условие гонки, не так ли? – alagner

Смежные вопросы