options = array_merge( [ "ip" => "localhost", // ip. "port" => 8080, // port. anything under 1000 requires root "ip_source" => null, // "X-Forwarded-For" to get IP from that header, null for raw IP "origin_whitelist" => [], // leave empty to allow any origins. Eg: ["http://localhost"] "max_header_size" => 1024, // in bytes "max_header_rcvtime" => 10, // in seconds "max_tcp_sndtime" => 30, // in seconds "tcp_write_size" => 4096, // in bytes "tcp_keepalive" => true, "ws_ping_interval" => 0, // int for ping interval, 0 for disabled, in seconds "max_ws_rcv_size" => 4096, // in bytes "http_headers" => [], // Eg ["Host" => "localhost"] "debug_log" => true ], $options ); if($this->options["ip_source"] !== null){ $this->options["ip_source"] = strtolower($this->options["ip_source"]); } unset($options); // define socket on all processes $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if( // make it so we can reuse the address on forced close !socket_set_option($this->socket, SOL_SOCKET, SO_REUSEADDR, 1) || !socket_set_option($this->socket, SOL_SOCKET, SO_SNDTIMEO, ["sec" => $this->options["max_tcp_sndtime"], "usec" => 0]) || !socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, $this->options === true ? 1 : 0) || !@socket_bind($this->socket, $this->options["ip"], $this->options["port"]) || !@socket_listen($this->socket, 5) ){ throw new Exception("Could not start websocket server: " . socket_strerror(socket_last_error($this->socket))); socket_close($this->socket); die(); } $pid = pcntl_fork(); if($pid === -1){ throw new Exception("Failed to create incoming connection handler process"); } elseif($pid){ // // parent: // wait for incoming unix signals that correspond to application events // while(true){ //echo "a\n"; sleep(1); } }else{ // // child: // subprocess for handling incoming connections // // reap dead children pcntl_signal(SIGCHLD, SIG_IGN); while(true){ $this->client = new StdClass(); if(($this->client->socket = @socket_accept($this->socket)) === false){ // socket accept failed continue; } $pid = pcntl_fork(); if($pid === -1){ echo "Warn: failed to create connection handler\n"; continue; }elseif($pid){ // parent: loop over to socket_accept call continue; }else{ // // connection handler // $this->client->request = new StdClass(); $this->client->request->handshake = false; $this->client->request->headers = []; if($this->options["ip_source"] === null){ socket_getpeername( $this->client->socket, $this->client->ip, $this->client->port ); }else{ $this->client->ip = "x.x.x.x"; $this->client->port = -1; } $this->log($this->client, "TCP connect"); $buffer = null; $len = 0; $time_start = microtime(true) + $this->options["max_header_rcvtime"]; $newline = false; $firstline = true; // Parse incoming handshake init while(true){ $timeout = $time_start - microtime(true); if($timeout <= 0){ $this->http_disconnect($this->client, 408, "Read timeout"); } $times = explode(".", $timeout, 2); if(count($times) === 2){ $usec = (int)substr($times[1], 0, 6); }else{ $usec = 0; } // timeout after configured amount socket_set_option( $this->client->socket, SOL_SOCKET, SO_RCVTIMEO, [ "sec" => (int)$times[0], "usec" => $usec ] ); // hang till we receive data $this->socket_recv($this->client, $bytes, 1, MSG_WAITALL); $len++; if($len >= $this->options["max_header_size"]){ $this->http_disconnect($this->client, 413, "Headers exceeded configured size"); } if($bytes == "\r"){ continue; } if($bytes == "\n"){ if($newline === true){ // received all headers break; } if($firstline){ $firstline = false; if( preg_match( '/^([A-Z]+) ([^ ]+) HTTP\/([0-9.]+)$/i', $buffer, $header ) === 0 ){ $this->http_disconnect($this->client, 400, "Received malformed HTTP method"); } $header[1] = strtoupper($header[1]); if($header[1] !== "GET"){ $this->http_disconnect($this->client, 405, "Received invalid HTTP method " . $header[1]); } $this->client->request->path = $header[2]; $this->client->request->http_version = (float)$header[3]; $buffer = null; continue; } $newline = true; $buffer = explode(":", $buffer, 2); if(count($buffer) !== 2){ $this->http_disconnect($this->client, 400, "Received invalid header line"); } $buffer[0] = strtolower(trim($buffer[0])); $buffer[1] = trim($buffer[1]); if( strlen($buffer[0]) === 0 || strlen($buffer[1]) === 0 ){ $this->http_disconnect($this->client, 400, "Received 0-length header name or value"); } $this->client->request->headers[$buffer[0]] = $buffer[1]; $buffer = null; continue; } $newline = false; $buffer .= $bytes; } // sanitize headers if( !isset($this->client->request->headers["host"]) || !isset($this->client->request->headers["connection"]) || !$this->check_header_value("upgrade", $this->client->request->headers["connection"]) || !isset($this->client->request->headers["upgrade"]) || !$this->check_header_value("websocket", $this->client->request->headers["upgrade"]) || !isset($this->client->request->headers["sec-websocket-version"]) || !$this->check_header_value("13", $this->client->request->headers["sec-websocket-version"]) || !isset($this->client->request->headers["sec-websocket-key"]) || strlen(base64_decode($this->client->request->headers["sec-websocket-key"])) !== 16 ){ $this->http_disconnect( $this->client, 405, "Not a WebSocket client" ); } // get IP if($this->options["ip_source"] !== null){ if(!isset($this->client->request->headers[$this->options["ip_source"]])){ $this->http_disconnect( $this->client, 405, "Missing " . $this->options["ip_source"] . " header" ); } $parts = explode( ":", $this->client->request->headers[$this->options["ip_source"]], 2 ); $this->client->ip = $parts[0]; if(count($parts) === 2){ $this->client->port = $parts[1]; } } // check origin if( count($this->options["origin_whitelist"]) !== 0 && ( !isset($this->client->request->headers["origin"]) || !in_array($this->client->request->headers["origin"], $this->options["origin_whitelist"]) ) ){ $this->http_disconnect( $this->client, 403, "Origin not allowed, got " . $this->client->request->headers["origin"] ); } // send handshake confirmation $this->http_respond( $this->client, 101, array_merge( $this->options["http_headers"], [ "Connection" => "Upgrade", "Upgrade" => "websocket", "Sec-WebSocket-Accept" => base64_encode( sha1( $this->client->request->headers["sec-websocket-key"] . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true ) ) ] ) ); $this->client->request->handshake = true; $this->log($this->client, "Handshake done"); // handle incoming messages $buffer = null; $bufferlength = 0; // remove socket timeout socket_set_option( $this->client->socket, SOL_SOCKET, SO_RCVTIMEO, [ "sec" => 0, "usec" => 0 ] ); while(true){ // decode message $this->socket_recv($this->client, $header, 2, MSG_WAITALL); $finalmessage = (ord($header[0]) & 0x80) === 128; $opcode = ord($header[0]) & 0x0F; $masked = (ord($header[1]) & 0x80) === 128; $messagelength = ord($header[1]) & 0x7F; switch($messagelength){ case 126: $this->socket_recv($this->client, $messagelength, 2, MSG_WAITALL); $messagelength = unpack("n", $messagelength)[1]; break; case 127: $this->socket_recv($this->client, $messagelength, 8, MSG_WAITALL); $messagelength = unpack("J", $messagelength)[1]; break; } if($messagelength > $this->options["max_ws_rcv_size"]){ $this->ws_disconnect($this->client, self::close_too_big, "Frame exceeds " . $this->options["max_ws_rcv_size"] . "b limit"); } if($masked){ // get mask and message payload $messagelength += 4; $this->socket_recv($this->client, $frame_data, $messagelength, MSG_WAITALL); for($i=4; $i<$messagelength; $i++){ $frame_data[$i] = $frame_data[$i] ^ $frame_data[$i % 4]; } $frame_data = substr($frame_data, 4); }else{ // get message payload $this->socket_recv($this->client, $frame_data, $messagelength, MSG_WAITALL); } // call user functions switch($opcode){ case self::op_text: case self::op_binary: if($finalmessage === false){ // partial message, come back later $buffer = $frame_data; $bufferlength = $messagelength; break; } // final message $this->log($this->client, ($opcode === self::op_text ? "Text" : "Binary") . " frame: {$frame_data}"); break; case self::op_continue: $buffer .= $frame_data; $bufferlength += $messagelength; if($bufferlength > $this->options["max_ws_rcv_size"]){ $this->ws_disconnect($this->client, self::close_too_big, "Message buffer exceeds " . $this->options["max_ws_rcv_size"] . "b limit"); } if($finalmessage){ // we received the entire thing, trigger event $this->log($this->client, "Text frame (buffer): {$buffer}"); $buffer = null; $bufferlength = 0; } break; case self::op_ping: if($messagelength > 126){ $this->ws_disconnect($this->client, self::close_too_big, "Ping frame exceeded 128 bytes"); } $this->log($this->client, "Received ping: " . $frame_data); $this->ws_send($this->client, self::op_pong, $frame_data); break; case self::op_pong: $this->log($this->client, "Received pong: " . $frame_data); break; case self::op_disconnect: $code = unpack("n", $frame_data)[1]; $frame_data = substr($frame_data, 2); $this->log($this->client, $code . ": Client disconnected. " . $frame_data); die(); break; default: // unknown frame $this->ws_disconnect($this->client, self::close_bad_data, "Unsupported opcode"); } } } } } } private function socket_recv(object $client, &$bytes, int $len, int $const){ $state = socket_recv($client->socket, $bytes, $len, $const); if($state === 0){ $this->log($client, "Remote TCP close"); exit(0); }elseif($state === false){ $error = socket_last_error(); if($error === 0){ return; } if($client->request->handshake === false){ $this->http_disconnect($client, 408, socket_strerror($error)); }else{ $this->ws_disconnect($client, self::close_no_status, socket_strerror($error)); } } } private function socket_write(object $client, string $bytes){ $read_ptr = 0; $len = strlen($bytes); while(true){ $sent = socket_write( $client->socket, substr( $bytes, $read_ptr, $this->options["tcp_write_size"] ) ); if($sent === false){ $this->log("Message send failure at byte " . $read_ptr . " (closed)"); exit(0); break; } $read_ptr += $sent; if($read_ptr === $len){ break; } } } public function ws_send(object $client, int $opcode, string $message){ $this->socket_write( $client, $this->ws_encode( $opcode, $message ) ); } public function ws_disconnect(object $client, int $disconnect_opcode, string $message){ $this->ws_send( $client, self::op_disconnect, pack("n", $disconnect_opcode) . $message ); socket_close($client->socket); $this->log($client, $disconnect_opcode . ": " . $message . " (closed)"); exit(0); } public function ws_encode(int $opcode, string $message){ // 0x80 = hardcoded end msg bit=true & mask=false (8bit) // $opcode = 1 to 4 bits $header = 0x80 | $opcode; $len = strlen($message); if($len < 126){ $header = pack("CC", $header, $len); }elseif($len < 65536){ $header = pack("CCn", $header, 126, $len); // 16bit len }else{ $header = pack("CCJ", $header, 127, $len); // 64bit len } return $header . $message; } public function log(object $client, string $message){ if($this->options["debug_log"]){ echo $client->ip . ":" . $client->port . "] " . $message . "\n"; } } public function http_disconnect(object $client, int $errcode, string $message){ $this->http_respond( $client, $errcode, array_merge( $this->options["http_headers"], [ "Content-Type" => "text/plain", "Sec-WebSocket-Version" => "13", "Content-Length" => strlen($message), "Connection" => "close" ] ), $message ); socket_close($client->socket); $this->log($client, $errcode . ": " . $message . " (closed)"); exit(0); } public function http_respond(object $client, int $errcode, array $headers, string $message = null){ switch($errcode){ case 100: $errtext = "Continue"; break; case 101: $errtext = "Switching Protocols"; break; case 200: $errtext = "OK"; break; case 201: $errtext = "Created"; break; case 202: $errtext = "Accepted"; break; case 203: $errtext = "Non-Authoritative Information"; break; case 204: $errtext = "No Content"; break; case 205: $errtext = "Reset Content"; break; case 206: $errtext = "Partial Content"; break; case 300: $errtext = "Multiple Choices"; break; case 301: $errtext = "Moved Permanently"; break; case 302: $errtext = "Moved Temporarily"; break; case 303: $errtext = "See Other"; break; case 304: $errtext = "Not Modified"; break; case 305: $errtext = "Use Proxy"; break; case 400: $errtext = "Bad Request"; break; case 401: $errtext = "Unauthorized"; break; case 402: $errtext = "Payment Required"; break; case 403: $errtext = "Forbidden"; break; case 404: $errtext = "Not Found"; break; case 405: $errtext = "Method Not Allowed"; break; case 406: $errtext = "Not Acceptable"; break; case 407: $errtext = "Proxy Authentication Required"; break; case 408: $errtext = "Request Time-out"; break; case 409: $errtext = "Conflict"; break; case 410: $errtext = "Gone"; break; case 411: $errtext = "Length Required"; break; case 412: $errtext = "Precondition Failed"; break; case 413: $errtext = "Request Entity Too Large"; break; case 414: $errtext = "Request-URI Too Large"; break; case 415: $errtext = "Unsupported Media Type"; break; case 500: $errtext = "Internal Server Error"; break; case 501: $errtext = "Not Implemented"; break; case 502: $errtext = "Bad Gateway"; break; case 503: $errtext = "Service Unavailable"; break; case 504: $errtext = "Gateway Time-out"; break; case 505: $errtext = "HTTP Version not supported"; break; } $headers_write = null; foreach($headers as $name => $value){ $headers_write .= $name . ": " . $value . "\r\n"; } $this->socket_write( $client, "HTTP/1.1 " . $errcode . " " . $errtext . "\r\n" . $headers_write . "\r\n" . $message ); } private function check_header_value(string $value, string $header){ $headers = explode(",", $header); foreach($headers as $part){ if(trim(strtolower($part)) == $value){ return true; } } return false; } }