1 | package Mojo::Transaction::WebSocket;
|
---|
2 | use Mojo::Base 'Mojo::Transaction';
|
---|
3 |
|
---|
4 | use Compress::Raw::Zlib 'Z_SYNC_FLUSH';
|
---|
5 | use List::Util 'first';
|
---|
6 | use Mojo::JSON qw(encode_json j);
|
---|
7 | use Mojo::Util qw(decode encode trim);
|
---|
8 | use Mojo::WebSocket
|
---|
9 | qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
|
---|
10 |
|
---|
11 | has [qw(compressed established handshake masked)];
|
---|
12 | has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
|
---|
13 |
|
---|
14 | sub build_message {
|
---|
15 | my ($self, $frame) = @_;
|
---|
16 |
|
---|
17 | # Text
|
---|
18 | $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
|
---|
19 |
|
---|
20 | # JSON
|
---|
21 | $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
|
---|
22 |
|
---|
23 | # Raw text or binary
|
---|
24 | if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] }
|
---|
25 | else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
|
---|
26 |
|
---|
27 | # "permessage-deflate" extension
|
---|
28 | return $frame unless $self->compressed;
|
---|
29 | my $deflate = $self->{deflate} ||= Compress::Raw::Zlib::Deflate->new(
|
---|
30 | AppendOutput => 1,
|
---|
31 | MemLevel => 8,
|
---|
32 | WindowBits => -15
|
---|
33 | );
|
---|
34 | $deflate->deflate($frame->[5], my $out);
|
---|
35 | $deflate->flush($out, Z_SYNC_FLUSH);
|
---|
36 | @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
|
---|
37 |
|
---|
38 | return $frame;
|
---|
39 | }
|
---|
40 |
|
---|
41 | sub client_read { shift->server_read(@_) }
|
---|
42 | sub client_write { shift->server_write(@_) }
|
---|
43 |
|
---|
44 | sub closed {
|
---|
45 | my $self = shift->completed;
|
---|
46 | return $self->emit(finish => $self->{close} ? (@{$self->{close}}) : 1006);
|
---|
47 | }
|
---|
48 |
|
---|
49 | sub connection { shift->handshake->connection }
|
---|
50 |
|
---|
51 | sub finish {
|
---|
52 | my $self = shift;
|
---|
53 |
|
---|
54 | my $close = $self->{close} = [@_];
|
---|
55 | my $payload = $close->[0] ? pack('n', $close->[0]) : '';
|
---|
56 | $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
|
---|
57 | $close->[0] //= 1005;
|
---|
58 | $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
|
---|
59 |
|
---|
60 | return $self;
|
---|
61 | }
|
---|
62 |
|
---|
63 | sub is_websocket {1}
|
---|
64 |
|
---|
65 | sub kept_alive { shift->handshake->kept_alive }
|
---|
66 | sub local_address { shift->handshake->local_address }
|
---|
67 | sub local_port { shift->handshake->local_port }
|
---|
68 |
|
---|
69 | sub parse_message {
|
---|
70 | my ($self, $frame) = @_;
|
---|
71 |
|
---|
72 | $self->emit(frame => $frame);
|
---|
73 |
|
---|
74 | # Ping/Pong
|
---|
75 | my $op = $frame->[4];
|
---|
76 | return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
|
---|
77 | return if $op == WS_PONG;
|
---|
78 |
|
---|
79 | # Close
|
---|
80 | if ($op == WS_CLOSE) {
|
---|
81 | return $self->finish unless length $frame->[5] >= 2;
|
---|
82 | return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')),
|
---|
83 | decode('UTF-8', $frame->[5]));
|
---|
84 | }
|
---|
85 |
|
---|
86 | # Append chunk and check message size
|
---|
87 | $self->{op} = $op unless exists $self->{op};
|
---|
88 | $self->{message} .= $frame->[5];
|
---|
89 | my $max = $self->max_websocket_size;
|
---|
90 | return $self->finish(1009) if length $self->{message} > $max;
|
---|
91 |
|
---|
92 | # No FIN bit (Continuation)
|
---|
93 | return unless $frame->[0];
|
---|
94 |
|
---|
95 | # "permessage-deflate" extension (handshake and RSV1)
|
---|
96 | my $msg = delete $self->{message};
|
---|
97 | if ($self->compressed && $frame->[1]) {
|
---|
98 | my $inflate = $self->{inflate} ||= Compress::Raw::Zlib::Inflate->new(
|
---|
99 | Bufsize => $max,
|
---|
100 | LimitOutput => 1,
|
---|
101 | WindowBits => -15
|
---|
102 | );
|
---|
103 | $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
|
---|
104 | return $self->finish(1009) if length $msg;
|
---|
105 | $msg = $out;
|
---|
106 | }
|
---|
107 |
|
---|
108 | $self->emit(json => j($msg)) if $self->has_subscribers('json');
|
---|
109 | $op = delete $self->{op};
|
---|
110 | $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
|
---|
111 | $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg)
|
---|
112 | if $self->has_subscribers('message');
|
---|
113 | }
|
---|
114 |
|
---|
115 | sub protocol { shift->res->headers->sec_websocket_protocol }
|
---|
116 |
|
---|
117 | sub remote_address { shift->handshake->remote_address }
|
---|
118 | sub remote_port { shift->handshake->remote_port }
|
---|
119 | sub req { shift->handshake->req }
|
---|
120 | sub res { shift->handshake->res }
|
---|
121 |
|
---|
122 | sub resume { $_[0]->handshake->resume and return $_[0] }
|
---|
123 |
|
---|
124 | sub send {
|
---|
125 | my ($self, $msg, $cb) = @_;
|
---|
126 | $self->once(drain => $cb) if $cb;
|
---|
127 | $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
|
---|
128 | $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
|
---|
129 | return $self->emit('resume');
|
---|
130 | }
|
---|
131 |
|
---|
132 | sub server_read {
|
---|
133 | my ($self, $chunk) = @_;
|
---|
134 |
|
---|
135 | $self->{read} .= $chunk;
|
---|
136 | my $max = $self->max_websocket_size;
|
---|
137 | while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
|
---|
138 | $self->finish(1009) and last unless ref $frame;
|
---|
139 | $self->parse_message($frame);
|
---|
140 | }
|
---|
141 |
|
---|
142 | $self->emit('resume');
|
---|
143 | }
|
---|
144 |
|
---|
145 | sub server_write {
|
---|
146 | my $self = shift;
|
---|
147 | $self->emit('drain') unless length($self->{write} //= '');
|
---|
148 | $self->completed if !length $self->{write} && $self->{closing};
|
---|
149 | return delete $self->{write};
|
---|
150 | }
|
---|
151 |
|
---|
152 | sub with_compression {
|
---|
153 | my $self = shift;
|
---|
154 |
|
---|
155 | # "permessage-deflate" extension
|
---|
156 | $self->compressed(1)
|
---|
157 | and $self->res->headers->sec_websocket_extensions('permessage-deflate')
|
---|
158 | if ($self->req->headers->sec_websocket_extensions // '')
|
---|
159 | =~ /permessage-deflate/;
|
---|
160 | }
|
---|
161 |
|
---|
162 | sub with_protocols {
|
---|
163 | my $self = shift;
|
---|
164 |
|
---|
165 | my %protos = map { trim($_) => 1 } split ',',
|
---|
166 | $self->req->headers->sec_websocket_protocol // '';
|
---|
167 | return undef unless defined(my $proto = first { $protos{$_} } @_);
|
---|
168 |
|
---|
169 | $self->res->headers->sec_websocket_protocol($proto);
|
---|
170 | return $proto;
|
---|
171 | }
|
---|
172 |
|
---|
173 | 1;
|
---|
174 |
|
---|
175 | =encoding utf8
|
---|
176 |
|
---|
177 | =head1 NAME
|
---|
178 |
|
---|
179 | Mojo::Transaction::WebSocket - WebSocket transaction
|
---|
180 |
|
---|
181 | =head1 SYNOPSIS
|
---|
182 |
|
---|
183 | use Mojo::Transaction::WebSocket;
|
---|
184 |
|
---|
185 | # Send and receive WebSocket messages
|
---|
186 | my $ws = Mojo::Transaction::WebSocket->new;
|
---|
187 | $ws->send('Hello World!');
|
---|
188 | $ws->on(message => sub {
|
---|
189 | my ($ws, $msg) = @_;
|
---|
190 | say "Message: $msg";
|
---|
191 | });
|
---|
192 | $ws->on(finish => sub {
|
---|
193 | my ($ws, $code, $reason) = @_;
|
---|
194 | say "WebSocket closed with status $code.";
|
---|
195 | });
|
---|
196 |
|
---|
197 | =head1 DESCRIPTION
|
---|
198 |
|
---|
199 | L<Mojo::Transaction::WebSocket> is a container for WebSocket transactions, based
|
---|
200 | on L<RFC 6455|http://tools.ietf.org/html/rfc6455> and
|
---|
201 | L<RFC 7692|http://tools.ietf.org/html/rfc7692>.
|
---|
202 |
|
---|
203 | =head1 EVENTS
|
---|
204 |
|
---|
205 | L<Mojo::Transaction::WebSocket> inherits all events from L<Mojo::Transaction>
|
---|
206 | and can emit the following new ones.
|
---|
207 |
|
---|
208 | =head2 binary
|
---|
209 |
|
---|
210 | $ws->on(binary => sub {
|
---|
211 | my ($ws, $bytes) = @_;
|
---|
212 | ...
|
---|
213 | });
|
---|
214 |
|
---|
215 | Emitted when a complete WebSocket binary message has been received.
|
---|
216 |
|
---|
217 | $ws->on(binary => sub {
|
---|
218 | my ($ws, $bytes) = @_;
|
---|
219 | say "Binary: $bytes";
|
---|
220 | });
|
---|
221 |
|
---|
222 | =head2 drain
|
---|
223 |
|
---|
224 | $ws->on(drain => sub {
|
---|
225 | my $ws = shift;
|
---|
226 | ...
|
---|
227 | });
|
---|
228 |
|
---|
229 | Emitted once all data has been sent.
|
---|
230 |
|
---|
231 | $ws->on(drain => sub {
|
---|
232 | my $ws = shift;
|
---|
233 | $ws->send(time);
|
---|
234 | });
|
---|
235 |
|
---|
236 | =head2 finish
|
---|
237 |
|
---|
238 | $ws->on(finish => sub {
|
---|
239 | my ($ws, $code, $reason) = @_;
|
---|
240 | ...
|
---|
241 | });
|
---|
242 |
|
---|
243 | Emitted when the WebSocket connection has been closed.
|
---|
244 |
|
---|
245 | =head2 frame
|
---|
246 |
|
---|
247 | $ws->on(frame => sub {
|
---|
248 | my ($ws, $frame) = @_;
|
---|
249 | ...
|
---|
250 | });
|
---|
251 |
|
---|
252 | Emitted when a WebSocket frame has been received.
|
---|
253 |
|
---|
254 | $ws->on(frame => sub {
|
---|
255 | my ($ws, $frame) = @_;
|
---|
256 | say "FIN: $frame->[0]";
|
---|
257 | say "RSV1: $frame->[1]";
|
---|
258 | say "RSV2: $frame->[2]";
|
---|
259 | say "RSV3: $frame->[3]";
|
---|
260 | say "Opcode: $frame->[4]";
|
---|
261 | say "Payload: $frame->[5]";
|
---|
262 | });
|
---|
263 |
|
---|
264 | =head2 json
|
---|
265 |
|
---|
266 | $ws->on(json => sub {
|
---|
267 | my ($ws, $json) = @_;
|
---|
268 | ...
|
---|
269 | });
|
---|
270 |
|
---|
271 | Emitted when a complete WebSocket message has been received, all text and
|
---|
272 | binary messages will be automatically JSON decoded. Note that this event only
|
---|
273 | gets emitted when it has at least one subscriber.
|
---|
274 |
|
---|
275 | $ws->on(json => sub {
|
---|
276 | my ($ws, $hash) = @_;
|
---|
277 | say "Message: $hash->{msg}";
|
---|
278 | });
|
---|
279 |
|
---|
280 | =head2 message
|
---|
281 |
|
---|
282 | $ws->on(message => sub {
|
---|
283 | my ($ws, $msg) = @_;
|
---|
284 | ...
|
---|
285 | });
|
---|
286 |
|
---|
287 | Emitted when a complete WebSocket message has been received, text messages will
|
---|
288 | be automatically decoded. Note that this event only gets emitted when it has at
|
---|
289 | least one subscriber.
|
---|
290 |
|
---|
291 | $ws->on(message => sub {
|
---|
292 | my ($ws, $msg) = @_;
|
---|
293 | say "Message: $msg";
|
---|
294 | });
|
---|
295 |
|
---|
296 | =head2 resume
|
---|
297 |
|
---|
298 | $tx->on(resume => sub {
|
---|
299 | my $tx = shift;
|
---|
300 | ...
|
---|
301 | });
|
---|
302 |
|
---|
303 | Emitted when transaction is resumed.
|
---|
304 |
|
---|
305 | =head2 text
|
---|
306 |
|
---|
307 | $ws->on(text => sub {
|
---|
308 | my ($ws, $bytes) = @_;
|
---|
309 | ...
|
---|
310 | });
|
---|
311 |
|
---|
312 | Emitted when a complete WebSocket text message has been received.
|
---|
313 |
|
---|
314 | $ws->on(text => sub {
|
---|
315 | my ($ws, $bytes) = @_;
|
---|
316 | say "Text: $bytes";
|
---|
317 | });
|
---|
318 |
|
---|
319 | =head1 ATTRIBUTES
|
---|
320 |
|
---|
321 | L<Mojo::Transaction::WebSocket> inherits all attributes from
|
---|
322 | L<Mojo::Transaction> and implements the following new ones.
|
---|
323 |
|
---|
324 | =head2 compressed
|
---|
325 |
|
---|
326 | my $bool = $ws->compressed;
|
---|
327 | $ws = $ws->compressed($bool);
|
---|
328 |
|
---|
329 | Compress messages with C<permessage-deflate> extension.
|
---|
330 |
|
---|
331 | =head2 established
|
---|
332 |
|
---|
333 | my $bool = $ws->established;
|
---|
334 | $ws = $ws->established($bool);
|
---|
335 |
|
---|
336 | WebSocket connection established.
|
---|
337 |
|
---|
338 | =head2 handshake
|
---|
339 |
|
---|
340 | my $handshake = $ws->handshake;
|
---|
341 | $ws = $ws->handshake(Mojo::Transaction::HTTP->new);
|
---|
342 |
|
---|
343 | The original handshake transaction, usually a L<Mojo::Transaction::HTTP> object.
|
---|
344 |
|
---|
345 | =head2 masked
|
---|
346 |
|
---|
347 | my $bool = $ws->masked;
|
---|
348 | $ws = $ws->masked($bool);
|
---|
349 |
|
---|
350 | Mask outgoing frames with XOR cipher and a random 32-bit key.
|
---|
351 |
|
---|
352 | =head2 max_websocket_size
|
---|
353 |
|
---|
354 | my $size = $ws->max_websocket_size;
|
---|
355 | $ws = $ws->max_websocket_size(1024);
|
---|
356 |
|
---|
357 | Maximum WebSocket message size in bytes, defaults to the value of the
|
---|
358 | C<MOJO_MAX_WEBSOCKET_SIZE> environment variable or C<262144> (256KiB).
|
---|
359 |
|
---|
360 | =head1 METHODS
|
---|
361 |
|
---|
362 | L<Mojo::Transaction::WebSocket> inherits all methods from L<Mojo::Transaction>
|
---|
363 | and implements the following new ones.
|
---|
364 |
|
---|
365 | =head2 build_message
|
---|
366 |
|
---|
367 | my $frame = $ws->build_message({binary => $bytes});
|
---|
368 | my $frame = $ws->build_message({text => $bytes});
|
---|
369 | my $frame = $ws->build_message({json => {test => [1, 2, 3]}});
|
---|
370 | my $frame = $ws->build_message($chars);
|
---|
371 |
|
---|
372 | Build WebSocket message.
|
---|
373 |
|
---|
374 | =head2 client_read
|
---|
375 |
|
---|
376 | $ws->client_read($data);
|
---|
377 |
|
---|
378 | Read data client-side, used to implement user agents such as L<Mojo::UserAgent>.
|
---|
379 |
|
---|
380 | =head2 client_write
|
---|
381 |
|
---|
382 | my $bytes = $ws->client_write;
|
---|
383 |
|
---|
384 | Write data client-side, used to implement user agents such as
|
---|
385 | L<Mojo::UserAgent>.
|
---|
386 |
|
---|
387 | =head2 closed
|
---|
388 |
|
---|
389 | $tx = $tx->closed;
|
---|
390 |
|
---|
391 | Same as L<Mojo::Transaction/"completed">, but also indicates that all
|
---|
392 | transaction data has been sent.
|
---|
393 |
|
---|
394 | =head2 connection
|
---|
395 |
|
---|
396 | my $id = $ws->connection;
|
---|
397 |
|
---|
398 | Connection identifier.
|
---|
399 |
|
---|
400 | =head2 finish
|
---|
401 |
|
---|
402 | $ws = $ws->finish;
|
---|
403 | $ws = $ws->finish(1000);
|
---|
404 | $ws = $ws->finish(1003 => 'Cannot accept data!');
|
---|
405 |
|
---|
406 | Close WebSocket connection gracefully.
|
---|
407 |
|
---|
408 | =head2 is_websocket
|
---|
409 |
|
---|
410 | my $bool = $ws->is_websocket;
|
---|
411 |
|
---|
412 | True, this is a L<Mojo::Transaction::WebSocket> object.
|
---|
413 |
|
---|
414 | =head2 kept_alive
|
---|
415 |
|
---|
416 | my $bool = $ws->kept_alive;
|
---|
417 |
|
---|
418 | Connection has been kept alive.
|
---|
419 |
|
---|
420 | =head2 local_address
|
---|
421 |
|
---|
422 | my $address = $ws->local_address;
|
---|
423 |
|
---|
424 | Local interface address.
|
---|
425 |
|
---|
426 | =head2 local_port
|
---|
427 |
|
---|
428 | my $port = $ws->local_port;
|
---|
429 |
|
---|
430 | Local interface port.
|
---|
431 |
|
---|
432 | =head2 parse_message
|
---|
433 |
|
---|
434 | $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
|
---|
435 |
|
---|
436 | Parse WebSocket message.
|
---|
437 |
|
---|
438 | =head2 protocol
|
---|
439 |
|
---|
440 | my $proto = $ws->protocol;
|
---|
441 |
|
---|
442 | Return negotiated subprotocol or C<undef>.
|
---|
443 |
|
---|
444 | =head2 remote_address
|
---|
445 |
|
---|
446 | my $address = $ws->remote_address;
|
---|
447 |
|
---|
448 | Remote interface address.
|
---|
449 |
|
---|
450 | =head2 remote_port
|
---|
451 |
|
---|
452 | my $port = $ws->remote_port;
|
---|
453 |
|
---|
454 | Remote interface port.
|
---|
455 |
|
---|
456 | =head2 req
|
---|
457 |
|
---|
458 | my $req = $ws->req;
|
---|
459 |
|
---|
460 | Handshake request, usually a L<Mojo::Message::Request> object.
|
---|
461 |
|
---|
462 | =head2 res
|
---|
463 |
|
---|
464 | my $res = $ws->res;
|
---|
465 |
|
---|
466 | Handshake response, usually a L<Mojo::Message::Response> object.
|
---|
467 |
|
---|
468 | =head2 resume
|
---|
469 |
|
---|
470 | $ws = $ws->resume;
|
---|
471 |
|
---|
472 | Resume L</"handshake"> transaction.
|
---|
473 |
|
---|
474 | =head2 send
|
---|
475 |
|
---|
476 | $ws = $ws->send({binary => $bytes});
|
---|
477 | $ws = $ws->send({text => $bytes});
|
---|
478 | $ws = $ws->send({json => {test => [1, 2, 3]}});
|
---|
479 | $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
|
---|
480 | $ws = $ws->send($chars);
|
---|
481 | $ws = $ws->send($chars => sub {...});
|
---|
482 |
|
---|
483 | Send message or frame non-blocking via WebSocket, the optional drain callback
|
---|
484 | will be executed once all data has been written.
|
---|
485 |
|
---|
486 | # Send "Ping" frame
|
---|
487 | use Mojo::WebSocket 'WS_PING';
|
---|
488 | $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
|
---|
489 |
|
---|
490 | =head2 server_read
|
---|
491 |
|
---|
492 | $ws->server_read($data);
|
---|
493 |
|
---|
494 | Read data server-side, used to implement web servers such as
|
---|
495 | L<Mojo::Server::Daemon>.
|
---|
496 |
|
---|
497 | =head2 server_write
|
---|
498 |
|
---|
499 | my $bytes = $ws->server_write;
|
---|
500 |
|
---|
501 | Write data server-side, used to implement web servers such as
|
---|
502 | L<Mojo::Server::Daemon>.
|
---|
503 |
|
---|
504 | =head2 with_compression
|
---|
505 |
|
---|
506 | $ws->with_compression;
|
---|
507 |
|
---|
508 | Negotiate C<permessage-deflate> extension for this WebSocket connection.
|
---|
509 |
|
---|
510 | =head2 with_protocols
|
---|
511 |
|
---|
512 | my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
|
---|
513 |
|
---|
514 | Negotiate subprotocol for this WebSocket connection.
|
---|
515 |
|
---|
516 | =head1 SEE ALSO
|
---|
517 |
|
---|
518 | L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
|
---|
519 |
|
---|
520 | =cut
|
---|