source: main/trunk/greenstone2/perllib/cpan/Mojo/IOLoop.pm

Last change on this file was 32205, checked in by ak19, 6 years ago

First set of commits to do with implementing the new 'paged_html' output option of PDFPlugin that uses using xpdftools' new pdftohtml. So far tested only on Linux (64 bit), but things work there so I'm optimistically committing the changes since they work. 2. Committing the pre-built Linux binaries of XPDFtools for both 32 and 64 bit built by the XPDF group. 2. To use the correct bitness variant of xpdftools, setup.bash now exports the BITNESS env var, consulted by gsConvert.pl. 3. All the perl code changes to do with using xpdf tools' pdftohtml to generate paged_html and feed it in the desired form into GS(3): gsConvert.pl, PDFPlugin.pm and its parent ConvertBinaryPFile.pm have been modified to make it all work. xpdftools' pdftohtml generates a folder containing an html file and a screenshot for each page in a PDF (as well as an index.html linking to each page's html). However, we want a single html file that contains each individual 'page' html's content in a div, and need to do some further HTML style, attribute and structure modifications to massage the xpdftool output to what we want for GS. In order to parse and manipulate the HTML 'DOM' to do this, we're using the Mojo::DOM package that Dr Bainbridge found and which he's compiled up. Mojo::DOM is therefore also committed in this revision. Some further changes and some display fixes are required, but need to check with the others about that.

File size: 18.5 KB
Line 
1package Mojo::IOLoop;
2use Mojo::Base 'Mojo::EventEmitter';
3
4# "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
5# used. Like the death ray."
6use Carp 'croak';
7use Mojo::IOLoop::Client;
8use Mojo::IOLoop::Delay;
9use Mojo::IOLoop::Server;
10use Mojo::IOLoop::Stream;
11use Mojo::IOLoop::Subprocess;
12use Mojo::Reactor::Poll;
13use Mojo::Util qw(md5_sum steady_time);
14use Scalar::Util qw(blessed weaken);
15
16use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
17
18has max_accepts => 0;
19has max_connections => 1000;
20has reactor => sub {
21 my $class = Mojo::Reactor::Poll->detect;
22 warn "-- Reactor initialized ($class)\n" if DEBUG;
23 return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
24};
25
26# Ignore PIPE signal
27$SIG{PIPE} = 'IGNORE';
28
29# Initialize singleton reactor early
30__PACKAGE__->singleton->reactor;
31
32sub acceptor {
33 my ($self, $acceptor) = (_instance(shift), @_);
34
35 # Find acceptor for id
36 return $self->{acceptors}{$acceptor} unless ref $acceptor;
37
38 # Connect acceptor with reactor
39 $self->{acceptors}{my $id = $self->_id} = $acceptor;
40 weaken $acceptor->reactor($self->reactor)->{reactor};
41
42 # Allow new acceptor to get picked up
43 $self->_not_accepting->_maybe_accepting;
44
45 return $id;
46}
47
48sub client {
49 my ($self, $cb) = (_instance(shift), pop);
50 my $args = ref $_[0] ? $_[0] : {@_};
51
52 my $id = $self->_id;
53 my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new;
54 weaken $client->reactor($self->reactor)->{reactor};
55 my $class = delete $args->{stream_class} || 'Mojo::IOLoop::Stream';
56
57 weaken $self;
58 $client->on(
59 connect => sub {
60 delete $self->{out}{$id}{client};
61 my $stream = $class->new(pop);
62 $self->_stream($stream => $id);
63 $self->$cb(undef, $stream);
64 }
65 );
66 $client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) });
67 $client->connect($args);
68
69 return $id;
70}
71
72sub delay {
73 my $delay = Mojo::IOLoop::Delay->new;
74 weaken $delay->ioloop(_instance(shift))->{ioloop};
75 return @_ ? $delay->steps(@_) : $delay;
76}
77
78sub is_running { _instance(shift)->reactor->is_running }
79
80sub next_tick {
81 my ($self, $cb) = (_instance(shift), @_);
82 weaken $self;
83 return $self->reactor->next_tick(sub { $self->$cb });
84}
85
86sub one_tick {
87 my $self = _instance(shift);
88 croak 'Mojo::IOLoop already running' if $self->is_running;
89 $self->reactor->one_tick;
90}
91
92sub recurring { shift->_timer(recurring => @_) }
93
94sub remove {
95 my ($self, $id) = (_instance(shift), @_);
96 my $c = $self->{in}{$id} || $self->{out}{$id};
97 if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
98 $self->_remove($id);
99}
100
101sub reset {
102 my $self = _instance(shift);
103 delete @$self{qw(accepting acceptors events in out stop)};
104 $self->reactor->reset;
105 $self->stop;
106}
107
108sub server {
109 my ($self, $cb) = (_instance(shift), pop);
110 my $args = ref $_[0] ? $_[0] : {@_};
111
112 my $server = Mojo::IOLoop::Server->new;
113 my $class = delete $args->{stream_class} || 'Mojo::IOLoop::Stream';
114 weaken $self;
115 $server->on(
116 accept => sub {
117 my $stream = $class->new(pop);
118 $self->$cb($stream, $self->_stream($stream, $self->_id, 1));
119
120 # Enforce connection limit (randomize to improve load balancing)
121 if (my $max = $self->max_accepts) {
122 $self->{accepts} //= $max - int rand $max / 2;
123 $self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
124 }
125
126 # Stop accepting if connection limit has been reached
127 $self->_not_accepting if $self->_limit;
128 }
129 );
130 $server->listen($args);
131
132 return $self->acceptor($server);
133}
134
135sub singleton { state $loop = shift->SUPER::new }
136
137sub start {
138 my $self = _instance(shift);
139 croak 'Mojo::IOLoop already running' if $self->is_running;
140 $self->reactor->start;
141}
142
143sub stop { _instance(shift)->reactor->stop }
144
145sub stop_gracefully {
146 my $self = _instance(shift)->_not_accepting;
147 ++$self->{stop} and !$self->emit('finish')->_in and $self->stop;
148}
149
150sub stream {
151 my ($self, $stream) = (_instance(shift), @_);
152 return $self->_stream($stream => $self->_id) if ref $stream;
153 my $c = $self->{in}{$stream} || $self->{out}{$stream} || {};
154 return $c->{stream};
155}
156
157sub subprocess {
158 my $subprocess = Mojo::IOLoop::Subprocess->new;
159 weaken $subprocess->ioloop(_instance(shift))->{ioloop};
160 return @_ ? $subprocess->run(@_) : $subprocess;
161}
162
163sub timer { shift->_timer(timer => @_) }
164
165sub transition {
166 my ($self, $id, $class) = (_instance(shift), @_);
167 my $new = $class->new($self->stream($id)->steal_handle);
168 $self->_stream($new, $id, !!$self->{in}{$id});
169 return $new;
170}
171
172sub _id {
173 my $self = shift;
174 my $id;
175 do { $id = md5_sum 'c' . steady_time . rand }
176 while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id};
177 return $id;
178}
179
180sub _in { scalar keys %{shift->{in} || {}} }
181
182sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
183
184sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections }
185
186sub _maybe_accepting {
187 my $self = shift;
188 return if $self->{accepting} || $self->_limit;
189 $_->start for values %{$self->{acceptors} || {}};
190 $self->{accepting} = 1;
191}
192
193sub _not_accepting {
194 my $self = shift;
195 return $self unless delete $self->{accepting};
196 $_->stop for values %{$self->{acceptors} || {}};
197 return $self;
198}
199
200sub _out { scalar keys %{shift->{out} || {}} }
201
202sub _remove {
203 my ($self, $id) = @_;
204
205 # Timer
206 return unless my $reactor = $self->reactor;
207 return if $reactor->remove($id);
208
209 # Acceptor
210 return $self->_not_accepting->_maybe_accepting
211 if delete $self->{acceptors}{$id};
212
213 # Connection
214 return unless delete $self->{in}{$id} || delete $self->{out}{$id};
215 return $self->stop if $self->{stop} && !$self->_in;
216 $self->_maybe_accepting;
217 warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
218}
219
220sub _stream {
221 my ($self, $stream, $id, $server) = @_;
222
223 # Connect stream with reactor
224 $self->{$server ? 'in' : 'out'}{$id}{stream} = $stream;
225 warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
226 weaken $stream->reactor($self->reactor)->{reactor};
227 weaken $self;
228 $stream->on(close => sub { $self && $self->_remove($id) });
229 $stream->start;
230
231 return $id;
232}
233
234sub _timer {
235 my ($self, $method, $after, $cb) = (_instance(shift), @_);
236 weaken $self;
237 return $self->reactor->$method($after => sub { $self->$cb });
238}
239
2401;
241
242=encoding utf8
243
244=head1 NAME
245
246Mojo::IOLoop - Minimalistic event loop
247
248=head1 SYNOPSIS
249
250 use Mojo::IOLoop;
251
252 # Listen on port 3000
253 Mojo::IOLoop->server({port => 3000} => sub {
254 my ($loop, $stream) = @_;
255
256 $stream->on(read => sub {
257 my ($stream, $bytes) = @_;
258
259 # Process input chunk
260 say $bytes;
261
262 # Write response
263 $stream->write('HTTP/1.1 200 OK');
264 });
265 });
266
267 # Connect to port 3000
268 my $id = Mojo::IOLoop->client({port => 3000} => sub {
269 my ($loop, $err, $stream) = @_;
270
271 $stream->on(read => sub {
272 my ($stream, $bytes) = @_;
273
274 # Process input
275 say "Input: $bytes";
276 });
277
278 # Write request
279 $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
280 });
281
282 # Add a timer
283 Mojo::IOLoop->timer(5 => sub {
284 my $loop = shift;
285 $loop->remove($id);
286 });
287
288 # Start event loop if necessary
289 Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
290
291=head1 DESCRIPTION
292
293L<Mojo::IOLoop> is a very minimalistic event loop based on L<Mojo::Reactor>, it
294has been reduced to the absolute minimal feature set required to build solid
295and scalable non-blocking clients and servers.
296
297Depending on operating system, the default per-process and system-wide file
298descriptor limits are often very low and need to be tuned for better
299scalability. The C<LIBEV_FLAGS> environment variable should also be used to
300select the best possible L<EV> backend, which usually defaults to the not very
301scalable C<select>.
302
303 LIBEV_FLAGS=1 # select
304 LIBEV_FLAGS=2 # poll
305 LIBEV_FLAGS=4 # epoll (Linux)
306 LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
307
308The event loop will be resilient to time jumps if a monotonic clock is
309available through L<Time::HiRes>. A TLS certificate and key are also built
310right in, to make writing test servers as easy as possible. Also note that for
311convenience the C<PIPE> signal will be set to C<IGNORE> when L<Mojo::IOLoop> is
312loaded.
313
314For better scalability (epoll, kqueue) and to provide non-blocking name
315resolution, SOCKS5 as well as TLS support, the optional modules L<EV> (4.0+),
316L<Net::DNS::Native> (0.15+), L<IO::Socket::Socks> (0.64+) and
317L<IO::Socket::SSL> (2.009+) will be used automatically if possible. Individual
318features can also be disabled with the C<MOJO_NO_NNR>, C<MOJO_NO_SOCKS> and
319C<MOJO_NO_TLS> environment variables.
320
321See L<Mojolicious::Guides::Cookbook/"REAL-TIME WEB"> for more.
322
323=head1 EVENTS
324
325L<Mojo::IOLoop> inherits all events from L<Mojo::EventEmitter> and can emit the
326following new ones.
327
328=head2 finish
329
330 $loop->on(finish => sub {
331 my $loop = shift;
332 ...
333 });
334
335Emitted when the event loop wants to shut down gracefully and is just waiting
336for all existing connections to be closed.
337
338=head1 ATTRIBUTES
339
340L<Mojo::IOLoop> implements the following attributes.
341
342=head2 max_accepts
343
344 my $max = $loop->max_accepts;
345 $loop = $loop->max_accepts(1000);
346
347The maximum number of connections this event loop is allowed to accept, before
348shutting down gracefully without interrupting existing connections, defaults to
349C<0>. Setting the value to C<0> will allow this event loop to accept new
350connections indefinitely. Note that up to half of this value can be subtracted
351randomly to improve load balancing between multiple server processes, and to
352make sure that not all of them restart at the same time.
353
354=head2 max_connections
355
356 my $max = $loop->max_connections;
357 $loop = $loop->max_connections(100);
358
359The maximum number of accepted connections this event loop is allowed to handle
360concurrently, before stopping to accept new incoming connections, defaults to
361C<1000>.
362
363=head2 reactor
364
365 my $reactor = $loop->reactor;
366 $loop = $loop->reactor(Mojo::Reactor->new);
367
368Low-level event reactor, usually a L<Mojo::Reactor::Poll> or
369L<Mojo::Reactor::EV> object with a default subscriber to the event
370L<Mojo::Reactor/"error">.
371
372 # Watch if handle becomes readable or writable
373 Mojo::IOLoop->singleton->reactor->io($handle => sub {
374 my ($reactor, $writable) = @_;
375 say $writable ? 'Handle is writable' : 'Handle is readable';
376 });
377
378 # Change to watching only if handle becomes writable
379 Mojo::IOLoop->singleton->reactor->watch($handle, 0, 1);
380
381 # Remove handle again
382 Mojo::IOLoop->singleton->reactor->remove($handle);
383
384=head1 METHODS
385
386L<Mojo::IOLoop> inherits all methods from L<Mojo::EventEmitter> and implements
387the following new ones.
388
389=head2 acceptor
390
391 my $server = Mojo::IOLoop->acceptor($id);
392 my $server = $loop->acceptor($id);
393 my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
394
395Get L<Mojo::IOLoop::Server> object for id or turn object into an acceptor.
396
397=head2 client
398
399 my $id
400 = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
401 my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
402 my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
403
404Open a TCP/IP or UNIX domain socket connection with L<Mojo::IOLoop::Client> and
405create a stream object (usually L<Mojo::IOLoop::Stream>), takes the same
406arguments as L<Mojo::IOLoop::Client/"connect"> in addition to C<stream_class>.
407
408 # Connect to 127.0.0.1 on port 3000 with a custom stream class
409 my $class = 'Mojo::IOLoop::Stream::HTTPClient';
410 Mojo::IOLoop->client({port => 3000, stream_class => $class} => sub {
411 my ($loop, $err, $stream) = @_;
412 ...
413 });
414
415=head2 delay
416
417 my $delay = Mojo::IOLoop->delay;
418 my $delay = $loop->delay;
419 my $delay = $loop->delay(sub {...});
420 my $delay = $loop->delay(sub {...}, sub {...});
421
422Build L<Mojo::IOLoop::Delay> object to use as a promise and/or for flow-control.
423Callbacks will be passed along to L<Mojo::IOLoop::Delay/"steps">.
424
425 # Wrap continuation-passing style APIs with promises
426 my $ua = Mojo::UserAgent->new;
427 sub get {
428 my $promise = Mojo::IOLoop->delay;
429 $ua->get(@_ => sub {
430 my ($ua, $tx) = @_;
431 my $err = $tx->error;
432 $promise->resolve($tx) if !$err || $err->{code};
433 $promise->reject($err->{message});
434 });
435 return $promise;
436 }
437 my $mojo = get('https://mojolicious.org');
438 my $cpan = get('https://metacpan.org');
439 Mojo::Promise->race($mojo, $cpan)->then(sub { say shift->req->url })->wait;
440
441 # Synchronize multiple non-blocking operations
442 my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' });
443 for my $i (1 .. 10) {
444 my $end = $delay->begin;
445 Mojo::IOLoop->timer($i => sub {
446 say 10 - $i;
447 $end->();
448 });
449 }
450 $delay->wait;
451
452 # Sequentialize multiple non-blocking operations
453 Mojo::IOLoop->delay(
454
455 # First step (simple timer)
456 sub {
457 my $delay = shift;
458 Mojo::IOLoop->timer(2 => $delay->begin);
459 say 'Second step in 2 seconds.';
460 },
461
462 # Second step (concurrent timers)
463 sub {
464 my $delay = shift;
465 Mojo::IOLoop->timer(1 => $delay->begin);
466 Mojo::IOLoop->timer(3 => $delay->begin);
467 say 'Third step in 3 seconds.';
468 },
469
470 # Third step (the end)
471 sub { say 'And done after 5 seconds total.' }
472 )->wait;
473
474=head2 is_running
475
476 my $bool = Mojo::IOLoop->is_running;
477 my $bool = $loop->is_running;
478
479Check if event loop is running.
480
481 exit unless Mojo::IOLoop->is_running;
482
483=head2 next_tick
484
485 my $undef = Mojo::IOLoop->next_tick(sub {...});
486 my $undef = $loop->next_tick(sub {...});
487
488Execute callback as soon as possible, but not before returning or other
489callbacks that have been registered with this method, always returns C<undef>.
490
491 # Perform operation on next reactor tick
492 Mojo::IOLoop->next_tick(sub {
493 my $loop = shift;
494 ...
495 });
496
497=head2 one_tick
498
499 Mojo::IOLoop->one_tick;
500 $loop->one_tick;
501
502Run event loop until an event occurs.
503
504 # Don't block longer than 0.5 seconds
505 my $id = Mojo::IOLoop->timer(0.5 => sub {});
506 Mojo::IOLoop->one_tick;
507 Mojo::IOLoop->remove($id);
508
509=head2 recurring
510
511 my $id = Mojo::IOLoop->recurring(3 => sub {...});
512 my $id = $loop->recurring(0 => sub {...});
513 my $id = $loop->recurring(0.25 => sub {...});
514
515Create a new recurring timer, invoking the callback repeatedly after a given
516amount of time in seconds.
517
518 # Perform operation every 5 seconds
519 Mojo::IOLoop->recurring(5 => sub {
520 my $loop = shift;
521 ...
522 });
523
524=head2 remove
525
526 Mojo::IOLoop->remove($id);
527 $loop->remove($id);
528
529Remove anything with an id, connections will be dropped gracefully by allowing
530them to finish writing all data in their write buffers.
531
532=head2 reset
533
534 Mojo::IOLoop->reset;
535 $loop->reset;
536
537Remove everything and stop the event loop.
538
539=head2 server
540
541 my $id = Mojo::IOLoop->server(port => 3000, sub {...});
542 my $id = $loop->server(port => 3000, sub {...});
543 my $id = $loop->server({port => 3000} => sub {...});
544
545Accept TCP/IP and UNIX domain socket connections with L<Mojo::IOLoop::Server>
546and create stream objects (usually L<Mojo::IOLoop::Stream>, takes the same
547arguments as L<Mojo::IOLoop::Server/"listen"> in addition to C<stream_class>.
548
549 # Listen on port 3000 with a custom stream class
550 my $class = 'Mojo::IOLoop::Stream::HTTPServer';
551 Mojo::IOLoop->server({port => 3000, stream_class => $class} => sub {
552 my ($loop, $stream, $id) = @_;
553 ...
554 });
555
556 # Listen on random port
557 my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub {
558 my ($loop, $stream, $id) = @_;
559 ...
560 });
561 my $port = Mojo::IOLoop->acceptor($id)->port;
562
563=head2 singleton
564
565 my $loop = Mojo::IOLoop->singleton;
566
567The global L<Mojo::IOLoop> singleton, used to access a single shared event loop
568object from everywhere inside the process.
569
570 # Many methods also allow you to take shortcuts
571 Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
572 Mojo::IOLoop->start;
573
574 # Restart active timer
575 my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
576 Mojo::IOLoop->singleton->reactor->again($id);
577
578 # Turn file descriptor into handle and watch if it becomes readable
579 my $handle = IO::Handle->new_from_fd($fd, 'r');
580 Mojo::IOLoop->singleton->reactor->io($handle => sub {
581 my ($reactor, $writable) = @_;
582 say $writable ? 'Handle is writable' : 'Handle is readable';
583 })->watch($handle, 1, 0);
584
585=head2 start
586
587 Mojo::IOLoop->start;
588 $loop->start;
589
590Start the event loop, this will block until L</"stop"> is called. Note that
591some reactors stop automatically if there are no events being watched anymore.
592
593 # Start event loop only if it is not running already
594 Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
595
596=head2 stop
597
598 Mojo::IOLoop->stop;
599 $loop->stop;
600
601Stop the event loop, this will not interrupt any existing connections and the
602event loop can be restarted by running L</"start"> again.
603
604=head2 stop_gracefully
605
606 Mojo::IOLoop->stop_gracefully;
607 $loop->stop_gracefully;
608
609Stop accepting new connections and wait for already accepted connections to be
610closed, before stopping the event loop.
611
612=head2 stream
613
614 my $stream = Mojo::IOLoop->stream($id);
615 my $stream = $loop->stream($id);
616 my $id = $loop->stream(Mojo::IOLoop::Stream->new);
617
618Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.
619
620 # Increase inactivity timeout for connection to 300 seconds
621 Mojo::IOLoop->stream($id)->timeout(300);
622
623=head2 subprocess
624
625 my $subprocess = Mojo::IOLoop->subprocess(sub {...}, sub {...});
626 my $subprocess = $loop->subprocess;
627 my $subprocess = $loop->subprocess(sub {...}, sub {...});
628
629Build L<Mojo::IOLoop::Subprocess> object to perform computationally expensive
630operations in subprocesses, without blocking the event loop. Callbacks will be
631passed along to L<Mojo::IOLoop::Subprocess/"run">.
632
633 # Operation that would block the event loop for 5 seconds
634 Mojo::IOLoop->subprocess(
635 sub {
636 my $subprocess = shift;
637 sleep 5;
638 return '♥', 'Mojolicious';
639 },
640 sub {
641 my ($subprocess, $err, @results) = @_;
642 say "Subprocess error: $err" and return if $err;
643 say "I $results[0] $results[1]!";
644 }
645 );
646
647=head2 timer
648
649 my $id = Mojo::IOLoop->timer(3 => sub {...});
650 my $id = $loop->timer(0 => sub {...});
651 my $id = $loop->timer(0.25 => sub {...});
652
653Create a new timer, invoking the callback after a given amount of time in
654seconds.
655
656 # Perform operation in 5 seconds
657 Mojo::IOLoop->timer(5 => sub {
658 my $loop = shift;
659 ...
660 });
661
662=head2 transition
663
664 my $stream =
665 Mojo::IOLoop->transition($id => 'Mojo::IOLoop::Stream::HTTPClient');
666 my $stream = $loop->transition($id => 'Mojo::IOLoop::Stream::HTTPClient');
667
668Transition stream to a different class.
669
670=head1 DEBUGGING
671
672You can set the C<MOJO_IOLOOP_DEBUG> environment variable to get some advanced
673diagnostics information printed to C<STDERR>.
674
675 MOJO_IOLOOP_DEBUG=1
676
677=head1 SEE ALSO
678
679L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
680
681=cut
Note: See TracBrowser for help on using the repository browser.