2008-09-18 4 views
3

Мне нужен очень быстрый алгоритм для следующей задачи. Я уже реализовал несколько алгоритмов, которые завершают его, но они слишком медленны для производительности, в которой я нуждаюсь. Он должен быть достаточно быстрым, чтобы алгоритм мог работать не менее 100 000 раз в секунду на современном процессоре. Он будет реализован на C++.Помощь с алгоритмом слияния векторов

Я работаю с диапазонами/диапазонами, структурой, которая имеет начало и конечную координату на линии.

У меня есть два вектора (динамические массивы) пролетов, и мне нужно их объединить. Один вектор - src, а другой dst. Векторы сортируются по начальным координатам диапазона, а пролеты не перекрываются внутри одного вектора.

Пространства в векторе src должны быть объединены с пролетами вектора dst, так что результирующий вектор по-прежнему сортируется и не имеет перекрытий. То есть. если во время слияния обнаруживаются перекрытия, два пролета объединяются в один. (Слияние двух пролетов - это просто вопрос изменения координат в структуре.)

Теперь есть еще один улов, пролеты в векторе src должны быть «расширены» во время слияния. Это означает, что константа будет добавлена ​​в начало и другая (большая) константа в конечную координату каждого интервала в src. Это означает, что после расширения пространств src они могут перекрываться.


То, что я прибыл в до сих пор является то, что она не может быть сделано полностью в месте, своего рода временного хранения не требуется. Я думаю, что это должно выполняться в линейном времени по количеству элементов src и dst, суммированных.

Любое временное хранилище, вероятно, может быть разделено между несколькими прогонами алгоритма.

Два основных подхода я пытался, которые слишком медленно, являются:

  1. Append все элементы ЦСИ к целевой_адрес, расширяя каждый элемент перед добавлением его. Затем запустите сортировку на месте. Наконец, итерации по результирующему вектору, используя указатель «читать» и «писать», с указателем чтения, работающим впереди указателя записи, слияния с интервалами по мере их поступления. Когда все элементы объединены (указатель чтения достигает конца) dst усекается.

  2. Создайте временный рабочий вектор. Сделайте наивное слияние, как описано выше, путем многократного выбора следующего элемента из src или dst и слияния в рабочий вектор. Когда закончите, скопируйте рабочий вектор в dst, заменив его.

Первый способ имеет проблему, что сортировка является O ((т + п) * лог (т + п)) вместо O (M + N) и имеет несколько накладных расходов. Это также означает, что вектор dst должен расти намного больше, чем ему действительно нужно.

Вторая проблема связана с большим количеством копирования и повторного выделения/освобождения памяти.

Структуры данных, используемые для хранения/управления пролетами/векторами, могут быть изменены, если вы считаете, что это необходимо.

Обновление: Забыл сказать, насколько велики наборы данных. Наиболее распространенными случаями являются от 4 до 30 элементов в любом векторе, и либо dst пуст, либо имеется большое количество перекрытий между пролетами в src и dst.

ответ

0

Как насчет второго метода без повторного выделения - другими словами, выделите свой временный вектор один раз и никогда не выделяйте его снова? Или, если входные векторы достаточно малы (но не постоянный размер), просто используйте alloca вместо malloc.

Кроме того, с точки зрения скорости, вы можете убедиться, что ваш код использует CMOV для сортировки, так как если код фактически ветвления для каждой итерации сортировки слиянием:

if(src1[x] < src2[x]) 
    dst[x] = src1[x]; 
else 
    dst[x] = src2[x]; 

предсказание ветви будет терпеть неудачу в 50% случаев, что будет иметь огромное влияние на производительность. Условный ход, вероятно, будет намного лучше, поэтому убедитесь, что компилятор делает это, а если нет, попробуйте уговорить его сделать это.

0

Сортировка, указанная в подходе 1, может быть сведена к линейному времени (из лог-линейного, как вы ее описываете), потому что два списка ввода уже отсортированы. Просто выполните этап слияния слияния-сортировки. При соответствующем представлении для векторов входного диапазона (например, односвязных списков) это можно сделать на месте.

