Merge pull request #105 from PepperDash/hotfix/udp-server-queue

Hotfix/udp server queue
This commit is contained in:
Neil Dorin
2021-07-23 15:01:01 -06:00
committed by GitHub

View File

@@ -1,428 +1,393 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using Crestron.SimplSharp; using Crestron.SimplSharp;
using Crestron.SimplSharp.CrestronSockets; using Crestron.SimplSharp.CrestronSockets;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
namespace PepperDash.Core namespace PepperDash.Core
{ {
public class GenericUdpServer : Device, ISocketStatusWithStreamDebugging public class GenericUdpServer : Device, ISocketStatusWithStreamDebugging
{ {
private const string SplusKey = "Uninitialized Udp Server"; private const string SplusKey = "Uninitialized Udp Server";
public CommunicationStreamDebugging StreamDebugging { get; private set; } public CommunicationStreamDebugging StreamDebugging { get; private set; }
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
public event EventHandler<GenericCommMethodReceiveBytesArgs> BytesReceived; public event EventHandler<GenericCommMethodReceiveBytesArgs> BytesReceived;
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
public event EventHandler<GenericCommMethodReceiveTextArgs> TextReceived; public event EventHandler<GenericCommMethodReceiveTextArgs> TextReceived;
/// <summary> /// <summary>
/// This event will fire when a message is dequeued that includes the source IP and Port info if needed to determine the source of the received data. /// This event will fire when a message is dequeued that includes the source IP and Port info if needed to determine the source of the received data.
/// </summary> /// </summary>
public event EventHandler<GenericUdpReceiveTextExtraArgs> DataRecievedExtra; public event EventHandler<GenericUdpReceiveTextExtraArgs> DataRecievedExtra;
/// <summary> /// <summary>
/// Queue to temporarily store received messages with the source IP and Port info ///
/// </summary> /// </summary>
private CrestronQueue<GenericUdpReceiveTextExtraArgs> MessageQueue; public event EventHandler<GenericSocketStatusChageEventArgs> ConnectionChange;
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
public event EventHandler<GenericSocketStatusChageEventArgs> ConnectionChange; public event EventHandler<GenericUdpConnectedEventArgs> UpdateConnectionStatus;
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
public event EventHandler<GenericUdpConnectedEventArgs> UpdateConnectionStatus; public SocketStatus ClientStatus
{
/// <summary> get
/// {
/// </summary> return Server.ServerStatus;
public SocketStatus ClientStatus }
{ }
get
{ /// <summary>
return Server.ServerStatus; ///
} /// </summary>
} public ushort UStatus
{
/// <summary> get { return (ushort)Server.ServerStatus; }
/// }
/// </summary>
public ushort UStatus /// <summary>
{ /// Address of server
get { return (ushort)Server.ServerStatus; } /// </summary>
} public string Hostname { get; set; }
/// <summary>
CCriticalSection DequeueLock; /// IP Address of the sender of the last recieved message
/// <summary> /// </summary>
/// Address of server
/// </summary>
public string Hostname { get; set; } /// <summary>
/// Port on server
/// <summary> /// </summary>
/// IP Address of the sender of the last recieved message public int Port { get; set; }
/// </summary>
/// <summary>
/// Another damn S+ helper because S+ seems to treat large port nums as signed ints
/// <summary> /// which screws up things
/// Port on server /// </summary>
/// </summary> public ushort UPort
public int Port { get; set; } {
get { return Convert.ToUInt16(Port); }
/// <summary> set { Port = Convert.ToInt32(value); }
/// Another damn S+ helper because S+ seems to treat large port nums as signed ints }
/// which screws up things
/// </summary> /// <summary>
public ushort UPort /// Indicates that the UDP Server is enabled
{ /// </summary>
get { return Convert.ToUInt16(Port); } public bool IsConnected
set { Port = Convert.ToInt32(value); } {
} get;
private set;
/// <summary> }
/// Indicates that the UDP Server is enabled
/// </summary> public ushort UIsConnected
public bool IsConnected {
{ get { return IsConnected ? (ushort)1 : (ushort)0; }
get; }
private set;
} /// <summary>
/// Defaults to 2000
public ushort UIsConnected /// </summary>
{ public int BufferSize { get; set; }
get { return IsConnected ? (ushort)1 : (ushort)0; }
} public UDPServer Server { get; private set; }
/// <summary> /// <summary>
/// Defaults to 2000 /// Constructor for S+. Make sure to set key, address, port, and buffersize using init method
/// </summary> /// </summary>
public int BufferSize { get; set; } public GenericUdpServer()
: base(SplusKey)
public UDPServer Server { get; private set; } {
StreamDebugging = new CommunicationStreamDebugging(SplusKey);
/// <summary> BufferSize = 5000;
/// Constructor for S+. Make sure to set key, address, port, and buffersize using init method
/// </summary> CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler);
public GenericUdpServer() CrestronEnvironment.EthernetEventHandler += new EthernetEventHandler(CrestronEnvironment_EthernetEventHandler);
: base(SplusKey) }
{
StreamDebugging = new CommunicationStreamDebugging(SplusKey); /// <summary>
BufferSize = 5000; ///
DequeueLock = new CCriticalSection(); /// </summary>
MessageQueue = new CrestronQueue<GenericUdpReceiveTextExtraArgs>(); /// <param name="key"></param>
/// <param name="address"></param>
CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler); /// <param name="port"></param>
CrestronEnvironment.EthernetEventHandler += new EthernetEventHandler(CrestronEnvironment_EthernetEventHandler); /// <param name="buffefSize"></param>
} public GenericUdpServer(string key, string address, int port, int buffefSize)
: base(key)
/// <summary> {
/// StreamDebugging = new CommunicationStreamDebugging(key);
/// </summary> Hostname = address;
/// <param name="key"></param> Port = port;
/// <param name="address"></param> BufferSize = buffefSize;
/// <param name="port"></param>
/// <param name="buffefSize"></param> CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler);
public GenericUdpServer(string key, string address, int port, int buffefSize) CrestronEnvironment.EthernetEventHandler += new EthernetEventHandler(CrestronEnvironment_EthernetEventHandler);
: base(key) }
{
StreamDebugging = new CommunicationStreamDebugging(key); /// <summary>
Hostname = address; /// Call from S+ to initialize values
Port = port; /// </summary>
BufferSize = buffefSize; /// <param name="key"></param>
/// <param name="address"></param>
DequeueLock = new CCriticalSection(); /// <param name="port"></param>
MessageQueue = new CrestronQueue<GenericUdpReceiveTextExtraArgs>(); public void Initialize(string key, string address, ushort port)
{
CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler); Key = key;
CrestronEnvironment.EthernetEventHandler += new EthernetEventHandler(CrestronEnvironment_EthernetEventHandler); Hostname = address;
} UPort = port;
}
/// <summary>
/// Call from S+ to initialize values /// <summary>
/// </summary> ///
/// <param name="key"></param> /// </summary>
/// <param name="address"></param> /// <param name="ethernetEventArgs"></param>
/// <param name="port"></param> void CrestronEnvironment_EthernetEventHandler(EthernetEventArgs ethernetEventArgs)
public void Initialize(string key, string address, ushort port) {
{ // Re-enable the server if the link comes back up and the status should be connected
Key = key; if (ethernetEventArgs.EthernetEventType == eEthernetEventType.LinkUp
Hostname = address; && IsConnected)
UPort = port; {
} Connect();
}
/// <summary> }
///
/// </summary> /// <summary>
/// <param name="ethernetEventArgs"></param> ///
void CrestronEnvironment_EthernetEventHandler(EthernetEventArgs ethernetEventArgs) /// </summary>
{ /// <param name="programEventType"></param>
// Re-enable the server if the link comes back up and the status should be connected void CrestronEnvironment_ProgramStatusEventHandler(eProgramStatusEventType programEventType)
if (ethernetEventArgs.EthernetEventType == eEthernetEventType.LinkUp {
&& IsConnected) if (programEventType != eProgramStatusEventType.Stopping)
{ return;
Connect();
} Debug.Console(1, this, "Program stopping. Disabling Server");
} Disconnect();
}
/// <summary>
/// /// <summary>
/// </summary> /// Enables the UDP Server
/// <param name="programEventType"></param> /// </summary>
void CrestronEnvironment_ProgramStatusEventHandler(eProgramStatusEventType programEventType) public void Connect()
{ {
if (programEventType == eProgramStatusEventType.Stopping) if (Server == null)
{ {
Debug.Console(1, this, "Program stopping. Disabling Server"); Server = new UDPServer();
Disconnect(); }
}
} if (string.IsNullOrEmpty(Hostname))
{
/// <summary> Debug.Console(1, Debug.ErrorLogLevel.Warning, "GenericUdpServer '{0}': No address set", Key);
/// Enables the UDP Server return;
/// </summary> }
public void Connect() if (Port < 1 || Port > 65535)
{ {
if (Server == null) {
{ Debug.Console(1, Debug.ErrorLogLevel.Warning, "GenericUdpServer '{0}': Invalid port", Key);
Server = new UDPServer(); return;
} }
}
if (string.IsNullOrEmpty(Hostname))
{ var status = Server.EnableUDPServer(Hostname, Port);
Debug.Console(1, Debug.ErrorLogLevel.Warning, "GenericUdpServer '{0}': No address set", Key);
return; Debug.Console(2, this, "SocketErrorCode: {0}", status);
} if (status == SocketErrorCodes.SOCKET_OK)
if (Port < 1 || Port > 65535) IsConnected = true;
{
{ var handler = UpdateConnectionStatus;
Debug.Console(1, Debug.ErrorLogLevel.Warning, "GenericUdpServer '{0}': Invalid port", Key); if (handler != null)
return; handler(this, new GenericUdpConnectedEventArgs(UIsConnected));
}
} // Start receiving data
Server.ReceiveDataAsync(Receive);
var status = Server.EnableUDPServer(Hostname, Port); }
Debug.Console(2, this, "SocketErrorCode: {0}", status); /// <summary>
if (status == SocketErrorCodes.SOCKET_OK) /// Disabled the UDP Server
IsConnected = true; /// </summary>
public void Disconnect()
var handler = UpdateConnectionStatus; {
if (handler != null) if(Server != null)
handler(this, new GenericUdpConnectedEventArgs(UIsConnected)); Server.DisableUDPServer();
// Start receiving data IsConnected = false;
Server.ReceiveDataAsync(Receive);
} var handler = UpdateConnectionStatus;
if (handler != null)
/// <summary> handler(this, new GenericUdpConnectedEventArgs(UIsConnected));
/// Disabled the UDP Server }
/// </summary>
public void Disconnect()
{ /// <summary>
if(Server != null) /// Recursive method to receive data
Server.DisableUDPServer(); /// </summary>
/// <param name="server"></param>
IsConnected = false; /// <param name="numBytes"></param>
void Receive(UDPServer server, int numBytes)
var handler = UpdateConnectionStatus; {
if (handler != null) Debug.Console(2, this, "Received {0} bytes", numBytes);
handler(this, new GenericUdpConnectedEventArgs(UIsConnected));
} try
{
if (numBytes <= 0)
/// <summary> return;
/// Recursive method to receive data
/// </summary> var sourceIp = Server.IPAddressLastMessageReceivedFrom;
/// <param name="server"></param> var sourcePort = Server.IPPortLastMessageReceivedFrom;
/// <param name="numBytes"></param> var bytes = server.IncomingDataBuffer.Take(numBytes).ToArray();
void Receive(UDPServer server, int numBytes) var str = Encoding.GetEncoding(28591).GetString(bytes, 0, bytes.Length);
{
Debug.Console(2, this, "Received {0} bytes", numBytes); var dataRecivedExtra = DataRecievedExtra;
if (dataRecivedExtra != null)
if (numBytes > 0) dataRecivedExtra(this, new GenericUdpReceiveTextExtraArgs(str, sourceIp, sourcePort, bytes));
{
var sourceIp = Server.IPAddressLastMessageReceivedFrom; Debug.Console(2, this, "Bytes: {0}", bytes.ToString());
var sourcePort = Server.IPPortLastMessageReceivedFrom; var bytesHandler = BytesReceived;
var bytes = server.IncomingDataBuffer.Take(numBytes).ToArray(); if (bytesHandler != null)
var str = Encoding.GetEncoding(28591).GetString(bytes, 0, bytes.Length); {
MessageQueue.TryToEnqueue(new GenericUdpReceiveTextExtraArgs(str, sourceIp, sourcePort, bytes)); if (StreamDebugging.RxStreamDebuggingIsEnabled)
{
Debug.Console(2, this, "Bytes: {0}", bytes.ToString()); Debug.Console(0, this, "Received {1} bytes: '{0}'", ComTextHelper.GetEscapedText(bytes), bytes.Length);
var bytesHandler = BytesReceived; }
if (bytesHandler != null) bytesHandler(this, new GenericCommMethodReceiveBytesArgs(bytes));
{ }
if (StreamDebugging.RxStreamDebuggingIsEnabled) var textHandler = TextReceived;
{ if (textHandler != null)
Debug.Console(0, this, "Received {1} bytes: '{0}'", ComTextHelper.GetEscapedText(bytes), bytes.Length); {
} if (StreamDebugging.RxStreamDebuggingIsEnabled)
bytesHandler(this, new GenericCommMethodReceiveBytesArgs(bytes)); Debug.Console(0, this, "Received {1} characters of text: '{0}'", ComTextHelper.GetDebugText(str), str.Length);
} textHandler(this, new GenericCommMethodReceiveTextArgs(str));
var textHandler = TextReceived; }
if (textHandler != null) }
{ catch (Exception ex)
if (StreamDebugging.RxStreamDebuggingIsEnabled) {
Debug.Console(0, this, "Received {1} characters of text: '{0}'", ComTextHelper.GetDebugText(str), str.Length); Debug.Console(0, "GenericUdpServer Receive error: {0}{1}", ex.Message, ex.StackTrace);
}
textHandler(this, new GenericCommMethodReceiveTextArgs(str)); finally
} {
} server.ReceiveDataAsync(Receive);
server.ReceiveDataAsync(Receive); }
}
// Attempt to enter the CCritical Secion and if we can, start the dequeue thread
var gotLock = DequeueLock.TryEnter(); /// <summary>
if (gotLock) /// General send method
CrestronInvoke.BeginInvoke((o) => DequeueEvent()); /// </summary>
} /// <param name="text"></param>
public void SendText(string text)
/// <summary> {
/// This method gets spooled up in its own thread an protected by a CCriticalSection to prevent multiple threads from running concurrently. var bytes = Encoding.GetEncoding(28591).GetBytes(text);
/// It will dequeue items as they are enqueued automatically.
/// </summary> if (IsConnected && Server != null)
void DequeueEvent() {
{ if (StreamDebugging.TxStreamDebuggingIsEnabled)
try Debug.Console(0, this, "Sending {0} characters of text: '{1}'", text.Length, ComTextHelper.GetDebugText(text));
{
while (true) Server.SendData(bytes, bytes.Length);
{ }
// Pull from Queue and fire an event. Block indefinitely until an item can be removed, similar to a Gather. }
var message = MessageQueue.Dequeue();
var dataRecivedExtra = DataRecievedExtra; /// <summary>
if (dataRecivedExtra != null) ///
{ /// </summary>
dataRecivedExtra(this, message); /// <param name="bytes"></param>
} public void SendBytes(byte[] bytes)
} {
} if (StreamDebugging.TxStreamDebuggingIsEnabled)
catch (Exception e) Debug.Console(0, this, "Sending {0} bytes: '{1}'", bytes.Length, ComTextHelper.GetEscapedText(bytes));
{
Debug.Console(0, "GenericUdpServer DequeueEvent error: {0}\r", e); if (IsConnected && Server != null)
} Server.SendData(bytes, bytes.Length);
// Make sure to leave the CCritical section in case an exception above stops this thread, or we won't be able to restart it. }
if (DequeueLock != null)
{ }
DequeueLock.Leave();
} /// <summary>
} ///
/// </summary>
/// <summary> public class GenericUdpReceiveTextExtraArgs : EventArgs
/// General send method {
/// </summary> /// <summary>
/// <param name="text"></param> ///
public void SendText(string text) /// </summary>
{ public string Text { get; private set; }
var bytes = Encoding.GetEncoding(28591).GetBytes(text); /// <summary>
///
if (IsConnected && Server != null) /// </summary>
{ public string IpAddress { get; private set; }
if (StreamDebugging.TxStreamDebuggingIsEnabled) /// <summary>
Debug.Console(0, this, "Sending {0} characters of text: '{1}'", text.Length, ComTextHelper.GetDebugText(text)); ///
/// </summary>
Server.SendData(bytes, bytes.Length); public int Port { get; private set; }
} /// <summary>
} ///
/// </summary>
/// <summary> public byte[] Bytes { get; private set; }
///
/// </summary> /// <summary>
/// <param name="bytes"></param> ///
public void SendBytes(byte[] bytes) /// </summary>
{ /// <param name="text"></param>
if (StreamDebugging.TxStreamDebuggingIsEnabled) /// <param name="ipAddress"></param>
Debug.Console(0, this, "Sending {0} bytes: '{1}'", bytes.Length, ComTextHelper.GetEscapedText(bytes)); /// <param name="port"></param>
/// <param name="bytes"></param>
if (IsConnected && Server != null) public GenericUdpReceiveTextExtraArgs(string text, string ipAddress, int port, byte[] bytes)
Server.SendData(bytes, bytes.Length); {
} Text = text;
IpAddress = ipAddress;
} Port = port;
Bytes = bytes;
/// <summary> }
///
/// </summary> /// <summary>
public class GenericUdpReceiveTextExtraArgs : EventArgs /// Stupid S+ Constructor
{ /// </summary>
/// <summary> public GenericUdpReceiveTextExtraArgs() { }
/// }
/// </summary>
public string Text { get; private set; } /// <summary>
/// <summary> ///
/// /// </summary>
/// </summary> public class UdpServerPropertiesConfig
public string IpAddress { get; private set; } {
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
public int Port { get; private set; } [JsonProperty(Required = Required.Always)]
/// <summary> public string Address { get; set; }
///
/// </summary> /// <summary>
public byte[] Bytes { get; private set; } ///
/// </summary>
/// <summary> [JsonProperty(Required = Required.Always)]
/// public int Port { get; set; }
/// </summary>
/// <param name="text"></param> /// <summary>
/// <param name="ipAddress"></param> /// Defaults to 32768
/// <param name="port"></param> /// </summary>
/// <param name="bytes"></param> public int BufferSize { get; set; }
public GenericUdpReceiveTextExtraArgs(string text, string ipAddress, int port, byte[] bytes)
{ /// <summary>
Text = text; ///
IpAddress = ipAddress; /// </summary>
Port = port; public UdpServerPropertiesConfig()
Bytes = bytes; {
} BufferSize = 32768;
}
/// <summary> }
/// Stupid S+ Constructor
/// </summary>
public GenericUdpReceiveTextExtraArgs() { }
}
/// <summary>
///
/// </summary>
public class UdpServerPropertiesConfig
{
/// <summary>
///
/// </summary>
[JsonProperty(Required = Required.Always)]
public string Address { get; set; }
/// <summary>
///
/// </summary>
[JsonProperty(Required = Required.Always)]
public int Port { get; set; }
/// <summary>
/// Defaults to 32768
/// </summary>
public int BufferSize { get; set; }
/// <summary>
///
/// </summary>
public UdpServerPropertiesConfig()
{
BufferSize = 32768;
}
}
} }