1 | package Mojo::Reactor::Poll;
|
---|
2 | use Mojo::Base 'Mojo::Reactor';
|
---|
3 |
|
---|
4 | use Carp 'croak';
|
---|
5 | use IO::Poll qw(POLLERR POLLHUP POLLIN POLLNVAL POLLOUT POLLPRI);
|
---|
6 | use List::Util 'min';
|
---|
7 | use Mojo::Util qw(md5_sum steady_time);
|
---|
8 | use Time::HiRes 'usleep';
|
---|
9 |
|
---|
10 | sub again {
|
---|
11 | croak 'Timer not active' unless my $timer = shift->{timers}{shift()};
|
---|
12 | $timer->{time} = steady_time + $timer->{after};
|
---|
13 | }
|
---|
14 |
|
---|
15 | sub io {
|
---|
16 | my ($self, $handle, $cb) = @_;
|
---|
17 | $self->{io}{fileno($handle) // croak 'Handle is closed'} = {cb => $cb};
|
---|
18 | return $self->watch($handle, 1, 1);
|
---|
19 | }
|
---|
20 |
|
---|
21 | sub is_running { !!shift->{running} }
|
---|
22 |
|
---|
23 | sub next_tick {
|
---|
24 | my ($self, $cb) = @_;
|
---|
25 | push @{$self->{next_tick}}, $cb;
|
---|
26 | $self->{next_timer} //= $self->timer(0 => \&_next);
|
---|
27 | return undef;
|
---|
28 | }
|
---|
29 |
|
---|
30 | sub one_tick {
|
---|
31 | my $self = shift;
|
---|
32 |
|
---|
33 | # Just one tick
|
---|
34 | local $self->{running} = 1 unless $self->{running};
|
---|
35 |
|
---|
36 | # Wait for one event
|
---|
37 | my $i;
|
---|
38 | until ($i || !$self->{running}) {
|
---|
39 |
|
---|
40 | # Stop automatically if there is nothing to watch
|
---|
41 | return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
|
---|
42 |
|
---|
43 | # Calculate ideal timeout based on timers and round up to next millisecond
|
---|
44 | my $min = min map { $_->{time} } values %{$self->{timers}};
|
---|
45 | my $timeout = defined $min ? $min - steady_time : 0.5;
|
---|
46 | $timeout = $timeout <= 0 ? 0 : int($timeout * 1000) + 1;
|
---|
47 |
|
---|
48 | # I/O
|
---|
49 | if (keys %{$self->{io}}) {
|
---|
50 | my @poll = map { $_ => $self->{io}{$_}{mode} } keys %{$self->{io}};
|
---|
51 |
|
---|
52 | # This may break in the future, but is worth it for performance
|
---|
53 | if (IO::Poll::_poll($timeout, @poll) > 0) {
|
---|
54 | while (my ($fd, $mode) = splice @poll, 0, 2) {
|
---|
55 |
|
---|
56 | if ($mode & (POLLIN | POLLPRI | POLLNVAL | POLLHUP | POLLERR)) {
|
---|
57 | next unless my $io = $self->{io}{$fd};
|
---|
58 | ++$i and $self->_try('I/O watcher', $io->{cb}, 0);
|
---|
59 | }
|
---|
60 | next unless $mode & POLLOUT && (my $io = $self->{io}{$fd});
|
---|
61 | ++$i and $self->_try('I/O watcher', $io->{cb}, 1);
|
---|
62 | }
|
---|
63 | }
|
---|
64 | }
|
---|
65 |
|
---|
66 | # Wait for timeout if poll can't be used
|
---|
67 | elsif ($timeout) { usleep($timeout * 1000) }
|
---|
68 |
|
---|
69 | # Timers (time should not change in between timers)
|
---|
70 | my $now = steady_time;
|
---|
71 | for my $id (keys %{$self->{timers}}) {
|
---|
72 | next unless my $t = $self->{timers}{$id};
|
---|
73 | next unless $t->{time} <= $now;
|
---|
74 |
|
---|
75 | # Recurring timer
|
---|
76 | if (exists $t->{recurring}) { $t->{time} = $now + $t->{recurring} }
|
---|
77 |
|
---|
78 | # Normal timer
|
---|
79 | else { $self->remove($id) }
|
---|
80 |
|
---|
81 | ++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
|
---|
82 | }
|
---|
83 | }
|
---|
84 | }
|
---|
85 |
|
---|
86 | sub recurring { shift->_timer(1, @_) }
|
---|
87 |
|
---|
88 | sub remove {
|
---|
89 | my ($self, $remove) = @_;
|
---|
90 | return !!delete $self->{timers}{$remove} unless ref $remove;
|
---|
91 | return !!delete $self->{io}{fileno($remove) // croak 'Handle is closed'};
|
---|
92 | }
|
---|
93 |
|
---|
94 | sub reset { delete @{shift()}{qw(events io next_tick next_timer timers)} }
|
---|
95 |
|
---|
96 | sub start {
|
---|
97 | my $self = shift;
|
---|
98 | local $self->{running} = ($self->{running} || 0) + 1;
|
---|
99 | $self->one_tick while $self->{running};
|
---|
100 | }
|
---|
101 |
|
---|
102 | sub stop { delete shift->{running} }
|
---|
103 |
|
---|
104 | sub timer { shift->_timer(0, @_) }
|
---|
105 |
|
---|
106 | sub watch {
|
---|
107 | my ($self, $handle, $read, $write) = @_;
|
---|
108 |
|
---|
109 | croak 'I/O watcher not active' unless my $io = $self->{io}{fileno $handle};
|
---|
110 | $io->{mode} = 0;
|
---|
111 | $io->{mode} |= POLLIN | POLLPRI if $read;
|
---|
112 | $io->{mode} |= POLLOUT if $write;
|
---|
113 |
|
---|
114 | return $self;
|
---|
115 | }
|
---|
116 |
|
---|
117 | sub _id {
|
---|
118 | my $self = shift;
|
---|
119 | my $id;
|
---|
120 | do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
|
---|
121 | return $id;
|
---|
122 | }
|
---|
123 |
|
---|
124 | sub _next {
|
---|
125 | my $self = shift;
|
---|
126 | delete $self->{next_timer};
|
---|
127 | while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
|
---|
128 | }
|
---|
129 |
|
---|
130 | sub _timer {
|
---|
131 | my ($self, $recurring, $after, $cb) = @_;
|
---|
132 |
|
---|
133 | my $id = $self->_id;
|
---|
134 | my $timer = $self->{timers}{$id}
|
---|
135 | = {cb => $cb, after => $after, time => steady_time + $after};
|
---|
136 | $timer->{recurring} = $after if $recurring;
|
---|
137 |
|
---|
138 | return $id;
|
---|
139 | }
|
---|
140 |
|
---|
141 | sub _try {
|
---|
142 | my ($self, $what, $cb) = (shift, shift, shift);
|
---|
143 | eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
|
---|
144 | }
|
---|
145 |
|
---|
146 | 1;
|
---|
147 |
|
---|
148 | =encoding utf8
|
---|
149 |
|
---|
150 | =head1 NAME
|
---|
151 |
|
---|
152 | Mojo::Reactor::Poll - Low-level event reactor with poll support
|
---|
153 |
|
---|
154 | =head1 SYNOPSIS
|
---|
155 |
|
---|
156 | use Mojo::Reactor::Poll;
|
---|
157 |
|
---|
158 | # Watch if handle becomes readable or writable
|
---|
159 | my $reactor = Mojo::Reactor::Poll->new;
|
---|
160 | $reactor->io($first => sub {
|
---|
161 | my ($reactor, $writable) = @_;
|
---|
162 | say $writable ? 'First handle is writable' : 'First handle is readable';
|
---|
163 | });
|
---|
164 |
|
---|
165 | # Change to watching only if handle becomes writable
|
---|
166 | $reactor->watch($first, 0, 1);
|
---|
167 |
|
---|
168 | # Turn file descriptor into handle and watch if it becomes readable
|
---|
169 | my $second = IO::Handle->new_from_fd($fd, 'r');
|
---|
170 | $reactor->io($second => sub {
|
---|
171 | my ($reactor, $writable) = @_;
|
---|
172 | say $writable ? 'Second handle is writable' : 'Second handle is readable';
|
---|
173 | })->watch($second, 1, 0);
|
---|
174 |
|
---|
175 | # Add a timer
|
---|
176 | $reactor->timer(15 => sub {
|
---|
177 | my $reactor = shift;
|
---|
178 | $reactor->remove($first);
|
---|
179 | $reactor->remove($second);
|
---|
180 | say 'Timeout!';
|
---|
181 | });
|
---|
182 |
|
---|
183 | # Start reactor if necessary
|
---|
184 | $reactor->start unless $reactor->is_running;
|
---|
185 |
|
---|
186 | =head1 DESCRIPTION
|
---|
187 |
|
---|
188 | L<Mojo::Reactor::Poll> is a low-level event reactor based on L<IO::Poll>.
|
---|
189 |
|
---|
190 | =head1 EVENTS
|
---|
191 |
|
---|
192 | L<Mojo::Reactor::Poll> inherits all events from L<Mojo::Reactor>.
|
---|
193 |
|
---|
194 | =head1 METHODS
|
---|
195 |
|
---|
196 | L<Mojo::Reactor::Poll> inherits all methods from L<Mojo::Reactor> and
|
---|
197 | implements the following new ones.
|
---|
198 |
|
---|
199 | =head2 again
|
---|
200 |
|
---|
201 | $reactor->again($id);
|
---|
202 |
|
---|
203 | Restart timer. Note that this method requires an active timer.
|
---|
204 |
|
---|
205 | =head2 io
|
---|
206 |
|
---|
207 | $reactor = $reactor->io($handle => sub {...});
|
---|
208 |
|
---|
209 | Watch handle for I/O events, invoking the callback whenever handle becomes
|
---|
210 | readable or writable.
|
---|
211 |
|
---|
212 | # Callback will be executed twice if handle becomes readable and writable
|
---|
213 | $reactor->io($handle => sub {
|
---|
214 | my ($reactor, $writable) = @_;
|
---|
215 | say $writable ? 'Handle is writable' : 'Handle is readable';
|
---|
216 | });
|
---|
217 |
|
---|
218 | =head2 is_running
|
---|
219 |
|
---|
220 | my $bool = $reactor->is_running;
|
---|
221 |
|
---|
222 | Check if reactor is running.
|
---|
223 |
|
---|
224 | =head2 next_tick
|
---|
225 |
|
---|
226 | my $undef = $reactor->next_tick(sub {...});
|
---|
227 |
|
---|
228 | Execute callback as soon as possible, but not before returning or other
|
---|
229 | callbacks that have been registered with this method, always returns C<undef>.
|
---|
230 |
|
---|
231 | =head2 one_tick
|
---|
232 |
|
---|
233 | $reactor->one_tick;
|
---|
234 |
|
---|
235 | Run reactor until an event occurs or no events are being watched anymore.
|
---|
236 |
|
---|
237 | # Don't block longer than 0.5 seconds
|
---|
238 | my $id = $reactor->timer(0.5 => sub {});
|
---|
239 | $reactor->one_tick;
|
---|
240 | $reactor->remove($id);
|
---|
241 |
|
---|
242 | =head2 recurring
|
---|
243 |
|
---|
244 | my $id = $reactor->recurring(0.25 => sub {...});
|
---|
245 |
|
---|
246 | Create a new recurring timer, invoking the callback repeatedly after a given
|
---|
247 | amount of time in seconds.
|
---|
248 |
|
---|
249 | =head2 remove
|
---|
250 |
|
---|
251 | my $bool = $reactor->remove($handle);
|
---|
252 | my $bool = $reactor->remove($id);
|
---|
253 |
|
---|
254 | Remove handle or timer.
|
---|
255 |
|
---|
256 | =head2 reset
|
---|
257 |
|
---|
258 | $reactor->reset;
|
---|
259 |
|
---|
260 | Remove all handles and timers.
|
---|
261 |
|
---|
262 | =head2 start
|
---|
263 |
|
---|
264 | $reactor->start;
|
---|
265 |
|
---|
266 | Start watching for I/O and timer events, this will block until L</"stop"> is
|
---|
267 | called or no events are being watched anymore.
|
---|
268 |
|
---|
269 | # Start reactor only if it is not running already
|
---|
270 | $reactor->start unless $reactor->is_running;
|
---|
271 |
|
---|
272 | =head2 stop
|
---|
273 |
|
---|
274 | $reactor->stop;
|
---|
275 |
|
---|
276 | Stop watching for I/O and timer events.
|
---|
277 |
|
---|
278 | =head2 timer
|
---|
279 |
|
---|
280 | my $id = $reactor->timer(0.5 => sub {...});
|
---|
281 |
|
---|
282 | Create a new timer, invoking the callback after a given amount of time in
|
---|
283 | seconds.
|
---|
284 |
|
---|
285 | =head2 watch
|
---|
286 |
|
---|
287 | $reactor = $reactor->watch($handle, $readable, $writable);
|
---|
288 |
|
---|
289 | Change I/O events to watch handle for with true and false values. Note that
|
---|
290 | this method requires an active I/O watcher.
|
---|
291 |
|
---|
292 | # Watch only for readable events
|
---|
293 | $reactor->watch($handle, 1, 0);
|
---|
294 |
|
---|
295 | # Watch only for writable events
|
---|
296 | $reactor->watch($handle, 0, 1);
|
---|
297 |
|
---|
298 | # Watch for readable and writable events
|
---|
299 | $reactor->watch($handle, 1, 1);
|
---|
300 |
|
---|
301 | # Pause watching for events
|
---|
302 | $reactor->watch($handle, 0, 0);
|
---|
303 |
|
---|
304 | =head1 SEE ALSO
|
---|
305 |
|
---|
306 | L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
|
---|
307 |
|
---|
308 | =cut
|
---|