ExtEvLoop.php 5.9 KB

  1. <?php
  2. namespace React\EventLoop;
  3. use Ev;
  4. use EvIo;
  5. use EvLoop;
  6. use React\EventLoop\Tick\FutureTickQueue;
  7. use React\EventLoop\Timer\Timer;
  8. use SplObjectStorage;
  9. /**
  10. * An `ext-ev` based event loop.
  11. *
  12. * This loop uses the [`ev` PECL extension](https://pecl.php.net/package/ev),
  13. * that provides an interface to `libev` library.
  14. * `libev` itself supports a number of system-specific backends (epoll, kqueue).
  15. *
  16. * This loop is known to work with PHP 5.4 through PHP 8+.
  17. *
  18. * @see http://php.net/manual/en/book.ev.php
  19. * @see https://bitbucket.org/osmanov/pecl-ev/overview
  20. */
  21. class ExtEvLoop implements LoopInterface
  22. {
  23. /**
  24. * @var EvLoop
  25. */
  26. private $loop;
  27. /**
  28. * @var FutureTickQueue
  29. */
  30. private $futureTickQueue;
  31. /**
  32. * @var SplObjectStorage
  33. */
  34. private $timers;
  35. /**
  36. * @var EvIo[]
  37. */
  38. private $readStreams = array();
  39. /**
  40. * @var EvIo[]
  41. */
  42. private $writeStreams = array();
  43. /**
  44. * @var bool
  45. */
  46. private $running;
  47. /**
  48. * @var SignalsHandler
  49. */
  50. private $signals;
  51. /**
  52. * @var \EvSignal[]
  53. */
  54. private $signalEvents = array();
  55. public function __construct()
  56. {
  57. $this->loop = new EvLoop();
  58. $this->futureTickQueue = new FutureTickQueue();
  59. $this->timers = new SplObjectStorage();
  60. $this->signals = new SignalsHandler();
  61. }
  62. public function addReadStream($stream, $listener)
  63. {
  64. $key = (int)$stream;
  65. if (isset($this->readStreams[$key])) {
  66. return;
  67. }
  68. $callback = $this->getStreamListenerClosure($stream, $listener);
  69. $event = $this->loop->io($stream, Ev::READ, $callback);
  70. $this->readStreams[$key] = $event;
  71. }
  72. /**
  73. * @param resource $stream
  74. * @param callable $listener
  75. *
  76. * @return \Closure
  77. */
  78. private function getStreamListenerClosure($stream, $listener)
  79. {
  80. return function () use ($stream, $listener) {
  81. \call_user_func($listener, $stream);
  82. };
  83. }
  84. public function addWriteStream($stream, $listener)
  85. {
  86. $key = (int)$stream;
  87. if (isset($this->writeStreams[$key])) {
  88. return;
  89. }
  90. $callback = $this->getStreamListenerClosure($stream, $listener);
  91. $event = $this->loop->io($stream, Ev::WRITE, $callback);
  92. $this->writeStreams[$key] = $event;
  93. }
  94. public function removeReadStream($stream)
  95. {
  96. $key = (int)$stream;
  97. if (!isset($this->readStreams[$key])) {
  98. return;
  99. }
  100. $this->readStreams[$key]->stop();
  101. unset($this->readStreams[$key]);
  102. }
  103. public function removeWriteStream($stream)
  104. {
  105. $key = (int)$stream;
  106. if (!isset($this->writeStreams[$key])) {
  107. return;
  108. }
  109. $this->writeStreams[$key]->stop();
  110. unset($this->writeStreams[$key]);
  111. }
  112. public function addTimer($interval, $callback)
  113. {
  114. $timer = new Timer($interval, $callback, false);
  115. $that = $this;
  116. $timers = $this->timers;
  117. $callback = function () use ($timer, $timers, $that) {
  118. \call_user_func($timer->getCallback(), $timer);
  119. if ($timers->contains($timer)) {
  120. $that->cancelTimer($timer);
  121. }
  122. };
  123. $event = $this->loop->timer($timer->getInterval(), 0.0, $callback);
  124. $this->timers->attach($timer, $event);
  125. return $timer;
  126. }
  127. public function addPeriodicTimer($interval, $callback)
  128. {
  129. $timer = new Timer($interval, $callback, true);
  130. $callback = function () use ($timer) {
  131. \call_user_func($timer->getCallback(), $timer);
  132. };
  133. $event = $this->loop->timer($timer->getInterval(), $timer->getInterval(), $callback);
  134. $this->timers->attach($timer, $event);
  135. return $timer;
  136. }
  137. public function cancelTimer(TimerInterface $timer)
  138. {
  139. if (!isset($this->timers[$timer])) {
  140. return;
  141. }
  142. $event = $this->timers[$timer];
  143. $event->stop();
  144. $this->timers->detach($timer);
  145. }
  146. public function futureTick($listener)
  147. {
  148. $this->futureTickQueue->add($listener);
  149. }
  150. public function run()
  151. {
  152. $this->running = true;
  153. while ($this->running) {
  154. $this->futureTickQueue->tick();
  155. $hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
  156. $wasJustStopped = !$this->running;
  157. $nothingLeftToDo = !$this->readStreams
  158. && !$this->writeStreams
  159. && !$this->timers->count()
  160. && $this->signals->isEmpty();
  161. $flags = Ev::RUN_ONCE;
  162. if ($wasJustStopped || $hasPendingCallbacks) {
  163. $flags |= Ev::RUN_NOWAIT;
  164. } elseif ($nothingLeftToDo) {
  165. break;
  166. }
  167. $this->loop->run($flags);
  168. }
  169. }
  170. public function stop()
  171. {
  172. $this->running = false;
  173. }
  174. public function __destruct()
  175. {
  176. /** @var TimerInterface $timer */
  177. foreach ($this->timers as $timer) {
  178. $this->cancelTimer($timer);
  179. }
  180. foreach ($this->readStreams as $key => $stream) {
  181. $this->removeReadStream($key);
  182. }
  183. foreach ($this->writeStreams as $key => $stream) {
  184. $this->removeWriteStream($key);
  185. }
  186. }
  187. public function addSignal($signal, $listener)
  188. {
  189. $this->signals->add($signal, $listener);
  190. if (!isset($this->signalEvents[$signal])) {
  191. $this->signalEvents[$signal] = $this->loop->signal($signal, function() use ($signal) {
  192. $this->signals->call($signal);
  193. });
  194. }
  195. }
  196. public function removeSignal($signal, $listener)
  197. {
  198. $this->signals->remove($signal, $listener);
  199. if (isset($this->signalEvents[$signal])) {
  200. $this->signalEvents[$signal]->stop();
  201. unset($this->signalEvents[$signal]);
  202. }
  203. }
  204. }