source: main/trunk/greenstone2/perllib/cpan/Mojo/IOLoop/Stream.pm@ 32205

Last change on this file since 32205 was 32205, checked in by ak19, 6 years ago

First set of commits to do with implementing the new 'paged_html' output option of PDFPlugin that uses using xpdftools' new pdftohtml. So far tested only on Linux (64 bit), but things work there so I'm optimistically committing the changes since they work. 2. Committing the pre-built Linux binaries of XPDFtools for both 32 and 64 bit built by the XPDF group. 2. To use the correct bitness variant of xpdftools, setup.bash now exports the BITNESS env var, consulted by gsConvert.pl. 3. All the perl code changes to do with using xpdf tools' pdftohtml to generate paged_html and feed it in the desired form into GS(3): gsConvert.pl, PDFPlugin.pm and its parent ConvertBinaryPFile.pm have been modified to make it all work. xpdftools' pdftohtml generates a folder containing an html file and a screenshot for each page in a PDF (as well as an index.html linking to each page's html). However, we want a single html file that contains each individual 'page' html's content in a div, and need to do some further HTML style, attribute and structure modifications to massage the xpdftool output to what we want for GS. In order to parse and manipulate the HTML 'DOM' to do this, we're using the Mojo::DOM package that Dr Bainbridge found and which he's compiled up. Mojo::DOM is therefore also committed in this revision. Some further changes and some display fixes are required, but need to check with the others about that.

File size: 6.6 KB
Line 
1package Mojo::IOLoop::Stream;
2use Mojo::Base 'Mojo::EventEmitter';
3
4use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK);
5use Mojo::IOLoop;
6use Mojo::Util;
7use Scalar::Util 'weaken';
8
9has reactor => sub { Mojo::IOLoop->singleton->reactor };
10
11sub DESTROY { Mojo::Util::_global_destruction() or shift->close }
12
13sub close {
14 my $self = shift;
15 return unless my $reactor = $self->reactor;
16 return unless my $handle = delete $self->timeout(0)->{handle};
17 $reactor->remove($handle);
18 $self->emit('close');
19}
20
21sub close_gracefully { $_[0]->is_writing ? $_[0]{graceful}++ : $_[0]->close }
22
23sub handle { shift->{handle} }
24
25sub is_readable {
26 my $self = shift;
27 $self->_again;
28 return $self->{handle} && Mojo::Util::_readable(0, fileno $self->{handle});
29}
30
31sub is_writing {
32 my $self = shift;
33 return undef unless $self->{handle};
34 return !!length($self->{buffer}) || $self->has_subscribers('drain');
35}
36
37sub new { shift->SUPER::new(handle => shift, buffer => '', timeout => 15) }
38
39sub start {
40 my $self = shift;
41
42 # Resume
43 my $reactor = $self->reactor;
44 return $reactor->watch($self->{handle}, 1, $self->is_writing)
45 if delete $self->{paused};
46
47 weaken $self;
48 my $cb = sub { pop() ? $self->_write : $self->_read };
49 $reactor->io($self->timeout($self->{timeout})->{handle} => $cb);
50}
51
52sub steal_handle {
53 my $self = shift;
54 $self->reactor->remove($self->{handle});
55 return delete $self->{handle};
56}
57
58sub stop {
59 my $self = shift;
60 $self->reactor->watch($self->{handle}, 0, $self->is_writing)
61 unless $self->{paused}++;
62}
63
64sub timeout {
65 my $self = shift;
66
67 return $self->{timeout} unless @_;
68
69 my $reactor = $self->reactor;
70 $reactor->remove(delete $self->{timer}) if $self->{timer};
71 return $self unless my $timeout = $self->{timeout} = shift;
72 weaken $self;
73 $self->{timer}
74 = $reactor->timer($timeout => sub { $self->emit('timeout')->close });
75
76 return $self;
77}
78
79sub write {
80 my ($self, $chunk, $cb) = @_;
81
82 # IO::Socket::SSL will corrupt data with the wrong internal representation
83 utf8::downgrade $chunk;
84 $self->{buffer} .= $chunk;
85 if ($cb) { $self->once(drain => $cb) }
86 elsif (!length $self->{buffer}) { return $self }
87 $self->reactor->watch($self->{handle}, !$self->{paused}, 1)
88 if $self->{handle};
89
90 return $self;
91}
92
93sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} }
94
95sub _read {
96 my $self = shift;
97
98 my $read = $self->{handle}->sysread(my $buffer, 131072, 0);
99 return $read == 0 ? $self->close : $self->emit(read => $buffer)->_again
100 if defined $read;
101
102 # Retry
103 return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
104
105 # Closed (maybe real error)
106 $! == ECONNRESET ? $self->close : $self->emit(error => $!)->close;
107}
108
109sub _write {
110 my $self = shift;
111
112 # Handle errors only when reading (to avoid timing problems)
113 my $handle = $self->{handle};
114 if (length $self->{buffer}) {
115 return unless defined(my $written = $handle->syswrite($self->{buffer}));
116 $self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again;
117 }
118
119 $self->emit('drain') unless length $self->{buffer};
120 return if $self->is_writing;
121 return $self->close if $self->{graceful};
122 $self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle};
123}
124
1251;
126
127=encoding utf8
128
129=head1 NAME
130
131Mojo::IOLoop::Stream - Non-blocking I/O stream
132
133=head1 SYNOPSIS
134
135 use Mojo::IOLoop::Stream;
136
137 # Create stream
138 my $stream = Mojo::IOLoop::Stream->new($handle);
139 $stream->on(read => sub {
140 my ($stream, $bytes) = @_;
141 ...
142 });
143 $stream->on(close => sub {
144 my $stream = shift;
145 ...
146 });
147 $stream->on(error => sub {
148 my ($stream, $err) = @_;
149 ...
150 });
151
152 # Start and stop watching for new data
153 $stream->start;
154 $stream->stop;
155
156 # Start reactor if necessary
157 $stream->reactor->start unless $stream->reactor->is_running;
158
159=head1 DESCRIPTION
160
161L<Mojo::IOLoop::Stream> is a container for I/O streams used by L<Mojo::IOLoop>.
162
163=head1 EVENTS
164
165L<Mojo::IOLoop::Stream> inherits all events from L<Mojo::EventEmitter> and can
166emit the following new ones.
167
168=head2 close
169
170 $stream->on(close => sub {
171 my $stream = shift;
172 ...
173 });
174
175Emitted if the stream gets closed.
176
177=head2 drain
178
179 $stream->on(drain => sub {
180 my $stream = shift;
181 ...
182 });
183
184Emitted once all data has been written.
185
186=head2 error
187
188 $stream->on(error => sub {
189 my ($stream, $err) = @_;
190 ...
191 });
192
193Emitted if an error occurs on the stream, fatal if unhandled.
194
195=head2 read
196
197 $stream->on(read => sub {
198 my ($stream, $bytes) = @_;
199 ...
200 });
201
202Emitted if new data arrives on the stream.
203
204=head2 timeout
205
206 $stream->on(timeout => sub {
207 my $stream = shift;
208 ...
209 });
210
211Emitted if the stream has been inactive for too long and will get closed
212automatically.
213
214=head2 write
215
216 $stream->on(write => sub {
217 my ($stream, $bytes) = @_;
218 ...
219 });
220
221Emitted if new data has been written to the stream.
222
223=head1 ATTRIBUTES
224
225L<Mojo::IOLoop::Stream> implements the following attributes.
226
227=head2 reactor
228
229 my $reactor = $stream->reactor;
230 $stream = $stream->reactor(Mojo::Reactor::Poll->new);
231
232Low-level event reactor, defaults to the C<reactor> attribute value of the
233global L<Mojo::IOLoop> singleton.
234
235=head1 METHODS
236
237L<Mojo::IOLoop::Stream> inherits all methods from L<Mojo::EventEmitter> and
238implements the following new ones.
239
240=head2 close
241
242 $stream->close;
243
244Close stream immediately.
245
246=head2 close_gracefully
247
248 $stream->close_gracefully;
249
250Close stream gracefully.
251
252=head2 handle
253
254 my $handle = $stream->handle;
255
256Get handle for stream, usually an L<IO::Socket::IP> or L<IO::Socket::SSL>
257object.
258
259=head2 is_readable
260
261 my $bool = $stream->is_readable;
262
263Quick non-blocking check if stream is readable, useful for identifying tainted
264sockets.
265
266=head2 is_writing
267
268 my $bool = $stream->is_writing;
269
270Check if stream is writing.
271
272=head2 new
273
274 my $stream = Mojo::IOLoop::Stream->new($handle);
275
276Construct a new L<Mojo::IOLoop::Stream> object.
277
278=head2 start
279
280 $stream->start;
281
282Start or resume watching for new data on the stream.
283
284=head2 steal_handle
285
286 my $handle = $stream->steal_handle;
287
288Steal L</"handle"> and prevent it from getting closed automatically.
289
290=head2 stop
291
292 $stream->stop;
293
294Stop watching for new data on the stream.
295
296=head2 timeout
297
298 my $timeout = $stream->timeout;
299 $stream = $stream->timeout(45);
300
301Maximum amount of time in seconds stream can be inactive before getting closed
302automatically, defaults to C<15>. Setting the value to C<0> will allow this
303stream to be inactive indefinitely.
304
305=head2 write
306
307 $stream = $stream->write($bytes);
308 $stream = $stream->write($bytes => sub {...});
309
310Write data to stream, the optional drain callback will be executed once all data
311has been written.
312
313=head1 SEE ALSO
314
315L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
316
317=cut
Note: See TracBrowser for help on using the repository browser.