Skip to content

Correct outgoing message queue threading #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SocketIO/obj/*
*.suo
*j.user
SocketIO/bin/*
SocketIO.userprefs
7 changes: 5 additions & 2 deletions SocketIO.sln
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

Microsoft Visual Studio Solution File, Format Version 10.00
# Visual Studio 2008
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2012
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SocketIO", "SocketIO\SocketIO.csproj", "{2F18D000-F801-40FC-8B29-CA112E33536D}"
EndProject
Global
Expand All @@ -14,6 +14,9 @@ Global
{2F18D000-F801-40FC-8B29-CA112E33536D}.Release|x86.ActiveCfg = Release|x86
{2F18D000-F801-40FC-8B29-CA112E33536D}.Release|x86.Build.0 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
StartupItem = SocketIO\SocketIO.csproj
EndGlobalSection
Expand Down
12 changes: 0 additions & 12 deletions SocketIO.userprefs

This file was deleted.

38 changes: 35 additions & 3 deletions SocketIO/SocketIO.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="3.5" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">x86</Platform>
Expand All @@ -10,6 +10,26 @@
<RootNamespace>SocketIO</RootNamespace>
<AssemblyName>SocketIO</AssemblyName>
<TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
<FileUpgradeFlags>
</FileUpgradeFlags>
<UpgradeBackupLocation>
</UpgradeBackupLocation>
<OldToolsVersion>3.5</OldToolsVersion>
<PublishUrl>publish\</PublishUrl>
<Install>true</Install>
<InstallFrom>Disk</InstallFrom>
<UpdateEnabled>false</UpdateEnabled>
<UpdateMode>Foreground</UpdateMode>
<UpdateInterval>7</UpdateInterval>
<UpdateIntervalUnits>Days</UpdateIntervalUnits>
<UpdatePeriodically>false</UpdatePeriodically>
<UpdateRequired>false</UpdateRequired>
<MapFileExtensions>true</MapFileExtensions>
<ApplicationRevision>0</ApplicationRevision>
<ApplicationVersion>1.0.0.%2a</ApplicationVersion>
<IsWebBootstrapper>false</IsWebBootstrapper>
<UseApplicationTrust>false</UseApplicationTrust>
<BootstrapperEnabled>true</BootstrapperEnabled>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
<DebugSymbols>true</DebugSymbols>
Expand All @@ -19,7 +39,7 @@
<DefineConstants>DEBUG;</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<PlatformTarget>x86</PlatformTarget>
<PlatformTarget>AnyCPU</PlatformTarget>
<ConsolePause>false</ConsolePause>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
Expand Down Expand Up @@ -79,4 +99,16 @@
<HintPath>bin\Debug\SocketIO.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<BootstrapperPackage Include="Microsoft.Net.Client.3.5">
<Visible>False</Visible>
<ProductName>.NET Framework 3.5 SP1 Client Profile</ProductName>
<Install>false</Install>
</BootstrapperPackage>
<BootstrapperPackage Include="Microsoft.Net.Framework.3.5.SP1">
<Visible>False</Visible>
<ProductName>.NET Framework 3.5 SP1</ProductName>
<Install>true</Install>
</BootstrapperPackage>
</ItemGroup>
</Project>
Binary file removed SocketIO/bin/Debug/SocketIO.dll
Binary file not shown.
Binary file removed SocketIO/bin/Debug/SocketIO.pdb
Binary file not shown.
83 changes: 41 additions & 42 deletions SocketIO/socketio/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ public class Client : IDisposable, SocketIOClient.IClient
public event EventHandler SocketConnectionClosed;
public event EventHandler<ErrorEventArgs> Error;

/// <summary>
/// ResetEvent for Outbound MessageQueue Empty Event - all pending messages have been sent
/// </summary>
public ManualResetEvent MessageQueueEmptyEvent = new ManualResetEvent(true);

/// <summary>
/// Connection Open Event
/// </summary>
Expand Down Expand Up @@ -128,15 +123,22 @@ public Client(string url, WebSocketVersion socketVersion)

this.registrationManager = new RegistrationManager();
this.outboundQueue = (new ConcurrentQueue<string>());
this.dequeuOutBoundMsgTask = new Thread(new ThreadStart(dequeuOutboundMessages));
//this.dequeuOutBoundMsgTask = Task.Factory.StartNew(() => dequeuOutboundMessages(), TaskCreationOptions.LongRunning);
this.dequeuOutBoundMsgTask.Start();
}

/// <summary>
/// Initiate the connection with Socket.IO service
/// </summary>
public void Connect()
{
Connect(null, null);
}

/// <summary>
/// Initiate the connection with Socket.IO service using provided handshake query string and headers
/// </summary>
/// <param name="query"></param>
/// <param name="headers"></param>
public void Connect(string query, string[] headers)
{
lock (padLock)
{
Expand All @@ -145,7 +147,7 @@ public void Connect()
try
{
this.ConnectionOpenEvent.Reset();
this.HandShake = this.requestHandshake(uri);// perform an initial HTTP request as a new, non-handshaken connection
this.HandShake = this.requestHandshake(uri, query, headers);// perform an initial HTTP request as a new, non-handshaken connection

if (this.HandShake == null || string.IsNullOrEmpty(this.HandShake.SID) || this.HandShake.HadError)
{
Expand All @@ -159,7 +161,8 @@ public void Connect()
string.Format("{0}://{1}:{2}/socket.io/1/websocket/{3}", wsScheme, uri.Host, uri.Port, this.HandShake.SID),
string.Empty,
this.socketVersion);
this.wsClient.EnableAutoSendPing = true; // #4 tkiley: Websocket4net client library initiates a websocket heartbeat, causes delivery problems
//this.wsClient.EnableAutoSendPing = true; // #4 tkiley: Websocket4net client library initiates a websocket heartbeat, causes delivery problems
this.wsClient.EnableAutoSendPing = false; // ANF: socket.io has it's own heartbeat, this should be unnecessary
this.wsClient.Opened += this.wsClient_OpenEvent;
this.wsClient.MessageReceived += this.wsClient_MessageReceived;
this.wsClient.Error += this.wsClient_Error;
Expand Down Expand Up @@ -303,9 +306,11 @@ public void Emit(string eventName, Object payload)
/// <param name="msg"></param>
public void Send(IMessage msg)
{
this.MessageQueueEmptyEvent.Reset();
if (this.outboundQueue != null)
{
this.outboundQueue.Enqueue(msg.Encoded);
ThreadPool.QueueUserWorkItem(new WaitCallback(dequeueOutboundMessages));
}
}

public void Send(string msg) {
Expand All @@ -315,9 +320,11 @@ public void Send(string msg) {

private void Send_backup(string rawEncodedMessageText)
{
this.MessageQueueEmptyEvent.Reset();
if (this.outboundQueue != null)
{
this.outboundQueue.Enqueue(rawEncodedMessageText);
ThreadPool.QueueUserWorkItem(new WaitCallback(dequeueOutboundMessages));
}
}

/// <summary>
Expand Down Expand Up @@ -404,7 +411,10 @@ protected void closeWebSocketClient()
// websocket client events - open, messages, errors, closing
private void wsClient_OpenEvent(object sender, EventArgs e)
{
this.socketHeartBeatTimer = new Timer(OnHeartBeatTimerCallback, new object(), HandShake.HeartbeatInterval, HandShake.HeartbeatInterval);
// ANF: only enable the heartbeat timer if the interval is non-zero
if(HandShake.HeartbeatInterval.TotalMilliseconds!=(double)0.0)
this.socketHeartBeatTimer = new Timer(OnHeartBeatTimerCallback, new object(), HandShake.HeartbeatInterval, HandShake.HeartbeatInterval);

this.ConnectionOpenEvent.Set();

this.OnMessageEvent(new EventMessage() { Event = "open" });
Expand All @@ -413,6 +423,9 @@ private void wsClient_OpenEvent(object sender, EventArgs e)
try { this.Opened(this, EventArgs.Empty); }
catch (Exception ex) { Trace.WriteLine(ex); }
}

// send any messages we might have queued while opening the connection
ThreadPool.QueueUserWorkItem(new WaitCallback(dequeueOutboundMessages));

}

Expand Down Expand Up @@ -547,34 +560,16 @@ private void EndAsyncEvent(IAsyncResult result)
}
}
/// <summary>
/// While connection is open, dequeue and send messages to the socket server
/// dequeue and send messages to the socket server
/// </summary>
protected void dequeuOutboundMessages()
protected void dequeueOutboundMessages(Object stateinfo)
{
while (this.outboundQueue != null)
if (this.ReadyState == WebSocketState.Open && this.outboundQueue != null)
{
if (this.ReadyState == WebSocketState.Open)
{
string msgString;
try
{
if (this.outboundQueue.TryDequeue(out msgString))
{
this.wsClient.Send(msgString);
}
else
this.MessageQueueEmptyEvent.Set();
}
catch(Exception ex)
{
Trace.WriteLine("The outboundQueue is no longer open...");
}
}
else
{
this.ConnectionOpenEvent.WaitOne(2000); // wait for connection event
}
}
string msgString;
while(this.outboundQueue.TryDequeue(out msgString))
this.wsClient.Send(msgString);
}
}

/// <summary>
Expand All @@ -583,19 +578,24 @@ protected void dequeuOutboundMessages()
/// <para>The tansport and sid are required as part of the ws: transport connection</para>
/// </summary>
/// <param name="uri">http://localhost:3000</param>
/// <param name="query">nullable, optional query string</param>
/// <param name="headers">nullable, Optional headers(ex: "name:value")</param>
/// <returns>Handshake object with sid value</returns>
/// <example>DownloadString: 13052140081337757257:15:25:websocket,htmlfile,xhr-polling,jsonp-polling</example>
protected SocketIOHandshake requestHandshake(Uri uri)
protected SocketIOHandshake requestHandshake(Uri uri, string query, string[] headers)
{
string value = string.Empty;
string errorText = string.Empty;
SocketIOHandshake handshake = null;

using (WebClient client = new WebClient())
{
{
if (headers != null && headers.Length > 0)
foreach (string s in headers)
client.Headers.Add(s);
try
{
value = client.DownloadString(string.Format("{0}://{1}:{2}/socket.io/1/{3}", uri.Scheme, uri.Host, uri.Port, uri.Query)); // #5 tkiley: The uri.Query is available in socket.io's handshakeData object during authorization
value = client.DownloadString(string.Format("{0}://{1}:{2}/socket.io/1/{3}", uri.Scheme, uri.Host, uri.Port, string.IsNullOrEmpty(query) ? uri.Query: query)); // #5 tkiley: The uri.Query is available in socket.io's handshakeData object during authorization
// 13052140081337757257:15:25:websocket,htmlfile,xhr-polling,jsonp-polling
if (string.IsNullOrEmpty(value))
errorText = "Did not receive handshake string from server";
Expand Down Expand Up @@ -630,7 +630,6 @@ protected virtual void Dispose(bool disposing)
{
// free managed resources
this.Close();
this.MessageQueueEmptyEvent.Close();
this.ConnectionOpenEvent.Close();
}

Expand Down