http://en.wikipedia.org/wiki/Merge_sort

0

я не думаю, что строго линейное решение возможно, из-за расширения СРК вектор пролеты может в худшем случае дело все они пересекаются (в зависимости от величины константы, которую вы добавление)

проблема может быть в реализации, а не в алгоритме; я хотел бы предложить профилирование код для Вашего предварительного решения, чтобы увидеть, где провел время

рассуждения:

для истинно «современного» CPU как на Intel Core 2 Extreme QX9770 работает на частоте 3,2 ГГц, можно ожидать, что о 59 455 MIPS

для 100 000 векторов, вам придется обрабатывать каждый вектор в 594 550 экземплярах. Это много инструкций.

иое: wikipedia MIPS

Кроме того, обратите внимание, что добавление константы к СРК вектора пролетам не ослабляющим-сортировать их, так что вы можете нормализовать вектор ЦСИ охватывает независимо друг от друга, а затем объединить их вектор ДСТА охватывает; это должно уменьшить рабочую нагрузку вашего исходного алгоритма.

+0

100k действительно может быть недосказанным, я действительно не подсчитал число. Также, когда я сказал «современный» процессор, я на самом деле думал «что-то до 5 лет», Athlon XP 3000+ не является нереалистичной целью. – jfs 2008-09-18 02:38:24

8

Мы знаем, что абсолютное время выполнения наилучшего случая - это O (m + n), это связано с тем, что вам, по крайней мере, нужно сканировать все данные, чтобы быть способный объединить списки. Учитывая это, ваш второй метод должен дать вам такой тип поведения.

Профилировали ли вы свой второй метод, чтобы узнать, что такое узкие места? Вполне возможно, что в зависимости от объема данных, о которых вы говорите, на самом деле невозможно сделать то, что вы хотите за указанный промежуток времени. Один из способов проверить это - сделать что-то простое, например суммировать все начальные и конечные значения пролетов в каждом векторе в цикле и время. В основном здесь вы выполняете минимальный объем работы для каждого элемента в векторах. Это даст вам базовый уровень для максимальной производительности, которую вы можете ожидать.

Помимо этого вы можете избежать копирования элемента вектора по элементу с помощью метода stl swap, и вы можете предварительно распределить вектор temp на определенный размер, чтобы избежать запуска расширения массива при слиянии элементов.

Возможно, вы захотите использовать 2 вектора в своей системе и всякий раз, когда вам нужно выполнить слияние, вы слились с неиспользуемым вектором, а затем замените (это похоже на двойную буферизацию, используемую в графике). Таким образом, вам не нужно перераспределять векторы каждый раз, когда вы выполняете слияние.

Тем не менее, вам лучше всего профилировать сначала и выяснить, каково ваше узкое место. Если распределения минимальны по сравнению с фактическим процессом слияния, вам нужно выяснить, как сделать это быстрее.

Некоторые дополнительные дополнительные ускорения могут возникать из-за прямого доступа к исходным данным векторов, что позволяет избежать проверки границ каждого доступа к данным.

+0

Спасибо, что напомнили мне о std :: swap, это может быть на самом деле сделка. Я вернусь после проверки этого;) – jfs 2008-09-18 02:32:47

0

1 прямо - полный сорт происходит медленнее, чем слияние двух отсортированных списков.

Итак, вы смотрите на настройку 2 (или что-то совершенно новое).

Если вы измените структуры данных на дважды связанные списки, вы можете объединить их в постоянное рабочее пространство.

Используйте распределитель кучи фиксированного размера для узлов списка, как для уменьшения использования памяти для каждого узла, так и для повышения вероятности того, что узлы находятся близко друг к другу в памяти, что уменьшает пропуски страниц.

Возможно, вам удастся найти код онлайн или в вашей любимой книге алгоритмов, чтобы оптимизировать слияние связанных списков. Вы хотите настроить это, чтобы объединить объединительные соединения одновременно с объединением списков.

