1 | //------------------------------------------------------------------------------
|
---|
2 | // <copyright file="WebSocketRpcChannel.cs" company="Microsoft">
|
---|
3 | // Copyright (c) Microsoft Corporation. All rights reserved.
|
---|
4 | // </copyright>
|
---|
5 | //------------------------------------------------------------------------------
|
---|
6 |
|
---|
7 | namespace Microsoft.Samples.Kinect.Webserver
|
---|
8 | {
|
---|
9 | using System;
|
---|
10 | using System.Diagnostics;
|
---|
11 | using System.IO;
|
---|
12 | using System.Net;
|
---|
13 | using System.Net.WebSockets;
|
---|
14 | using System.Runtime.Serialization;
|
---|
15 | using System.Threading.Tasks;
|
---|
16 | using System.Windows.Threading;
|
---|
17 |
|
---|
18 | using Microsoft.Samples.Kinect.Webserver.Sensor.Serialization;
|
---|
19 |
|
---|
20 | /// <summary>
|
---|
21 | /// Web socket communication channel used for server to call remote procedures exposed
|
---|
22 | /// by web client and synchronously wait for the call to return with results.
|
---|
23 | /// </summary>
|
---|
24 | /// <remarks>
|
---|
25 | /// <para>
|
---|
26 | /// All communications performed by this channel are sent/received as UTF8-encoded web
|
---|
27 | /// socket text messages.
|
---|
28 | /// </para>
|
---|
29 | /// <para>
|
---|
30 | /// We only support one single RPC call to be sent at a time, by any client.
|
---|
31 | /// Calls initiated before a previous call completes will simply fail.
|
---|
32 | /// </para>
|
---|
33 | /// </remarks>
|
---|
34 | public sealed class WebSocketRpcChannel : WebSocketChannelBase
|
---|
35 | {
|
---|
36 | /// <summary>
|
---|
37 | /// Default number of bytes in buffer used to receive RPC responses from client.
|
---|
38 | /// </summary>
|
---|
39 | private const int DefaultReceiveBufferSize = 2048;
|
---|
40 |
|
---|
41 | /// <summary>
|
---|
42 | /// Function name used to verify that client connection is still fully operational.
|
---|
43 | /// </summary>
|
---|
44 | private const string PingFunctionName = "ping";
|
---|
45 |
|
---|
46 | /// <summary>
|
---|
47 | /// Buffer used to receive RPC responses from client.
|
---|
48 | /// </summary>
|
---|
49 | private byte[] receiveBuffer;
|
---|
50 |
|
---|
51 | /// <summary>
|
---|
52 | /// Sequence Id of last function call performed.
|
---|
53 | /// </summary>
|
---|
54 | /// <remarks>
|
---|
55 | /// This is incremented with each function call, to make sequence ids unique.
|
---|
56 | /// </remarks>
|
---|
57 | private int sequenceId;
|
---|
58 |
|
---|
59 | /// <summary>
|
---|
60 | /// True if we're currently waiting for an RPC call to come back with a response.
|
---|
61 | /// </summary>
|
---|
62 | private bool isInCall;
|
---|
63 |
|
---|
64 | /// <summary>
|
---|
65 | /// Initializes a new instance of the <see cref="WebSocketRpcChannel"/> class.
|
---|
66 | /// </summary>
|
---|
67 | /// <param name="context">
|
---|
68 | /// Web socket context.
|
---|
69 | /// </param>
|
---|
70 | /// <param name="closedAction">
|
---|
71 | /// Action to perform when web socket becomes closed.
|
---|
72 | /// </param>
|
---|
73 | /// <param name="receiveBufferSize">
|
---|
74 | /// Number of bytes in buffer used to receive RPC responses from client.
|
---|
75 | /// </param>
|
---|
76 | private WebSocketRpcChannel(WebSocketContext context, Action<WebSocketChannelBase> closedAction, int receiveBufferSize)
|
---|
77 | : base(context, closedAction)
|
---|
78 | {
|
---|
79 | this.receiveBuffer = new byte[receiveBufferSize];
|
---|
80 | }
|
---|
81 |
|
---|
82 | /// <summary>
|
---|
83 | /// Attempt to open a new RPC channel from the specified HTTP listener context.
|
---|
84 | /// </summary>
|
---|
85 | /// <param name="listenerContext">
|
---|
86 | /// HTTP listener context.
|
---|
87 | /// </param>
|
---|
88 | /// <param name="openedAction">
|
---|
89 | /// Action to perform when web socket is opened.
|
---|
90 | /// Will never be called if web channel can't be opened.
|
---|
91 | /// </param>
|
---|
92 | /// <param name="closedAction">
|
---|
93 | /// Action to perform when web socket is closed.
|
---|
94 | /// Will never be called if web channel can't be opened.
|
---|
95 | /// </param>
|
---|
96 | /// <param name="receiveBufferSize">
|
---|
97 | /// Number of bytes in buffer used to receive RPC responses from client.
|
---|
98 | /// </param>
|
---|
99 | /// <remarks>
|
---|
100 | /// If <paramref name="listenerContext"/> does not represent a web socket request, or if
|
---|
101 | /// web socket channel could not be established, an appropriate status code will be
|
---|
102 | /// returned via <paramref name="listenerContext"/>'s Response property.
|
---|
103 | /// </remarks>
|
---|
104 | public static async void TryOpenAsync(
|
---|
105 | HttpListenerContext listenerContext,
|
---|
106 | Action<WebSocketRpcChannel> openedAction,
|
---|
107 | Action<WebSocketRpcChannel> closedAction,
|
---|
108 | int receiveBufferSize)
|
---|
109 | {
|
---|
110 | var socketContext = await HandleWebSocketRequestAsync(listenerContext);
|
---|
111 |
|
---|
112 | if (socketContext != null)
|
---|
113 | {
|
---|
114 | var channel = new WebSocketRpcChannel(
|
---|
115 | socketContext, closedChannel => closedAction(closedChannel as WebSocketRpcChannel), receiveBufferSize);
|
---|
116 | openedAction(channel);
|
---|
117 | }
|
---|
118 | }
|
---|
119 |
|
---|
120 | /// <summary>
|
---|
121 | /// Attempt to open a new RPC channel from the specified HTTP listener context.
|
---|
122 | /// </summary>
|
---|
123 | /// <param name="listenerContext">
|
---|
124 | /// HTTP listener context.
|
---|
125 | /// </param>
|
---|
126 | /// <param name="openedAction">
|
---|
127 | /// Action to perform when web socket is opened.
|
---|
128 | /// Will never be called if web channel can't be opened.
|
---|
129 | /// </param>
|
---|
130 | /// <param name="closedAction">
|
---|
131 | /// Action to perform when web socket is closed.
|
---|
132 | /// Will never be called if web channel can't be opened.
|
---|
133 | /// </param>
|
---|
134 | /// <remarks>
|
---|
135 | /// <para>
|
---|
136 | /// If <paramref name="listenerContext"/> does not represent a web socket request, or if
|
---|
137 | /// web socket channel could not be established, an appropriate status code will be
|
---|
138 | /// returned via <paramref name="listenerContext"/>'s Response property.
|
---|
139 | /// </para>
|
---|
140 | /// <para>
|
---|
141 | /// Will use default receive buffer size.
|
---|
142 | /// </para>
|
---|
143 | /// </remarks>
|
---|
144 | public static void TryOpenAsync(
|
---|
145 | HttpListenerContext listenerContext,
|
---|
146 | Action<WebSocketRpcChannel> openedAction,
|
---|
147 | Action<WebSocketRpcChannel> closedAction)
|
---|
148 | {
|
---|
149 | TryOpenAsync(listenerContext, openedAction, closedAction, DefaultReceiveBufferSize);
|
---|
150 | }
|
---|
151 |
|
---|
152 | /// <summary>
|
---|
153 | /// Determine if this web socket channel is open for sending/receiving messages
|
---|
154 | /// or if it has been closed
|
---|
155 | /// </summary>
|
---|
156 | /// <returns>
|
---|
157 | /// True if this web socket channel is still open, false otherwise.
|
---|
158 | /// </returns>
|
---|
159 | /// <remarks>
|
---|
160 | /// This call is expected to perform more comprehensive connection state checks
|
---|
161 | /// than IsOpen property, which might include sending remote messages, if the
|
---|
162 | /// specific <see cref="WebSocketChannelBase"/> subclass warrants it, so callers
|
---|
163 | /// should be careful not to call this method too often.
|
---|
164 | /// </remarks>
|
---|
165 | public override bool CheckConnectionStatus()
|
---|
166 | {
|
---|
167 | if (!base.CheckConnectionStatus())
|
---|
168 | {
|
---|
169 | return false;
|
---|
170 | }
|
---|
171 |
|
---|
172 | if (this.isInCall)
|
---|
173 | {
|
---|
174 | return true;
|
---|
175 | }
|
---|
176 |
|
---|
177 | var result = this.CallFunction<bool>(PingFunctionName);
|
---|
178 | var isValidConnection = result.Success && result.Result;
|
---|
179 |
|
---|
180 | // If client did not respond to our ping, but socket is still open, close it
|
---|
181 | // because we're dealing with a client that does not respect our expected
|
---|
182 | // communication contract.
|
---|
183 | if (this.IsOpen && !isValidConnection)
|
---|
184 | {
|
---|
185 | this.Dispose();
|
---|
186 | }
|
---|
187 |
|
---|
188 | return isValidConnection;
|
---|
189 | }
|
---|
190 |
|
---|
191 | /// <summary>
|
---|
192 | /// Perform synchronous RPC function call.
|
---|
193 | /// </summary>
|
---|
194 | /// <typeparam name="T">
|
---|
195 | /// Type of function call result.
|
---|
196 | /// </typeparam>
|
---|
197 | /// <param name="functionName">
|
---|
198 | /// Name of remote function to invoke.
|
---|
199 | /// </param>
|
---|
200 | /// <param name="args">
|
---|
201 | /// Function arguments.
|
---|
202 | /// </param>
|
---|
203 | /// <returns>
|
---|
204 | /// Result of RPC call.
|
---|
205 | /// </returns>
|
---|
206 | public RpcResult<T> CallFunction<T>(string functionName, params object[] args)
|
---|
207 | {
|
---|
208 | if (this.isInCall)
|
---|
209 | {
|
---|
210 | return new RpcResult<T>(false);
|
---|
211 | }
|
---|
212 |
|
---|
213 | this.isInCall = true;
|
---|
214 |
|
---|
215 | try
|
---|
216 | {
|
---|
217 | var frame = new DispatcherFrame { Continue = true };
|
---|
218 | RpcResult<T> result = null;
|
---|
219 | try
|
---|
220 | {
|
---|
221 | // Push dispatcher frame with a single posted message to process before
|
---|
222 | // breaking out of frame
|
---|
223 | Dispatcher.CurrentDispatcher.BeginInvoke(
|
---|
224 | (Action)(async () =>
|
---|
225 | {
|
---|
226 | try
|
---|
227 | {
|
---|
228 | result = await this.SendReceiveAsync<T>(functionName, args);
|
---|
229 | }
|
---|
230 | catch (Exception e)
|
---|
231 | {
|
---|
232 | Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e);
|
---|
233 | result = new RpcResult<T>(false);
|
---|
234 | }
|
---|
235 |
|
---|
236 | frame.Continue = false;
|
---|
237 | }));
|
---|
238 | Dispatcher.PushFrame(frame);
|
---|
239 |
|
---|
240 | return result;
|
---|
241 | }
|
---|
242 | catch (AggregateException e)
|
---|
243 | {
|
---|
244 | Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e);
|
---|
245 | return new RpcResult<T>(false);
|
---|
246 | }
|
---|
247 | }
|
---|
248 | finally
|
---|
249 | {
|
---|
250 | this.isInCall = false;
|
---|
251 | }
|
---|
252 | }
|
---|
253 |
|
---|
254 | /// <summary>
|
---|
255 | /// Asynchronously send RPC function call request and process response, ensuring that
|
---|
256 | /// response matches request.
|
---|
257 | /// </summary>
|
---|
258 | /// <typeparam name="T">
|
---|
259 | /// Type of function call result.
|
---|
260 | /// </typeparam>
|
---|
261 | /// <param name="functionName">
|
---|
262 | /// Name of remote function to invoke.
|
---|
263 | /// </param>
|
---|
264 | /// <param name="args">
|
---|
265 | /// Function arguments.
|
---|
266 | /// </param>
|
---|
267 | /// <returns>
|
---|
268 | /// Result of RPC call, as an await-able task.
|
---|
269 | /// </returns>
|
---|
270 | private async Task<RpcResult<T>> SendReceiveAsync<T>(string functionName, object[] args)
|
---|
271 | {
|
---|
272 | var call = new FunctionCallRequest(functionName, args, ++this.sequenceId);
|
---|
273 |
|
---|
274 | using (var callStream = new MemoryStream())
|
---|
275 | {
|
---|
276 | call.ToJson(callStream);
|
---|
277 | var sendResult =
|
---|
278 | await
|
---|
279 | this.SendAsync(new ArraySegment<byte>(callStream.GetBuffer(), 0, (int)callStream.Length), WebSocketMessageType.Text);
|
---|
280 | if (!sendResult)
|
---|
281 | {
|
---|
282 | return new RpcResult<T>(false);
|
---|
283 | }
|
---|
284 | }
|
---|
285 |
|
---|
286 | var receiveResult = await this.ReceiveCompleteMessageAsync();
|
---|
287 | if (receiveResult == null)
|
---|
288 | {
|
---|
289 | return new RpcResult<T>(false);
|
---|
290 | }
|
---|
291 |
|
---|
292 | using (var responseStream = new MemoryStream(this.receiveBuffer, 0, receiveResult.Count))
|
---|
293 | {
|
---|
294 | FunctionCallResponse<T> callResponse;
|
---|
295 |
|
---|
296 | try
|
---|
297 | {
|
---|
298 | callResponse = responseStream.FromJson<FunctionCallResponse<T>>();
|
---|
299 | }
|
---|
300 | catch (SerializationException)
|
---|
301 | {
|
---|
302 | return new RpcResult<T>(false);
|
---|
303 | }
|
---|
304 |
|
---|
305 | if (callResponse.id != call.id)
|
---|
306 | {
|
---|
307 | // call and response sequence ids don't match, so call did not succeed
|
---|
308 | return new RpcResult<T>(false);
|
---|
309 | }
|
---|
310 |
|
---|
311 | return new RpcResult<T>(true, callResponse.result);
|
---|
312 | }
|
---|
313 | }
|
---|
314 |
|
---|
315 | /// <summary>
|
---|
316 | /// Asynchronously wait for RPC function call response, until a complete message has
|
---|
317 | /// been received.
|
---|
318 | /// </summary>
|
---|
319 | /// <returns>
|
---|
320 | /// WebSocketReceiveResult if complete, non-empty, text message has been received.
|
---|
321 | /// Null if any failure occurred while receiving message.
|
---|
322 | /// </returns>
|
---|
323 | private async Task<WebSocketReceiveResult> ReceiveCompleteMessageAsync()
|
---|
324 | {
|
---|
325 | int receiveCount = 0;
|
---|
326 | WebSocketReceiveResult receiveResult;
|
---|
327 |
|
---|
328 | do
|
---|
329 | {
|
---|
330 | receiveResult =
|
---|
331 | await
|
---|
332 | this.ReceiveAsync(new ArraySegment<byte>(this.receiveBuffer, receiveCount, this.receiveBuffer.Length - receiveCount));
|
---|
333 |
|
---|
334 | if ((receiveResult == null) || (receiveResult.MessageType != WebSocketMessageType.Text))
|
---|
335 | {
|
---|
336 | return null;
|
---|
337 | }
|
---|
338 |
|
---|
339 | receiveCount += receiveResult.Count;
|
---|
340 |
|
---|
341 | if (receiveResult.EndOfMessage)
|
---|
342 | {
|
---|
343 | break;
|
---|
344 | }
|
---|
345 |
|
---|
346 | // This can only happen if we've filled the buffer and message is still not completely
|
---|
347 | // received, so we double the buffer size.
|
---|
348 | Debug.Assert(receiveCount == this.receiveBuffer.Length, "ReceiveAsync method should guarantee that incomplete messages are only returned when buffer is completely full");
|
---|
349 |
|
---|
350 | var newBuffer = new byte[receiveCount * 2];
|
---|
351 | Array.Copy(this.receiveBuffer, newBuffer, receiveCount);
|
---|
352 | this.receiveBuffer = newBuffer;
|
---|
353 | }
|
---|
354 | while (!receiveResult.EndOfMessage);
|
---|
355 |
|
---|
356 | if (receiveCount == 0)
|
---|
357 | {
|
---|
358 | return null;
|
---|
359 | }
|
---|
360 |
|
---|
361 | return new WebSocketReceiveResult(receiveCount, WebSocketMessageType.Text, true);
|
---|
362 | }
|
---|
363 | }
|
---|
364 | }
|
---|