LimitingServer.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. <?php
  2. namespace React\Socket;
  3. use Evenement\EventEmitter;
  4. use Exception;
  5. use OverflowException;
  6. /**
  7. * The `LimitingServer` decorator wraps a given `ServerInterface` and is responsible
  8. * for limiting and keeping track of open connections to this server instance.
  9. *
  10. * Whenever the underlying server emits a `connection` event, it will check its
  11. * limits and then either
  12. * - keep track of this connection by adding it to the list of
  13. * open connections and then forward the `connection` event
  14. * - or reject (close) the connection when its limits are exceeded and will
  15. * forward an `error` event instead.
  16. *
  17. * Whenever a connection closes, it will remove this connection from the list of
  18. * open connections.
  19. *
  20. * ```php
  21. * $server = new React\Socket\LimitingServer($server, 100);
  22. * $server->on('connection', function (React\Socket\ConnectionInterface $connection) {
  23. * $connection->write('hello there!' . PHP_EOL);
  24. * …
  25. * });
  26. * ```
  27. *
  28. * See also the `ServerInterface` for more details.
  29. *
  30. * @see ServerInterface
  31. * @see ConnectionInterface
  32. */
  33. class LimitingServer extends EventEmitter implements ServerInterface
  34. {
  35. private $connections = array();
  36. private $server;
  37. private $limit;
  38. private $pauseOnLimit = false;
  39. private $autoPaused = false;
  40. private $manuPaused = false;
  41. /**
  42. * Instantiates a new LimitingServer.
  43. *
  44. * You have to pass a maximum number of open connections to ensure
  45. * the server will automatically reject (close) connections once this limit
  46. * is exceeded. In this case, it will emit an `error` event to inform about
  47. * this and no `connection` event will be emitted.
  48. *
  49. * ```php
  50. * $server = new React\Socket\LimitingServer($server, 100);
  51. * $server->on('connection', function (React\Socket\ConnectionInterface $connection) {
  52. * $connection->write('hello there!' . PHP_EOL);
  53. * …
  54. * });
  55. * ```
  56. *
  57. * You MAY pass a `null` limit in order to put no limit on the number of
  58. * open connections and keep accepting new connection until you run out of
  59. * operating system resources (such as open file handles). This may be
  60. * useful if you do not want to take care of applying a limit but still want
  61. * to use the `getConnections()` method.
  62. *
  63. * You can optionally configure the server to pause accepting new
  64. * connections once the connection limit is reached. In this case, it will
  65. * pause the underlying server and no longer process any new connections at
  66. * all, thus also no longer closing any excessive connections.
  67. * The underlying operating system is responsible for keeping a backlog of
  68. * pending connections until its limit is reached, at which point it will
  69. * start rejecting further connections.
  70. * Once the server is below the connection limit, it will continue consuming
  71. * connections from the backlog and will process any outstanding data on
  72. * each connection.
  73. * This mode may be useful for some protocols that are designed to wait for
  74. * a response message (such as HTTP), but may be less useful for other
  75. * protocols that demand immediate responses (such as a "welcome" message in
  76. * an interactive chat).
  77. *
  78. * ```php
  79. * $server = new React\Socket\LimitingServer($server, 100, true);
  80. * $server->on('connection', function (React\Socket\ConnectionInterface $connection) {
  81. * $connection->write('hello there!' . PHP_EOL);
  82. * …
  83. * });
  84. * ```
  85. *
  86. * @param ServerInterface $server
  87. * @param int|null $connectionLimit
  88. * @param bool $pauseOnLimit
  89. */
  90. public function __construct(ServerInterface $server, $connectionLimit, $pauseOnLimit = false)
  91. {
  92. $this->server = $server;
  93. $this->limit = $connectionLimit;
  94. if ($connectionLimit !== null) {
  95. $this->pauseOnLimit = $pauseOnLimit;
  96. }
  97. $this->server->on('connection', array($this, 'handleConnection'));
  98. $this->server->on('error', array($this, 'handleError'));
  99. }
  100. /**
  101. * Returns an array with all currently active connections
  102. *
  103. * ```php
  104. * foreach ($server->getConnection() as $connection) {
  105. * $connection->write('Hi!');
  106. * }
  107. * ```
  108. *
  109. * @return ConnectionInterface[]
  110. */
  111. public function getConnections()
  112. {
  113. return $this->connections;
  114. }
  115. public function getAddress()
  116. {
  117. return $this->server->getAddress();
  118. }
  119. public function pause()
  120. {
  121. if (!$this->manuPaused) {
  122. $this->manuPaused = true;
  123. if (!$this->autoPaused) {
  124. $this->server->pause();
  125. }
  126. }
  127. }
  128. public function resume()
  129. {
  130. if ($this->manuPaused) {
  131. $this->manuPaused = false;
  132. if (!$this->autoPaused) {
  133. $this->server->resume();
  134. }
  135. }
  136. }
  137. public function close()
  138. {
  139. $this->server->close();
  140. }
  141. /** @internal */
  142. public function handleConnection(ConnectionInterface $connection)
  143. {
  144. // close connection if limit exceeded
  145. if ($this->limit !== null && \count($this->connections) >= $this->limit) {
  146. $this->handleError(new \OverflowException('Connection closed because server reached connection limit'));
  147. $connection->close();
  148. return;
  149. }
  150. $this->connections[] = $connection;
  151. $that = $this;
  152. $connection->on('close', function () use ($that, $connection) {
  153. $that->handleDisconnection($connection);
  154. });
  155. // pause accepting new connections if limit exceeded
  156. if ($this->pauseOnLimit && !$this->autoPaused && \count($this->connections) >= $this->limit) {
  157. $this->autoPaused = true;
  158. if (!$this->manuPaused) {
  159. $this->server->pause();
  160. }
  161. }
  162. $this->emit('connection', array($connection));
  163. }
  164. /** @internal */
  165. public function handleDisconnection(ConnectionInterface $connection)
  166. {
  167. unset($this->connections[\array_search($connection, $this->connections)]);
  168. // continue accepting new connection if below limit
  169. if ($this->autoPaused && \count($this->connections) < $this->limit) {
  170. $this->autoPaused = false;
  171. if (!$this->manuPaused) {
  172. $this->server->resume();
  173. }
  174. }
  175. }
  176. /** @internal */
  177. public function handleError(\Exception $error)
  178. {
  179. $this->emit('error', array($error));
  180. }
  181. }