source: other-projects/playing-in-the-street/summer-2013/trunk/Playing-in-the-Street-WPF/Microsoft.Samples.Kinect.Webserver/WebSocketEventChannel.cs@ 28897

Last change on this file since 28897 was 28897, checked in by davidb, 10 years ago

GUI front-end to server base plus web page content

File size: 8.5 KB
Line 
1//------------------------------------------------------------------------------
2// <copyright file="WebSocketEventChannel.cs" company="Microsoft">
3// Copyright (c) Microsoft Corporation. All rights reserved.
4// </copyright>
5//------------------------------------------------------------------------------
6
7namespace Microsoft.Samples.Kinect.Webserver
8{
9 using System;
10 using System.Collections.Generic;
11 using System.Diagnostics;
12 using System.Net;
13 using System.Net.WebSockets;
14 using System.Threading.Tasks;
15
16 /// <summary>
17 /// Web socket communication channel used for sending events to clients.
18 /// </summary>
19 public sealed class WebSocketEventChannel : WebSocketChannelBase
20 {
21 /// <summary>
22 /// If more than this number of send tasks are overlapping (i.e.: simultaneously
23 /// awaiting to finish sending), it is considered as an indication that more
24 /// events are happening at a faster pace than can be handled by the underlying
25 /// web socket.
26 /// </summary>
27 private const int MaximumOverlappingTaskCount = 10;
28
29 /// <summary>
30 /// Keeps track of task representing the last send request initiated.
31 /// </summary>
32 private Task lastSendTask;
33
34 /// <summary>
35 /// Number of overlapping send tasks.
36 /// </summary>
37 private int overlappingTaskCount;
38
39 /// <summary>
40 /// Initializes a new instance of the <see cref="WebSocketEventChannel"/> class.
41 /// </summary>
42 /// <param name="context">
43 /// Web socket context.
44 /// </param>
45 /// <param name="closedAction">
46 /// Action to perform when web socket becomes closed.
47 /// </param>
48 internal WebSocketEventChannel(WebSocketContext context, Action<WebSocketChannelBase> closedAction)
49 : base(context, closedAction)
50 {
51 // Always monitor for disconnections.
52 this.StartDisconnectionMonitor();
53 }
54
55 /// <summary>
56 /// Attempt to open a new event channel from the specified HTTP listener context.
57 /// </summary>
58 /// <param name="listenerContext">
59 /// HTTP listener context.
60 /// </param>
61 /// <param name="openedAction">
62 /// Action to perform when web socket is opened.
63 /// Will never be called if web channel can't be opened.
64 /// </param>
65 /// <param name="closedAction">
66 /// Action to perform when web socket is closed.
67 /// Will never be called if web channel can't be opened.
68 /// </param>
69 /// <remarks>
70 /// If <paramref name="listenerContext"/> does not represent a web socket request, or if
71 /// web socket channel could not be established, an appropriate status code will be
72 /// returned via <paramref name="listenerContext"/>'s Response property.
73 /// </remarks>
74 public static async void TryOpenAsync(
75 HttpListenerContext listenerContext, Action<WebSocketEventChannel> openedAction, Action<WebSocketEventChannel> closedAction)
76 {
77 var socketContext = await HandleWebSocketRequestAsync(listenerContext);
78
79 if (socketContext != null)
80 {
81 var channel = new WebSocketEventChannel(socketContext, closedChannel => closedAction(closedChannel as WebSocketEventChannel));
82 openedAction(channel);
83 }
84 }
85
86 /// <summary>
87 /// Asynchronously sends a batch of messages over the web socket channel.
88 /// </summary>
89 /// <param name="messages">
90 /// Batch of messages to be sent as an atomic block through the web socket.
91 /// </param>
92 /// <returns>
93 /// true if the messages were sent successfully. false otherwise.
94 /// </returns>
95 public async Task<bool> SendMessagesAsync(params WebSocketMessage[] messages)
96 {
97 if (messages.Length == 0)
98 {
99 // No work to be done
100 return true;
101 }
102
103 ++this.overlappingTaskCount;
104
105 Task<Task<bool>> getSendTaskTask = null;
106
107 try
108 {
109 if (this.overlappingTaskCount > MaximumOverlappingTaskCount)
110 {
111 throw new InvalidOperationException(@"Events are being generated faster than web socket channel can handle");
112 }
113
114 // Create a function whose only purpose is to return a task representing
115 // the real work that needs to be done to send the messages, and a task
116 // corresponding to this function.
117 // We're basically creating a linked list of tasks, where each task waits
118 // for the previous task (if it exists) to finish processing before starting
119 // to do its own work.
120 // We do things in this way rather than adding message data to a queue that
121 // then processes each message in order, to avoid the additional data copy
122 // (and potential allocation) that would be required to allow for deferred
123 // processing. The current contract has message data be processed in the same
124 // function stack frame in which an awaiting client called us.
125 Func<object, Task<bool>> getSendTaskFunction = previousTask => this.SerializedSendMessages(previousTask, messages);
126 if (this.lastSendTask == null)
127 {
128 getSendTaskTask = new Task<Task<bool>>(getSendTaskFunction, null);
129 getSendTaskTask.Start();
130 }
131 else
132 {
133 getSendTaskTask = this.lastSendTask.ContinueWith(getSendTaskFunction);
134 }
135
136 this.lastSendTask = getSendTaskTask;
137
138 // After awaiting for this nested task we will have a task that represents
139 // the real work of actually sending the messages.
140 Task<bool> sendTask = await getSendTaskTask;
141 return await sendTask;
142 }
143 finally
144 {
145 // If no other send tasks came in while we were waiting for this send batch
146 // to complete, clear the last send task state so that future batches don't
147 // think that they are overlapping a previous batch.
148 if (this.lastSendTask == getSendTaskTask)
149 {
150 this.lastSendTask = null;
151 }
152
153 --this.overlappingTaskCount;
154
155 Debug.Assert((this.overlappingTaskCount > 0) || (this.lastSendTask == null), "Whenever no more tasks overlap, there should be no pending send task to await");
156 }
157 }
158
159 /// <summary>
160 /// Asynchronously sends a batch of messages over the web socket channel after ensuring
161 /// that all previous message batches have completed the sending process.
162 /// </summary>
163 /// <param name="previous">
164 /// Previous message-batch-sending task that needs to complete before we start
165 /// sending new messages. May be null, if this batch was requested after all previous
166 /// batches had already finished sending.
167 /// </param>
168 /// <param name="messages">
169 /// Batch of messages to send.
170 /// </param>
171 /// <returns>
172 /// true if the messages were sent successfully. false otherwise.
173 /// </returns>
174 private async Task<bool> SerializedSendMessages(object previous, IEnumerable<WebSocketMessage> messages)
175 {
176 var previousTask = previous as Task<Task<bool>>;
177 if (previousTask != null)
178 {
179 // The previous task, if non-null, is a task that returns the task that does
180 // the real work of sending the previous batch of messages, so we need to wait
181 // for the result task to finish to ensure that messages are serialized in the
182 // expected order.
183 await previousTask.Result;
184 }
185
186 foreach (var message in messages)
187 {
188 if (!await this.SendAsync(message.Content, message.MessageType))
189 {
190 return false;
191 }
192 }
193
194 return true;
195 }
196 }
197}
Note: See TracBrowser for help on using the repository browser.