2015-07-17 2 views
0

В настоящее время я извлекаю данные запаса из api, к которым у меня есть доступ. Я делаю это в следующих шагах:Розетки и резьба

  1. петлю через список символов/запасов один на один
  2. создать сокет-соединение и отправить соответствующее сообщение на апи
  3. получить данные и отдельные его в строки до «! EndMSG!» принимаются в какой момент данных для этого символа завершения
  4. преобразования данных (строка) в StringIO, а затем прочитать его в панде dataframe и в конечном итоге записать данные в SQL
  5. ли следующий символ/акции

Соответствующий фрагмент кода:

def readlines(sock, recv_buffer=4096, delim='\n'): 
    buffer = '' 
    while True: 
     data = sock.recv(recv_buffer) 
     buffer += str(data.decode('latin-1')) 

    while buffer.find(delim) != -1: 
     line, buffer = buffer.split('\n', 1) 
     yield line 

def main(): 
    syms = ['MSFT', 'AAPL', 'GS', 'F'] 
    for sym in syms: 
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     sock.connect((host, port)) 

     data = '' 
     message = sym + #relevant api specific commands 
     sock.sendall(message.encode()) 

     for line in readlines(sock): 
      if "!ENDMSG!" in line: 
       break 
      data += line + '\n' 

     sock.close() 

     data = io.StringIO(data) 
     df = pd.read_csv(data) 
     df.to_sql(...) 

Я хотел бы включить резьб в это так, что я не должен делать один запас по времени. Однако то, что им не уверены в том, где/как реализовать замки, так что я не рискую получать данные о неправильных запасов в неправильных переменных и т.д.

Это то, что я до сих пор:

import threading 
from queue import Queue 

q = Queue() 
my_lock = threading.Lock() 

def readlines(sock, recv_buffer=4096, delim='\n'): 
    buffer = '' 
    while True: 
     data = sock.recv(recv_buffer) 
     buffer += str(data.decode('latin-1')) 

    while buffer.find(delim) != -1: 
     line, buffer = buffer.split('\n', 1) 
     yield line 

def get_symbol_data(sym, sock): 
    with my_lock: 
     data = '' 
     message = sym + #relevant api specific commands 
     sock.sendall(message.encode()) 

     for line in readlines(sock): 
      if "!ENDMSG!" in line: 
       break 
      data += line + '\n' 

     data = io.StringIO(data) 
     df = pd.read_csv(data) 
     df.to_sql(...) 

def threader(): 
    while True: 
     sym_tuple = q.get() 
     sym = sym_tuple[0] 
     sock = sym_tuple[1] 
     get_symbol_data(sym, sock) 
     q.task_done() 

def main(): 
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    sock.connect((host, port)) 

    # create 4 threads 
    for x in range(4): 
     t = threading.Thread(target=threader) 
     t.daemon = True 
     t.start() 

    syms = ['MSFT', 'AAPL', 'GS', 'F'] 
    for sym in syms: 
     q.put((sym, sock)) 
    q.join() 
    sock.close() 

Моя попытка при включении резьбы просто висит. Нет ошибок, ничего. Он просто висит. Надеюсь, кто-то может указать мне в правильном направлении.

Я также не уверен, что если я воспользуюсь замком в нужном месте? Кстати, если я не использую блокировку, программа все еще висит. Предположительно, он должен работать, даже если данные все перепутаны из-за того, что не используются блокировки?

+0

Вместо того, чтобы использовать разные сокеты для каждого символа, почему вы не отправляете все сообщения по одному соединению? Предположительно, нет причин, по которым вам нужно ждать ответа на одно сообщение, прежде чем отправлять следующее? Возможно, вам понадобится один поток для отправки сообщений, а один - для чтения ответов, но это может быть все, что вам нужно. Медленность для такого рода проблем обычно связана с ожиданием каждого ответа синхронно после запроса. –

+0

В функции 'readline' ваш' while True' shoud никогда не останавливается, пока, в конце концов, не будет создано исключение. Я прав ? – FunkySayu

ответ

1

Вот мой 2 * [маленький единица валюты]:

  • Что замок должен делать? Теперь каждый поток должен дождаться блокировки перед получением данных. Это не очень эффективно, так как сетевая операция - это, вероятно, то, что может принести наибольшую пользу от распараллеливания.
  • Создайте сокет в каждом потоке. Таким образом, вам не нужно синхронизировать доступ к сокету и, возможно, полностью избавиться от блокировок. Кроме того, используйте пул сокетов.
  • Я не уверен, как вы храните свои данные, но при обновлении кадра данных pandas может потребоваться синхронизация между авторами. Вы упоминаете SQL - надеюсь, ваша база данных позаботится об этом для вас. Другой вариант заключается в том, чтобы считыватели API/сокетов сообщали свои данные второму типу потока (или только основного потока), который собирает/записывает данные на ваше хранилище.

Все перечисленное предполагает, что сетевые операции являются причиной, по которой вы хотите распараллелить в первую очередь. Кто-то упомянул в комментарии, что вы можете повторно использовать сокет для всех символов. Я не знаю, как работает ваш API, но мне кажется, что для этого потребуется, чтобы все символы собирались поочередно.

+0

Спасибо за предложения. Я дам каждой точке некоторую мысль. Что касается причины, по которой я хочу это сделать, это потому, что я собираю данные на 5 000 акций. Значит, много данных. Мне кажется неэффективным отправить запрос на один запас, получить данные, записать данные в db, а затем перейти к следующему. Я пытаюсь выяснить способ быстрее преодолеть все 5 000 акций. – darkpool

+0

Профилировали ли вы свой код? Я предполагал, что это чтение из сокета, которое занимает больше всего времени, но я предполагаю, что он также может обрабатывать или записывать в медленную базу данных. Если это сеть, пара потоков, отбирающих элементы из рабочей очереди (например, в вашей реализации), вероятно, ускорит работу, если только проблема не будет вашей пропускной способностью или скоростью API. Проблема с вашей реализацией - это блокировка, которая в основном устраняет параллелизм. –