В настоящее время я извлекаю данные запаса из api, к которым у меня есть доступ. Я делаю это в следующих шагах:Розетки и резьба
- петлю через список символов/запасов один на один
- создать сокет-соединение и отправить соответствующее сообщение на апи
- получить данные и отдельные его в строки до «! EndMSG!» принимаются в какой момент данных для этого символа завершения
- преобразования данных (строка) в StringIO, а затем прочитать его в панде dataframe и в конечном итоге записать данные в SQL
- ли следующий символ/акции
Соответствующий фрагмент кода:
def readlines(sock, recv_buffer=4096, delim='\n'):
buffer = ''
while True:
data = sock.recv(recv_buffer)
buffer += str(data.decode('latin-1'))
while buffer.find(delim) != -1:
line, buffer = buffer.split('\n', 1)
yield line
def main():
syms = ['MSFT', 'AAPL', 'GS', 'F']
for sym in syms:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
data = ''
message = sym + #relevant api specific commands
sock.sendall(message.encode())
for line in readlines(sock):
if "!ENDMSG!" in line:
break
data += line + '\n'
sock.close()
data = io.StringIO(data)
df = pd.read_csv(data)
df.to_sql(...)
Я хотел бы включить резьб в это так, что я не должен делать один запас по времени. Однако то, что им не уверены в том, где/как реализовать замки, так что я не рискую получать данные о неправильных запасов в неправильных переменных и т.д.
Это то, что я до сих пор:
import threading
from queue import Queue
q = Queue()
my_lock = threading.Lock()
def readlines(sock, recv_buffer=4096, delim='\n'):
buffer = ''
while True:
data = sock.recv(recv_buffer)
buffer += str(data.decode('latin-1'))
while buffer.find(delim) != -1:
line, buffer = buffer.split('\n', 1)
yield line
def get_symbol_data(sym, sock):
with my_lock:
data = ''
message = sym + #relevant api specific commands
sock.sendall(message.encode())
for line in readlines(sock):
if "!ENDMSG!" in line:
break
data += line + '\n'
data = io.StringIO(data)
df = pd.read_csv(data)
df.to_sql(...)
def threader():
while True:
sym_tuple = q.get()
sym = sym_tuple[0]
sock = sym_tuple[1]
get_symbol_data(sym, sock)
q.task_done()
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
# create 4 threads
for x in range(4):
t = threading.Thread(target=threader)
t.daemon = True
t.start()
syms = ['MSFT', 'AAPL', 'GS', 'F']
for sym in syms:
q.put((sym, sock))
q.join()
sock.close()
Моя попытка при включении резьбы просто висит. Нет ошибок, ничего. Он просто висит. Надеюсь, кто-то может указать мне в правильном направлении.
Я также не уверен, что если я воспользуюсь замком в нужном месте? Кстати, если я не использую блокировку, программа все еще висит. Предположительно, он должен работать, даже если данные все перепутаны из-за того, что не используются блокировки?
Вместо того, чтобы использовать разные сокеты для каждого символа, почему вы не отправляете все сообщения по одному соединению? Предположительно, нет причин, по которым вам нужно ждать ответа на одно сообщение, прежде чем отправлять следующее? Возможно, вам понадобится один поток для отправки сообщений, а один - для чтения ответов, но это может быть все, что вам нужно. Медленность для такого рода проблем обычно связана с ожиданием каждого ответа синхронно после запроса. –
В функции 'readline' ваш' while True' shoud никогда не останавливается, пока, в конце концов, не будет создано исключение. Я прав ? – FunkySayu