2013-08-16 2 views
7

Предположим, что я представляю вектор-функцию с использованием словаря (почему? Потому что я знаю, что функции редки, но, более подробно об этом позже).Как эффективно вычислить скалярное произведение двух словарей

Как я должен реализовать скалярное произведение двух таких словарей (обозначается, A, B)

Я попробовал наивный подход:

for k in A: 
    if k in B: 
    sum += A[k] * B[k] 

но оказывается медленным.

Некоторые подробности:

  • Я использую словарь для представления функций, потому что

    1. Клавиши особенность являются строками
    2. Есть ~ 20К возможных ключей
    3. Каждый вектор (например, около 1000 ненулевых элементов).
  • Мне действительно интересно вычислить попарно скалярное произведение N = 2000 различных словарей (то есть их линейное ядро).

+0

Вы можете попробовать 'для K, V в A.iteritems(): сумма + = v * B.get (к, 0)' – val

+0

Это может быть очевидным, но я что я добавил бы это в любом случае: вы уверены, что «A» имеет самый маленький 'len()' из двух словарей? В зависимости от обстоятельств это может иметь огромное влияние на скорость. (И поскольку операция коммутативна, это не повлияет на ответ.) – Noah

ответ

1

Вот мой ответ (Вслед за предложением от @ Valentin-Климента):

Сначала я обернуть scipy.sparse dok_matrix. Идея состоит в том, чтобы присвоить каждой из возможных функций индекс.

import scipy.sparse as sps 
import numpy as np 

class MSK: 
    # DD is a dict of dict, whose values are of type float. 
    # features - the set of possible features keys 
    def __init__(self, DD, features): 
     self.features = {k: j for (j, k) in enumerate(features)} 
     self.strings = DD.keys() 
     n = len(self.strings) 
     d = len(self.features) 
     self.M = sps.dok_matrix((n, d), dtype=np.float64) 
     for (i, s) in enumerate(self.strings): 
      v = DD[s] 
      for k in v: 
       j = self.features[k] 
       self.M[i, j] = v[k] 

И мы тестируем, используя следующий код, где число элементов 800, размерность также 800, но разреженности 200 (ровно 200 элементов отличны от нуля).

np.random.seed(1) 
N = 800 
DD = dict() 
R = range(N) 
for i in xrange(N): 
    DD[i] = dict() 
    S = np.random.permutation(R) 
    S = S[:N/4] 
    for j in S: 
     DD[i][j] = np.random.randn(1)[0] 

K = MSK(DD, R) 
import cProfile 
cProfile.runctx("A = K.M * K.M.T", globals(), locals()) 
print A.todense() 

Выход:

2080520 function calls (2080519 primitive calls) in 2.884 seconds 

Допустим, 3 секунды. Моя наивная реализация (которая использует if-statement @ Joowani) занимает около 19 секунд.

(MSK означает MatrixSparseKeys)

+0

FYI, заменив dok_matrix() на lil_matrix(), мы можем получить еще лучшую производительность. –

6

Не уверен, что быстрее, но вот другой подход:

keys = A.viewkeys() & B.viewkeys() 
the_sum = sum(a[k] * b[k] for k in keys) 
+2

Вы можете использовать 'A.keys() и B.keys()' на Python 3 и 'A.viewkeys() & B. viewkeys() 'на Python 2.7. –

+0

На самом деле 'set (A) .intersection (B)' появляется быстрее в некоторых тестах, которые я только что сделал. –

1

Вы должны попытаться использовать namedtuples вместо Dict.

from collections import namedtuple 
A = dict 
B = dict 
_A = namedtuple('_A', A.keys()) 
_B = namedtuple('_B', B.keys()) 
DictA = _A(**A) 
DictB = _B(**B) 

, а затем использовать их как dict. более информация о namedtuples здесь: What are "named tuples" in Python?

+0

Как это ускорит расчет? –

+0

