1 | package Mojo::IOLoop;
|
---|
2 | use Mojo::Base 'Mojo::EventEmitter';
|
---|
3 |
|
---|
4 | # "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
|
---|
5 | # used. Like the death ray."
|
---|
6 | use Carp 'croak';
|
---|
7 | use Mojo::IOLoop::Client;
|
---|
8 | use Mojo::IOLoop::Delay;
|
---|
9 | use Mojo::IOLoop::Server;
|
---|
10 | use Mojo::IOLoop::Stream;
|
---|
11 | use Mojo::IOLoop::Subprocess;
|
---|
12 | use Mojo::Reactor::Poll;
|
---|
13 | use Mojo::Util qw(md5_sum steady_time);
|
---|
14 | use Scalar::Util qw(blessed weaken);
|
---|
15 |
|
---|
16 | use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
|
---|
17 |
|
---|
18 | has max_accepts => 0;
|
---|
19 | has max_connections => 1000;
|
---|
20 | has reactor => sub {
|
---|
21 | my $class = Mojo::Reactor::Poll->detect;
|
---|
22 | warn "-- Reactor initialized ($class)\n" if DEBUG;
|
---|
23 | return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
|
---|
24 | };
|
---|
25 |
|
---|
26 | # Ignore PIPE signal
|
---|
27 | $SIG{PIPE} = 'IGNORE';
|
---|
28 |
|
---|
29 | # Initialize singleton reactor early
|
---|
30 | __PACKAGE__->singleton->reactor;
|
---|
31 |
|
---|
32 | sub acceptor {
|
---|
33 | my ($self, $acceptor) = (_instance(shift), @_);
|
---|
34 |
|
---|
35 | # Find acceptor for id
|
---|
36 | return $self->{acceptors}{$acceptor} unless ref $acceptor;
|
---|
37 |
|
---|
38 | # Connect acceptor with reactor
|
---|
39 | $self->{acceptors}{my $id = $self->_id} = $acceptor;
|
---|
40 | weaken $acceptor->reactor($self->reactor)->{reactor};
|
---|
41 |
|
---|
42 | # Allow new acceptor to get picked up
|
---|
43 | $self->_not_accepting->_maybe_accepting;
|
---|
44 |
|
---|
45 | return $id;
|
---|
46 | }
|
---|
47 |
|
---|
48 | sub client {
|
---|
49 | my ($self, $cb) = (_instance(shift), pop);
|
---|
50 | my $args = ref $_[0] ? $_[0] : {@_};
|
---|
51 |
|
---|
52 | my $id = $self->_id;
|
---|
53 | my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new;
|
---|
54 | weaken $client->reactor($self->reactor)->{reactor};
|
---|
55 | my $class = delete $args->{stream_class} || 'Mojo::IOLoop::Stream';
|
---|
56 |
|
---|
57 | weaken $self;
|
---|
58 | $client->on(
|
---|
59 | connect => sub {
|
---|
60 | delete $self->{out}{$id}{client};
|
---|
61 | my $stream = $class->new(pop);
|
---|
62 | $self->_stream($stream => $id);
|
---|
63 | $self->$cb(undef, $stream);
|
---|
64 | }
|
---|
65 | );
|
---|
66 | $client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) });
|
---|
67 | $client->connect($args);
|
---|
68 |
|
---|
69 | return $id;
|
---|
70 | }
|
---|
71 |
|
---|
72 | sub delay {
|
---|
73 | my $delay = Mojo::IOLoop::Delay->new;
|
---|
74 | weaken $delay->ioloop(_instance(shift))->{ioloop};
|
---|
75 | return @_ ? $delay->steps(@_) : $delay;
|
---|
76 | }
|
---|
77 |
|
---|
78 | sub is_running { _instance(shift)->reactor->is_running }
|
---|
79 |
|
---|
80 | sub next_tick {
|
---|
81 | my ($self, $cb) = (_instance(shift), @_);
|
---|
82 | weaken $self;
|
---|
83 | return $self->reactor->next_tick(sub { $self->$cb });
|
---|
84 | }
|
---|
85 |
|
---|
86 | sub one_tick {
|
---|
87 | my $self = _instance(shift);
|
---|
88 | croak 'Mojo::IOLoop already running' if $self->is_running;
|
---|
89 | $self->reactor->one_tick;
|
---|
90 | }
|
---|
91 |
|
---|
92 | sub recurring { shift->_timer(recurring => @_) }
|
---|
93 |
|
---|
94 | sub remove {
|
---|
95 | my ($self, $id) = (_instance(shift), @_);
|
---|
96 | my $c = $self->{in}{$id} || $self->{out}{$id};
|
---|
97 | if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
|
---|
98 | $self->_remove($id);
|
---|
99 | }
|
---|
100 |
|
---|
101 | sub reset {
|
---|
102 | my $self = _instance(shift);
|
---|
103 | delete @$self{qw(accepting acceptors events in out stop)};
|
---|
104 | $self->reactor->reset;
|
---|
105 | $self->stop;
|
---|
106 | }
|
---|
107 |
|
---|
108 | sub server {
|
---|
109 | my ($self, $cb) = (_instance(shift), pop);
|
---|
110 | my $args = ref $_[0] ? $_[0] : {@_};
|
---|
111 |
|
---|
112 | my $server = Mojo::IOLoop::Server->new;
|
---|
113 | my $class = delete $args->{stream_class} || 'Mojo::IOLoop::Stream';
|
---|
114 | weaken $self;
|
---|
115 | $server->on(
|
---|
116 | accept => sub {
|
---|
117 | my $stream = $class->new(pop);
|
---|
118 | $self->$cb($stream, $self->_stream($stream, $self->_id, 1));
|
---|
119 |
|
---|
120 | # Enforce connection limit (randomize to improve load balancing)
|
---|
121 | if (my $max = $self->max_accepts) {
|
---|
122 | $self->{accepts} //= $max - int rand $max / 2;
|
---|
123 | $self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
|
---|
124 | }
|
---|
125 |
|
---|
126 | # Stop accepting if connection limit has been reached
|
---|
127 | $self->_not_accepting if $self->_limit;
|
---|
128 | }
|
---|
129 | );
|
---|
130 | $server->listen($args);
|
---|
131 |
|
---|
132 | return $self->acceptor($server);
|
---|
133 | }
|
---|
134 |
|
---|
135 | sub singleton { state $loop = shift->SUPER::new }
|
---|
136 |
|
---|
137 | sub start {
|
---|
138 | my $self = _instance(shift);
|
---|
139 | croak 'Mojo::IOLoop already running' if $self->is_running;
|
---|
140 | $self->reactor->start;
|
---|
141 | }
|
---|
142 |
|
---|
143 | sub stop { _instance(shift)->reactor->stop }
|
---|
144 |
|
---|
145 | sub stop_gracefully {
|
---|
146 | my $self = _instance(shift)->_not_accepting;
|
---|
147 | ++$self->{stop} and !$self->emit('finish')->_in and $self->stop;
|
---|
148 | }
|
---|
149 |
|
---|
150 | sub stream {
|
---|
151 | my ($self, $stream) = (_instance(shift), @_);
|
---|
152 | return $self->_stream($stream => $self->_id) if ref $stream;
|
---|
153 | my $c = $self->{in}{$stream} || $self->{out}{$stream} || {};
|
---|
154 | return $c->{stream};
|
---|
155 | }
|
---|
156 |
|
---|
157 | sub subprocess {
|
---|
158 | my $subprocess = Mojo::IOLoop::Subprocess->new;
|
---|
159 | weaken $subprocess->ioloop(_instance(shift))->{ioloop};
|
---|
160 | return @_ ? $subprocess->run(@_) : $subprocess;
|
---|
161 | }
|
---|
162 |
|
---|
163 | sub timer { shift->_timer(timer => @_) }
|
---|
164 |
|
---|
165 | sub transition {
|
---|
166 | my ($self, $id, $class) = (_instance(shift), @_);
|
---|
167 | my $new = $class->new($self->stream($id)->steal_handle);
|
---|
168 | $self->_stream($new, $id, !!$self->{in}{$id});
|
---|
169 | return $new;
|
---|
170 | }
|
---|
171 |
|
---|
172 | sub _id {
|
---|
173 | my $self = shift;
|
---|
174 | my $id;
|
---|
175 | do { $id = md5_sum 'c' . steady_time . rand }
|
---|
176 | while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id};
|
---|
177 | return $id;
|
---|
178 | }
|
---|
179 |
|
---|
180 | sub _in { scalar keys %{shift->{in} || {}} }
|
---|
181 |
|
---|
182 | sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
|
---|
183 |
|
---|
184 | sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections }
|
---|
185 |
|
---|
186 | sub _maybe_accepting {
|
---|
187 | my $self = shift;
|
---|
188 | return if $self->{accepting} || $self->_limit;
|
---|
189 | $_->start for values %{$self->{acceptors} || {}};
|
---|
190 | $self->{accepting} = 1;
|
---|
191 | }
|
---|
192 |
|
---|
193 | sub _not_accepting {
|
---|
194 | my $self = shift;
|
---|
195 | return $self unless delete $self->{accepting};
|
---|
196 | $_->stop for values %{$self->{acceptors} || {}};
|
---|
197 | return $self;
|
---|
198 | }
|
---|
199 |
|
---|
200 | sub _out { scalar keys %{shift->{out} || {}} }
|
---|
201 |
|
---|
202 | sub _remove {
|
---|
203 | my ($self, $id) = @_;
|
---|
204 |
|
---|
205 | # Timer
|
---|
206 | return unless my $reactor = $self->reactor;
|
---|
207 | return if $reactor->remove($id);
|
---|
208 |
|
---|
209 | # Acceptor
|
---|
210 | return $self->_not_accepting->_maybe_accepting
|
---|
211 | if delete $self->{acceptors}{$id};
|
---|
212 |
|
---|
213 | # Connection
|
---|
214 | return unless delete $self->{in}{$id} || delete $self->{out}{$id};
|
---|
215 | return $self->stop if $self->{stop} && !$self->_in;
|
---|
216 | $self->_maybe_accepting;
|
---|
217 | warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
|
---|
218 | }
|
---|
219 |
|
---|
220 | sub _stream {
|
---|
221 | my ($self, $stream, $id, $server) = @_;
|
---|
222 |
|
---|
223 | # Connect stream with reactor
|
---|
224 | $self->{$server ? 'in' : 'out'}{$id}{stream} = $stream;
|
---|
225 | warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
|
---|
226 | weaken $stream->reactor($self->reactor)->{reactor};
|
---|
227 | weaken $self;
|
---|
228 | $stream->on(close => sub { $self && $self->_remove($id) });
|
---|
229 | $stream->start;
|
---|
230 |
|
---|
231 | return $id;
|
---|
232 | }
|
---|
233 |
|
---|
234 | sub _timer {
|
---|
235 | my ($self, $method, $after, $cb) = (_instance(shift), @_);
|
---|
236 | weaken $self;
|
---|
237 | return $self->reactor->$method($after => sub { $self->$cb });
|
---|
238 | }
|
---|
239 |
|
---|
240 | 1;
|
---|
241 |
|
---|
242 | =encoding utf8
|
---|
243 |
|
---|
244 | =head1 NAME
|
---|
245 |
|
---|
246 | Mojo::IOLoop - Minimalistic event loop
|
---|
247 |
|
---|
248 | =head1 SYNOPSIS
|
---|
249 |
|
---|
250 | use Mojo::IOLoop;
|
---|
251 |
|
---|
252 | # Listen on port 3000
|
---|
253 | Mojo::IOLoop->server({port => 3000} => sub {
|
---|
254 | my ($loop, $stream) = @_;
|
---|
255 |
|
---|
256 | $stream->on(read => sub {
|
---|
257 | my ($stream, $bytes) = @_;
|
---|
258 |
|
---|
259 | # Process input chunk
|
---|
260 | say $bytes;
|
---|
261 |
|
---|
262 | # Write response
|
---|
263 | $stream->write('HTTP/1.1 200 OK');
|
---|
264 | });
|
---|
265 | });
|
---|
266 |
|
---|
267 | # Connect to port 3000
|
---|
268 | my $id = Mojo::IOLoop->client({port => 3000} => sub {
|
---|
269 | my ($loop, $err, $stream) = @_;
|
---|
270 |
|
---|
271 | $stream->on(read => sub {
|
---|
272 | my ($stream, $bytes) = @_;
|
---|
273 |
|
---|
274 | # Process input
|
---|
275 | say "Input: $bytes";
|
---|
276 | });
|
---|
277 |
|
---|
278 | # Write request
|
---|
279 | $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
|
---|
280 | });
|
---|
281 |
|
---|
282 | # Add a timer
|
---|
283 | Mojo::IOLoop->timer(5 => sub {
|
---|
284 | my $loop = shift;
|
---|
285 | $loop->remove($id);
|
---|
286 | });
|
---|
287 |
|
---|
288 | # Start event loop if necessary
|
---|
289 | Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
|
---|
290 |
|
---|
291 | =head1 DESCRIPTION
|
---|
292 |
|
---|
293 | L<Mojo::IOLoop> is a very minimalistic event loop based on L<Mojo::Reactor>, it
|
---|
294 | has been reduced to the absolute minimal feature set required to build solid
|
---|
295 | and scalable non-blocking clients and servers.
|
---|
296 |
|
---|
297 | Depending on operating system, the default per-process and system-wide file
|
---|
298 | descriptor limits are often very low and need to be tuned for better
|
---|
299 | scalability. The C<LIBEV_FLAGS> environment variable should also be used to
|
---|
300 | select the best possible L<EV> backend, which usually defaults to the not very
|
---|
301 | scalable C<select>.
|
---|
302 |
|
---|
303 | LIBEV_FLAGS=1 # select
|
---|
304 | LIBEV_FLAGS=2 # poll
|
---|
305 | LIBEV_FLAGS=4 # epoll (Linux)
|
---|
306 | LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
|
---|
307 |
|
---|
308 | The event loop will be resilient to time jumps if a monotonic clock is
|
---|
309 | available through L<Time::HiRes>. A TLS certificate and key are also built
|
---|
310 | right in, to make writing test servers as easy as possible. Also note that for
|
---|
311 | convenience the C<PIPE> signal will be set to C<IGNORE> when L<Mojo::IOLoop> is
|
---|
312 | loaded.
|
---|
313 |
|
---|
314 | For better scalability (epoll, kqueue) and to provide non-blocking name
|
---|
315 | resolution, SOCKS5 as well as TLS support, the optional modules L<EV> (4.0+),
|
---|
316 | L<Net::DNS::Native> (0.15+), L<IO::Socket::Socks> (0.64+) and
|
---|
317 | L<IO::Socket::SSL> (2.009+) will be used automatically if possible. Individual
|
---|
318 | features can also be disabled with the C<MOJO_NO_NNR>, C<MOJO_NO_SOCKS> and
|
---|
319 | C<MOJO_NO_TLS> environment variables.
|
---|
320 |
|
---|
321 | See L<Mojolicious::Guides::Cookbook/"REAL-TIME WEB"> for more.
|
---|
322 |
|
---|
323 | =head1 EVENTS
|
---|
324 |
|
---|
325 | L<Mojo::IOLoop> inherits all events from L<Mojo::EventEmitter> and can emit the
|
---|
326 | following new ones.
|
---|
327 |
|
---|
328 | =head2 finish
|
---|
329 |
|
---|
330 | $loop->on(finish => sub {
|
---|
331 | my $loop = shift;
|
---|
332 | ...
|
---|
333 | });
|
---|
334 |
|
---|
335 | Emitted when the event loop wants to shut down gracefully and is just waiting
|
---|
336 | for all existing connections to be closed.
|
---|
337 |
|
---|
338 | =head1 ATTRIBUTES
|
---|
339 |
|
---|
340 | L<Mojo::IOLoop> implements the following attributes.
|
---|
341 |
|
---|
342 | =head2 max_accepts
|
---|
343 |
|
---|
344 | my $max = $loop->max_accepts;
|
---|
345 | $loop = $loop->max_accepts(1000);
|
---|
346 |
|
---|
347 | The maximum number of connections this event loop is allowed to accept, before
|
---|
348 | shutting down gracefully without interrupting existing connections, defaults to
|
---|
349 | C<0>. Setting the value to C<0> will allow this event loop to accept new
|
---|
350 | connections indefinitely. Note that up to half of this value can be subtracted
|
---|
351 | randomly to improve load balancing between multiple server processes, and to
|
---|
352 | make sure that not all of them restart at the same time.
|
---|
353 |
|
---|
354 | =head2 max_connections
|
---|
355 |
|
---|
356 | my $max = $loop->max_connections;
|
---|
357 | $loop = $loop->max_connections(100);
|
---|
358 |
|
---|
359 | The maximum number of accepted connections this event loop is allowed to handle
|
---|
360 | concurrently, before stopping to accept new incoming connections, defaults to
|
---|
361 | C<1000>.
|
---|
362 |
|
---|
363 | =head2 reactor
|
---|
364 |
|
---|
365 | my $reactor = $loop->reactor;
|
---|
366 | $loop = $loop->reactor(Mojo::Reactor->new);
|
---|
367 |
|
---|
368 | Low-level event reactor, usually a L<Mojo::Reactor::Poll> or
|
---|
369 | L<Mojo::Reactor::EV> object with a default subscriber to the event
|
---|
370 | L<Mojo::Reactor/"error">.
|
---|
371 |
|
---|
372 | # Watch if handle becomes readable or writable
|
---|
373 | Mojo::IOLoop->singleton->reactor->io($handle => sub {
|
---|
374 | my ($reactor, $writable) = @_;
|
---|
375 | say $writable ? 'Handle is writable' : 'Handle is readable';
|
---|
376 | });
|
---|
377 |
|
---|
378 | # Change to watching only if handle becomes writable
|
---|
379 | Mojo::IOLoop->singleton->reactor->watch($handle, 0, 1);
|
---|
380 |
|
---|
381 | # Remove handle again
|
---|
382 | Mojo::IOLoop->singleton->reactor->remove($handle);
|
---|
383 |
|
---|
384 | =head1 METHODS
|
---|
385 |
|
---|
386 | L<Mojo::IOLoop> inherits all methods from L<Mojo::EventEmitter> and implements
|
---|
387 | the following new ones.
|
---|
388 |
|
---|
389 | =head2 acceptor
|
---|
390 |
|
---|
391 | my $server = Mojo::IOLoop->acceptor($id);
|
---|
392 | my $server = $loop->acceptor($id);
|
---|
393 | my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
|
---|
394 |
|
---|
395 | Get L<Mojo::IOLoop::Server> object for id or turn object into an acceptor.
|
---|
396 |
|
---|
397 | =head2 client
|
---|
398 |
|
---|
399 | my $id
|
---|
400 | = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
|
---|
401 | my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
|
---|
402 | my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
|
---|
403 |
|
---|
404 | Open a TCP/IP or UNIX domain socket connection with L<Mojo::IOLoop::Client> and
|
---|
405 | create a stream object (usually L<Mojo::IOLoop::Stream>), takes the same
|
---|
406 | arguments as L<Mojo::IOLoop::Client/"connect"> in addition to C<stream_class>.
|
---|
407 |
|
---|
408 | # Connect to 127.0.0.1 on port 3000 with a custom stream class
|
---|
409 | my $class = 'Mojo::IOLoop::Stream::HTTPClient';
|
---|
410 | Mojo::IOLoop->client({port => 3000, stream_class => $class} => sub {
|
---|
411 | my ($loop, $err, $stream) = @_;
|
---|
412 | ...
|
---|
413 | });
|
---|
414 |
|
---|
415 | =head2 delay
|
---|
416 |
|
---|
417 | my $delay = Mojo::IOLoop->delay;
|
---|
418 | my $delay = $loop->delay;
|
---|
419 | my $delay = $loop->delay(sub {...});
|
---|
420 | my $delay = $loop->delay(sub {...}, sub {...});
|
---|
421 |
|
---|
422 | Build L<Mojo::IOLoop::Delay> object to use as a promise and/or for flow-control.
|
---|
423 | Callbacks will be passed along to L<Mojo::IOLoop::Delay/"steps">.
|
---|
424 |
|
---|
425 | # Wrap continuation-passing style APIs with promises
|
---|
426 | my $ua = Mojo::UserAgent->new;
|
---|
427 | sub get {
|
---|
428 | my $promise = Mojo::IOLoop->delay;
|
---|
429 | $ua->get(@_ => sub {
|
---|
430 | my ($ua, $tx) = @_;
|
---|
431 | my $err = $tx->error;
|
---|
432 | $promise->resolve($tx) if !$err || $err->{code};
|
---|
433 | $promise->reject($err->{message});
|
---|
434 | });
|
---|
435 | return $promise;
|
---|
436 | }
|
---|
437 | my $mojo = get('https://mojolicious.org');
|
---|
438 | my $cpan = get('https://metacpan.org');
|
---|
439 | Mojo::Promise->race($mojo, $cpan)->then(sub { say shift->req->url })->wait;
|
---|
440 |
|
---|
441 | # Synchronize multiple non-blocking operations
|
---|
442 | my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' });
|
---|
443 | for my $i (1 .. 10) {
|
---|
444 | my $end = $delay->begin;
|
---|
445 | Mojo::IOLoop->timer($i => sub {
|
---|
446 | say 10 - $i;
|
---|
447 | $end->();
|
---|
448 | });
|
---|
449 | }
|
---|
450 | $delay->wait;
|
---|
451 |
|
---|
452 | # Sequentialize multiple non-blocking operations
|
---|
453 | Mojo::IOLoop->delay(
|
---|
454 |
|
---|
455 | # First step (simple timer)
|
---|
456 | sub {
|
---|
457 | my $delay = shift;
|
---|
458 | Mojo::IOLoop->timer(2 => $delay->begin);
|
---|
459 | say 'Second step in 2 seconds.';
|
---|
460 | },
|
---|
461 |
|
---|
462 | # Second step (concurrent timers)
|
---|
463 | sub {
|
---|
464 | my $delay = shift;
|
---|
465 | Mojo::IOLoop->timer(1 => $delay->begin);
|
---|
466 | Mojo::IOLoop->timer(3 => $delay->begin);
|
---|
467 | say 'Third step in 3 seconds.';
|
---|
468 | },
|
---|
469 |
|
---|
470 | # Third step (the end)
|
---|
471 | sub { say 'And done after 5 seconds total.' }
|
---|
472 | )->wait;
|
---|
473 |
|
---|
474 | =head2 is_running
|
---|
475 |
|
---|
476 | my $bool = Mojo::IOLoop->is_running;
|
---|
477 | my $bool = $loop->is_running;
|
---|
478 |
|
---|
479 | Check if event loop is running.
|
---|
480 |
|
---|
481 | exit unless Mojo::IOLoop->is_running;
|
---|
482 |
|
---|
483 | =head2 next_tick
|
---|
484 |
|
---|
485 | my $undef = Mojo::IOLoop->next_tick(sub {...});
|
---|
486 | my $undef = $loop->next_tick(sub {...});
|
---|
487 |
|
---|
488 | Execute callback as soon as possible, but not before returning or other
|
---|
489 | callbacks that have been registered with this method, always returns C<undef>.
|
---|
490 |
|
---|
491 | # Perform operation on next reactor tick
|
---|
492 | Mojo::IOLoop->next_tick(sub {
|
---|
493 | my $loop = shift;
|
---|
494 | ...
|
---|
495 | });
|
---|
496 |
|
---|
497 | =head2 one_tick
|
---|
498 |
|
---|
499 | Mojo::IOLoop->one_tick;
|
---|
500 | $loop->one_tick;
|
---|
501 |
|
---|
502 | Run event loop until an event occurs.
|
---|
503 |
|
---|
504 | # Don't block longer than 0.5 seconds
|
---|
505 | my $id = Mojo::IOLoop->timer(0.5 => sub {});
|
---|
506 | Mojo::IOLoop->one_tick;
|
---|
507 | Mojo::IOLoop->remove($id);
|
---|
508 |
|
---|
509 | =head2 recurring
|
---|
510 |
|
---|
511 | my $id = Mojo::IOLoop->recurring(3 => sub {...});
|
---|
512 | my $id = $loop->recurring(0 => sub {...});
|
---|
513 | my $id = $loop->recurring(0.25 => sub {...});
|
---|
514 |
|
---|
515 | Create a new recurring timer, invoking the callback repeatedly after a given
|
---|
516 | amount of time in seconds.
|
---|
517 |
|
---|
518 | # Perform operation every 5 seconds
|
---|
519 | Mojo::IOLoop->recurring(5 => sub {
|
---|
520 | my $loop = shift;
|
---|
521 | ...
|
---|
522 | });
|
---|
523 |
|
---|
524 | =head2 remove
|
---|
525 |
|
---|
526 | Mojo::IOLoop->remove($id);
|
---|
527 | $loop->remove($id);
|
---|
528 |
|
---|
529 | Remove anything with an id, connections will be dropped gracefully by allowing
|
---|
530 | them to finish writing all data in their write buffers.
|
---|
531 |
|
---|
532 | =head2 reset
|
---|
533 |
|
---|
534 | Mojo::IOLoop->reset;
|
---|
535 | $loop->reset;
|
---|
536 |
|
---|
537 | Remove everything and stop the event loop.
|
---|
538 |
|
---|
539 | =head2 server
|
---|
540 |
|
---|
541 | my $id = Mojo::IOLoop->server(port => 3000, sub {...});
|
---|
542 | my $id = $loop->server(port => 3000, sub {...});
|
---|
543 | my $id = $loop->server({port => 3000} => sub {...});
|
---|
544 |
|
---|
545 | Accept TCP/IP and UNIX domain socket connections with L<Mojo::IOLoop::Server>
|
---|
546 | and create stream objects (usually L<Mojo::IOLoop::Stream>, takes the same
|
---|
547 | arguments as L<Mojo::IOLoop::Server/"listen"> in addition to C<stream_class>.
|
---|
548 |
|
---|
549 | # Listen on port 3000 with a custom stream class
|
---|
550 | my $class = 'Mojo::IOLoop::Stream::HTTPServer';
|
---|
551 | Mojo::IOLoop->server({port => 3000, stream_class => $class} => sub {
|
---|
552 | my ($loop, $stream, $id) = @_;
|
---|
553 | ...
|
---|
554 | });
|
---|
555 |
|
---|
556 | # Listen on random port
|
---|
557 | my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub {
|
---|
558 | my ($loop, $stream, $id) = @_;
|
---|
559 | ...
|
---|
560 | });
|
---|
561 | my $port = Mojo::IOLoop->acceptor($id)->port;
|
---|
562 |
|
---|
563 | =head2 singleton
|
---|
564 |
|
---|
565 | my $loop = Mojo::IOLoop->singleton;
|
---|
566 |
|
---|
567 | The global L<Mojo::IOLoop> singleton, used to access a single shared event loop
|
---|
568 | object from everywhere inside the process.
|
---|
569 |
|
---|
570 | # Many methods also allow you to take shortcuts
|
---|
571 | Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
|
---|
572 | Mojo::IOLoop->start;
|
---|
573 |
|
---|
574 | # Restart active timer
|
---|
575 | my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
|
---|
576 | Mojo::IOLoop->singleton->reactor->again($id);
|
---|
577 |
|
---|
578 | # Turn file descriptor into handle and watch if it becomes readable
|
---|
579 | my $handle = IO::Handle->new_from_fd($fd, 'r');
|
---|
580 | Mojo::IOLoop->singleton->reactor->io($handle => sub {
|
---|
581 | my ($reactor, $writable) = @_;
|
---|
582 | say $writable ? 'Handle is writable' : 'Handle is readable';
|
---|
583 | })->watch($handle, 1, 0);
|
---|
584 |
|
---|
585 | =head2 start
|
---|
586 |
|
---|
587 | Mojo::IOLoop->start;
|
---|
588 | $loop->start;
|
---|
589 |
|
---|
590 | Start the event loop, this will block until L</"stop"> is called. Note that
|
---|
591 | some reactors stop automatically if there are no events being watched anymore.
|
---|
592 |
|
---|
593 | # Start event loop only if it is not running already
|
---|
594 | Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
|
---|
595 |
|
---|
596 | =head2 stop
|
---|
597 |
|
---|
598 | Mojo::IOLoop->stop;
|
---|
599 | $loop->stop;
|
---|
600 |
|
---|
601 | Stop the event loop, this will not interrupt any existing connections and the
|
---|
602 | event loop can be restarted by running L</"start"> again.
|
---|
603 |
|
---|
604 | =head2 stop_gracefully
|
---|
605 |
|
---|
606 | Mojo::IOLoop->stop_gracefully;
|
---|
607 | $loop->stop_gracefully;
|
---|
608 |
|
---|
609 | Stop accepting new connections and wait for already accepted connections to be
|
---|
610 | closed, before stopping the event loop.
|
---|
611 |
|
---|
612 | =head2 stream
|
---|
613 |
|
---|
614 | my $stream = Mojo::IOLoop->stream($id);
|
---|
615 | my $stream = $loop->stream($id);
|
---|
616 | my $id = $loop->stream(Mojo::IOLoop::Stream->new);
|
---|
617 |
|
---|
618 | Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.
|
---|
619 |
|
---|
620 | # Increase inactivity timeout for connection to 300 seconds
|
---|
621 | Mojo::IOLoop->stream($id)->timeout(300);
|
---|
622 |
|
---|
623 | =head2 subprocess
|
---|
624 |
|
---|
625 | my $subprocess = Mojo::IOLoop->subprocess(sub {...}, sub {...});
|
---|
626 | my $subprocess = $loop->subprocess;
|
---|
627 | my $subprocess = $loop->subprocess(sub {...}, sub {...});
|
---|
628 |
|
---|
629 | Build L<Mojo::IOLoop::Subprocess> object to perform computationally expensive
|
---|
630 | operations in subprocesses, without blocking the event loop. Callbacks will be
|
---|
631 | passed along to L<Mojo::IOLoop::Subprocess/"run">.
|
---|
632 |
|
---|
633 | # Operation that would block the event loop for 5 seconds
|
---|
634 | Mojo::IOLoop->subprocess(
|
---|
635 | sub {
|
---|
636 | my $subprocess = shift;
|
---|
637 | sleep 5;
|
---|
638 | return 'â¥', 'Mojolicious';
|
---|
639 | },
|
---|
640 | sub {
|
---|
641 | my ($subprocess, $err, @results) = @_;
|
---|
642 | say "Subprocess error: $err" and return if $err;
|
---|
643 | say "I $results[0] $results[1]!";
|
---|
644 | }
|
---|
645 | );
|
---|
646 |
|
---|
647 | =head2 timer
|
---|
648 |
|
---|
649 | my $id = Mojo::IOLoop->timer(3 => sub {...});
|
---|
650 | my $id = $loop->timer(0 => sub {...});
|
---|
651 | my $id = $loop->timer(0.25 => sub {...});
|
---|
652 |
|
---|
653 | Create a new timer, invoking the callback after a given amount of time in
|
---|
654 | seconds.
|
---|
655 |
|
---|
656 | # Perform operation in 5 seconds
|
---|
657 | Mojo::IOLoop->timer(5 => sub {
|
---|
658 | my $loop = shift;
|
---|
659 | ...
|
---|
660 | });
|
---|
661 |
|
---|
662 | =head2 transition
|
---|
663 |
|
---|
664 | my $stream =
|
---|
665 | Mojo::IOLoop->transition($id => 'Mojo::IOLoop::Stream::HTTPClient');
|
---|
666 | my $stream = $loop->transition($id => 'Mojo::IOLoop::Stream::HTTPClient');
|
---|
667 |
|
---|
668 | Transition stream to a different class.
|
---|
669 |
|
---|
670 | =head1 DEBUGGING
|
---|
671 |
|
---|
672 | You can set the C<MOJO_IOLOOP_DEBUG> environment variable to get some advanced
|
---|
673 | diagnostics information printed to C<STDERR>.
|
---|
674 |
|
---|
675 | MOJO_IOLOOP_DEBUG=1
|
---|
676 |
|
---|
677 | =head1 SEE ALSO
|
---|
678 |
|
---|
679 | L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
|
---|
680 |
|
---|
681 | =cut
|
---|