Чтобы оптимизировать слияние, сначала обратите внимание, что для каждого запуска значений, выходящих с одной стороны, без одной с другой стороны, вы можете вставить весь прогон в список dst за один раз, вместо того, чтобы вставлять каждый узел в очередь. И вы можете сохранить одну запись для каждой вставки над обычной операцией списка, оставив конец «болтаться», зная, что позже вы ее исправите. И при условии, что вы не делаете удаления нигде в своем приложении, список может быть односвязным, что означает одну запись на узел.

В течение 10 микросекунд выполнения - вид зависит от п и м ...

0

Если последняя реализация еще не достаточно быстро, то в конечном итоге приходится искать альтернативные подходы.

Для чего вы используете выходы этой функции?

+0

Ну, вам не хватает одной вещи, есть не одна «дельта», ее два. Левая и правая части диапазона имеют разные значения, в частности справа имеет большее добавленное значение, чем левое. – jfs 2008-09-18 03:06:30

0

Я написал новый класс контейнера только для этого алгоритма, с учетом потребностей. Это также дало мне возможность настроить другой код вокруг моей программы, который получил небольшое ускорение скорости в одно и то же время.

Это значительно быстрее, чем старая реализация с использованием векторов STL, но в остальном это в основном то же самое. Но пока это быстрее, все равно не очень быстро ... к сожалению.

Профилирование не раскрывает, что является настоящим узким местом. Профилировщик MSVC, по-видимому, иногда ставит «вину» за неправильные вызовы (предположительно идентичные прогоны назначают разные времена работы), и большинство звонков объединяются в одну большую щель.

Глядя на разборку сгенерированного кода, видно, что в сгенерированном коде очень много прыжков, я думаю, что это может быть основной причиной отсталости сейчас.

class SpanBuffer { 
private: 
    int *data; 
    size_t allocated_size; 
    size_t count; 

    inline void EnsureSpace() 
    { 
     if (count == allocated_size) 
      Reserve(count*2); 
    } 

public: 
    struct Span { 
     int start, end; 
    }; 

public: 
    SpanBuffer() 
     : data(0) 
     , allocated_size(24) 
     , count(0) 
    { 
     data = new int[allocated_size]; 
    } 

    SpanBuffer(const SpanBuffer &src) 
     : data(0) 
     , allocated_size(src.allocated_size) 
     , count(src.count) 
    { 
     data = new int[allocated_size]; 
     memcpy(data, src.data, sizeof(int)*count); 
    } 

    ~SpanBuffer() 
    { 
     delete [] data; 
    } 

    inline void AddIntersection(int x) 
    { 
     EnsureSpace(); 
     data[count++] = x; 
    } 

    inline void AddSpan(int s, int e) 
    { 
     assert((count & 1) == 0); 
     assert(s >= 0); 
     assert(e >= 0); 
     EnsureSpace(); 
     data[count] = s; 
     data[count+1] = e; 
     count += 2; 
    } 

    inline void Clear() 
    { 
     count = 0; 
    } 

    inline size_t GetCount() const 
    { 
     return count; 
    } 

    inline int GetIntersection(size_t i) const 
    { 
     return data[i]; 
    } 

