diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c6c3cd..8c6f25c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Added extensions to raise events with common event args using the data directly - Added property to IcdEnvironment to determine whether SSL communication is enabled - Added IcdTimeZoneInfo, a very light implementation of System.TimeZoneInfo for the .NET Compact Framework + - Added ThreadedWorkerQueue - a threadsafe way to enqueue items and have a worker thread process them one at a time ### Changed - Repeater changed to use configured callbacks instead of a dumb event diff --git a/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj b/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj index d36e00c..22b20ea 100644 --- a/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj +++ b/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj @@ -113,6 +113,7 @@ + diff --git a/ICD.Common.Utils/ThreadedWorkerQueue.cs b/ICD.Common.Utils/ThreadedWorkerQueue.cs new file mode 100644 index 0000000..499f15c --- /dev/null +++ b/ICD.Common.Utils/ThreadedWorkerQueue.cs @@ -0,0 +1,166 @@ +using System; +using ICD.Common.Properties; +using ICD.Common.Utils.Collections; + +namespace ICD.Common.Utils +{ + /// + /// Utilizes a priority queue to store items + /// Dequeues items in priority order and processes + /// them in a worker thread one at a time + /// + /// + public sealed class ThreadedWorkerQueue + { + + private readonly PriorityQueue m_Queue; + + private readonly SafeCriticalSection m_QueueSection; + private readonly SafeCriticalSection m_ProcessSection; + + private readonly Action m_ProcessAction; + + /// + /// + /// + /// Action to process the dequeued items + public ThreadedWorkerQueue([NotNull] Action processItemAction) + { + if (processItemAction == null) + throw new ArgumentNullException("processItemAction"); + + m_Queue = new PriorityQueue(); + m_QueueSection = new SafeCriticalSection(); + m_ProcessSection = new SafeCriticalSection(); + + m_ProcessAction = processItemAction; + } + + + #region Queue Methods + + /// + /// Clears the collection. + /// + [PublicAPI] + public void Clear() + { + m_QueueSection.Execute(() => m_Queue.Clear()); + } + + /// + /// Adds the item to the end of the queue. + /// + /// + [PublicAPI] + public void Enqueue([CanBeNull] T item) + { + Enqueue(() => m_Queue.Enqueue(item)); + } + + /// + /// Adds the item to the queue with the given priority. + /// Lower values are dequeued first. + /// + /// + /// + [PublicAPI] + public void Enqueue([CanBeNull] T item, int priority) + { + Enqueue(() => m_Queue.Enqueue(item, priority)); + } + + /// + /// Adds the item to the queue with the given priority at the given index. + /// + /// + /// + /// + [PublicAPI] + public void Enqueue([CanBeNull] T item, int priority, int position) + { + Enqueue(() => m_Queue.Enqueue(item, priority, position)); + } + + /// + /// Enqueues the item at the beginning of the queue. + /// + /// + [PublicAPI] + public void EnqueueFirst([CanBeNull] T item) + { + Enqueue(() => m_Queue.EnqueueFirst(item)); + } + + /// + /// Removes any items in the queue matching the predicate. + /// Appends the given item at the end of the given priority level. + /// This is useful for reducing duplication, or replacing items with something more pertinent. + /// + /// + /// + [PublicAPI] + public void EnqueueRemove([CanBeNull] T item, [NotNull] Func remove) + { + Enqueue(() => m_Queue.EnqueueRemove(item, remove)); + } + + /// + /// Removes any items in the queue matching the predicate. + /// Appends the given item at the end of the given priority level. + /// This is useful for reducing duplication, or replacing items with something more pertinent. + /// + /// + /// + /// + [PublicAPI] + public void EnqueueRemove([CanBeNull] T item, [NotNull] Func remove, int priority) + { + Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority)); + } + + /// + /// Removes any items in the queue matching the predicate. + /// Appends the given item at the end of the given priority level. + /// This is useful for reducing duplication, or replacing items with something more pertinent. + /// + /// + /// + /// + /// + [PublicAPI] + public void EnqueueRemove([CanBeNull] T item, [NotNull] Func remove, int priority, bool deDuplicateToEndOfQueue) + { + Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority, deDuplicateToEndOfQueue)); + } + + #endregion + + #region Private Methods + + private void Enqueue(Action enqueueAction) + { + m_QueueSection.Execute(() => enqueueAction()); + ThreadingUtils.SafeInvoke(ProcessQueue); + } + + private void ProcessQueue() + { + if (!m_ProcessSection.TryEnter()) + return; + + try + { + T item = default(T); + while (m_QueueSection.Execute(() => m_Queue.TryDequeue(out item))) + m_ProcessAction(item); + } + finally + { + m_ProcessSection.Leave(); + } + } + + #endregion + } +} \ No newline at end of file