2015-11-25 6 views
2

Я столкнулся с проблемой производительности из-за куском питона кода, который необходимо сделать следующее:Эффективного 1D сравнения массива, масштабирование и суммирование

У меня есть 2 массивов A и B с неупорядоченными значениями, и я хочу, чтобы построить новый массив C, который бы содержит для каждого индекса я следующее:

C[i]= sum(flag*B[k] for k so that flag = A[k]<=A[i]) 

Я сделал это двумя способами:

1) довольно прямой путь:

M = len(A) 
C = np.zeros(M) 
for i in xrange(M): 
    value = A[i] 
    flag = A <= value 
    C[i] = np.sum(flag * B) 

2) пытается использовать Numpy функцию сортировки:

indices_sorted = np.argsort(A) 
C_sort = np.zeros(M) 
for i in xrange(M): 
    index = np.where(indices_sorted==i) 
    for k in xrange(index[0][0]+1): 
     C_sort[i] += B[indices_sorted[k]] 

Результатом является то, что первый из них является гораздо быстрее (фактор 40-50) за 5000 элементов массивов.

Я не ожидал, что второй один быть, что плохо, и первая попытка не достаточно быстро, ни ...

Могли бы вы, ребята, дайте мне лучший способ сделать это ??

Заранее спасибо.

+0

Ваше объяснение отсутствует что-то - «так, что А [к] " какие? Не могли бы вы привести полный воспроизводимый пример, включая ожидаемый результат? –

+0

Я думаю, что вы неправильно поняли, он говорит: A [k] <= A [i] –

+0

Ну, теперь это происходит после этого редактирования: http://stackoverflow.com/revisions/33923805/2 –

ответ

3

Предполагая A и B быть 1D массивы той же формы, вы можете использовать broadcasting путем расширения A к 2D массиву, а затем выполнить сравнение, таким образом, в основном делает сравнение каждого элемента с каждым другим элементом в векторизованном образе. Затем выполните элементное умножение с B, где снова появится broadcasting. Наконец, суммируем вдоль второй оси для конечного выхода. Реализация будет выглядеть следующим образом -

C = ((A <= A[:,None])*B).sum(1) 

Вы можете имитировать же поведение elementwise multiplication and summing с matrix-multiplication использованием np.dot для гораздо более эффективного решения, как так -

C = (A <= A[:,None]).dot(B) 

Вот другой подход, основанный на индексации с np.take и подсчет с np.bincount -

row,col = np.nonzero(A <= A[:,None]) 
C = np.bincount(row,np.take(B,col)) 

Для огромных данных объем памяти с созданием маски 2D(A <= A[:,None] может компенсировать производительность. Таким образом, в качестве оптимизации существующего кода цикла вы можете ввести matrix-multiplication для замены элементарного умножения и суммирования. Таким образом, np.sum(flag * B) может быть заменен на flag.dot(B). Приведение в несколько других трюков оптимизации, вы бы модифицированную версию, как так -

M = len(A) 
C = np.empty(M) 
for i in xrange(M): 
    C[i] = (A <= A[i]).dot(B) 

Наконец! Вот победитель с np.cumsum -

idx = A.argsort() 
C = B[idx].cumsum()[idx.argsort()] 

Вот краткое объяснение о том, как и почему это работает:

Вы выполняете сравнение поэлементного, а затем суммирование элементов из B на основе результатов сравнения.Теперь, если A были отсортированным массивом, тогда выход C будет по существу cumsum версии B. Таким образом, для общего несортированного случая вам необходимо отсортировать B от argsort A, выполнить на нем cumsum и, наконец, переустановить элементы на основе оригинального несортированного ордера.


выполнения тестов

Определить подходы -

def org_app(A,B): 
    M = len(A) 
    C = np.zeros(M) 
    for i in range(M): 
     value = A[i] 
     flag = A <= value 
     C[i] = np.sum(flag * B) 
    return C 

def sum_based(A,B): 
    return ((A <= A[:,None])*B).sum(1) 

def dot_based(A,B): 
    return (A <= A[:,None]).dot(B) 

def bincount_based(A,B): 
    row,col = np.nonzero(A <= A[:,None]) 
    return np.bincount(row,np.take(B,col)) 

def org_app_modified(A,B): 
    M = len(A) 
    C = np.empty(M) 
    for i in xrange(M): 
     C[i] = (A <= A[i]).dot(B) 
    return C 

def cumsum_trick(A,B): 
    idx = A.argsort() 
    return B[idx].cumsum()[idx.argsort()] 

Настройка входов и тайминги -

In [212]: # Inputs 
    ...: N = 5000 
    ...: A = np.random.rand(N) 
    ...: B = np.random.rand(N) 
    ...: 

In [213]: %timeit org_app(A,B) 
    ...: %timeit sum_based(A,B) 
    ...: %timeit dot_based(A,B) 
    ...: %timeit bincount_based(A,B) 
    ...: %timeit org_app_modified(A,B) 
    ...: %timeit cumsum_trick(A,B) 
    ...: 
1 loops, best of 3: 266 ms per loop 
1 loops, best of 3: 411 ms per loop 
1 loops, best of 3: 322 ms per loop 
1 loops, best of 3: 1.01 s per loop 
10 loops, best of 3: 196 ms per loop 
1000 loops, best of 3: 835 µs per loop 
+0

Спасибо Divakar, Я ценю элегантный способ вашего решения, но производительность мудрым я следующее: Mine 1: 0.2s Mine 2: 8.4s Yours 1: 0.34s Yours 2: 0.23s Мой первый метод по-прежнему является самым эффективным, у вас есть идея, почему? –

+0

@MichaelB Оба 'A' ​​и' B' являются массивами 1D NumPy по 5000 элементов каждый, правильно? – Divakar

+0

Да, это 1D numpy массивы в моем случае. Я проверяю сейчас и дам вам знать ... –

Смежные вопросы