1 | //------------------------------------------------------------------------------
|
---|
2 | // <copyright file="WebSocketEventChannel.cs" company="Microsoft">
|
---|
3 | // Copyright (c) Microsoft Corporation. All rights reserved.
|
---|
4 | // </copyright>
|
---|
5 | //------------------------------------------------------------------------------
|
---|
6 |
|
---|
7 | namespace Microsoft.Samples.Kinect.Webserver
|
---|
8 | {
|
---|
9 | using System;
|
---|
10 | using System.Collections.Generic;
|
---|
11 | using System.Diagnostics;
|
---|
12 | using System.Net;
|
---|
13 | using System.Net.WebSockets;
|
---|
14 | using System.Threading.Tasks;
|
---|
15 |
|
---|
16 | /// <summary>
|
---|
17 | /// Web socket communication channel used for sending events to clients.
|
---|
18 | /// </summary>
|
---|
19 | public sealed class WebSocketEventChannel : WebSocketChannelBase
|
---|
20 | {
|
---|
21 | /// <summary>
|
---|
22 | /// If more than this number of send tasks are overlapping (i.e.: simultaneously
|
---|
23 | /// awaiting to finish sending), it is considered as an indication that more
|
---|
24 | /// events are happening at a faster pace than can be handled by the underlying
|
---|
25 | /// web socket.
|
---|
26 | /// </summary>
|
---|
27 | private const int MaximumOverlappingTaskCount = 10;
|
---|
28 |
|
---|
29 | /// <summary>
|
---|
30 | /// Keeps track of task representing the last send request initiated.
|
---|
31 | /// </summary>
|
---|
32 | private Task lastSendTask;
|
---|
33 |
|
---|
34 | /// <summary>
|
---|
35 | /// Number of overlapping send tasks.
|
---|
36 | /// </summary>
|
---|
37 | private int overlappingTaskCount;
|
---|
38 |
|
---|
39 | /// <summary>
|
---|
40 | /// Initializes a new instance of the <see cref="WebSocketEventChannel"/> class.
|
---|
41 | /// </summary>
|
---|
42 | /// <param name="context">
|
---|
43 | /// Web socket context.
|
---|
44 | /// </param>
|
---|
45 | /// <param name="closedAction">
|
---|
46 | /// Action to perform when web socket becomes closed.
|
---|
47 | /// </param>
|
---|
48 | internal WebSocketEventChannel(WebSocketContext context, Action<WebSocketChannelBase> closedAction)
|
---|
49 | : base(context, closedAction)
|
---|
50 | {
|
---|
51 | // Always monitor for disconnections.
|
---|
52 | this.StartDisconnectionMonitor();
|
---|
53 | }
|
---|
54 |
|
---|
55 | /// <summary>
|
---|
56 | /// Attempt to open a new event channel from the specified HTTP listener context.
|
---|
57 | /// </summary>
|
---|
58 | /// <param name="listenerContext">
|
---|
59 | /// HTTP listener context.
|
---|
60 | /// </param>
|
---|
61 | /// <param name="openedAction">
|
---|
62 | /// Action to perform when web socket is opened.
|
---|
63 | /// Will never be called if web channel can't be opened.
|
---|
64 | /// </param>
|
---|
65 | /// <param name="closedAction">
|
---|
66 | /// Action to perform when web socket is closed.
|
---|
67 | /// Will never be called if web channel can't be opened.
|
---|
68 | /// </param>
|
---|
69 | /// <remarks>
|
---|
70 | /// If <paramref name="listenerContext"/> does not represent a web socket request, or if
|
---|
71 | /// web socket channel could not be established, an appropriate status code will be
|
---|
72 | /// returned via <paramref name="listenerContext"/>'s Response property.
|
---|
73 | /// </remarks>
|
---|
74 | public static async void TryOpenAsync(
|
---|
75 | HttpListenerContext listenerContext, Action<WebSocketEventChannel> openedAction, Action<WebSocketEventChannel> closedAction)
|
---|
76 | {
|
---|
77 | var socketContext = await HandleWebSocketRequestAsync(listenerContext);
|
---|
78 |
|
---|
79 | if (socketContext != null)
|
---|
80 | {
|
---|
81 | var channel = new WebSocketEventChannel(socketContext, closedChannel => closedAction(closedChannel as WebSocketEventChannel));
|
---|
82 | openedAction(channel);
|
---|
83 | }
|
---|
84 | }
|
---|
85 |
|
---|
86 | /// <summary>
|
---|
87 | /// Asynchronously sends a batch of messages over the web socket channel.
|
---|
88 | /// </summary>
|
---|
89 | /// <param name="messages">
|
---|
90 | /// Batch of messages to be sent as an atomic block through the web socket.
|
---|
91 | /// </param>
|
---|
92 | /// <returns>
|
---|
93 | /// true if the messages were sent successfully. false otherwise.
|
---|
94 | /// </returns>
|
---|
95 | public async Task<bool> SendMessagesAsync(params WebSocketMessage[] messages)
|
---|
96 | {
|
---|
97 | if (messages.Length == 0)
|
---|
98 | {
|
---|
99 | // No work to be done
|
---|
100 | return true;
|
---|
101 | }
|
---|
102 |
|
---|
103 | ++this.overlappingTaskCount;
|
---|
104 |
|
---|
105 | Task<Task<bool>> getSendTaskTask = null;
|
---|
106 |
|
---|
107 | try
|
---|
108 | {
|
---|
109 | if (this.overlappingTaskCount > MaximumOverlappingTaskCount)
|
---|
110 | {
|
---|
111 | throw new InvalidOperationException(@"Events are being generated faster than web socket channel can handle");
|
---|
112 | }
|
---|
113 |
|
---|
114 | // Create a function whose only purpose is to return a task representing
|
---|
115 | // the real work that needs to be done to send the messages, and a task
|
---|
116 | // corresponding to this function.
|
---|
117 | // We're basically creating a linked list of tasks, where each task waits
|
---|
118 | // for the previous task (if it exists) to finish processing before starting
|
---|
119 | // to do its own work.
|
---|
120 | // We do things in this way rather than adding message data to a queue that
|
---|
121 | // then processes each message in order, to avoid the additional data copy
|
---|
122 | // (and potential allocation) that would be required to allow for deferred
|
---|
123 | // processing. The current contract has message data be processed in the same
|
---|
124 | // function stack frame in which an awaiting client called us.
|
---|
125 | Func<object, Task<bool>> getSendTaskFunction = previousTask => this.SerializedSendMessages(previousTask, messages);
|
---|
126 | if (this.lastSendTask == null)
|
---|
127 | {
|
---|
128 | getSendTaskTask = new Task<Task<bool>>(getSendTaskFunction, null);
|
---|
129 | getSendTaskTask.Start();
|
---|
130 | }
|
---|
131 | else
|
---|
132 | {
|
---|
133 | getSendTaskTask = this.lastSendTask.ContinueWith(getSendTaskFunction);
|
---|
134 | }
|
---|
135 |
|
---|
136 | this.lastSendTask = getSendTaskTask;
|
---|
137 |
|
---|
138 | // After awaiting for this nested task we will have a task that represents
|
---|
139 | // the real work of actually sending the messages.
|
---|
140 | Task<bool> sendTask = await getSendTaskTask;
|
---|
141 | return await sendTask;
|
---|
142 | }
|
---|
143 | finally
|
---|
144 | {
|
---|
145 | // If no other send tasks came in while we were waiting for this send batch
|
---|
146 | // to complete, clear the last send task state so that future batches don't
|
---|
147 | // think that they are overlapping a previous batch.
|
---|
148 | if (this.lastSendTask == getSendTaskTask)
|
---|
149 | {
|
---|
150 | this.lastSendTask = null;
|
---|
151 | }
|
---|
152 |
|
---|
153 | --this.overlappingTaskCount;
|
---|
154 |
|
---|
155 | Debug.Assert((this.overlappingTaskCount > 0) || (this.lastSendTask == null), "Whenever no more tasks overlap, there should be no pending send task to await");
|
---|
156 | }
|
---|
157 | }
|
---|
158 |
|
---|
159 | /// <summary>
|
---|
160 | /// Asynchronously sends a batch of messages over the web socket channel after ensuring
|
---|
161 | /// that all previous message batches have completed the sending process.
|
---|
162 | /// </summary>
|
---|
163 | /// <param name="previous">
|
---|
164 | /// Previous message-batch-sending task that needs to complete before we start
|
---|
165 | /// sending new messages. May be null, if this batch was requested after all previous
|
---|
166 | /// batches had already finished sending.
|
---|
167 | /// </param>
|
---|
168 | /// <param name="messages">
|
---|
169 | /// Batch of messages to send.
|
---|
170 | /// </param>
|
---|
171 | /// <returns>
|
---|
172 | /// true if the messages were sent successfully. false otherwise.
|
---|
173 | /// </returns>
|
---|
174 | private async Task<bool> SerializedSendMessages(object previous, IEnumerable<WebSocketMessage> messages)
|
---|
175 | {
|
---|
176 | var previousTask = previous as Task<Task<bool>>;
|
---|
177 | if (previousTask != null)
|
---|
178 | {
|
---|
179 | // The previous task, if non-null, is a task that returns the task that does
|
---|
180 | // the real work of sending the previous batch of messages, so we need to wait
|
---|
181 | // for the result task to finish to ensure that messages are serialized in the
|
---|
182 | // expected order.
|
---|
183 | await previousTask.Result;
|
---|
184 | }
|
---|
185 |
|
---|
186 | foreach (var message in messages)
|
---|
187 | {
|
---|
188 | if (!await this.SendAsync(message.Content, message.MessageType))
|
---|
189 | {
|
---|
190 | return false;
|
---|
191 | }
|
---|
192 | }
|
---|
193 |
|
---|
194 | return true;
|
---|
195 | }
|
---|
196 | }
|
---|
197 | }
|
---|