source: other-projects/playing-in-the-street/summer-2013/trunk/Playing-in-the-Street-WPF/Microsoft.Samples.Kinect.Webserver/WebSocketRpcChannel.cs@ 28897

Last change on this file since 28897 was 28897, checked in by davidb, 10 years ago

GUI front-end to server base plus web page content

File size: 13.8 KB
Line 
1//------------------------------------------------------------------------------
2// <copyright file="WebSocketRpcChannel.cs" company="Microsoft">
3// Copyright (c) Microsoft Corporation. All rights reserved.
4// </copyright>
5//------------------------------------------------------------------------------
6
7namespace Microsoft.Samples.Kinect.Webserver
8{
9 using System;
10 using System.Diagnostics;
11 using System.IO;
12 using System.Net;
13 using System.Net.WebSockets;
14 using System.Runtime.Serialization;
15 using System.Threading.Tasks;
16 using System.Windows.Threading;
17
18 using Microsoft.Samples.Kinect.Webserver.Sensor.Serialization;
19
20 /// <summary>
21 /// Web socket communication channel used for server to call remote procedures exposed
22 /// by web client and synchronously wait for the call to return with results.
23 /// </summary>
24 /// <remarks>
25 /// <para>
26 /// All communications performed by this channel are sent/received as UTF8-encoded web
27 /// socket text messages.
28 /// </para>
29 /// <para>
30 /// We only support one single RPC call to be sent at a time, by any client.
31 /// Calls initiated before a previous call completes will simply fail.
32 /// </para>
33 /// </remarks>
34 public sealed class WebSocketRpcChannel : WebSocketChannelBase
35 {
36 /// <summary>
37 /// Default number of bytes in buffer used to receive RPC responses from client.
38 /// </summary>
39 private const int DefaultReceiveBufferSize = 2048;
40
41 /// <summary>
42 /// Function name used to verify that client connection is still fully operational.
43 /// </summary>
44 private const string PingFunctionName = "ping";
45
46 /// <summary>
47 /// Buffer used to receive RPC responses from client.
48 /// </summary>
49 private byte[] receiveBuffer;
50
51 /// <summary>
52 /// Sequence Id of last function call performed.
53 /// </summary>
54 /// <remarks>
55 /// This is incremented with each function call, to make sequence ids unique.
56 /// </remarks>
57 private int sequenceId;
58
59 /// <summary>
60 /// True if we're currently waiting for an RPC call to come back with a response.
61 /// </summary>
62 private bool isInCall;
63
64 /// <summary>
65 /// Initializes a new instance of the <see cref="WebSocketRpcChannel"/> class.
66 /// </summary>
67 /// <param name="context">
68 /// Web socket context.
69 /// </param>
70 /// <param name="closedAction">
71 /// Action to perform when web socket becomes closed.
72 /// </param>
73 /// <param name="receiveBufferSize">
74 /// Number of bytes in buffer used to receive RPC responses from client.
75 /// </param>
76 private WebSocketRpcChannel(WebSocketContext context, Action<WebSocketChannelBase> closedAction, int receiveBufferSize)
77 : base(context, closedAction)
78 {
79 this.receiveBuffer = new byte[receiveBufferSize];
80 }
81
82 /// <summary>
83 /// Attempt to open a new RPC channel from the specified HTTP listener context.
84 /// </summary>
85 /// <param name="listenerContext">
86 /// HTTP listener context.
87 /// </param>
88 /// <param name="openedAction">
89 /// Action to perform when web socket is opened.
90 /// Will never be called if web channel can't be opened.
91 /// </param>
92 /// <param name="closedAction">
93 /// Action to perform when web socket is closed.
94 /// Will never be called if web channel can't be opened.
95 /// </param>
96 /// <param name="receiveBufferSize">
97 /// Number of bytes in buffer used to receive RPC responses from client.
98 /// </param>
99 /// <remarks>
100 /// If <paramref name="listenerContext"/> does not represent a web socket request, or if
101 /// web socket channel could not be established, an appropriate status code will be
102 /// returned via <paramref name="listenerContext"/>'s Response property.
103 /// </remarks>
104 public static async void TryOpenAsync(
105 HttpListenerContext listenerContext,
106 Action<WebSocketRpcChannel> openedAction,
107 Action<WebSocketRpcChannel> closedAction,
108 int receiveBufferSize)
109 {
110 var socketContext = await HandleWebSocketRequestAsync(listenerContext);
111
112 if (socketContext != null)
113 {
114 var channel = new WebSocketRpcChannel(
115 socketContext, closedChannel => closedAction(closedChannel as WebSocketRpcChannel), receiveBufferSize);
116 openedAction(channel);
117 }
118 }
119
120 /// <summary>
121 /// Attempt to open a new RPC channel from the specified HTTP listener context.
122 /// </summary>
123 /// <param name="listenerContext">
124 /// HTTP listener context.
125 /// </param>
126 /// <param name="openedAction">
127 /// Action to perform when web socket is opened.
128 /// Will never be called if web channel can't be opened.
129 /// </param>
130 /// <param name="closedAction">
131 /// Action to perform when web socket is closed.
132 /// Will never be called if web channel can't be opened.
133 /// </param>
134 /// <remarks>
135 /// <para>
136 /// If <paramref name="listenerContext"/> does not represent a web socket request, or if
137 /// web socket channel could not be established, an appropriate status code will be
138 /// returned via <paramref name="listenerContext"/>'s Response property.
139 /// </para>
140 /// <para>
141 /// Will use default receive buffer size.
142 /// </para>
143 /// </remarks>
144 public static void TryOpenAsync(
145 HttpListenerContext listenerContext,
146 Action<WebSocketRpcChannel> openedAction,
147 Action<WebSocketRpcChannel> closedAction)
148 {
149 TryOpenAsync(listenerContext, openedAction, closedAction, DefaultReceiveBufferSize);
150 }
151
152 /// <summary>
153 /// Determine if this web socket channel is open for sending/receiving messages
154 /// or if it has been closed
155 /// </summary>
156 /// <returns>
157 /// True if this web socket channel is still open, false otherwise.
158 /// </returns>
159 /// <remarks>
160 /// This call is expected to perform more comprehensive connection state checks
161 /// than IsOpen property, which might include sending remote messages, if the
162 /// specific <see cref="WebSocketChannelBase"/> subclass warrants it, so callers
163 /// should be careful not to call this method too often.
164 /// </remarks>
165 public override bool CheckConnectionStatus()
166 {
167 if (!base.CheckConnectionStatus())
168 {
169 return false;
170 }
171
172 if (this.isInCall)
173 {
174 return true;
175 }
176
177 var result = this.CallFunction<bool>(PingFunctionName);
178 var isValidConnection = result.Success && result.Result;
179
180 // If client did not respond to our ping, but socket is still open, close it
181 // because we're dealing with a client that does not respect our expected
182 // communication contract.
183 if (this.IsOpen && !isValidConnection)
184 {
185 this.Dispose();
186 }
187
188 return isValidConnection;
189 }
190
191 /// <summary>
192 /// Perform synchronous RPC function call.
193 /// </summary>
194 /// <typeparam name="T">
195 /// Type of function call result.
196 /// </typeparam>
197 /// <param name="functionName">
198 /// Name of remote function to invoke.
199 /// </param>
200 /// <param name="args">
201 /// Function arguments.
202 /// </param>
203 /// <returns>
204 /// Result of RPC call.
205 /// </returns>
206 public RpcResult<T> CallFunction<T>(string functionName, params object[] args)
207 {
208 if (this.isInCall)
209 {
210 return new RpcResult<T>(false);
211 }
212
213 this.isInCall = true;
214
215 try
216 {
217 var frame = new DispatcherFrame { Continue = true };
218 RpcResult<T> result = null;
219 try
220 {
221 // Push dispatcher frame with a single posted message to process before
222 // breaking out of frame
223 Dispatcher.CurrentDispatcher.BeginInvoke(
224 (Action)(async () =>
225 {
226 try
227 {
228 result = await this.SendReceiveAsync<T>(functionName, args);
229 }
230 catch (Exception e)
231 {
232 Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e);
233 result = new RpcResult<T>(false);
234 }
235
236 frame.Continue = false;
237 }));
238 Dispatcher.PushFrame(frame);
239
240 return result;
241 }
242 catch (AggregateException e)
243 {
244 Trace.TraceError("Error while sending/receiving data during remote function call:\n{0}", e);
245 return new RpcResult<T>(false);
246 }
247 }
248 finally
249 {
250 this.isInCall = false;
251 }
252 }
253
254 /// <summary>
255 /// Asynchronously send RPC function call request and process response, ensuring that
256 /// response matches request.
257 /// </summary>
258 /// <typeparam name="T">
259 /// Type of function call result.
260 /// </typeparam>
261 /// <param name="functionName">
262 /// Name of remote function to invoke.
263 /// </param>
264 /// <param name="args">
265 /// Function arguments.
266 /// </param>
267 /// <returns>
268 /// Result of RPC call, as an await-able task.
269 /// </returns>
270 private async Task<RpcResult<T>> SendReceiveAsync<T>(string functionName, object[] args)
271 {
272 var call = new FunctionCallRequest(functionName, args, ++this.sequenceId);
273
274 using (var callStream = new MemoryStream())
275 {
276 call.ToJson(callStream);
277 var sendResult =
278 await
279 this.SendAsync(new ArraySegment<byte>(callStream.GetBuffer(), 0, (int)callStream.Length), WebSocketMessageType.Text);
280 if (!sendResult)
281 {
282 return new RpcResult<T>(false);
283 }
284 }
285
286 var receiveResult = await this.ReceiveCompleteMessageAsync();
287 if (receiveResult == null)
288 {
289 return new RpcResult<T>(false);
290 }
291
292 using (var responseStream = new MemoryStream(this.receiveBuffer, 0, receiveResult.Count))
293 {
294 FunctionCallResponse<T> callResponse;
295
296 try
297 {
298 callResponse = responseStream.FromJson<FunctionCallResponse<T>>();
299 }
300 catch (SerializationException)
301 {
302 return new RpcResult<T>(false);
303 }
304
305 if (callResponse.id != call.id)
306 {
307 // call and response sequence ids don't match, so call did not succeed
308 return new RpcResult<T>(false);
309 }
310
311 return new RpcResult<T>(true, callResponse.result);
312 }
313 }
314
315 /// <summary>
316 /// Asynchronously wait for RPC function call response, until a complete message has
317 /// been received.
318 /// </summary>
319 /// <returns>
320 /// WebSocketReceiveResult if complete, non-empty, text message has been received.
321 /// Null if any failure occurred while receiving message.
322 /// </returns>
323 private async Task<WebSocketReceiveResult> ReceiveCompleteMessageAsync()
324 {
325 int receiveCount = 0;
326 WebSocketReceiveResult receiveResult;
327
328 do
329 {
330 receiveResult =
331 await
332 this.ReceiveAsync(new ArraySegment<byte>(this.receiveBuffer, receiveCount, this.receiveBuffer.Length - receiveCount));
333
334 if ((receiveResult == null) || (receiveResult.MessageType != WebSocketMessageType.Text))
335 {
336 return null;
337 }
338
339 receiveCount += receiveResult.Count;
340
341 if (receiveResult.EndOfMessage)
342 {
343 break;
344 }
345
346 // This can only happen if we've filled the buffer and message is still not completely
347 // received, so we double the buffer size.
348 Debug.Assert(receiveCount == this.receiveBuffer.Length, "ReceiveAsync method should guarantee that incomplete messages are only returned when buffer is completely full");
349
350 var newBuffer = new byte[receiveCount * 2];
351 Array.Copy(this.receiveBuffer, newBuffer, receiveCount);
352 this.receiveBuffer = newBuffer;
353 }
354 while (!receiveResult.EndOfMessage);
355
356 if (receiveCount == 0)
357 {
358 return null;
359 }
360
361 return new WebSocketReceiveResult(receiveCount, WebSocketMessageType.Text, true);
362 }
363 }
364}
Note: See TracBrowser for help on using the repository browser.