using System; using Crestron.SimplSharp; using Crestron.SimplSharpPro.CrestronThread; using PepperDash.Core; namespace PepperDash.Essentials.Core.Queues { /// /// Threadsafe processing of queued items with pacing if required /// public class GenericQueue : IQueue { private readonly string _key; protected readonly CrestronQueue _queue; protected readonly Thread _worker; protected readonly CEvent _waitHandle = new CEvent(); private bool _delayEnabled; private int _delayTime; private const Thread.eThreadPriority _defaultPriority = Thread.eThreadPriority.MediumPriority; /// /// If the instance has been disposed. /// public bool Disposed { get; private set; } /// /// Returns the capacity of the CrestronQueue (fixed Size property) /// public int QueueCapacity { get { return _queue.Size; } } /// /// Returns the number of elements currently in the CrestronQueue /// public int QueueCount { get { return _queue.Count; } } /// /// Constructor with no thread priority /// /// public GenericQueue(string key) : this(key, _defaultPriority, 0, 0) { } /// /// Constructor with queue size /// /// /// Fixed size for the queue to hold public GenericQueue(string key, int capacity) : this(key, _defaultPriority, capacity, 0) { } /// /// Constructor for generic queue with no pacing /// /// Key /// Pacing in ms between actions public GenericQueue(int pacing, string key) : this(key, _defaultPriority, 0, pacing) { } /// /// Constructor with pacing and capacity /// /// /// /// public GenericQueue(string key, int pacing, int capacity) : this(key, _defaultPriority, capacity, pacing) { } /// /// Constructor with pacing and priority /// /// /// /// public GenericQueue(string key, int pacing, Thread.eThreadPriority priority) : this(key, priority, 0, pacing) { } /// /// Constructor with pacing, priority and capacity /// /// /// /// public GenericQueue(string key, Thread.eThreadPriority priority, int capacity) : this(key, priority, capacity, 0) { } /// /// Constructor with pacing, priority and capacity /// /// /// /// /// public GenericQueue(string key, int pacing, Thread.eThreadPriority priority, int capacity) : this(key, priority, capacity, pacing) { } /// /// Constructor for generic queue with no pacing /// /// Key /// /// /// protected GenericQueue(string key, Thread.eThreadPriority priority, int capacity, int pacing) { _key = key; int cap = 25; // sets default if (capacity > 0) { cap = capacity; // overrides default } _queue = new CrestronQueue(cap); _worker = new Thread(ProcessQueue, null, Thread.eThreadStartOptions.Running) { Priority = priority }; SetDelayValues(pacing); } private void SetDelayValues(int pacing) { _delayEnabled = pacing > 0; _delayTime = pacing; CrestronEnvironment.ProgramStatusEventHandler += programEvent => { if (programEvent != eProgramStatusEventType.Stopping) return; Dispose(true); }; } /// /// Thread callback /// /// The action used to process dequeued items /// Null when the thread is exited private object ProcessQueue(object obj) { while (true) { IQueueMessage item = null; if (_queue.Count > 0) { item = _queue.Dequeue(); if (item == null) break; } if (item != null) { try { //Debug.Console(2, this, "Processing queue item: '{0}'", item.ToString()); item.Dispatch(); if (_delayEnabled) Thread.Sleep(_delayTime); } catch (Exception ex) { Debug.Console(0, this, Debug.ErrorLogLevel.Error, "Caught an exception in the Queue {0}\r{1}\r{2}", ex.Message, ex.InnerException, ex.StackTrace); } } else _waitHandle.Wait(); } return null; } public void Enqueue(IQueueMessage item) { if (Disposed) { Debug.Console(1, this, "I've been disposed so you can't enqueue any messages. Are you trying to dispatch a message while the program is stopping?"); return; } _queue.Enqueue(item); _waitHandle.Set(); } /// /// Disposes the thread and cleans up resources. Thread cannot be restarted once /// disposed. /// public void Dispose() { Dispose(true); CrestronEnvironment.GC.SuppressFinalize(this); } /// /// Actually does the disposing. If you override this method, be sure to either call the base implementation /// or clean up all the resources yourself. /// /// set to true unless called from finalizer protected void Dispose(bool disposing) { if (Disposed) return; if (disposing) { Debug.Console(2, this, "Disposing..."); if (_queue != null && !_queue.Disposed) { _queue.Clear(); Enqueue(null); } _worker.Abort(); _waitHandle.Close(); } Disposed = true; } ~GenericQueue() { Dispose(true); } /// /// Key /// public string Key { get { return _key; } } } } namespace PepperDash_Essentials_Core.Queues { /// /// Threadsafe processing of queued items with pacing if required /// [Obsolete("Use PepperDash.Essentials.Core.Queues")] public class GenericQueue : IQueue { private readonly string _key; protected readonly CrestronQueue _queue; protected readonly Thread _worker; protected readonly CEvent _waitHandle = new CEvent(); private bool _delayEnabled; private int _delayTime; private const Thread.eThreadPriority _defaultPriority = Thread.eThreadPriority.MediumPriority; /// /// If the instance has been disposed. /// public bool Disposed { get; private set; } /// /// Returns the capacity of the CrestronQueue (fixed Size property) /// public int QueueCapacity { get { return _queue.Size; } } /// /// Returns the number of elements currently in the CrestronQueue /// public int QueueCount { get { return _queue.Count; } } /// /// Constructor with no thread priority /// /// public GenericQueue(string key) : this(key, _defaultPriority, 0, 0) { } /// /// Constructor with queue size /// /// /// Fixed size for the queue to hold public GenericQueue(string key, int capacity) : this(key, _defaultPriority, capacity, 0) { } /// /// Constructor for generic queue with no pacing /// /// Key /// Pacing in ms between actions public GenericQueue(int pacing, string key) : this(key, _defaultPriority, 0, pacing) { } /// /// Constructor with pacing and capacity /// /// /// /// public GenericQueue(string key, int pacing, int capacity) : this(key, _defaultPriority, capacity, pacing) { } /// /// Constructor with pacing and priority /// /// /// /// public GenericQueue(string key, int pacing, Thread.eThreadPriority priority) : this(key, priority, 0, pacing) { } /// /// Constructor with pacing, priority and capacity /// /// /// /// public GenericQueue(string key, Thread.eThreadPriority priority, int capacity) : this(key, priority, capacity, 0) { } /// /// Constructor with pacing, priority and capacity /// /// /// /// /// public GenericQueue(string key, int pacing, Thread.eThreadPriority priority, int capacity) : this(key, priority, capacity, pacing) { } /// /// Constructor for generic queue with no pacing /// /// Key /// /// /// protected GenericQueue(string key, Thread.eThreadPriority priority, int capacity, int pacing) { _key = key; int cap = 25; // sets default if (capacity > 0) { cap = capacity; // overrides default } _queue = new CrestronQueue(cap); _worker = new Thread(ProcessQueue, null, Thread.eThreadStartOptions.Running) { Priority = priority }; SetDelayValues(pacing); } private void SetDelayValues(int pacing) { _delayEnabled = pacing > 0; _delayTime = pacing; CrestronEnvironment.ProgramStatusEventHandler += programEvent => { if (programEvent != eProgramStatusEventType.Stopping) return; Dispose(true); }; } /// /// Thread callback /// /// The action used to process dequeued items /// Null when the thread is exited private object ProcessQueue(object obj) { while (true) { IQueueMessage item = null; if (_queue.Count > 0) { item = _queue.Dequeue(); if (item == null) break; } if (item != null) { try { Debug.Console(2, this, "Processing queue item: '{0}'", item.ToString()); item.Dispatch(); if (_delayEnabled) Thread.Sleep(_delayTime); } catch (Exception ex) { Debug.Console(0, this, Debug.ErrorLogLevel.Error, "Caught an exception in the Queue {0}\r{1}\r{2}", ex.Message, ex.InnerException, ex.StackTrace); } } else _waitHandle.Wait(); } return null; } public void Enqueue(IQueueMessage item) { _queue.Enqueue(item); _waitHandle.Set(); } /// /// Disposes the thread and cleans up resources. Thread cannot be restarted once /// disposed. /// public void Dispose() { Dispose(true); CrestronEnvironment.GC.SuppressFinalize(this); } /// /// Actually does the disposing. If you override this method, be sure to either call the base implementation /// or clean up all the resources yourself. /// /// set to true unless called from finalizer protected void Dispose(bool disposing) { if (Disposed) return; if (disposing) { Debug.Console(2, this, "Disposing..."); if (_queue != null && !_queue.Disposed) { _queue.Clear(); Enqueue(null); } _worker.Abort(); _waitHandle.Close(); } Disposed = true; } ~GenericQueue() { Dispose(true); } /// /// Key /// public string Key { get { return _key; } } } }