source: main/trunk/greenstone2/perllib/cpan/Mojo/Transaction/WebSocket.pm@ 32205

Last change on this file since 32205 was 32205, checked in by ak19, 6 years ago

First set of commits to do with implementing the new 'paged_html' output option of PDFPlugin that uses using xpdftools' new pdftohtml. So far tested only on Linux (64 bit), but things work there so I'm optimistically committing the changes since they work. 2. Committing the pre-built Linux binaries of XPDFtools for both 32 and 64 bit built by the XPDF group. 2. To use the correct bitness variant of xpdftools, setup.bash now exports the BITNESS env var, consulted by gsConvert.pl. 3. All the perl code changes to do with using xpdf tools' pdftohtml to generate paged_html and feed it in the desired form into GS(3): gsConvert.pl, PDFPlugin.pm and its parent ConvertBinaryPFile.pm have been modified to make it all work. xpdftools' pdftohtml generates a folder containing an html file and a screenshot for each page in a PDF (as well as an index.html linking to each page's html). However, we want a single html file that contains each individual 'page' html's content in a div, and need to do some further HTML style, attribute and structure modifications to massage the xpdftool output to what we want for GS. In order to parse and manipulate the HTML 'DOM' to do this, we're using the Mojo::DOM package that Dr Bainbridge found and which he's compiled up. Mojo::DOM is therefore also committed in this revision. Some further changes and some display fixes are required, but need to check with the others about that.

File size: 11.7 KB
Line 
1package Mojo::Transaction::WebSocket;
2use Mojo::Base 'Mojo::Transaction';
3
4use Compress::Raw::Zlib 'Z_SYNC_FLUSH';
5use List::Util 'first';
6use Mojo::JSON qw(encode_json j);
7use Mojo::Util qw(decode encode trim);
8use Mojo::WebSocket
9 qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
10
11has [qw(compressed established handshake masked)];
12has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
13
14sub build_message {
15 my ($self, $frame) = @_;
16
17 # Text
18 $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
19
20 # JSON
21 $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
22
23 # Raw text or binary
24 if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] }
25 else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
26
27 # "permessage-deflate" extension
28 return $frame unless $self->compressed;
29 my $deflate = $self->{deflate} ||= Compress::Raw::Zlib::Deflate->new(
30 AppendOutput => 1,
31 MemLevel => 8,
32 WindowBits => -15
33 );
34 $deflate->deflate($frame->[5], my $out);
35 $deflate->flush($out, Z_SYNC_FLUSH);
36 @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
37
38 return $frame;
39}
40
41sub client_read { shift->server_read(@_) }
42sub client_write { shift->server_write(@_) }
43
44sub closed {
45 my $self = shift->completed;
46 return $self->emit(finish => $self->{close} ? (@{$self->{close}}) : 1006);
47}
48
49sub connection { shift->handshake->connection }
50
51sub finish {
52 my $self = shift;
53
54 my $close = $self->{close} = [@_];
55 my $payload = $close->[0] ? pack('n', $close->[0]) : '';
56 $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
57 $close->[0] //= 1005;
58 $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
59
60 return $self;
61}
62
63sub is_websocket {1}
64
65sub kept_alive { shift->handshake->kept_alive }
66sub local_address { shift->handshake->local_address }
67sub local_port { shift->handshake->local_port }
68
69sub parse_message {
70 my ($self, $frame) = @_;
71
72 $self->emit(frame => $frame);
73
74 # Ping/Pong
75 my $op = $frame->[4];
76 return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
77 return if $op == WS_PONG;
78
79 # Close
80 if ($op == WS_CLOSE) {
81 return $self->finish unless length $frame->[5] >= 2;
82 return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')),
83 decode('UTF-8', $frame->[5]));
84 }
85
86 # Append chunk and check message size
87 $self->{op} = $op unless exists $self->{op};
88 $self->{message} .= $frame->[5];
89 my $max = $self->max_websocket_size;
90 return $self->finish(1009) if length $self->{message} > $max;
91
92 # No FIN bit (Continuation)
93 return unless $frame->[0];
94
95 # "permessage-deflate" extension (handshake and RSV1)
96 my $msg = delete $self->{message};
97 if ($self->compressed && $frame->[1]) {
98 my $inflate = $self->{inflate} ||= Compress::Raw::Zlib::Inflate->new(
99 Bufsize => $max,
100 LimitOutput => 1,
101 WindowBits => -15
102 );
103 $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
104 return $self->finish(1009) if length $msg;
105 $msg = $out;
106 }
107
108 $self->emit(json => j($msg)) if $self->has_subscribers('json');
109 $op = delete $self->{op};
110 $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
111 $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg)
112 if $self->has_subscribers('message');
113}
114
115sub protocol { shift->res->headers->sec_websocket_protocol }
116
117sub remote_address { shift->handshake->remote_address }
118sub remote_port { shift->handshake->remote_port }
119sub req { shift->handshake->req }
120sub res { shift->handshake->res }
121
122sub resume { $_[0]->handshake->resume and return $_[0] }
123
124sub send {
125 my ($self, $msg, $cb) = @_;
126 $self->once(drain => $cb) if $cb;
127 $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
128 $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
129 return $self->emit('resume');
130}
131
132sub server_read {
133 my ($self, $chunk) = @_;
134
135 $self->{read} .= $chunk;
136 my $max = $self->max_websocket_size;
137 while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
138 $self->finish(1009) and last unless ref $frame;
139 $self->parse_message($frame);
140 }
141
142 $self->emit('resume');
143}
144
145sub server_write {
146 my $self = shift;
147 $self->emit('drain') unless length($self->{write} //= '');
148 $self->completed if !length $self->{write} && $self->{closing};
149 return delete $self->{write};
150}
151
152sub with_compression {
153 my $self = shift;
154
155 # "permessage-deflate" extension
156 $self->compressed(1)
157 and $self->res->headers->sec_websocket_extensions('permessage-deflate')
158 if ($self->req->headers->sec_websocket_extensions // '')
159 =~ /permessage-deflate/;
160}
161
162sub with_protocols {
163 my $self = shift;
164
165 my %protos = map { trim($_) => 1 } split ',',
166 $self->req->headers->sec_websocket_protocol // '';
167 return undef unless defined(my $proto = first { $protos{$_} } @_);
168
169 $self->res->headers->sec_websocket_protocol($proto);
170 return $proto;
171}
172
1731;
174
175=encoding utf8
176
177=head1 NAME
178
179Mojo::Transaction::WebSocket - WebSocket transaction
180
181=head1 SYNOPSIS
182
183 use Mojo::Transaction::WebSocket;
184
185 # Send and receive WebSocket messages
186 my $ws = Mojo::Transaction::WebSocket->new;
187 $ws->send('Hello World!');
188 $ws->on(message => sub {
189 my ($ws, $msg) = @_;
190 say "Message: $msg";
191 });
192 $ws->on(finish => sub {
193 my ($ws, $code, $reason) = @_;
194 say "WebSocket closed with status $code.";
195 });
196
197=head1 DESCRIPTION
198
199L<Mojo::Transaction::WebSocket> is a container for WebSocket transactions, based
200on L<RFC 6455|http://tools.ietf.org/html/rfc6455> and
201L<RFC 7692|http://tools.ietf.org/html/rfc7692>.
202
203=head1 EVENTS
204
205L<Mojo::Transaction::WebSocket> inherits all events from L<Mojo::Transaction>
206and can emit the following new ones.
207
208=head2 binary
209
210 $ws->on(binary => sub {
211 my ($ws, $bytes) = @_;
212 ...
213 });
214
215Emitted when a complete WebSocket binary message has been received.
216
217 $ws->on(binary => sub {
218 my ($ws, $bytes) = @_;
219 say "Binary: $bytes";
220 });
221
222=head2 drain
223
224 $ws->on(drain => sub {
225 my $ws = shift;
226 ...
227 });
228
229Emitted once all data has been sent.
230
231 $ws->on(drain => sub {
232 my $ws = shift;
233 $ws->send(time);
234 });
235
236=head2 finish
237
238 $ws->on(finish => sub {
239 my ($ws, $code, $reason) = @_;
240 ...
241 });
242
243Emitted when the WebSocket connection has been closed.
244
245=head2 frame
246
247 $ws->on(frame => sub {
248 my ($ws, $frame) = @_;
249 ...
250 });
251
252Emitted when a WebSocket frame has been received.
253
254 $ws->on(frame => sub {
255 my ($ws, $frame) = @_;
256 say "FIN: $frame->[0]";
257 say "RSV1: $frame->[1]";
258 say "RSV2: $frame->[2]";
259 say "RSV3: $frame->[3]";
260 say "Opcode: $frame->[4]";
261 say "Payload: $frame->[5]";
262 });
263
264=head2 json
265
266 $ws->on(json => sub {
267 my ($ws, $json) = @_;
268 ...
269 });
270
271Emitted when a complete WebSocket message has been received, all text and
272binary messages will be automatically JSON decoded. Note that this event only
273gets emitted when it has at least one subscriber.
274
275 $ws->on(json => sub {
276 my ($ws, $hash) = @_;
277 say "Message: $hash->{msg}";
278 });
279
280=head2 message
281
282 $ws->on(message => sub {
283 my ($ws, $msg) = @_;
284 ...
285 });
286
287Emitted when a complete WebSocket message has been received, text messages will
288be automatically decoded. Note that this event only gets emitted when it has at
289least one subscriber.
290
291 $ws->on(message => sub {
292 my ($ws, $msg) = @_;
293 say "Message: $msg";
294 });
295
296=head2 resume
297
298 $tx->on(resume => sub {
299 my $tx = shift;
300 ...
301 });
302
303Emitted when transaction is resumed.
304
305=head2 text
306
307 $ws->on(text => sub {
308 my ($ws, $bytes) = @_;
309 ...
310 });
311
312Emitted when a complete WebSocket text message has been received.
313
314 $ws->on(text => sub {
315 my ($ws, $bytes) = @_;
316 say "Text: $bytes";
317 });
318
319=head1 ATTRIBUTES
320
321L<Mojo::Transaction::WebSocket> inherits all attributes from
322L<Mojo::Transaction> and implements the following new ones.
323
324=head2 compressed
325
326 my $bool = $ws->compressed;
327 $ws = $ws->compressed($bool);
328
329Compress messages with C<permessage-deflate> extension.
330
331=head2 established
332
333 my $bool = $ws->established;
334 $ws = $ws->established($bool);
335
336WebSocket connection established.
337
338=head2 handshake
339
340 my $handshake = $ws->handshake;
341 $ws = $ws->handshake(Mojo::Transaction::HTTP->new);
342
343The original handshake transaction, usually a L<Mojo::Transaction::HTTP> object.
344
345=head2 masked
346
347 my $bool = $ws->masked;
348 $ws = $ws->masked($bool);
349
350Mask outgoing frames with XOR cipher and a random 32-bit key.
351
352=head2 max_websocket_size
353
354 my $size = $ws->max_websocket_size;
355 $ws = $ws->max_websocket_size(1024);
356
357Maximum WebSocket message size in bytes, defaults to the value of the
358C<MOJO_MAX_WEBSOCKET_SIZE> environment variable or C<262144> (256KiB).
359
360=head1 METHODS
361
362L<Mojo::Transaction::WebSocket> inherits all methods from L<Mojo::Transaction>
363and implements the following new ones.
364
365=head2 build_message
366
367 my $frame = $ws->build_message({binary => $bytes});
368 my $frame = $ws->build_message({text => $bytes});
369 my $frame = $ws->build_message({json => {test => [1, 2, 3]}});
370 my $frame = $ws->build_message($chars);
371
372Build WebSocket message.
373
374=head2 client_read
375
376 $ws->client_read($data);
377
378Read data client-side, used to implement user agents such as L<Mojo::UserAgent>.
379
380=head2 client_write
381
382 my $bytes = $ws->client_write;
383
384Write data client-side, used to implement user agents such as
385L<Mojo::UserAgent>.
386
387=head2 closed
388
389 $tx = $tx->closed;
390
391Same as L<Mojo::Transaction/"completed">, but also indicates that all
392transaction data has been sent.
393
394=head2 connection
395
396 my $id = $ws->connection;
397
398Connection identifier.
399
400=head2 finish
401
402 $ws = $ws->finish;
403 $ws = $ws->finish(1000);
404 $ws = $ws->finish(1003 => 'Cannot accept data!');
405
406Close WebSocket connection gracefully.
407
408=head2 is_websocket
409
410 my $bool = $ws->is_websocket;
411
412True, this is a L<Mojo::Transaction::WebSocket> object.
413
414=head2 kept_alive
415
416 my $bool = $ws->kept_alive;
417
418Connection has been kept alive.
419
420=head2 local_address
421
422 my $address = $ws->local_address;
423
424Local interface address.
425
426=head2 local_port
427
428 my $port = $ws->local_port;
429
430Local interface port.
431
432=head2 parse_message
433
434 $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
435
436Parse WebSocket message.
437
438=head2 protocol
439
440 my $proto = $ws->protocol;
441
442Return negotiated subprotocol or C<undef>.
443
444=head2 remote_address
445
446 my $address = $ws->remote_address;
447
448Remote interface address.
449
450=head2 remote_port
451
452 my $port = $ws->remote_port;
453
454Remote interface port.
455
456=head2 req
457
458 my $req = $ws->req;
459
460Handshake request, usually a L<Mojo::Message::Request> object.
461
462=head2 res
463
464 my $res = $ws->res;
465
466Handshake response, usually a L<Mojo::Message::Response> object.
467
468=head2 resume
469
470 $ws = $ws->resume;
471
472Resume L</"handshake"> transaction.
473
474=head2 send
475
476 $ws = $ws->send({binary => $bytes});
477 $ws = $ws->send({text => $bytes});
478 $ws = $ws->send({json => {test => [1, 2, 3]}});
479 $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
480 $ws = $ws->send($chars);
481 $ws = $ws->send($chars => sub {...});
482
483Send message or frame non-blocking via WebSocket, the optional drain callback
484will be executed once all data has been written.
485
486 # Send "Ping" frame
487 use Mojo::WebSocket 'WS_PING';
488 $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
489
490=head2 server_read
491
492 $ws->server_read($data);
493
494Read data server-side, used to implement web servers such as
495L<Mojo::Server::Daemon>.
496
497=head2 server_write
498
499 my $bytes = $ws->server_write;
500
501Write data server-side, used to implement web servers such as
502L<Mojo::Server::Daemon>.
503
504=head2 with_compression
505
506 $ws->with_compression;
507
508Negotiate C<permessage-deflate> extension for this WebSocket connection.
509
510=head2 with_protocols
511
512 my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
513
514Negotiate subprotocol for this WebSocket connection.
515
516=head1 SEE ALSO
517
518L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
519
520=cut
Note: See TracBrowser for help on using the repository browser.