Added the client config object, and added the receive queue and thread with event to the Secure TCP server and client. Will need to duplicate to the unsecure at some point after testing.

This commit is contained in:
Joshua Gutenplan
2019-06-13 19:17:09 -07:00
parent fcc1ef3d1d
commit 214e1d215c
5 changed files with 234 additions and 20 deletions

View File

@@ -17,6 +17,7 @@ using System.Text;
using System.Text.RegularExpressions;
using Crestron.SimplSharp;
using Crestron.SimplSharp.CrestronSockets;
using PepperDash_Core.Comm;
namespace PepperDash.Core
{
@@ -33,6 +34,13 @@ namespace PepperDash.Core
public event EventHandler<GenericTcpServerCommMethodReceiveTextArgs> TextReceived;
/// <summary>
/// Event for Receiving text. Once subscribed to this event the receive callback will start a thread that dequeues the messages and invokes the event on a new thread.
/// It is not recommended to use both the TextReceived event and the TextReceivedQueueInvoke event.
/// </summary>
public event EventHandler<GenericTcpServerCommMethodReceiveTextArgs> TextReceivedQueueInvoke;
public event EventHandler<GenericTcpServerSocketStatusChangeEventArgs> ConnectionChange;
@@ -209,8 +217,20 @@ namespace PepperDash.Core
get { return (ushort)(HeartbeatEnabled ? 1 : 0); }
set { HeartbeatEnabled = value == 1; }
}
public string HeartbeatString = "heartbeat";
public int HeartbeatInterval = 50000;
public string HeartbeatString { get; set; }
//public int HeartbeatInterval = 50000;
/// <summary>
/// Milliseconds before server expects another heartbeat. Set by property HeartbeatRequiredIntervalInSeconds which is driven from S+
/// </summary>
public int HeartbeatInterval { get; set; }
/// <summary>
/// Simpl+ Heartbeat Analog value in seconds
/// </summary>
public ushort HeartbeatRequiredIntervalInSeconds { set { HeartbeatInterval = (value * 1000); } }
CTimer HeartbeatSendTimer;
CTimer HeartbeatAckTimer;
/// <summary>
@@ -226,6 +246,24 @@ namespace PepperDash.Core
bool ProgramIsStopping;
/// <summary>
/// Queue lock
/// </summary>
CCriticalSection DequeueLock = new CCriticalSection();
/// <summary>
/// Receive Queue size. Defaults to 20. Will set to 20 if QueueSize property is less than 20. Use constructor or set queue size property before
/// calling initialize.
/// </summary>
public int ReceiveQueueSize { get; set; }
/// <summary>
/// Queue to temporarily store received messages with the source IP and Port info. Defaults to size 20. Use constructor or set queue size property before
/// calling initialize.
/// </summary>
private CrestronQueue<GenericTcpServerCommMethodReceiveTextArgs> MessageQueue;
#endregion
#region Constructors
@@ -244,12 +282,24 @@ namespace PepperDash.Core
//base class constructor
public GenericSecureTcpIpClient_ForServer()
: base("Uninitialized DynamicTcpClient")
: base("Uninitialized Secure Tcp Client For Server")
{
CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler);
AutoReconnectIntervalMs = 5000;
BufferSize = 2000;
}
/// <summary>
/// Contstructor that sets all properties by calling the initialize method with a config object.
/// </summary>
/// <param name="serverConfigObject"></param>
public GenericSecureTcpIpClient_ForServer(string key, TcpClientConfigObject clientConfigObject)
: base(key)
{
CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler);
Initialize(clientConfigObject);
}
#endregion
#region Methods
@@ -262,6 +312,36 @@ namespace PepperDash.Core
Key = key;
}
public void Initialize(TcpClientConfigObject clientConfigObject)
{
try
{
if (clientConfigObject != null)
{
Hostname = clientConfigObject.Address;
AutoReconnect = clientConfigObject.AutoReconnect;
AutoReconnectIntervalMs = clientConfigObject.AutoReconnectIntervalMs > 1000 ? clientConfigObject.AutoReconnectIntervalMs : 5000;
SharedKey = clientConfigObject.SharedKey;
SharedKeyRequired = clientConfigObject.SharedKeyRequired;
HeartbeatEnabled = clientConfigObject.HeartbeatRequired;
HeartbeatRequiredIntervalInSeconds = clientConfigObject.HeartbeatRequiredIntervalInSeconds > 0 ? clientConfigObject.HeartbeatRequiredIntervalInSeconds : (ushort)15;
HeartbeatString = string.IsNullOrEmpty(clientConfigObject.HeartbeatStringToMatch) ? "heartbeat" : clientConfigObject.HeartbeatStringToMatch;
Port = clientConfigObject.Port;
BufferSize = clientConfigObject.BufferSize > 2000 ? clientConfigObject.BufferSize : 2000;
ReceiveQueueSize = clientConfigObject.ReceiveQueueSize > 20 ? clientConfigObject.ReceiveQueueSize : 20;
MessageQueue = new CrestronQueue<GenericTcpServerCommMethodReceiveTextArgs>(ReceiveQueueSize);
}
else
{
ErrorLog.Error("Could not initialize client with key: {0}", Key);
}
}
catch
{
ErrorLog.Error("Could not initialize client with key: {0}", Key);
}
}
/// <summary>
/// Handles closing this up when the program shuts down
/// </summary>
@@ -328,7 +408,7 @@ namespace PepperDash.Core
Client = new SecureTCPClient(Hostname, Port, BufferSize);
Client.SocketStatusChange += Client_SocketStatusChange;
if(HeartbeatEnabled)
if (HeartbeatEnabled)
Client.SocketSendOrReceiveTimeOutInMs = (HeartbeatInterval * 5);
Client.AddressClientConnectedTo = Hostname;
Client.PortNumber = Port;
@@ -487,7 +567,7 @@ namespace PepperDash.Core
if (numBytes > 0)
{
string str = string.Empty;
var handler = TextReceivedQueueInvoke;
try
{
var bytes = client.IncomingDataBuffer.Take(numBytes).ToArray();
@@ -517,6 +597,10 @@ namespace PepperDash.Core
var textHandler = TextReceived;
if (textHandler != null)
textHandler(this, new GenericTcpServerCommMethodReceiveTextArgs(str));
if (handler != null)
{
MessageQueue.TryToEnqueue(new GenericTcpServerCommMethodReceiveTextArgs(str));
}
}
}
}
@@ -524,9 +608,51 @@ namespace PepperDash.Core
{
Debug.Console(1, this, "Error receiving data: {1}. Error: {0}", ex.Message, str);
}
if (client.ClientStatus == SocketStatus.SOCKET_STATUS_CONNECTED)
client.ReceiveDataAsync(Receive);
//Check to see if there is a subscription to the TextReceivedQueueInvoke event. If there is start the dequeue thread.
if (handler != null)
{
var gotLock = DequeueLock.TryEnter();
if (gotLock)
CrestronInvoke.BeginInvoke((o) => DequeueEvent());
}
}
else //JAG added this as I believe the error return is 0 bytes like the server. See help when hover on ReceiveAsync
{
client.DisconnectFromServer();
}
}
/// <summary>
/// This method gets spooled up in its own thread an protected by a CCriticalSection to prevent multiple threads from running concurrently.
/// It will dequeue items as they are enqueued automatically.
/// </summary>
void DequeueEvent()
{
try
{
while (true)
{
// Pull from Queue and fire an event. Block indefinitely until an item can be removed, similar to a Gather.
var message = MessageQueue.Dequeue();
var handler = TextReceivedQueueInvoke;
if (handler != null)
{
handler(this, message);
}
}
}
catch (Exception e)
{
Debug.Console(0, "GenericUdpServer DequeueEvent error: {0}\r", e);
}
// Make sure to leave the CCritical section in case an exception above stops this thread, or we won't be able to restart it.
if (DequeueLock != null)
{
DequeueLock.Leave();
}
if (client.ClientStatus == SocketStatus.SOCKET_STATUS_CONNECTED)
client.ReceiveDataAsync(Receive);
}
void HeartbeatStart()

