2016-07-14 2 views
1

Я запускаю сервер webssocket в php, который делает свою работу довольно хорошо. За исключением одного:stream_socket_accept() и fgetc stalls «случайно» на другом рабочем сервере websocket

Иногда stream_socket_accept() останавливается на 60 секунд. Это может произойти через несколько секунд после запуска сервера, это также может произойти через несколько часов после запуска сервера. Я не могу воспроизвести поведение сам.

Иногда он останавливается при вызове stream_socket_accept(), иногда он сжимается при чтении заголовка из клиента сразу после возврата stream_socket_accept.

К тому же: default_socket_timeout задает ширину системы до 10 секунд, а php.ini показывает это значение.

Даже с stream_socket_accept ($ socket, 0); он остановится. Указанный тайм-аут просто игнорируется.

Мои вопросы:

  1. Почему стойло в первую очередь? Когда слушатель указывает на новое соединение, stream_socket_accept не должен останавливаться, не так ли?

  2. Почему fgetc кабина на самом первом байте соединения (при получении заголовка входящего соединения, сразу после stream_socket_accept()?

  3. Почему это стойло ровно 60 секунд (стандарт default_timeout для сокетов), когда я определенно изменил это до 10 секунд (с указанием в phpinfo()).

Я бегу из идей.

ЛЮБАЯ идея высоко ценится.

Вот полный код сокета (который также выполняет некоторую управляющую логику агента, которая работает).

Моя надежда заключается в том, что кто-то что-то видит.

<?php 

// STALLING HAPPENS SOMETIMES IN LINE 52 fgetc() 
// AND IN LINE 271 stream_socket_accept() 

class WS { 

    const 
     //! UUID magic string 
     Magic='258EAFA5-E914-47DA-95CA-C5AB0DC85B11', 
     //! Packet size 
     Packet=65536; 

    //@{ Mask bits for first byte of header 
    const 
     Text=0x01, 
     Binary=0x02, 
     Close=0x08, 
     Ping=0x09, 
     Pong=0x0a, 
     OpCode=0x0f, 
     Finale=0x80; 
    //@} 

    //@{ Mask bits for second byte of header 
    const 
     Length=0x7f; 
    //@} 

} 

//! RFC6455 server socket 
class Server { 

    protected 
     $addr, 
     $ctx, 
     $wait, 
     $sockets, 
     $agents=[], 
     $events=[]; 

    /** 
    * Allocate stream socket 
    * @return NULL 
    * @param $socket resource 
    **/ 
    function alloc($socket) { 
     trace("Fetching http header...");   
     // if header does not start with "GET" 
     // immediately close connection 
     foreach(['G','E','T'] as $get) { 
      $character=fgetc($socket); 
      $metadata=stream_get_meta_data($socket); 

      // this MUST NOT BE REWRITTEN! 
      // unread_bytes can not be checked against 0 
      if ($character==$get && !feof($socket) && $metadata['unread_bytes'] > 0) 
       continue; 
      else { 
       trace("Error: Header does not start with GET – connection closed"); 
       stream_socket_shutdown($socket,STREAM_SHUT_RDWR); 
       return; 
      } 
     } 

     $str="GET"; 
     do { 
      $str.=fgetc($socket); 
      $metadata=stream_get_meta_data($socket); 
     } while (!feof($socket) && $metadata['unread_bytes'] > 0); 

     // Get WebSocket headers 
     $hdrs=[]; 
     $CRLF="\r\n"; 
     $verb=NULL; 
     $uri=NULL; 
     foreach (explode($CRLF,$str) as $line) 
      if (preg_match('/^(\w+)\s(.+)\sHTTP\/1\.\d$/', 
       trim($line),$match)) { 
       $verb=$match[1]; 
       $uri=$match[2]; 
      } 
      else 
      if (preg_match('/^(.+): (.+)/',trim($line),$match)) 
       // Standardize header 
       $hdrs[ 
        strtr(
         ucwords(
          strtolower(
           strtr($match[1],'-',' ') 
          ) 
         ),' ','-' 
        ) 
       ]=$match[2]; 
     if (empty($hdrs['Upgrade']) && 
      empty($hdrs['Sec-Websocket-Key'])) { 
      // Not a WebSocket request 
      $this->write(
       $socket, 
       $str='HTTP/1.1 400 Bad Request'.$CRLF. 
        'Connection: close'.$CRLF.$CRLF 
      ); 
      stream_socket_shutdown($socket,STREAM_SHUT_RDWR); 
      // 1 @fclose($socket); 
      return; 
     } 
     // Handshake 
     $bytes=$this->write(
      $socket, 
      $str='HTTP/1.1 101 Switching Protocols'.$CRLF. 
       'Upgrade: websocket'.$CRLF. 
       'Connection: Upgrade'.$CRLF. 
       'Sec-WebSocket-Accept: '. 
        base64_encode(
         sha1(
          $hdrs['Sec-Websocket-Key']. 
          WS::Magic, 
          TRUE 
         ) 
        ).$CRLF.$CRLF 
     ); 
     if (is_int($bytes)) { 
      // Connect agent to server 
      $this->sockets[]=$socket; 
      $this->agents[(int)$socket]= 
       new Agent($this,$socket,$verb,$uri,$hdrs); 
     } 
     else 
      stream_socket_shutdown($socket,STREAM_SHUT_RDWR); 
    } 

    /** 
    * Free stream socket 
    * @return bool 
    * @param $socket resource 
    **/ 
    function free($socket) { 
     unset($this->sockets[array_search($socket,$this->sockets)]); 
     unset($this->agents[(int)$socket]); 
     stream_socket_shutdown($socket,STREAM_SHUT_WR); 
     // 1 @fclose($socket); 
    } 

    /** 
    * Read from stream socket 
    * @return string|FALSE 
    * @param $socket resource 
    **/ 
    function read($socket) { 
     return is_string([email protected]($socket,WS::Packet)) && strlen($str)? 
      $str: 
      FALSE; 
    } 

    /** 
    * Write to stream socket 
    * @return int|FALSE 
    * @param $socket resource 
    * @param $str string 
    **/ 
    function write($socket,$str) { 
     for ($i=0,$bytes=0;$i<strlen($str);$i+=$bytes) { 
      if (([email protected]($socket,substr($str,$i))) && 
       @fflush($socket)) 
       continue; 
      return FALSE; 
     } 
     return $bytes; 
    } 

    /** 
    * Return socket agents 
    * @return array 
    * @param $uri string 
    ***/ 
    function agents($uri=NULL) { 
     return array_filter(
      $this->agents, 
      function($val) use($uri) { 
       return $uri?($val->uri()==$uri):TRUE; 
      } 
     ); 
    } 

    /** 
    * Return event handlers 
    * @return array 
    **/ 
    function events() { 
     return $this->events; 
    } 

    /** 
    * Bind function to event handler 
    * @return object 
    * @param $event string 
    * @param $func callable 
    **/ 
    function on($event,$func) { 
     $this->events[$event]=$func; 
     return $this; 
    } 

    /** 
    * Execute the server process 
    * @return object 
    **/ 
    function run() { 
     $fw=\Base::instance(); 
     // Activate WebSocket listener 
     $listen=stream_socket_server(
      $this->addr,$errno,$errstr, 
      STREAM_SERVER_BIND|STREAM_SERVER_LISTEN, 
      $this->ctx 
     ); 
     // stream_set_timeout($listen,0); 
     stream_set_read_buffer($listen,WS::Packet); 
     stream_set_write_buffer($listen,WS::Packet); 
     $socket=socket_import_stream($listen); 
     socket_set_option(
      $socket, 
      SOL_SOCKET, 
      SO_REUSEADDR, 
      1 
     ); 
     socket_set_option(
      $socket, 
      SOL_SOCKET, 
      SO_LINGER, 
      ['l_onoff'=>1,'l_linger'=>1] 
     ); 
     register_shutdown_function(function() use($listen) { 
      foreach ($this->sockets as $socket) 
       if ($socket!=$listen) 
        $this->free($socket); 
      stream_socket_shutdown($listen,STREAM_SHUT_RDWR); 
      @fclose($listen); 
      if (isset($this->events['stop']) && 
       is_callable($func=$this->events['stop'])) 
       $func($this); 
     }); 
     if ($errstr) 
      user_error($errstr,E_USER_ERROR); 
     if (isset($this->events['start']) && 
      is_callable($func=$this->events['start'])) 
      $func($this); 
     $this->sockets=[$listen]; 
     $empty=[]; 
     $wait=$this->wait; 
     while (TRUE) { 
      $active=$this->sockets; 
      $mark=microtime(TRUE); 
      trace("Waiting for socket action..."); 
      [email protected]_select(
       $active,$empty,$empty,(int)$wait,round(1e6*($wait-(int)$wait)) 
      ); 
      if (is_bool($count) && $wait) { 
       if (isset($this->events['error']) && 
        is_callable($func=$this->events['error'])) 
        $func($this); 
       die; 
      } 
      if ($count) { 
       // Process active connections 
       foreach ($active as $socket) { 
        if (!is_resource($socket)) 
         continue;      
        if ($socket==$listen) { 
         trace("New connection pending..."); 
         if ($socket=stream_socket_accept($listen)) { 
          $this->alloc($socket); 
          trace("alloc() finished"); 
         } 
         else { 
          trace("Connection failed...");      
          continue;       
         } 
        } 
        else { 
         $id=(int)$socket; 
         if (isset($this->agents[$id]) && 
          $raw=$this->agents[$id]->fetch()) { 
          list($op,$data)=$raw; 
          // Dispatch 
          switch ($op & WS::OpCode) { 
          case WS::Text: 
           $data=trim($data); 
          case WS::Binary: 
          case WS::Pong: 
           if (isset($this->events['receive']) && 
            is_callable($func=$this->events['receive'])) 
            $func($this->agents[$id],$op,$data); 
           break; 
          case WS::Ping: 
           $this->agents[$id]->send(WS::Pong); 
           break; 
          default: 
           if (isset($this->events['invalid']) && 
            is_callable($func=$this->events['invalid'])) 
            $func($this->agents[$id],$op,$data); 
          case WS::Close: 
           $this->free($socket); 
           break; 
          } 
         } 
        } 
       } 
       $wait-=microtime(TRUE)-$mark; 
       while ($wait<1e-6) { 
        $wait+=$this->wait; 
        $count=0; 
       } 
      } 
      if (!$count) { 
       $mark=microtime(TRUE); 
       foreach ($this->sockets as $socket) { 
        if (!is_resource($socket)) 
         continue; 
        $id=(int)$socket; 
        if ($socket!=$listen && 
         isset($this->agents[$id]) && 
         is_string($this->agents[$id]->send(WS::Ping)) && 
         isset($this->events['idle']) && 
         is_callable($func=$this->events['idle'])) 
         $func($this->agents[$id]); 
       } 
       $wait=$this->wait-microtime(TRUE)+$mark; 
      } 
     } 
    } 

    /** 
    * Instantiate object 
    * @return object 
    * @param $addr string 
    * @param $ctx resource 
    * @param $wait int 
    **/ 
    function __construct($addr,$ctx=NULL,$wait=60) { 
     $this->addr=$addr; 
     $this->ctx=$ctx?:stream_context_create(); 
     $this->wait=$wait; 
     $this->events=[]; 
    } 

} 

//! RFC6455 remote socket 
class Agent { 

    protected 
     $server, 
     $id, 
     $socket, 
     $flag, 
     $verb, 
     $uri, 
     $headers, 
     $events, 
     $buffer=''; 

    /** 
    * Return server instance 
    * @return object 
    **/ 
    function server() { 
     return $this->server; 
    } 

    /** 
    * Return socket ID 
    * @return string 
    **/ 
    function id() { 
     return $this->id; 
    } 

    /** 
    * Return request method 
    * @return string 
    **/ 
    function verb() { 
     return $this->verb; 
    } 

    /** 
    * Return request URI 
    * @return string 
    **/ 
    function uri() { 
     return $this->uri; 
    } 

    /** 
    * Return socket headers 
    * @return string 
    **/ 
    function headers() { 
     return $this->headers; 
    } 

    /** 
    * Frame and transmit payload 
    * @return string|FALSE 
    * @param $socket resource 
    * @param $op int 
    * @param $payload string 
    **/ 
    function send($op,$data='') { 
     $mask=WS::Finale | $op & WS::OpCode; 
     $len=strlen($data); 
     $str=''; 
     if ($len<126) 
      $str=pack('CC',$mask,$len); 
     else 
     if ($len>125 && $len<65536) 
      $str=pack('CCn',$mask,126,$len); 
     else 
     if ($len>65535) 
      $str=pack('CCNN',$mask,127,$len); 
     $str.=$data; 
     $server=$this->server(); 
     if (is_bool($server->write($this->socket,$str))) { 
      $server->free($this->socket); 
      return FALSE; 
     } 
     if (!in_array($op,[WS::Pong,WS::Close]) && 
      isset($this->events['send']) && 
      is_callable($func=$this->events['send'])) 
      $func($this,$op,$data); 
     return $data; 
    } 

    /** 
    * Retrieve and unmask payload 
    * @return array|FALSE 
    **/ 
    function fetch() { 
     // Unmask payload 
     $server=$this->server(); 
     if (is_bool($str=$server->read($this->socket))) { 
      $server->free($this->socket); 
      return FALSE; 
     } 
     $buf=($this->buffer.=$str); 
     $op=ord($buf[0]) & WS::OpCode; 
     $len=ord($buf[1]) & WS::Length; 
     $pos=2; 
     if ($len==126) { 
      $len=ord($buf[2])*256+ord($buf[3]); 
      $pos+=2; 
     } 
     else 
     if ($len==127) { 
      for ($i=0,$len=0;$i<8;$i++) 
       $len=$len*256+ord($buf[$i+2]); 
      $pos+=8; 
     } 
     for ($i=0,$mask=[];$i<4;$i++) 
      $mask[$i]=ord($buf[$pos+$i]); 
     $pos+=4; 
     if (strlen($buf)<$len+$pos) 
      return FALSE; 
     $this->buffer=''; 
     for ($i=0,$data='';$i<$len;$i++) 
      $data.=chr(ord($buf[$pos+$i])^$mask[$i%4]); 
     return [$op,$data]; 
    } 

    /** 
    * Destroy object 
    * @return NULL 
    **/ 
    function __destruct() { 
     if (isset($this->events['disconnect']) && 
      is_callable($func=$this->events['disconnect'])) 
      $func($this); 
    } 

    /** 
    * Instantiate object 
    * @return object 
    * @param $server object 
    * @param $socket resource 
    * @param $verb string 
    * @param $uri string 
    * @param $hdrs array 
    **/ 
    function __construct($server,$socket,$verb,$uri,array $hdrs) { 
     $this->server=$server; 
     $this->id=stream_socket_get_name($socket,TRUE); 
     $this->socket=$socket; 
     $this->verb=$verb; 
     $this->uri=$uri; 
     $this->headers=$hdrs; 
     $this->events=$server->events(); 
     if (isset($this->events['connect']) && 
      is_callable($func=$this->events['connect'])) 
      $func($this); 
    } 

} 

/** 
* Simple console logger 
* @return NULL 
* @param $line string 
**/ 
function trace($line) { 
    echo "\r".date('H:i:s').' '.$line.PHP_EOL; 
} 

/** 
* Process handler for graceful exit (routed to registered shutdown handler) 
* @return NULL 
**/ 
function kill($signal) { 
    die; 
} 

pcntl_signal(SIGINT,'kill'); 
pcntl_signal(SIGTERM,'kill'); 

if (PHP_SAPI!='cli') { 
    // Prohibit direct HTTP access 
    header('HTTP/1.1 404 Not Found'); 
    die; 
} 

chdir(__DIR__); 

require('lib/base.php'); 
error_reporting((E_ALL|E_STRICT)&~(E_NOTICE|E_USER_NOTICE|E_WARNING|E_USER_WARNING)); 

// Load .ini files 
$fw=Base::instance(); 

$fw-> 
    config('app/ini/config.ini')-> 
    config('app/ini/dev.ini'); 

if (!is_file($pid='ws.pid') || 
    !is_dir('/proc/'.file_get_contents($pid))) { 

    // Override any error handler specified in .ini files 
    ini_set('error_log','/dev/null'); 
    $fw->DEBUG=2; 
    $fw->ONERROR=function($fw) { 
     trace($fw->get('ERROR.text')); 
     foreach (explode("\n",trim($fw->get('ERROR.trace'))) as $line) 
      trace($line); 
    }; 

    $fw->VERBOSE=(bool)preg_grep('/[\/-]v/',$argv); 

    // Instantiate the server 
    $ws=new Server(
     $fw->get('SITE.websocket'), 
     stream_context_create([ 
      'ssl'=>$fw->get('SSL')+[ 
       'allow_self_signed'=>TRUE, 
       'verify_peer'=>FALSE 
      ] 
     ]) 
    ); 

    // Intercept OpenSSL errors 
    $err=FALSE; 
    while (TRUE) 
     if ($msg=openssl_error_string()) { 
      $err=TRUE; 
      trace($msg); 
     } 
     else 
      break; 
    if ($err) 
     die; 

    $ws-> 
     on('start',function($server) use($fw) { 
      trace('WebSocket server started ('.$fw->get('SITE.websocket').')'); 
      file_put_contents('ws.pid',getmypid()); 
     })-> 
     on('error',function($server) use($fw) { 
      if ($err=socket_last_error()) { 
       trace(socket_strerror($err)); 
       socket_clear_error(); 
      } 
      if ($err=error_get_last()) 
       trace($err['message']); 
     })-> 
     on('stop',function($server) use($fw) { 
      trace('Shutting down ('.$fw->get('SITE.websocket').')'); 
      @unlink('ws.pid'); 
     })-> 
     on('connect',function($agent) use($fw) { 
      trace(
       '(0x00'.$agent->uri().') '.$agent->id().' connected '. 
       '<'.(count($agent->server()->agents())+1).'>' 
      ); 
      if ($fw->VERBOSE) { 
       $hdrs=$agent->headers(); 
       trace(
        $hdrs['User-Agent'].' '. 
        '[v'.$hdrs['Sec-Websocket-Version'].']' 
       ); 
      } 
      $agent->hash=dechex(crc32(file_get_contents(__FILE__))); 
      $agent->feature=[]; 
      $agent->query=''; 
      $agent->session=[]; 
     })-> 
     on('disconnect',function($agent) use($fw) { 
      trace('(0x08'.$agent->uri().') '.$agent->id().' disconnected'); 
      if ($err=socket_last_error()) { 
       trace(socket_strerror($err)); 
       socket_clear_error(); 
      } 
      if (preg_match('/^\/(.+)/',$agent->uri(),$match)) { 
       $class='WebSocket\\'.$match[1]; 
       if (isset($agent->feature[$class])) { 
        $obj=$agent->feature[$class]; 
        foreach ($agent->feature as $key=>$obj) 
         if (is_callable([$obj,'disconnect'])) 
          $fw->call([$obj,'disconnect'],[$fw,$agent]); 
       } 
      } 
     })-> 
     on('idle',function($agent) use($fw) { 
      foreach ($agent->feature as $key=>$obj) 
       if (is_callable([$obj,'idle'])) 
        $fw->call([$obj,'idle'],[$fw,$agent]); 
     })-> 
     on('receive',function($agent,$op,$data) use($fw) { 
      switch($op) { 
      case WS::Pong: 
       $text='pong'; 
       break; 
      case WS::Text: 
       $data=trim($data); 
      case WS::Binary: 
       $text='data'; 
       break; 
      default: 
       $text='unknown'; 
       break; 
      } 
      trace(
       '(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT). 
       $agent->uri().') '.$agent->id().' '.$text.' received' 
      ); 
      if ($op==WS::Text && $data) { 
       if ($fw->VERBOSE) 
        trace($data); 
       $in=json_decode($data,TRUE); 
       if (json_last_error()==JSON_ERROR_NONE && 
        preg_match('/^\/(.+)/',$agent->uri(),$match)) { 
        $class='WebSocket\\'.$match[1]; 
        if (isset($agent->feature[$class])) { 
         if (isset($in['query'])) 
          $agent->query=$in['query']; 
         if (isset($in['session'])) 
          foreach ($in['session'] as $key=>$val) 
           $agent->session[$key]=$val; 
         $obj=$agent->feature[$class]; 
         if (isset($in['func']) && 
          is_callable([$obj,$in['func']])) 
          $fw->call([$obj,$in['func']],[$fw,$agent]); 
         return; 
        } 
        else 
        if (isset($in['nonce']) && 
         isset($agent->headers()['Cookie']) && 
         preg_match(
          '/PHPSESSID=(\w+)/', 
          $agent->headers()['Cookie'], 
          $match 
         ) && 
         Bcrypt::instance()-> 
          verify($match[1],'$2y$12$'.$in['nonce'])) { 
         if (isset($in['session'])) 
          foreach ($in['session'] as $key=>$val) 
           $agent->session[$key]=$val; 
         if (empty($agent->feature[$class])) 
          $agent->feature[$class]=new $class($fw,$agent); 
         return; 
        } 
        else 
         trace(
          '(0x00'.$agent->uri().') '.$agent->id().' '. 
          'authentication failed'); 
       } 
      } 
     })-> 
     on('send',function($agent,$op,$data) use($fw) { 
      switch($op) { 
      case WS::Ping: 
       $text='ping'; 
       break; 
      case WS::Text: 
       $data=trim($data); 
      case WS::Binary: 
       $text='data'; 
       break; 
      default: 
       $text='unknown'; 
       break; 
      } 
      trace(
       '(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT). 
       $agent->uri().') '.$agent->id().' '.$text.' sent' 
      ); 
      if ($op==WS::Text && $fw->VERBOSE) 
       trace($data); 
     })-> 
     on('invalid',function($agent,$op,$data) use($fw) { 
      trace(
       '(0x'.str_pad(dechex($op),2,'0',STR_PAD_LEFT). 
       $agent->uri().') '.$agent->id().' invalid opcode' 
      ); 
     })-> 
     run(); 

} 
else 
    trace('A socket server instance is already running!'); 

ответ

0

Это отчасти ответ на вопрос:

таймаут игнорируется, поскольку PHP-кли использует другой php.ini, конечно. Я совсем забыл об этом. простой ini_set сделал трюк.

Смежные вопросы