2016-07-06 1 views
0

В настоящее время я пишу программу Python для приема данных из сокета TCP/UDP, а затем записываю данные в файл. Прямо сейчас, моя программа связана с I/O, записывая каждую дейтаграмму в файл, как он есть (я делаю это для очень больших файлов, поэтому замедление значимо). Имея это в виду, я решил, что хочу попробовать получить данные из сокета в одном потоке, а затем записать эти данные в другой поток. До сих пор я придумал следующий черновик. На данный момент он записывает только один блок данных (512 байт) в файл.Получение данных сокета в одном потоке, запись данных в другой - python

f = open("t1.txt","wb") 
def write_to_file(data): 
    f.write(data) 

def recv_data(): 
    dataChunk, addr = sock.recvfrom(buf) #THIS IS THE DATA THAT GETS WRITTEN 
    try: 
     w = threading.Thread(target = write_to_file, args = (dataChunk,)) 
     threads.append(w) 
     w.start() 
     while(dataChunk): 
      sock.settimeout(4) 
      dataChunk,addr = sock.recvfrom(buf) 
    except socket.timeout: 
     print "Timeout" 
     sock.close() 
     f.close() 

threads = [] 
r = threading.Thread(target=recv_data) 
threads.append(r) 
r.start() 

Я предполагаю, что я делаю что-то неправильно, я просто не уверен, что лучший способ использовать потоки. Прямо сейчас, моя проблема в том, что я должен приводить аргумент, когда создаю свой поток, но значение этого аргумента не изменяется должным образом, чтобы отражать новые фрагменты данных, которые входят. Однако, если я поместил строку w=threading.Thread(target=write_to_file, arg=(dataChunk,)) внутри while(dataChunk) loop, не буду ли я создавать новый поток на каждой итерации?

Кроме того, для того, что это стоит, это просто мое небольшое доказательство концепции использования отдельных потоков приема и записи. Это не самая большая программа, которая должна в конечном счете использовать эту концепцию.

ответ

1

У вас должен быть буфер, на который пишет поток чтения, и записывается нить записи. A deque from the collections module идеален, так как позволяет добавлять/удалять с обеих сторон без ухудшения производительности.

Итак, не проходите dataChunk в свою ветку (ни), а в буфер.

import collections # for the buffer 
import time # to ease polling 
import threading 

def write_to_file(path, buffer, terminate_signal): 
    with open(path, 'wb') as out_file: # close file automatically on exit 
     while not terminate_signal.is_set() or buffer: # go on until end is signaled 
     try: 
      data = buffer.pop() # pop from RIGHT end of buffer 
     except IndexError: 
      time.sleep(0.5) # wait for new data 
     else: 
      out_file.write(data) # write a chunk 

def read_from_socket(sock, buffer, terminate_signal): 
    sock.settimeout(4) 
    try: 
     while True: 
     data, _ = sock.recvfrom(buf) 
     buffer.appendleft(data) # append to LEFT of buffer 
    except socket.timeout: 
     print "Timeout" 
     terminate_signal.set() # signal writer that we are done 
     sock.close() 

buffer = collections.deque() # buffer for reading/writing 
terminate_signal = threading.Event() # shared signal 
threads = [ 
    threading.Thread(target=read_from_socket, kwargs=dict(
    sock=sock, 
    buffer=buffer, 
    terminate_signal=terminate_signal 
)), 
    threading.Thread(target= write_to_file, kwargs=dict(
    path="t1.txt", 
    buffer=buffer, 
    terminate_signal=terminate_signal 
)) 
] 
for t in threads: # start both threads 
    t.start() 
for t in threads: # wait for both threads to finish 
    t.join() 
+0

две вещи: 1. Я предполагаю, что вам нужно, чтобы начать каждую нить где-то (по-видимому, как так 'для т в резьб: t.start()' и 2. Он по-прежнему не появляется, чтобы работать Теперь файл составляет всего 226 байт, и, похоже, он только получает последние 226 байт данных, а не что-то в начале. – Swoldier

+0

@Swoldier Вы должны начать, а затем подождать оба потока. Я добавил соответствующий код – MisterMiyagi

+0

@Swoldier Я могу предоставить только черновик - код, который на самом деле не выполнялся *, не запускался *, и он не мог, поскольку, например, 'bug' не определен в' sock.recvfrom (buf) '. – MisterMiyagi

Смежные вопросы