    inline const Span * GetSpanIteratorBegin() const 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<const Span *>(data); 
    } 

    inline Span * GetSpanIteratorBegin() 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<Span *>(data); 
    } 

    inline const Span * GetSpanIteratorEnd() const 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<const Span *>(data+count); 
    } 

    inline Span * GetSpanIteratorEnd() 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<Span *>(data+count); 
    } 

    inline void MergeOrAddSpan(int s, int e) 
    { 
     assert((count & 1) == 0); 
     assert(s >= 0); 
     assert(e >= 0); 

     if (count == 0) 
     { 
      AddSpan(s, e); 
      return; 
     } 

     int *lastspan = data + count-2; 

     if (s > lastspan[1]) 
     { 
      AddSpan(s, e); 
     } 
     else 
     { 
      if (s < lastspan[0]) 
       lastspan[0] = s; 
      if (e > lastspan[1]) 
       lastspan[1] = e; 
     } 
    } 

    inline void Reserve(size_t minsize) 
    { 
     if (minsize <= allocated_size) 
      return; 

     int *newdata = new int[minsize]; 

     memcpy(newdata, data, sizeof(int)*count); 

     delete [] data; 
     data = newdata; 

     allocated_size = minsize; 
    } 

    inline void SortIntersections() 
    { 
     assert((count & 1) == 0); 
     std::sort(data, data+count, std::less<int>()); 
     assert((count & 1) == 0); 
    } 

    inline void Swap(SpanBuffer &other) 
    { 
     std::swap(data, other.data); 
     std::swap(allocated_size, other.allocated_size); 
     std::swap(count, other.count); 
    } 
}; 


struct ShapeWidener { 
    // How much to widen in the X direction 
    int widen_by; 
    // Half of width difference of src and dst (width of the border being produced) 
    int xofs; 

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call 
    SpanBuffer buffer; 

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst); 

    ShapeWidener(int _xofs) : xofs(_xofs) { } 
}; 


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst) 
{ 
    if (src.GetCount() == 0) return; 
    if (src.GetCount() + dst.GetCount() == 0) return; 

    assert((src.GetCount() & 1) == 0); 
    assert((dst.GetCount() & 1) == 0); 

    assert(buffer.GetCount() == 0); 

    dst.Swap(buffer); 

    const int widen_s = xofs - widen_by; 
    const int widen_e = xofs + widen_by; 

    size_t resta = src.GetCount()/2; 
    size_t restb = buffer.GetCount()/2; 
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin(); 
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin(); 

    while (resta > 0 || restb > 0) 
    { 
     if (restb == 0) 
     { 
      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); 
      --resta, ++spa; 
     } 
     else if (resta == 0) 
     { 
      dst.MergeOrAddSpan(spb->start, spb->end); 
      --restb, ++spb; 
     } 
     else if (spa->start < spb->start) 
     { 
      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); 
      --resta, ++spa; 
     } 
     else 
     { 
      dst.MergeOrAddSpan(spb->start, spb->end); 
      --restb, ++spb; 
     } 
    } 

    buffer.Clear(); 
} 
+0

Попробуйте использовать deque вместо вектора. Deque уменьшает количество распределений памяти за счет отсутствия непрерывной памяти. – moswald 2008-09-18 06:28:22

0

Я бы всегда сортировал свой вектор пролетов. Это делает алгоритмы реализации LOT более легкими - и их можно делать в линейном времени.

ОК, так что я бы сортировать пролеты на основе: поверочного минимума

  • в порядке возрастания
  • затем охватывают максимум в порядке убывания

Вам нужно создать функцию, чтобы сделать что.

Тогда я бы использовал std :: set_union для слияния векторов (вы можете объединить несколько, прежде чем продолжить).

Затем для каждого последовательного набора пролетов с одинаковыми минимумами вы сохраняете первый и удаляете остальные (они являются суб-пролетами первого пролета).

Затем вам необходимо объединить свои пролеты. Это должно быть довольно выполнимо сейчас и возможно в линейном времени.

ОК, вот трюк. Не пытайтесь делать это на месте. Используйте один или несколько временных векторов (и зарезервируйте достаточно места раньше времени). Затем в конце вызовите std :: vector :: swap, чтобы поместить результаты во входной вектор по вашему выбору.

Надеюсь, этого достаточно, чтобы вы начали.

0

Какова ваша целевая система? Является ли он многоядерным? Если это так, вы можете рассмотреть многопоточность этого алгоритма.

+0

Моя целевая система - это настольная система за последние 5 лет или около того, я не могу ничего принимать о поддержке SMP или наборах инструкций SIMD. (Ну, я могу предположить MMX на x86, но это все.) – jfs 2008-09-18 20:58:48

Смежные вопросы