diff --git a/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpClient_ForServer.cs b/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpClient_ForServer.cs index df61965..6551f40 100644 --- a/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpClient_ForServer.cs +++ b/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpClient_ForServer.cs @@ -17,6 +17,7 @@ using System.Text; using System.Text.RegularExpressions; using Crestron.SimplSharp; using Crestron.SimplSharp.CrestronSockets; +using PepperDash_Core.Comm; namespace PepperDash.Core { @@ -33,6 +34,13 @@ namespace PepperDash.Core public event EventHandler TextReceived; + /// + /// Event for Receiving text. Once subscribed to this event the receive callback will start a thread that dequeues the messages and invokes the event on a new thread. + /// It is not recommended to use both the TextReceived event and the TextReceivedQueueInvoke event. + /// + public event EventHandler TextReceivedQueueInvoke; + + public event EventHandler ConnectionChange; @@ -209,8 +217,20 @@ namespace PepperDash.Core get { return (ushort)(HeartbeatEnabled ? 1 : 0); } set { HeartbeatEnabled = value == 1; } } - public string HeartbeatString = "heartbeat"; - public int HeartbeatInterval = 50000; + + public string HeartbeatString { get; set; } + //public int HeartbeatInterval = 50000; + + /// + /// Milliseconds before server expects another heartbeat. Set by property HeartbeatRequiredIntervalInSeconds which is driven from S+ + /// + public int HeartbeatInterval { get; set; } + + /// + /// Simpl+ Heartbeat Analog value in seconds + /// + public ushort HeartbeatRequiredIntervalInSeconds { set { HeartbeatInterval = (value * 1000); } } + CTimer HeartbeatSendTimer; CTimer HeartbeatAckTimer; /// @@ -226,6 +246,24 @@ namespace PepperDash.Core bool ProgramIsStopping; + /// + /// Queue lock + /// + CCriticalSection DequeueLock = new CCriticalSection(); + + /// + /// Receive Queue size. Defaults to 20. Will set to 20 if QueueSize property is less than 20. Use constructor or set queue size property before + /// calling initialize. + /// + public int ReceiveQueueSize { get; set; } + + /// + /// Queue to temporarily store received messages with the source IP and Port info. Defaults to size 20. Use constructor or set queue size property before + /// calling initialize. + /// + private CrestronQueue MessageQueue; + + #endregion #region Constructors @@ -244,12 +282,24 @@ namespace PepperDash.Core //base class constructor public GenericSecureTcpIpClient_ForServer() - : base("Uninitialized DynamicTcpClient") + : base("Uninitialized Secure Tcp Client For Server") { CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler); AutoReconnectIntervalMs = 5000; BufferSize = 2000; } + + /// + /// Contstructor that sets all properties by calling the initialize method with a config object. + /// + /// + public GenericSecureTcpIpClient_ForServer(string key, TcpClientConfigObject clientConfigObject) + : base(key) + { + CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler); + Initialize(clientConfigObject); + } + #endregion #region Methods @@ -262,6 +312,36 @@ namespace PepperDash.Core Key = key; } + public void Initialize(TcpClientConfigObject clientConfigObject) + { + try + { + if (clientConfigObject != null) + { + Hostname = clientConfigObject.Address; + AutoReconnect = clientConfigObject.AutoReconnect; + AutoReconnectIntervalMs = clientConfigObject.AutoReconnectIntervalMs > 1000 ? clientConfigObject.AutoReconnectIntervalMs : 5000; + SharedKey = clientConfigObject.SharedKey; + SharedKeyRequired = clientConfigObject.SharedKeyRequired; + HeartbeatEnabled = clientConfigObject.HeartbeatRequired; + HeartbeatRequiredIntervalInSeconds = clientConfigObject.HeartbeatRequiredIntervalInSeconds > 0 ? clientConfigObject.HeartbeatRequiredIntervalInSeconds : (ushort)15; + HeartbeatString = string.IsNullOrEmpty(clientConfigObject.HeartbeatStringToMatch) ? "heartbeat" : clientConfigObject.HeartbeatStringToMatch; + Port = clientConfigObject.Port; + BufferSize = clientConfigObject.BufferSize > 2000 ? clientConfigObject.BufferSize : 2000; + ReceiveQueueSize = clientConfigObject.ReceiveQueueSize > 20 ? clientConfigObject.ReceiveQueueSize : 20; + MessageQueue = new CrestronQueue(ReceiveQueueSize); + } + else + { + ErrorLog.Error("Could not initialize client with key: {0}", Key); + } + } + catch + { + ErrorLog.Error("Could not initialize client with key: {0}", Key); + } + } + /// /// Handles closing this up when the program shuts down /// @@ -328,7 +408,7 @@ namespace PepperDash.Core Client = new SecureTCPClient(Hostname, Port, BufferSize); Client.SocketStatusChange += Client_SocketStatusChange; - if(HeartbeatEnabled) + if (HeartbeatEnabled) Client.SocketSendOrReceiveTimeOutInMs = (HeartbeatInterval * 5); Client.AddressClientConnectedTo = Hostname; Client.PortNumber = Port; @@ -487,7 +567,7 @@ namespace PepperDash.Core if (numBytes > 0) { string str = string.Empty; - + var handler = TextReceivedQueueInvoke; try { var bytes = client.IncomingDataBuffer.Take(numBytes).ToArray(); @@ -517,6 +597,10 @@ namespace PepperDash.Core var textHandler = TextReceived; if (textHandler != null) textHandler(this, new GenericTcpServerCommMethodReceiveTextArgs(str)); + if (handler != null) + { + MessageQueue.TryToEnqueue(new GenericTcpServerCommMethodReceiveTextArgs(str)); + } } } } @@ -524,9 +608,51 @@ namespace PepperDash.Core { Debug.Console(1, this, "Error receiving data: {1}. Error: {0}", ex.Message, str); } + if (client.ClientStatus == SocketStatus.SOCKET_STATUS_CONNECTED) + client.ReceiveDataAsync(Receive); + + //Check to see if there is a subscription to the TextReceivedQueueInvoke event. If there is start the dequeue thread. + if (handler != null) + { + var gotLock = DequeueLock.TryEnter(); + if (gotLock) + CrestronInvoke.BeginInvoke((o) => DequeueEvent()); + } + } + else //JAG added this as I believe the error return is 0 bytes like the server. See help when hover on ReceiveAsync + { + client.DisconnectFromServer(); + } + } + + /// + /// This method gets spooled up in its own thread an protected by a CCriticalSection to prevent multiple threads from running concurrently. + /// It will dequeue items as they are enqueued automatically. + /// + void DequeueEvent() + { + try + { + while (true) + { + // Pull from Queue and fire an event. Block indefinitely until an item can be removed, similar to a Gather. + var message = MessageQueue.Dequeue(); + var handler = TextReceivedQueueInvoke; + if (handler != null) + { + handler(this, message); + } + } + } + catch (Exception e) + { + Debug.Console(0, "GenericUdpServer DequeueEvent error: {0}\r", e); + } + // Make sure to leave the CCritical section in case an exception above stops this thread, or we won't be able to restart it. + if (DequeueLock != null) + { + DequeueLock.Leave(); } - if (client.ClientStatus == SocketStatus.SOCKET_STATUS_CONNECTED) - client.ReceiveDataAsync(Receive); } void HeartbeatStart() diff --git a/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpServer.cs b/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpServer.cs index 5554764..13b3cef 100644 --- a/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpServer.cs +++ b/Pepperdash Core/Pepperdash Core/Comm/GenericSecureTcpIpServer.cs @@ -28,6 +28,12 @@ namespace PepperDash.Core /// public event EventHandler TextReceived; + /// + /// Event for Receiving text. Once subscribed to this event the receive callback will start a thread that dequeues the messages and invokes the event on a new thread. + /// It is not recommended to use both the TextReceived event and the TextReceivedQueueInvoke event. + /// + public event EventHandler TextReceivedQueueInvoke; + /// /// Event for client connection socket status change /// @@ -56,10 +62,26 @@ namespace PepperDash.Core #region Properties/Variables /// - /// + /// Server listen lock /// CCriticalSection ServerCCSection = new CCriticalSection(); + /// + /// Queue lock + /// + CCriticalSection DequeueLock = new CCriticalSection(); + + /// + /// Receive Queue size. Defaults to 20. Will set to 20 if QueueSize property is less than 20. Use constructor or set queue size property before + /// calling initialize. + /// + public int ReceiveQueueSize { get; set; } + + /// + /// Queue to temporarily store received messages with the source IP and Port info. Defaults to size 20. Use constructor or set queue size property before + /// calling initialize. + /// + private CrestronQueue MessageQueue; /// /// A bandaid client that monitors whether the server is reachable @@ -269,7 +291,7 @@ namespace PepperDash.Core /// constructor S+ Does not accept a key. Use initialze with key to set the debug key on this device. If using with + make sure to set all properties manually. /// public GenericSecureTcpIpServer() - : base("Uninitialized Dynamic TCP Server") + : base("Uninitialized Secure TCP Server") { HeartbeatRequiredIntervalInSeconds = 15; CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler); @@ -282,7 +304,7 @@ namespace PepperDash.Core /// /// public GenericSecureTcpIpServer(string key) - : base("Uninitialized Dynamic TCP Server") + : base("Uninitialized Secure TCP Server") { HeartbeatRequiredIntervalInSeconds = 15; CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler); @@ -292,11 +314,11 @@ namespace PepperDash.Core } /// - /// Contstructor that sets all properties by calling the initialize method with a config object. + /// Contstructor that sets all properties by calling the initialize method with a config object. This does set Queue size. /// /// public GenericSecureTcpIpServer(TcpServerConfigObject serverConfigObject) - : base("Uninitialized Dynamic TCP Server") + : base("Uninitialized Secure TCP Server") { HeartbeatRequiredIntervalInSeconds = 15; CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler); @@ -345,7 +367,8 @@ namespace PepperDash.Core HeartbeatRequiredIntervalInSeconds = serverConfigObject.HeartbeatRequiredIntervalInSeconds; HeartbeatStringToMatch = serverConfigObject.HeartbeatStringToMatch; BufferSize = serverConfigObject.BufferSize; - + ReceiveQueueSize = serverConfigObject.ReceiveQueueSize > 20 ? serverConfigObject.ReceiveQueueSize : 20; + MessageQueue = new CrestronQueue(ReceiveQueueSize); } else { @@ -770,6 +793,7 @@ namespace PepperDash.Core if (numberOfBytesReceived > 0) { string received = "Nothing"; + var handler = TextReceivedQueueInvoke; try { byte[] bytes = mySecureTCPServer.GetIncomingDataBufferForSpecificClient(clientIndex); @@ -792,11 +816,16 @@ namespace PepperDash.Core byte[] success = Encoding.GetEncoding(28591).GetBytes("Shared Key Match"); mySecureTCPServer.SendDataAsync(clientIndex, success, success.Length, null); OnServerClientReadyForCommunications(clientIndex); - Debug.Console(1, this, Debug.ErrorLogLevel.Notice, "Client with index {0} provided the shared key and successfully connected to the server", clientIndex); - + Debug.Console(1, this, Debug.ErrorLogLevel.Notice, "Client with index {0} provided the shared key and successfully connected to the server", clientIndex); + } + else if (!string.IsNullOrEmpty(checkHeartbeat(clientIndex, received))) + { + onTextReceived(received, clientIndex); + if (handler != null) + { + MessageQueue.TryToEnqueue(new GenericTcpServerCommMethodReceiveTextArgs(received, clientIndex)); + } } - else if (!string.IsNullOrEmpty(checkHeartbeat(clientIndex, received))) - onTextReceived(received, clientIndex); } catch (Exception ex) { @@ -804,13 +833,49 @@ namespace PepperDash.Core } if (mySecureTCPServer.GetServerSocketStatusForSpecificClient(clientIndex) == SocketStatus.SOCKET_STATUS_CONNECTED) mySecureTCPServer.ReceiveDataAsync(clientIndex, SecureReceivedDataAsyncCallback); + + //Check to see if there is a subscription to the TextReceivedQueueInvoke event. If there is start the dequeue thread. + if (handler != null) + { + var gotLock = DequeueLock.TryEnter(); + if (gotLock) + CrestronInvoke.BeginInvoke((o) => DequeueEvent()); + } } else { - // If numberOfBytesReceived <= 0 mySecureTCPServer.Disconnect(clientIndex); - } + } + } + /// + /// This method gets spooled up in its own thread an protected by a CCriticalSection to prevent multiple threads from running concurrently. + /// It will dequeue items as they are enqueued automatically. + /// + void DequeueEvent() + { + try + { + while (true) + { + // Pull from Queue and fire an event. Block indefinitely until an item can be removed, similar to a Gather. + var message = MessageQueue.Dequeue(); + var handler = TextReceivedQueueInvoke; + if (handler != null) + { + handler(this, message); + } + } + } + catch (Exception e) + { + Debug.Console(0, "GenericUdpServer DequeueEvent error: {0}\r", e); + } + // Make sure to leave the CCritical section in case an exception above stops this thread, or we won't be able to restart it. + if (DequeueLock != null) + { + DequeueLock.Leave(); + } } #endregion diff --git a/Pepperdash Core/Pepperdash Core/Comm/TcpClientConfigObject.cs b/Pepperdash Core/Pepperdash Core/Comm/TcpClientConfigObject.cs new file mode 100644 index 0000000..c0c7f45 --- /dev/null +++ b/Pepperdash Core/Pepperdash Core/Comm/TcpClientConfigObject.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Crestron.SimplSharp; +using PepperDash.Core; +using Newtonsoft.Json; + +namespace PepperDash_Core.Comm +{ + public class TcpClientConfigObject : TcpSshPropertiesConfig + { + public bool Secure { get; set; } + public bool SharedKeyRequired { get; set; } + public string SharedKey { get; set; } + public bool HeartbeatRequired { get; set; } + public ushort HeartbeatRequiredIntervalInSeconds { get; set; } + public string HeartbeatStringToMatch { get; set; } + public int ReceiveQueueSize { get; set; } + } +} \ No newline at end of file diff --git a/Pepperdash Core/Pepperdash Core/Comm/TcpServerConfigObject.cs b/Pepperdash Core/Pepperdash Core/Comm/TcpServerConfigObject.cs index c7c1998..af455f4 100644 --- a/Pepperdash Core/Pepperdash Core/Comm/TcpServerConfigObject.cs +++ b/Pepperdash Core/Pepperdash Core/Comm/TcpServerConfigObject.cs @@ -9,8 +9,8 @@ namespace PepperDash.Core public class TcpServerConfigObject { public string Key { get; set; } - public bool Secure { get; set; } public ushort MaxClients { get; set; } + public bool Secure { get; set; } public int Port { get; set; } public bool SharedKeyRequired { get; set; } public string SharedKey { get; set; } @@ -18,5 +18,6 @@ namespace PepperDash.Core public ushort HeartbeatRequiredIntervalInSeconds { get; set; } public string HeartbeatStringToMatch { get; set; } public int BufferSize { get; set; } + public int ReceiveQueueSize { get; set; } } } \ No newline at end of file diff --git a/Pepperdash Core/Pepperdash Core/PepperDash_Core.csproj b/Pepperdash Core/Pepperdash Core/PepperDash_Core.csproj index 8a033a2..c0a4f03 100644 --- a/Pepperdash Core/Pepperdash Core/PepperDash_Core.csproj +++ b/Pepperdash Core/Pepperdash Core/PepperDash_Core.csproj @@ -80,6 +80,7 @@ +