2015-02-13 2 views
7

У меня есть поток, который поддерживает список сокетов, и я хотел бы перебирать список, видеть, есть ли что-нибудь, чтобы читать, если так - действовать на него, если нет - перейдите к следующему. Проблема в том, что, как только я сталкиваюсь с первым узлом, все выполнение приостанавливается, пока что-то не дойдет до чтения.Принудительно блокировать чтение с помощью TcpStream

Я использую std::io::Read::read(&mut self, buf: &mut [u8]) -> Result<usize>

От doc

Эта функция не дает никаких гарантий относительно того, ждет она блокирует для данных, но если объект должен блокировать для чтения, но не может его обычно сигнализирует об этом через возвращаемое значение Err.

копания в источник, реализация TcpStream Считанной

impl Read for TcpStream { 
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) } 
} 

Что вызывает

pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { 
    let fd = self.fd(); 
    let dolock = || self.lock_nonblocking(); 
    let doread = |nb| unsafe { 
     let flags = if nb {c::MSG_DONTWAIT} else {0}; 
     libc::recv(fd, 
        buf.as_mut_ptr() as *mut libc::c_void, 
        buf.len() as wrlen, 
        flags) as libc::c_int 
    }; 
    read(fd, self.read_deadline, dolock, doread) 
} 

И, наконец, называет read<T, L, R>(fd: sock_t, deadline: u64, mut lock: L, mut read: R)

Где я могу видеть петлю над неблокирующими читает пока данные не будут восстановлены или не произойдет ошибка.

Есть ли способ принудительно заблокировать чтение с помощью TcpStream?

+0

Почему вы не начинаете нить на разъем? –

+0

@ker Эта реализация предназначена для предполагаемого времени соединения 5-30 минут и должна быть способна обрабатывать параллельные соединения около 200 тыс. Я исхожу из предположения, что количество потоков - это плохая вещь, но в настоящее время я пытаюсь вычислить способ вычисления этого, поскольку это будет план B – nathansizemore

+0

Возможно, вы захотите взглянуть на библиотеки AsyncIO, такие как https: //github.com/carllerche/mio – Levans

ответ

6

Обновлено Ответ

Следует отметить, что, как Руст 1.9.0, std::net::TcpStream добавил функциональность:

fn set_nonblocking(&self, nonblocking: bool) -> Result<()>

Оригинал ответа

не удалось точно получить его с помощью TcpStream и не захотеть вложить отдельный lib для операций ввода-вывода, поэтому я решил, что t o задайте дескриптор файла как неблокирующий перед его использованием и выполните системный вызов для чтения/записи. Определенно не самое безопасное решение, но меньше работы, чем реализация новой библиотеки IO, хотя MIO выглядит великолепно.

extern "system" { 
    fn read(fd: c_int, buffer: *mut c_void, count: size_t) -> ssize_t; 
} 

pub fn new(user: User, stream: TcpStream) -> Socket { 

    // First we need to setup the socket as Non-blocking on POSIX 
    let fd = stream.as_raw_fd(); 
    unsafe { 
     let ret_value = libc::fcntl(fd, 
      libc::consts::os::posix01::F_SETFL, 
      libc::consts::os::extra::O_NONBLOCK); 

     // Ensure we didnt get an error code 
     if ret_value < 0 { 
      panic!("Unable to set fd as non-blocking") 
     } 
    } 

    Socket { 
     user: user, 
     stream: stream 
    } 
} 

pub fn read(&mut self) { 
    let count = 512 as size_t; 
    let mut buffer = [0u8; 512]; 
    let fd = self.stream.as_raw_fd(); 

    let mut num_read = 0 as ssize_t; 
    unsafe { 
     let buf_ptr = buffer.as_mut_ptr(); 
     let void_buf_ptr: *mut c_void = mem::transmute(buf_ptr); 
     num_read = read(fd, void_buf_ptr, count); 
     if num_read > 0 { 
      println!("Read: {}", num_read); 
     } 

     println!("test"); 
    } 
} 
Смежные вопросы