Фактически namedtuples быстрее и эффективнее, чем dict (http://stackoverflow.com/questions/1336791/dictionary-vs-object-which-is-more-efficient-and-why), используя это вместо dict, может улучшить скорость вычисления –

+0

Если вы просматриваете поля по имени, это все равно поиск словаря под капотом. –

7

Хм, кажется, ваш подход на самом деле лучше для плотных векторов:

>>> # Eric's answer 
>>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.4360210521285808 

>>> # My comment 
>>> timeit.timeit('for k,v in A.iteritems(): sum += v*B.get(k,0)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 
0.4082838999682963 

# My comment, more compact 
>>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.38053266868496394 

>>> #Your approach 
>>> timeit.timeit('for k in A: sum += A[k]*B[k] if k in B else 0.', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 
0.35574231962510794 

>>> # Your approach, more compact 
>>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.3400850549682559 

Для более редких из них, ответ Эрика работает лучше, но у вас по-прежнему самый быстрый:

# Mine 
>>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.1390782696843189 

# Eric's 
>>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.11702822992151596 

# Yours 
>>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.07878250570843193 

РЕДАКТИРОВАТЬ

После Мессинг вокруг е или немного, кажется, sum([x for x ...]) значительно быстрее, чем sum(x for x in ...). Rebenchmarking с этим и замечание Янне для ключей в ответ Эрика, ваш по-прежнему на вершине (с Joowani-х дает небольшое улучшение):

>>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.items()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
1.1604375791416714 
>>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.9234189571552633 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.5411289579401455 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.5198972138696263 

масштабированию до очень больших размеров, вы видите, точно такая же картина:

>>> #Mine 
>>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.iteritems()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
45.328807250833506 

>>> #Eric's 
>>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
28.042937058640973 

>>> #Yours 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
16.55080344861699 

>>> #Joowani's 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
15.485236119691308 

Я думаю, что трюк Джоувани не улучшает его здесь, потому что векторы примерно одного размера, но в зависимости от вашей проблемы (если некоторые векторы смехотворно меньше других), это может быть более значительным ...

EDIT СНОВА

К сожалению, кажется, что я должен был еще кофе перед публикацией ... Как отметил Эрик (хотя я полностью пропустил это ...), определяя массив в setup держит его в то же самое для всех испытаний, что на самом деле не самый лучший способ тестирования. С СОБСТВЕННЫХ случайных векторов тестируется, результаты не сильно отличаются, но для полноты картины:

>>> timeit.timeit('mine(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
6.294158102577967 
>>> timeit.timeit('erics(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
6.068052507449011 
>>> timeit.timeit('yours(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
5.745110704570834 
>>> timeit.timeit('joowanis(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
5.737499445367575 

Для масштабирования:

>>> timeit.timeit('mine(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
5.0510995368395015 
>>> timeit.timeit('erics(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.350612399185138 
>>> timeit.timeit('yours(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.15619379016789 
>>> timeit.timeit('joowanis(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.185129374341159 

Я думаю, что в нижней строке в том, что вы не можете ожидать значительного прироста умело переупорядочивая ваши выражения для такого рода вещей ... Может быть, вы могли бы попробовать сделать числовую часть в C/Cython или использовать пакет Scipy's Sparse?

+0

Я только что обновил мой из 'set (d.keys())' to 'd.viewkeys()', что может дать ускорение скорости – Eric

+0

Кроме того, я не уверен, что этот тест имеет смысл, если исходный словарь отличается для каждый фрагмент кода – Eric

+0

Правда, но я думаю, что это все равно может дать нам приблизительное представление о скорости каждого из них в среднем. – val

2

В случае, когда A намного длиннее B, это может помочь?

if len(A) > len(B): 
    A, B = B, A 

for k in A: 
    if k in B: 
     the_sum += A[k] * B[k] 
+0

Вау, это было быстрое редактирование Эрика, спасибо! – Joohwan