View File

@@ -28,6 +28,12 @@ namespace PepperDash.Core
/// </summary>
public event EventHandler<GenericTcpServerCommMethodReceiveTextArgs> TextReceived;
/// <summary>
/// Event for Receiving text. Once subscribed to this event the receive callback will start a thread that dequeues the messages and invokes the event on a new thread.
/// It is not recommended to use both the TextReceived event and the TextReceivedQueueInvoke event.
/// </summary>
public event EventHandler<GenericTcpServerCommMethodReceiveTextArgs> TextReceivedQueueInvoke;
/// <summary>
/// Event for client connection socket status change
/// </summary>
@@ -56,10 +62,26 @@ namespace PepperDash.Core
#region Properties/Variables
/// <summary>
///
/// Server listen lock
/// </summary>
CCriticalSection ServerCCSection = new CCriticalSection();
/// <summary>
/// Queue lock
/// </summary>
CCriticalSection DequeueLock = new CCriticalSection();
/// <summary>
/// Receive Queue size. Defaults to 20. Will set to 20 if QueueSize property is less than 20. Use constructor or set queue size property before
/// calling initialize.
/// </summary>
public int ReceiveQueueSize { get; set; }
/// <summary>
/// Queue to temporarily store received messages with the source IP and Port info. Defaults to size 20. Use constructor or set queue size property before
/// calling initialize.
/// </summary>
private CrestronQueue<GenericTcpServerCommMethodReceiveTextArgs> MessageQueue;
/// <summary>
/// A bandaid client that monitors whether the server is reachable
@@ -269,7 +291,7 @@ namespace PepperDash.Core
/// constructor S+ Does not accept a key. Use initialze with key to set the debug key on this device. If using with + make sure to set all properties manually.
/// </summary>
public GenericSecureTcpIpServer()
: base("Uninitialized Dynamic TCP Server")
: base("Uninitialized Secure TCP Server")
{
HeartbeatRequiredIntervalInSeconds = 15;
CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler);
@@ -282,7 +304,7 @@ namespace PepperDash.Core
/// </summary>
/// <param name="key"></param>
public GenericSecureTcpIpServer(string key)
: base("Uninitialized Dynamic TCP Server")
: base("Uninitialized Secure TCP Server")
{
HeartbeatRequiredIntervalInSeconds = 15;
CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler);
@@ -292,11 +314,11 @@ namespace PepperDash.Core
}
/// <summary>
/// Contstructor that sets all properties by calling the initialize method with a config object.
/// Contstructor that sets all properties by calling the initialize method with a config object. This does set Queue size.
/// </summary>
/// <param name="serverConfigObject"></param>
public GenericSecureTcpIpServer(TcpServerConfigObject serverConfigObject)
: base("Uninitialized Dynamic TCP Server")
: base("Uninitialized Secure TCP Server")
{
HeartbeatRequiredIntervalInSeconds = 15;
CrestronEnvironment.ProgramStatusEventHandler += new ProgramStatusEventHandler(CrestronEnvironment_ProgramStatusEventHandler);
@@ -345,7 +367,8 @@ namespace PepperDash.Core
HeartbeatRequiredIntervalInSeconds = serverConfigObject.HeartbeatRequiredIntervalInSeconds;
HeartbeatStringToMatch = serverConfigObject.HeartbeatStringToMatch;
BufferSize = serverConfigObject.BufferSize;
ReceiveQueueSize = serverConfigObject.ReceiveQueueSize > 20 ? serverConfigObject.ReceiveQueueSize : 20;
MessageQueue = new CrestronQueue<GenericTcpServerCommMethodReceiveTextArgs>(ReceiveQueueSize);
}
else
{
@@ -770,6 +793,7 @@ namespace PepperDash.Core
if (numberOfBytesReceived > 0)
{
string received = "Nothing";
var handler = TextReceivedQueueInvoke;
try
{
byte[] bytes = mySecureTCPServer.GetIncomingDataBufferForSpecificClient(clientIndex);
@@ -792,11 +816,16 @@ namespace PepperDash.Core
byte[] success = Encoding.GetEncoding(28591).GetBytes("Shared Key Match");
mySecureTCPServer.SendDataAsync(clientIndex, success, success.Length, null);
OnServerClientReadyForCommunications(clientIndex);
Debug.Console(1, this, Debug.ErrorLogLevel.Notice, "Client with index {0} provided the shared key and successfully connected to the server", clientIndex);
Debug.Console(1, this, Debug.ErrorLogLevel.Notice, "Client with index {0} provided the shared key and successfully connected to the server", clientIndex);
}
else if (!string.IsNullOrEmpty(checkHeartbeat(clientIndex, received)))
{
onTextReceived(received, clientIndex);
if (handler != null)
{
MessageQueue.TryToEnqueue(new GenericTcpServerCommMethodReceiveTextArgs(received, clientIndex));
}
}
else if (!string.IsNullOrEmpty(checkHeartbeat(clientIndex, received)))
onTextReceived(received, clientIndex);
}
catch (Exception ex)
{
@@ -804,13 +833,49 @@ namespace PepperDash.Core
}
if (mySecureTCPServer.GetServerSocketStatusForSpecificClient(clientIndex) == SocketStatus.SOCKET_STATUS_CONNECTED)
mySecureTCPServer.ReceiveDataAsync(clientIndex, SecureReceivedDataAsyncCallback);
//Check to see if there is a subscription to the TextReceivedQueueInvoke event. If there is start the dequeue thread.
if (handler != null)
{
var gotLock = DequeueLock.TryEnter();
if (gotLock)
CrestronInvoke.BeginInvoke((o) => DequeueEvent());
}
}
else
{
// If numberOfBytesReceived <= 0
mySecureTCPServer.Disconnect(clientIndex);
}
}
}
/// <summary>
/// This method gets spooled up in its own thread an protected by a CCriticalSection to prevent multiple threads from running concurrently.
/// It will dequeue items as they are enqueued automatically.
/// </summary>
void DequeueEvent()
{
try
{
while (true)
{
// Pull from Queue and fire an event. Block indefinitely until an item can be removed, similar to a Gather.
var message = MessageQueue.Dequeue();
var handler = TextReceivedQueueInvoke;
if (handler != null)
{
handler(this, message);
}
}
}
catch (Exception e)
{
Debug.Console(0, "GenericUdpServer DequeueEvent error: {0}\r", e);
}
// Make sure to leave the CCritical section in case an exception above stops this thread, or we won't be able to restart it.
if (DequeueLock != null)
{
DequeueLock.Leave();
}
}
#endregion

