startServer.php 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. <?php
  2. use GuzzleHttp\Psr7\Message;
  3. use GuzzleHttp\Psr7\Response;
  4. use Ratchet\RFC6455\Handshake\PermessageDeflateOptions;
  5. use Ratchet\RFC6455\Messaging\MessageBuffer;
  6. use Ratchet\RFC6455\Messaging\MessageInterface;
  7. use Ratchet\RFC6455\Messaging\FrameInterface;
  8. use Ratchet\RFC6455\Messaging\Frame;
  9. require_once __DIR__ . "/../bootstrap.php";
  10. $loop = \React\EventLoop\Factory::create();
  11. $socket = new \React\Socket\Server('0.0.0.0:9001', $loop);
  12. $closeFrameChecker = new \Ratchet\RFC6455\Messaging\CloseFrameChecker;
  13. $negotiator = new \Ratchet\RFC6455\Handshake\ServerNegotiator(new \Ratchet\RFC6455\Handshake\RequestVerifier, PermessageDeflateOptions::permessageDeflateSupported());
  14. $uException = new \UnderflowException;
  15. $socket->on('connection', function (React\Socket\ConnectionInterface $connection) use ($negotiator, $closeFrameChecker, $uException, $socket) {
  16. $headerComplete = false;
  17. $buffer = '';
  18. $parser = null;
  19. $connection->on('data', function ($data) use ($connection, &$parser, &$headerComplete, &$buffer, $negotiator, $closeFrameChecker, $uException, $socket) {
  20. if ($headerComplete) {
  21. $parser->onData($data);
  22. return;
  23. }
  24. $buffer .= $data;
  25. $parts = explode("\r\n\r\n", $buffer);
  26. if (count($parts) < 2) {
  27. return;
  28. }
  29. $headerComplete = true;
  30. $psrRequest = Message::parseRequest($parts[0] . "\r\n\r\n");
  31. $negotiatorResponse = $negotiator->handshake($psrRequest);
  32. $negotiatorResponse = $negotiatorResponse->withAddedHeader("Content-Length", "0");
  33. if ($negotiatorResponse->getStatusCode() !== 101 && $psrRequest->getUri()->getPath() === '/shutdown') {
  34. $connection->end(Message::toString(new Response(200, [], 'Shutting down echo server.' . PHP_EOL)));
  35. $socket->close();
  36. return;
  37. };
  38. $connection->write(Message::toString($negotiatorResponse));
  39. if ($negotiatorResponse->getStatusCode() !== 101) {
  40. $connection->end();
  41. return;
  42. }
  43. // there is no need to look through the client requests
  44. // we support any valid permessage deflate
  45. $deflateOptions = PermessageDeflateOptions::fromRequestOrResponse($psrRequest)[0];
  46. $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker,
  47. function (MessageInterface $message, MessageBuffer $messageBuffer) {
  48. $messageBuffer->sendMessage($message->getPayload(), true, $message->isBinary());
  49. }, function (FrameInterface $frame) use ($connection, &$parser) {
  50. switch ($frame->getOpCode()) {
  51. case Frame::OP_CLOSE:
  52. $connection->end($frame->getContents());
  53. break;
  54. case Frame::OP_PING:
  55. $connection->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents());
  56. break;
  57. }
  58. }, true, function () use ($uException) {
  59. return $uException;
  60. },
  61. null,
  62. null,
  63. [$connection, 'write'],
  64. $deflateOptions);
  65. array_shift($parts);
  66. $parser->onData(implode("\r\n\r\n", $parts));
  67. });
  68. });
  69. $loop->run();