added IQueueMessage and two implementations; GenericQueue now accepts type IQueueMessage

This commit is contained in:
Nick Genovese
2020-09-03 10:57:06 -04:00
parent 4f562e0d8e
commit b9fff95215
10 changed files with 146 additions and 164 deletions

View File

@@ -179,7 +179,7 @@ namespace PepperDash.Essentials.Core
{
public static eDeviceRegistrationUnRegistrationResponse RegisterWithLogging(this GenericBase device, string key)
{
var result = device.Register();
var result = device.Register();
var level = result == eDeviceRegistrationUnRegistrationResponse.Success ?
Debug.ErrorLogLevel.Notice : Debug.ErrorLogLevel.Error;
Debug.Console(0, level, "Register device result: '{0}', type '{1}', result {2}", key, device, result);

View File

@@ -158,7 +158,6 @@
<Compile Include="Bridges\JoinMaps\SetTopBoxControllerJoinMap.cs" />
<Compile Include="Bridges\JoinMaps\StatusSignControllerJoinMap.cs" />
<Compile Include="Bridges\JoinMaps\SystemMonitorJoinMap.cs" />
<Compile Include="Queues\BytesQueue.cs" />
<Compile Include="Comm and IR\CecPortController.cs" />
<Compile Include="Comm and IR\CommFactory.cs" />
<Compile Include="Comm and IR\CommunicationExtras.cs" />
@@ -223,6 +222,8 @@
<Compile Include="Fusion\FusionRviDataClasses.cs" />
<Compile Include="Gateways\CenRfgwController.cs" />
<Compile Include="Gateways\EssentialsRfGatewayConfig.cs" />
<Compile Include="Queues\ComsMessage.cs" />
<Compile Include="Queues\ProcessStringMessage.cs" />
<Compile Include="Queues\GenericQueue.cs" />
<Compile Include="Global\JobTimer.cs" />
<Compile Include="Global\Scheduler.cs" />
@@ -241,6 +242,7 @@
<Compile Include="Plugins\PluginLoader.cs" />
<Compile Include="Presets\PresetBase.cs" />
<Compile Include="Plugins\IPluginDeviceFactory.cs" />
<Compile Include="Queues\IQueueMessage.cs" />
<Compile Include="Ramps and Increments\ActionIncrementer.cs" />
<Compile Include="Config\BasicConfig.cs" />
<Compile Include="Config\ConfigPropertiesHelpers.cs" />
@@ -316,7 +318,6 @@
<Compile Include="SmartObjects\SmartObjectDPad.cs" />
<Compile Include="SmartObjects\SmartObjectHelperBase.cs" />
<Compile Include="Routing\TieLine.cs" />
<Compile Include="Queues\StringQueue.cs" />
<Compile Include="Queues\StringResponseProcessor.cs" />
<Compile Include="Timers\CountdownTimer.cs" />
<Compile Include="Touchpanels\CrestronTouchpanelPropertiesConfig.cs" />

View File

@@ -1,67 +0,0 @@
using System;
using PepperDash.Core;
namespace PepperDash_Essentials_Core.Queues
{
/// <summary>
/// Byte implementation of Action queue
/// </summary>
public class BytesQueue : IQueue<byte[]>
{
private readonly IQueue<byte[]> _queue;
/// <summary>
/// Constructor for BytesQueue
/// </summary>
/// <param name="key">Key</param>
/// <param name="processBytesAction">Action to process queued bytes</param>
public BytesQueue(string key, Action<byte[]> processBytesAction)
{
_queue = new GenericQueue<byte[]>(key, processBytesAction);
}
/// <summary>
/// Constructor for BytesQueue
/// </summary>
/// <param name="key">Key</param>
/// <param name="processBytesAction">Action to process queued bytes</param>
/// <param name="pacing">Delay in ms between actions being invoked</param>
public BytesQueue(string key, Action<byte[]> processBytesAction, int pacing)
{
_queue = new GenericQueue<byte[]>(key, processBytesAction, pacing);
}
/// <summary>
/// Enqueue a byte array to be processed
/// </summary>
/// <param name="item">Byte array to be processed</param>
public void Enqueue(byte[] item)
{
_queue.Enqueue(item);
}
/// <summary>
/// If the instance has been disposed
/// </summary>
public bool Disposed
{
get { return _queue.Disposed; }
}
/// <summary>
/// Key
/// </summary>
public string Key
{
get { return _queue.Key; }
}
/// <summary>
/// Disposes of resources
/// </summary>
public void Dispose()
{
_queue.Dispose();
}
}
}

View File

@@ -0,0 +1,73 @@
using System;
using PepperDash.Core;
namespace PepperDash_Essentials_Core.Queues
{
/// <summary>
/// IBasicCommunication Message for IQueue
/// </summary>
public class ComsMessage : IQueueMessage
{
private readonly byte[] _bytes;
private readonly IBasicCommunication _coms;
private readonly string _string;
private readonly bool _isByteMessage;
/// <summary>
/// Constructor for a string message
/// </summary>
/// <param name="coms">IBasicCommunication to send the message</param>
/// <param name="message">Message to send</param>
public ComsMessage(IBasicCommunication coms, string message)
{
Validate(coms, message);
_coms = coms;
_string = message;
}
/// <summary>
/// Constructor for a byte message
/// </summary>
/// <param name="coms">IBasicCommunication to send the message</param>
/// <param name="message">Message to send</param>
public ComsMessage(IBasicCommunication coms, byte[] message)
{
Validate(coms, message);
_coms = coms;
_bytes = message;
_isByteMessage = true;
}
private void Validate(IBasicCommunication coms, object message)
{
if (_coms == null)
throw new ArgumentNullException("coms");
if (message == null)
throw new ArgumentNullException("message");
}
/// <summary>
/// Dispatchs the string/byte[] to the IBasicCommunication specified
/// </summary>
public void Dispatch()
{
if (_isByteMessage)
{
_coms.SendBytes(_bytes);
}
else
{
_coms.SendText(_string);
}
}
/// <summary>
/// Shows either the byte[] or string to be sent
/// </summary>
public override string ToString()
{
return _bytes != null ? _bytes.ToString() : _string;
}
}
}

View File

@@ -8,17 +8,16 @@ namespace PepperDash_Essentials_Core.Queues
/// <summary>
/// Threadsafe processing of queued items with pacing if required
/// </summary>
/// <typeparam name="T">Type of item to be processed</typeparam>
public class GenericQueue<T> : IQueue<T> where T : class
public class GenericQueue : IQueue<IQueueMessage>
{
private readonly string _key;
protected readonly CrestronQueue<T> _queue;
protected readonly CrestronQueue<IQueueMessage> _queue;
protected readonly Thread _worker;
protected readonly CEvent _waitHandle = new CEvent();
private readonly bool _delayEnabled;
private readonly int _delayTime;
/// <summary>
/// If the instance has been disposed.
/// </summary>
@@ -28,12 +27,11 @@ namespace PepperDash_Essentials_Core.Queues
/// Constructor for generic queue with no pacing
/// </summary>
/// <param name="key">Key</param>
/// <param name="processQueueAction">Action to process items in the queue</param>
public GenericQueue(string key, Action<T> processQueueAction)
public GenericQueue(string key)
{
_key = key;
_queue = new CrestronQueue<T>();
_worker = new Thread(ProcessQueue, processQueueAction, Thread.eThreadStartOptions.Running);
_queue = new CrestronQueue<IQueueMessage>();
_worker = new Thread(ProcessQueue, null, Thread.eThreadStartOptions.Running);
CrestronEnvironment.ProgramStatusEventHandler += programEvent =>
{
@@ -48,10 +46,9 @@ namespace PepperDash_Essentials_Core.Queues
/// Constructor for generic queue with no pacing
/// </summary>
/// <param name="key">Key</param>
/// <param name="processQueueAction">Action to process items in the queue</param>
/// <param name="pacing">Pacing in ms between actions</param>
public GenericQueue(string key, Action<T> processQueueAction, int pacing)
: this(key, processQueueAction)
public GenericQueue(string key, int pacing)
: this(key)
{
_delayEnabled = pacing > 0;
_delayTime = pacing;
@@ -64,13 +61,9 @@ namespace PepperDash_Essentials_Core.Queues
/// <returns>Null when the thread is exited</returns>
private object ProcessQueue(object obj)
{
var action = obj as Action<T>;
if (action == null)
throw new ArgumentNullException("obj");
while (true)
{
T item = null;
IQueueMessage item = null;
if (_queue.Count > 0)
{
@@ -83,14 +76,14 @@ namespace PepperDash_Essentials_Core.Queues
try
{
Debug.Console(2, this, "Processing queue item: '{0}'", item.ToString());
action(item);
item.Dispatch();
if (_delayEnabled)
Thread.Sleep(_delayTime);
}
catch (Exception ex)
{
Debug.ConsoleWithLog(0, this, "Caught an exception in the ComsQueue {0}\r{1}\r{2}", ex.Message, ex.InnerException, ex.StackTrace);
Debug.ConsoleWithLog(0, this, "Caught an exception in the Queue {0}\r{1}\r{2}", ex.Message, ex.InnerException, ex.StackTrace);
}
}
else _waitHandle.Wait();
@@ -99,12 +92,7 @@ namespace PepperDash_Essentials_Core.Queues
return null;
}
/// <summary>
/// Enqueues an item and processes the queue. If 'null' is enqueued the thread will close and
/// the class must be reinstantiated.
/// </summary>
/// <param name="item">Item to be processed</param>
public virtual void Enqueue(T item)
public void Enqueue(IQueueMessage item)
{
_queue.Enqueue(item);
_waitHandle.Set();

View File

@@ -7,7 +7,7 @@ using PepperDash.Core;
namespace PepperDash_Essentials_Core.Queues
{
public interface IQueue<T> : IKeyed, IDisposable where T : class
public interface IQueue<T> : IKeyed, IDisposable where T : class
{
void Enqueue(T item);
bool Disposed { get; }

View File

@@ -0,0 +1,7 @@
namespace PepperDash_Essentials_Core.Queues
{
public interface IQueueMessage
{
void Dispatch();
}
}

View File

@@ -0,0 +1,44 @@
using System;
namespace PepperDash_Essentials_Core.Queues
{
/// <summary>
/// Message class for processing strings via an IQueue
/// </summary>
public class ProcessStringMessage : IQueueMessage
{
private readonly Action<string> _action;
private readonly string _message;
/// <summary>
/// Constructor
/// </summary>
/// <param name="message">Message to be processed</param>
/// <param name="action">Action to invoke on the message</param>
public ProcessStringMessage(string message, Action<string> action)
{
_message = message;
_action = action;
}
/// <summary>
/// Processes the string with the given action
/// </summary>
public void Dispatch()
{
if (_action == null || String.IsNullOrEmpty(_message))
return;
_action(_message);
}
/// <summary>
/// To string
/// </summary>
/// <returns>The current message</returns>
public override string ToString()
{
return _message ?? String.Empty;
}
}
}

View File

@@ -1,66 +0,0 @@
using System;
namespace PepperDash_Essentials_Core.Queues
{
/// <summary>
/// String implementation of Action Queue
/// </summary>
public class StringQueue : IQueue<string>
{
private readonly IQueue<string> _queue;
/// <summary>
/// Constructor for BytesQueue
/// </summary>
/// <param name="key">Key</param>
/// <param name="processStringAction">Action to process queued strings</param>
public StringQueue(string key, Action<string> processStringAction)
{
_queue = new GenericQueue<string>(key, processStringAction);
}
/// <summary>
/// Constructor for StringQueue
/// </summary>
/// <param name="key">Key</param>
/// <param name="processStringAction">Action to process queued strings</param>
/// <param name="pacing">Delay in ms between actions being invoked</param>
public StringQueue(string key, Action<string> processStringAction, int pacing)
{
_queue = new GenericQueue<string>(key, processStringAction, pacing);
}
/// <summary>
/// Enqueue a byte array to be processed
/// </summary>
/// <param name="item">Byte array to be processed</param>
public void Enqueue(string item)
{
_queue.Enqueue(item);
}
/// <summary>
/// Key
/// </summary>
public string Key
{
get { return _queue.Key; }
}
/// <summary>
/// Disposes of resources
/// </summary>
public void Dispose()
{
_queue.Dispose();
}
/// <summary>
/// If the instance has been disposed
/// </summary>
public bool Disposed
{
get { return _queue.Disposed; }
}
}
}

View File

@@ -6,13 +6,15 @@ namespace PepperDash_Essentials_Core.Queues
{
public sealed class StringResponseProcessor : IKeyed, IDisposable
{
private readonly IQueue<string> _queue;
private readonly Action<string> _processStringAction;
private readonly IQueue<IQueueMessage> _queue;
private readonly IBasicCommunication _coms;
private readonly CommunicationGather _gather;
private StringResponseProcessor(string key, Action<string> processStringAction)
{
_queue = new StringQueue(key, processStringAction);
_processStringAction = processStringAction;
_queue = new GenericQueue(key);
CrestronEnvironment.ProgramStatusEventHandler += programEvent =>
{
@@ -49,7 +51,7 @@ namespace PepperDash_Essentials_Core.Queues
private void OnResponseReceived(object sender, GenericCommMethodReceiveTextArgs args)
{
_queue.Enqueue(args.Text);
_queue.Enqueue(new ProcessStringMessage(args.Text, _processStringAction));
}
/// <summary>