2

Я работал над алгоритмом обхода графика по простой сети, и я хотел бы запустить его с использованием многопроцессорности, так как он потребует большого количества ограничений ввода-вывода, когда я масштабировать его по всей сети. Простая версия работает довольно быстро:Преобразование графика для многопроцессорности в Python

already_seen = {} 
already_seen_get = already_seen.get 

GH_add_node = GH.add_node 
GH_add_edge = GH.add_edge 
GH_has_node = GH.has_node 
GH_has_edge = GH.has_edge 


def graph_user(user, depth=0): 
    logger.debug("Searching for %s", user) 
    logger.debug("At depth %d", depth) 
    users_to_read = followers = following = [] 

    if already_seen_get(user): 
     logging.debug("Already seen %s", user) 
     return None 

    result = [x.value for x in list(view[user])] 

    if result: 
     result = result[0] 
     following = result['following'] 
     followers = result['followers'] 
     users_to_read = set().union(following, followers) 

    if not GH_has_node(user): 
     logger.debug("Adding %s to graph", user) 
     GH_add_node(user) 

    for follower in users_to_read: 
     if not GH_has_node(follower): 
      GH_add_node(follower) 
      logger.debug("Adding %s to graph", follower) 
      if depth < max_depth: 
       graph_user(follower, depth + 1) 

     if GH_has_edge(follower, user): 
      GH[follower][user]['weight'] += 1 
     else: 
      GH_add_edge(user, follower, {'weight': 1}) 

Ее на самом деле значительно быстрее, чем мой многопроцессорной версии:

to_write = Queue() 
to_read = Queue() 
to_edge = Queue() 
already_seen = Queue() 


def fetch_user(): 
    seen = {} 
    read_get = to_read.get 
    read_put = to_read.put 
    write_put = to_write.put 
    edge_put = to_edge.put 
    seen_get = seen.get 

    while True: 
     try: 
      logging.debug("Begging for a user") 

      user = read_get(timeout=1) 
      if seen_get(user): 
       continue 

      logging.debug("Adding %s", user) 
      seen[user] = True 
      result = [x.value for x in list(view[user])] 
      write_put(user, timeout=1) 

      if result: 
       result = result.pop() 
       logging.debug("Got user %s and result %s", user, result) 
       following = result['following'] 
       followers = result['followers'] 
       users_to_read = list(set().union(following, followers)) 

       [edge_put((user, x, {'weight': 1})) for x in users_to_read] 

       [read_put(y, timeout=1) for y in users_to_read if not seen_get(y)] 

     except Empty: 
      logging.debug("Fetches complete") 
      return 


def write_node(): 
    users = [] 
    users_app = users.append 
    write_get = to_write.get 

    while True: 
     try: 
      user = write_get(timeout=1) 
      logging.debug("Writing user %s", user) 
      users_app(user) 
     except Empty: 
      logging.debug("Users complete") 
      return users 


def write_edge(): 
    edges = [] 
    edges_app = edges.append 
    edge_get = to_edge.get 

    while True: 
     try: 
      edge = edge_get(timeout=1) 
      logging.debug("Writing edge %s", edge) 
      edges_app(edge) 
     except Empty: 
      logging.debug("Edges Complete") 
      return edges 


if __name__ == '__main__': 
    pool = Pool(processes=1) 
    to_read.put(me) 

    pool.apply_async(fetch_user) 
    users = pool.apply_async(write_node) 
    edges = pool.apply_async(write_edge) 

    GH.add_weighted_edges_from(edges.get()) 
    GH.add_nodes_from(users.get()) 

    pool.close() 
    pool.join() 

То, что я не могу понять, почему одна версия процесс намного быстрее. Теоретически, многопроцессорная версия должна записывать и читать одновременно. Я подозреваю, что в очередях есть блокировка, и это является причиной замедления, но на самом деле у меня нет никаких доказательств этого. Когда я масштабирую количество процессов fetch_user, он работает быстрее, но затем у меня возникают проблемы с синхронизацией данных, просматриваемых по ним. Так что некоторые мысли у меня были

  • Это даже хорошее приложение для многопроцессорной обработки? Я был изначально , используя его, потому что я хотел быть в состоянии , чтобы получить из db в parallell.
  • Как я могу избежать конкуренции ресурсов при чтении и записи из одной очереди?
  • Я пропустил некоторые очевидные оговорки в отношении дизайна?
  • Что я могу сделать, чтобы разделить таблицу поиска между читателями, чтобы я не продолжал получать один и тот же пользователь дважды?
  • При увеличении количества процессов отбора, которые они, в конце концов, записывают. Похоже, что очередь записи не записывается, но очередь чтения заполнена. Есть ли лучший способ справиться с этой ситуацией, чем с тайм-аутами и обработкой исключений?

ответ

1

Queues в Python синхронизированы. Это означает, что только один поток за раз может читать/писать, это определенно спровоцирует узкое место в вашем приложении.

Одним из лучших решений является распространение обработки на основе hash function и назначение обработки потокам с простой работой модуля. Так, например, если у вас есть 4 темы, вы можете иметь 4 очереди:

thread_queues = [] 
for i in range(4): 
    thread_queues = Queue() 

for user in user_list: 
    user_hash=hash(user.user_id) #hash in here is just shortcut to some standard hash utility 
    thread_id = user_hash % 4 
    thread_queues[thread_id].put(user) 

# From here ... your pool of threads access thread_queues but each thread ONLY accesses 
# one queue based on a numeric id given to each of them. 

Большинство хэш-функции будет равномерно распределять ваши данные. Обычно я использую UMAC. Но, возможно, вы можете просто попробовать с хэш-функцией из реализации Python String.

Другим усовершенствованием было бы избежать использования очередей и использовать несинхронизирующий объект, такой список.

+0

Я, наконец, обошел это, и синхронизация была узким местом. Проблема заключалась в том, чтобы решить, на какой раздел данных. – dcolish

Смежные вопросы