View File

@@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Crestron.SimplSharp;
using PepperDash.Core;
using Newtonsoft.Json;
namespace PepperDash_Core.Comm
{
public class TcpClientConfigObject : TcpSshPropertiesConfig
{
public bool Secure { get; set; }
public bool SharedKeyRequired { get; set; }
public string SharedKey { get; set; }
public bool HeartbeatRequired { get; set; }
public ushort HeartbeatRequiredIntervalInSeconds { get; set; }
public string HeartbeatStringToMatch { get; set; }
public int ReceiveQueueSize { get; set; }
}
}

View File

@@ -9,8 +9,8 @@ namespace PepperDash.Core
public class TcpServerConfigObject
{
public string Key { get; set; }
public bool Secure { get; set; }
public ushort MaxClients { get; set; }
public bool Secure { get; set; }
public int Port { get; set; }
public bool SharedKeyRequired { get; set; }
public string SharedKey { get; set; }
@@ -18,5 +18,6 @@ namespace PepperDash.Core
public ushort HeartbeatRequiredIntervalInSeconds { get; set; }
public string HeartbeatStringToMatch { get; set; }
public int BufferSize { get; set; }
public int ReceiveQueueSize { get; set; }
}
}

View File

@@ -80,6 +80,7 @@
<Compile Include="Comm\EventArgs.cs" />
<Compile Include="Comm\GenericSshClient.cs" />
<Compile Include="Comm\GenericUdpServer.cs" />
<Compile Include="Comm\TcpClientConfigObject.cs" />
<Compile Include="Comm\TcpServerConfigObject.cs" />
<Compile Include="Config\PortalConfigReader.cs" />
<Compile Include="CoreInterfaces.cs" />