From 451cf08c0f8351fd2fc8dcd9a332fa81c0337dff Mon Sep 17 00:00:00 2001 From: Drew Tingen Date: Thu, 23 Sep 2021 17:34:20 -0400 Subject: [PATCH] fix: Fixing ThreadedWorkerQueue to not use TryEnter, and track the processing state with a bool --- .../ThreadedWorkerQueueTest.cs | 205 +++++++++++ ICD.Common.Utils/ThreadedWorkerQueue.cs | 342 ++++++++++++++++-- 2 files changed, 523 insertions(+), 24 deletions(-) create mode 100644 ICD.Common.Utils.Tests/ThreadedWorkerQueueTest.cs diff --git a/ICD.Common.Utils.Tests/ThreadedWorkerQueueTest.cs b/ICD.Common.Utils.Tests/ThreadedWorkerQueueTest.cs new file mode 100644 index 0000000..f7ccfd9 --- /dev/null +++ b/ICD.Common.Utils.Tests/ThreadedWorkerQueueTest.cs @@ -0,0 +1,205 @@ +using System.Collections.Generic; +using System.Linq; +using NUnit.Framework; + +namespace ICD.Common.Utils.Tests +{ + [TestFixture] + public sealed class ThreadedWorkerQueueTest + { + [Test] + public void BetweenTimeTest() + { + List callbacks = new List(); + + using (ThreadedWorkerQueue queue = new ThreadedWorkerQueue((d) => callbacks.Add(d), true, 1000)) + { + + queue.Enqueue(10); + queue.Enqueue(20); + queue.Enqueue(30); + + ThreadingUtils.Sleep(100); + + Assert.AreEqual(1, callbacks.Count, "Initial enqueue did not trigger a dequeue"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(2, callbacks.Count, "Second enqueue did not dequeue"); + + ThreadingUtils.Sleep(100); + + Assert.AreEqual(2, callbacks.Count, "Third enqueue did not wait for process to complete"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(3, callbacks.Count); + } + } + + #region Properties + + [TestCase(1000)] + [TestCase(0)] + [TestCase(long.MaxValue)] + public void BetweenMillisecondsTest(long milliseconds) + { + using (var queue = new ThreadedWorkerQueue((d) => { }, true, milliseconds)) + Assert.AreEqual(milliseconds, queue.BetweenTime); + } + + [TestCase(5)] + [TestCase(0)] + [TestCase(30)] + public void CountTest(int count) + { + using (var queue = new ThreadedWorkerQueue(d => { }, false)) + { + for (int i = 0; i < count; i++) + queue.Enqueue(1); + + Assert.AreEqual(count, queue.Count); + } + } + + [Test] + public void ProcessBetweenTimeTest() + { + var processed = new List(); + + using (var queue = new ThreadedWorkerQueue(d => processed.Add(d), false, 1000)) + { + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + ThreadingUtils.Sleep(100); + Assert.AreEqual(0, processed.Count, "Queue processed item early"); + + queue.SetRunProcess(true); + + ThreadingUtils.Sleep(100); + + Assert.AreEqual(1, processed.Count, "First item not processed"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(2, processed.Count, "Second item not processed"); + + queue.SetRunProcess(false); + + ThreadingUtils.Sleep(2000); + + Assert.AreEqual(2, processed.Count, "Item processed after stopping run process"); + Assert.AreEqual(1, queue.Count, "Incorrect number of items in queue"); + + // Queue lower priority item + queue.Enqueue(5, 1); + queue.SetRunProcess(true); + + ThreadingUtils.Sleep(100); + + Assert.AreEqual(3, processed.Count, "Third item not processed"); + Assert.AreEqual(5, processed[2], "Dequeued incorrect priority item"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(4, processed.Count, "Didn't process all items"); + Assert.True(processed.SequenceEqual(new[] { 1, 2, 5, 3 }), "Processed sequence incorrect"); + } + } + + [Test] + public void FlushQueueBetweenTimeTest() + { + var processed = new List(); + + using (var queue = new ThreadedWorkerQueue(d => processed.Add(d), false, 1000)) + { + Assert.True(queue.WaitForFlush(1), "WaitForFlush on empty queue failed"); + + queue.Enqueue(11); + queue.Enqueue(21); + queue.Enqueue(31); + queue.Enqueue(41); + queue.Enqueue(51); + queue.Enqueue(61); + queue.Enqueue(71); + queue.Enqueue(81); + queue.Enqueue(91); + queue.Enqueue(101); + + queue.SetRunProcess(true); + + Assert.False(queue.WaitForFlush(1250), "WaitForFlush didn't time out"); + Assert.AreEqual(2, processed.Count, "Didn't process correct number of items in time frame"); + + Assert.True(queue.WaitForFlush(), "WaitForFlush failed"); + Assert.AreEqual(10, processed.Count, "Not all items processed"); + Assert.AreEqual(0, queue.Count, "Queue not empty"); + } + } + + #endregion + + #region Methods + + [TestCase(false)] + [TestCase(true)] + public void SetRunProcessTest(bool runProcess) + { + using (var queue = new ThreadedWorkerQueue(d => { }, !runProcess)) + { + Assert.AreEqual(!runProcess, queue.RunProcess, "Initial state wrong"); + queue.SetRunProcess(runProcess); + Assert.AreEqual(runProcess, queue.RunProcess, "Didn't set to correct state 1st time"); + queue.SetRunProcess(!runProcess); + Assert.AreEqual(!runProcess, queue.RunProcess, "Didn't set to correct state 2nd time"); + } + } + + [Test] + public void EnqueueTest() + { + var processed = new List(); + + using (var queue = new ThreadedWorkerQueue(d => processed.Add(d), false)) + { + queue.Enqueue(10); + queue.Enqueue(20); + queue.Enqueue(30); + + Assert.AreEqual(3, queue.Count, "First queue count wrong"); + + queue.Enqueue(40); + queue.Enqueue(50); + queue.Enqueue(60); + + Assert.AreEqual(6, queue.Count, "Second queue count wrong"); + + queue.SetRunProcess(true); + Assert.True(queue.WaitForFlush(),"Queue didn't flush after processing"); + + + Assert.True(processed.SequenceEqual(new[] { 10, 20, 30, 40, 50, 60 }), "Processed sequence wrong"); + } + } + + [Test] + public void ClearTest() + { + using (var queue = new ThreadedWorkerQueue(d => { }, false)) + { + queue.Enqueue(1); + queue.Enqueue(1); + queue.Enqueue(1); + + queue.Clear(); + + Assert.AreEqual(0, queue.Count); + } + } + + #endregion + } +} diff --git a/ICD.Common.Utils/ThreadedWorkerQueue.cs b/ICD.Common.Utils/ThreadedWorkerQueue.cs index 34db9e9..2adf16a 100644 --- a/ICD.Common.Utils/ThreadedWorkerQueue.cs +++ b/ICD.Common.Utils/ThreadedWorkerQueue.cs @@ -1,6 +1,7 @@ using System; using ICD.Common.Properties; using ICD.Common.Utils.Collections; +using ICD.Common.Utils.Timers; namespace ICD.Common.Utils { @@ -10,30 +11,127 @@ namespace ICD.Common.Utils /// them in a worker thread one at a time /// /// - public sealed class ThreadedWorkerQueue + public sealed class ThreadedWorkerQueue : IDisposable { + /// + /// Underlying Queue to hold items + /// private readonly PriorityQueue m_Queue; + + /// + /// Bool to track if a thread is currently running to dequeue and process items + /// + private bool m_ProcessRunning; + + /// + /// Critical section to lock the queue and the process running bool + /// private readonly SafeCriticalSection m_QueueSection; - private readonly SafeCriticalSection m_ProcessSection; + + /// + /// Action that should be run for every item as it is dequeued + /// private readonly Action m_ProcessAction; + /// + /// Event used to block and wait for the queue to empty + /// + private readonly IcdManualResetEvent m_FlushEvent; + + /// + /// If true, the queue will be processed. + /// + private bool m_RunProcess; + + private long m_BetweenTime; + + /// + /// Time to wait between processing items + /// If less than or equal to 0, items will be processed immediately + /// + [PublicAPI] + public long BetweenTime + { + get { return m_BetweenTime; } + set + { + if (value < 0) + throw new InvalidOperationException("BetweenTime can't be negative"); + + m_BetweenTime = value; + } + } + + /// + /// Timer used to wait the BetweenTime + /// + private readonly SafeTimer m_BetweenTimeTimer; + + /// + /// While true, the queue will be processed + /// When set to false, the queue will be be stopped after any current processing is finished + /// When set to true, the queue processing will be started in a new thread if there are any items in the queue + /// + /// + [PublicAPI] + public bool RunProcess { get { return m_RunProcess; } } + + /// + /// Gets the current count of the items in the queue + /// + public int Count + { + get { return m_QueueSection.Execute(() => m_Queue.Count); } + } + + #region Constructors + /// /// Constructor. /// /// Action to process the dequeued items public ThreadedWorkerQueue([NotNull] Action processItemAction) + : this(processItemAction, true, 0) + { + } + + /// + /// Constructor. + /// + /// Action to process the dequeued items + /// If true, queued items will be processed, if false, no processing will happen until runProcess is set + public ThreadedWorkerQueue([NotNull] Action processItemAction, bool runProcess) + : this(processItemAction, runProcess, 0) + { + } + + /// + /// Constructor. + /// + /// Action to process the dequeued items + /// If true, queued items will be processed, if false, no processing will happen until runProcess is set + /// Time to wait between processing items + public ThreadedWorkerQueue([NotNull] Action processItemAction, bool runProcess, long betweenTime) { if (processItemAction == null) throw new ArgumentNullException("processItemAction"); + if (betweenTime < 0) + throw new ArgumentOutOfRangeException("betweenTime", "Between time can't be negative"); + m_Queue = new PriorityQueue(); m_QueueSection = new SafeCriticalSection(); - m_ProcessSection = new SafeCriticalSection(); - + m_FlushEvent = new IcdManualResetEvent(true); + m_RunProcess = runProcess; m_ProcessAction = processItemAction; + m_BetweenTimeTimer = SafeTimer.Stopped(BetweenTimerCallback); + + BetweenTime = betweenTime; } - #region Queue Methods + #endregion + + #region Methods /// /// Clears the collection. @@ -44,6 +142,68 @@ namespace ICD.Common.Utils m_QueueSection.Execute(() => m_Queue.Clear()); } + /// + /// Blocks until the queue is empty + /// + /// true if the queue empties + [PublicAPI] + public bool WaitForFlush() + { + return m_FlushEvent.WaitOne(); + } + + /// + /// Blocks until the queue is empty, or the timeout is reached + /// + /// Timeout in ms + /// true if the queue empties, false if the timeout is reached + [PublicAPI] + public bool WaitForFlush(int timeout) + { + return m_FlushEvent.WaitOne(timeout); + } + + public void SetRunProcess(bool runProcess) + { + m_QueueSection.Enter(); + try + { + if (m_RunProcess == runProcess) + return; + + m_RunProcess = runProcess; + + if (runProcess) + EnableProcessQueue(); + + } + finally + { + m_QueueSection.Leave(); + } + } + + public void Dispose() + { + m_QueueSection.Enter(); + try + { + SetRunProcess(false); + m_Queue.Clear(); + m_BetweenTimeTimer.Stop(); + m_BetweenTimeTimer.Dispose(); + m_FlushEvent.Dispose(); + } + finally + { + m_QueueSection.Leave(); + } + } + + #endregion + + #region Enqueue Methods + /// /// Adds the item to the end of the queue. /// @@ -51,7 +211,7 @@ namespace ICD.Common.Utils [PublicAPI] public void Enqueue([CanBeNull] T item) { - Enqueue(() => m_Queue.Enqueue(item)); + Enqueue(item, () => m_Queue.Enqueue(item)); } /// @@ -63,7 +223,7 @@ namespace ICD.Common.Utils [PublicAPI] public void Enqueue([CanBeNull] T item, int priority) { - Enqueue(() => m_Queue.Enqueue(item, priority)); + Enqueue(item, () => m_Queue.Enqueue(item, priority)); } /// @@ -75,7 +235,7 @@ namespace ICD.Common.Utils [PublicAPI] public void Enqueue([CanBeNull] T item, int priority, int position) { - Enqueue(() => m_Queue.Enqueue(item, priority, position)); + Enqueue(item, () => m_Queue.Enqueue(item, priority, position)); } /// @@ -85,7 +245,7 @@ namespace ICD.Common.Utils [PublicAPI] public void EnqueueFirst([CanBeNull] T item) { - Enqueue(() => m_Queue.EnqueueFirst(item)); + Enqueue(item, () => m_Queue.EnqueueFirst(item)); } /// @@ -98,7 +258,7 @@ namespace ICD.Common.Utils [PublicAPI] public void EnqueueRemove([CanBeNull] T item, [NotNull] Func remove) { - Enqueue(() => m_Queue.EnqueueRemove(item, remove)); + Enqueue(item, () => m_Queue.EnqueueRemove(item, remove)); } /// @@ -112,7 +272,7 @@ namespace ICD.Common.Utils [PublicAPI] public void EnqueueRemove([CanBeNull] T item, [NotNull] Func remove, int priority) { - Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority)); + Enqueue(item, () => m_Queue.EnqueueRemove(item, remove, priority)); } /// @@ -127,34 +287,168 @@ namespace ICD.Common.Utils [PublicAPI] public void EnqueueRemove([CanBeNull] T item, [NotNull] Func remove, int priority, bool deDuplicateToEndOfQueue) { - Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority, deDuplicateToEndOfQueue)); + Enqueue(item, () => m_Queue.EnqueueRemove(item, remove, priority, deDuplicateToEndOfQueue)); } #endregion #region Private Methods - private void Enqueue(Action enqueueAction) + /// + /// Enqueues an item using the given enqueueAction + /// Starts processing the queue if it's not already processing + /// + /// + /// + private void Enqueue(T item, Action enqueueAction) { - m_QueueSection.Execute(enqueueAction); - ThreadingUtils.SafeInvoke(ProcessQueue); - } + bool startWorkerThread; + T nextItem = default(T); - private void ProcessQueue() - { - if (!m_ProcessSection.TryEnter()) - return; + m_QueueSection.Enter(); try { - T item = default(T); - while (m_QueueSection.Execute(() => m_Queue.TryDequeue(out item))) - m_ProcessAction(item); + // Reset the flush event, to block the flush thread while something is in the queue/being processed + m_FlushEvent.Reset(); + + // We'll start a worker thread if the process isn't running and should be + startWorkerThread = !m_ProcessRunning && RunProcess; + + if (startWorkerThread) + m_ProcessRunning = true; + + + if (m_Queue.Count == 0 && startWorkerThread) + { + // If the queue is empty and we're starting the worker thread + // we won't bother adding an item just to dequeue it again + nextItem = item; + } + else if (startWorkerThread) + { + // If the queue isn't empty and we are starting the worker thread + // enqueue the item and dequeue the next item. + enqueueAction(); + nextItem = m_Queue.Dequeue(); + } + else + { + // If we're not starting the worker thread, just enqueue the item + enqueueAction(); + } } finally { - m_ProcessSection.Leave(); + m_QueueSection.Leave(); } + + if (startWorkerThread) + ProcessQueue(nextItem); + } + + /// + /// Starts processing the queue by starting a thread + /// When running this, m_ProcessRunning MUST be set to true already, + /// and RunProcess should be checked before this + /// + /// + private void ProcessQueue(T item) + { + ThreadingUtils.SafeInvoke(() => ProcessQueueThread(item)); + } + + /// + /// Processes the item, meant to be run in it's own thread + /// When running this, m_ProcessRunning MUST be set to true already, + /// and RunProcess should be checked before this + /// + /// + private void ProcessQueueThread(T item) + { + + while (true) + { + // Run the process action + m_ProcessAction(item); + + m_QueueSection.Enter(); + try + { + bool runProcess = RunProcess; + bool hasItem = m_Queue.Count > 0; + long betweenTime = BetweenTime; + + // Stop processing and return if we don't have another item, or we aren't to keep running the process + if (!runProcess || !hasItem) + { + m_ProcessRunning = false; + if (!hasItem) + m_FlushEvent.Set(); + return; + } + + // If a between time is set, start the timer (and leave m_ProcessRunning set) + if (betweenTime > 0) + { + m_BetweenTimeTimer.Reset(betweenTime); + return; + } + + item = m_Queue.Dequeue(); + } + finally + { + m_QueueSection.Leave(); + } + } + } + + /// + /// Starts processing an item if we should be + /// Run by RunProcess being set to true + /// + private void EnableProcessQueue() + { + T item; + m_QueueSection.Enter(); + try + { + if (!RunProcess || m_ProcessRunning || !m_Queue.TryDequeue(out item)) + return; + + m_ProcessRunning = true; + } + finally + { + m_QueueSection.Leave(); + } + + ProcessQueue(item); + } + + private void BetweenTimerCallback() + { + T item; + + m_QueueSection.Enter(); + try + { + // Check to make sure RunProcess is still true + // Try to dequeue item + if (!RunProcess || !m_Queue.TryDequeue(out item)) + { + m_ProcessRunning = false; + return; + } + } + finally + { + m_QueueSection.Leave(); + } + + // Process the dequeued item + ProcessQueueThread(item); } #endregion