fix: Fixing ThreadedWorkerQueue to not use TryEnter, and track the processing state with a bool

This commit is contained in:
Drew Tingen
2021-09-23 17:34:20 -04:00
committed by Chris Cameron
parent 63d76d8cef
commit 451cf08c0f
2 changed files with 523 additions and 24 deletions

View File

@@ -0,0 +1,205 @@
using System.Collections.Generic;
using System.Linq;
using NUnit.Framework;
namespace ICD.Common.Utils.Tests
{
[TestFixture]
public sealed class ThreadedWorkerQueueTest
{
[Test]
public void BetweenTimeTest()
{
List<int> callbacks = new List<int>();
using (ThreadedWorkerQueue<int> queue = new ThreadedWorkerQueue<int>((d) => callbacks.Add(d), true, 1000))
{
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
ThreadingUtils.Sleep(100);
Assert.AreEqual(1, callbacks.Count, "Initial enqueue did not trigger a dequeue");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, callbacks.Count, "Second enqueue did not dequeue");
ThreadingUtils.Sleep(100);
Assert.AreEqual(2, callbacks.Count, "Third enqueue did not wait for process to complete");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(3, callbacks.Count);
}
}
#region Properties
[TestCase(1000)]
[TestCase(0)]
[TestCase(long.MaxValue)]
public void BetweenMillisecondsTest(long milliseconds)
{
using (var queue = new ThreadedWorkerQueue<int>((d) => { }, true, milliseconds))
Assert.AreEqual(milliseconds, queue.BetweenTime);
}
[TestCase(5)]
[TestCase(0)]
[TestCase(30)]
public void CountTest(int count)
{
using (var queue = new ThreadedWorkerQueue<int>(d => { }, false))
{
for (int i = 0; i < count; i++)
queue.Enqueue(1);
Assert.AreEqual(count, queue.Count);
}
}
[Test]
public void ProcessBetweenTimeTest()
{
var processed = new List<int>();
using (var queue = new ThreadedWorkerQueue<int>(d => processed.Add(d), false, 1000))
{
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
ThreadingUtils.Sleep(100);
Assert.AreEqual(0, processed.Count, "Queue processed item early");
queue.SetRunProcess(true);
ThreadingUtils.Sleep(100);
Assert.AreEqual(1, processed.Count, "First item not processed");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, processed.Count, "Second item not processed");
queue.SetRunProcess(false);
ThreadingUtils.Sleep(2000);
Assert.AreEqual(2, processed.Count, "Item processed after stopping run process");
Assert.AreEqual(1, queue.Count, "Incorrect number of items in queue");
// Queue lower priority item
queue.Enqueue(5, 1);
queue.SetRunProcess(true);
ThreadingUtils.Sleep(100);
Assert.AreEqual(3, processed.Count, "Third item not processed");
Assert.AreEqual(5, processed[2], "Dequeued incorrect priority item");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(4, processed.Count, "Didn't process all items");
Assert.True(processed.SequenceEqual(new[] { 1, 2, 5, 3 }), "Processed sequence incorrect");
}
}
[Test]
public void FlushQueueBetweenTimeTest()
{
var processed = new List<int>();
using (var queue = new ThreadedWorkerQueue<int>(d => processed.Add(d), false, 1000))
{
Assert.True(queue.WaitForFlush(1), "WaitForFlush on empty queue failed");
queue.Enqueue(11);
queue.Enqueue(21);
queue.Enqueue(31);
queue.Enqueue(41);
queue.Enqueue(51);
queue.Enqueue(61);
queue.Enqueue(71);
queue.Enqueue(81);
queue.Enqueue(91);
queue.Enqueue(101);
queue.SetRunProcess(true);
Assert.False(queue.WaitForFlush(1250), "WaitForFlush didn't time out");
Assert.AreEqual(2, processed.Count, "Didn't process correct number of items in time frame");
Assert.True(queue.WaitForFlush(), "WaitForFlush failed");
Assert.AreEqual(10, processed.Count, "Not all items processed");
Assert.AreEqual(0, queue.Count, "Queue not empty");
}
}
#endregion
#region Methods
[TestCase(false)]
[TestCase(true)]
public void SetRunProcessTest(bool runProcess)
{
using (var queue = new ThreadedWorkerQueue<int>(d => { }, !runProcess))
{
Assert.AreEqual(!runProcess, queue.RunProcess, "Initial state wrong");
queue.SetRunProcess(runProcess);
Assert.AreEqual(runProcess, queue.RunProcess, "Didn't set to correct state 1st time");
queue.SetRunProcess(!runProcess);
Assert.AreEqual(!runProcess, queue.RunProcess, "Didn't set to correct state 2nd time");
}
}
[Test]
public void EnqueueTest()
{
var processed = new List<int>();
using (var queue = new ThreadedWorkerQueue<int>(d => processed.Add(d), false))
{
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
Assert.AreEqual(3, queue.Count, "First queue count wrong");
queue.Enqueue(40);
queue.Enqueue(50);
queue.Enqueue(60);
Assert.AreEqual(6, queue.Count, "Second queue count wrong");
queue.SetRunProcess(true);
Assert.True(queue.WaitForFlush(),"Queue didn't flush after processing");
Assert.True(processed.SequenceEqual(new[] { 10, 20, 30, 40, 50, 60 }), "Processed sequence wrong");
}
}
[Test]
public void ClearTest()
{
using (var queue = new ThreadedWorkerQueue<int>(d => { }, false))
{
queue.Enqueue(1);
queue.Enqueue(1);
queue.Enqueue(1);
queue.Clear();
Assert.AreEqual(0, queue.Count);
}
}
#endregion
}
}

View File

@@ -1,6 +1,7 @@
using System;
using ICD.Common.Properties;
using ICD.Common.Utils.Collections;
using ICD.Common.Utils.Timers;
namespace ICD.Common.Utils
{
@@ -10,30 +11,127 @@ namespace ICD.Common.Utils
/// them in a worker thread one at a time
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class ThreadedWorkerQueue<T>
public sealed class ThreadedWorkerQueue<T> : IDisposable
{
/// <summary>
/// Underlying Queue to hold items
/// </summary>
private readonly PriorityQueue<T> m_Queue;
/// <summary>
/// Bool to track if a thread is currently running to dequeue and process items
/// </summary>
private bool m_ProcessRunning;
/// <summary>
/// Critical section to lock the queue and the process running bool
/// </summary>
private readonly SafeCriticalSection m_QueueSection;
private readonly SafeCriticalSection m_ProcessSection;
/// <summary>
/// Action that should be run for every item as it is dequeued
/// </summary>
private readonly Action<T> m_ProcessAction;
/// <summary>
/// Event used to block and wait for the queue to empty
/// </summary>
private readonly IcdManualResetEvent m_FlushEvent;
/// <summary>
/// If true, the queue will be processed.
/// </summary>
private bool m_RunProcess;
private long m_BetweenTime;
/// <summary>
/// Time to wait between processing items
/// If less than or equal to 0, items will be processed immediately
/// </summary>
[PublicAPI]
public long BetweenTime
{
get { return m_BetweenTime; }
set
{
if (value < 0)
throw new InvalidOperationException("BetweenTime can't be negative");
m_BetweenTime = value;
}
}
/// <summary>
/// Timer used to wait the BetweenTime
/// </summary>
private readonly SafeTimer m_BetweenTimeTimer;
/// <summary>
/// While true, the queue will be processed
/// When set to false, the queue will be be stopped after any current processing is finished
/// When set to true, the queue processing will be started in a new thread if there are any items in the queue
///
/// </summary>
[PublicAPI]
public bool RunProcess { get { return m_RunProcess; } }
/// <summary>
/// Gets the current count of the items in the queue
/// </summary>
public int Count
{
get { return m_QueueSection.Execute(() => m_Queue.Count); }
}
#region Constructors
/// <summary>
/// Constructor.
/// </summary>
/// <param name="processItemAction">Action to process the dequeued items</param>
public ThreadedWorkerQueue([NotNull] Action<T> processItemAction)
: this(processItemAction, true, 0)
{
}
/// <summary>
/// Constructor.
/// </summary>
/// <param name="processItemAction">Action to process the dequeued items</param>
/// <param name="runProcess">If true, queued items will be processed, if false, no processing will happen until runProcess is set</param>
public ThreadedWorkerQueue([NotNull] Action<T> processItemAction, bool runProcess)
: this(processItemAction, runProcess, 0)
{
}
/// <summary>
/// Constructor.
/// </summary>
/// <param name="processItemAction">Action to process the dequeued items</param>
/// <param name="runProcess">If true, queued items will be processed, if false, no processing will happen until runProcess is set</param>
/// <param name="betweenTime">Time to wait between processing items</param>
public ThreadedWorkerQueue([NotNull] Action<T> processItemAction, bool runProcess, long betweenTime)
{
if (processItemAction == null)
throw new ArgumentNullException("processItemAction");
if (betweenTime < 0)
throw new ArgumentOutOfRangeException("betweenTime", "Between time can't be negative");
m_Queue = new PriorityQueue<T>();
m_QueueSection = new SafeCriticalSection();
m_ProcessSection = new SafeCriticalSection();
m_FlushEvent = new IcdManualResetEvent(true);
m_RunProcess = runProcess;
m_ProcessAction = processItemAction;
m_BetweenTimeTimer = SafeTimer.Stopped(BetweenTimerCallback);
BetweenTime = betweenTime;
}
#region Queue Methods
#endregion
#region Methods
/// <summary>
/// Clears the collection.
@@ -44,6 +142,68 @@ namespace ICD.Common.Utils
m_QueueSection.Execute(() => m_Queue.Clear());
}
/// <summary>
/// Blocks until the queue is empty
/// </summary>
/// <returns>true if the queue empties</returns>
[PublicAPI]
public bool WaitForFlush()
{
return m_FlushEvent.WaitOne();
}
/// <summary>
/// Blocks until the queue is empty, or the timeout is reached
/// </summary>
/// <param name="timeout">Timeout in ms</param>
/// <returns>true if the queue empties, false if the timeout is reached</returns>
[PublicAPI]
public bool WaitForFlush(int timeout)
{
return m_FlushEvent.WaitOne(timeout);
}
public void SetRunProcess(bool runProcess)
{
m_QueueSection.Enter();
try
{
if (m_RunProcess == runProcess)
return;
m_RunProcess = runProcess;
if (runProcess)
EnableProcessQueue();
}
finally
{
m_QueueSection.Leave();
}
}
public void Dispose()
{
m_QueueSection.Enter();
try
{
SetRunProcess(false);
m_Queue.Clear();
m_BetweenTimeTimer.Stop();
m_BetweenTimeTimer.Dispose();
m_FlushEvent.Dispose();
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
#region Enqueue Methods
/// <summary>
/// Adds the item to the end of the queue.
/// </summary>
@@ -51,7 +211,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void Enqueue([CanBeNull] T item)
{
Enqueue(() => m_Queue.Enqueue(item));
Enqueue(item, () => m_Queue.Enqueue(item));
}
/// <summary>
@@ -63,7 +223,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void Enqueue([CanBeNull] T item, int priority)
{
Enqueue(() => m_Queue.Enqueue(item, priority));
Enqueue(item, () => m_Queue.Enqueue(item, priority));
}
/// <summary>
@@ -75,7 +235,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void Enqueue([CanBeNull] T item, int priority, int position)
{
Enqueue(() => m_Queue.Enqueue(item, priority, position));
Enqueue(item, () => m_Queue.Enqueue(item, priority, position));
}
/// <summary>
@@ -85,7 +245,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueFirst([CanBeNull] T item)
{
Enqueue(() => m_Queue.EnqueueFirst(item));
Enqueue(item, () => m_Queue.EnqueueFirst(item));
}
/// <summary>
@@ -98,7 +258,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueRemove([CanBeNull] T item, [NotNull] Func<T, bool> remove)
{
Enqueue(() => m_Queue.EnqueueRemove(item, remove));
Enqueue(item, () => m_Queue.EnqueueRemove(item, remove));
}
/// <summary>
@@ -112,7 +272,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueRemove([CanBeNull] T item, [NotNull] Func<T, bool> remove, int priority)
{
Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority));
Enqueue(item, () => m_Queue.EnqueueRemove(item, remove, priority));
}
/// <summary>
@@ -127,34 +287,168 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueRemove([CanBeNull] T item, [NotNull] Func<T, bool> remove, int priority, bool deDuplicateToEndOfQueue)
{
Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority, deDuplicateToEndOfQueue));
Enqueue(item, () => m_Queue.EnqueueRemove(item, remove, priority, deDuplicateToEndOfQueue));
}
#endregion
#region Private Methods
private void Enqueue(Action enqueueAction)
/// <summary>
/// Enqueues an item using the given enqueueAction
/// Starts processing the queue if it's not already processing
/// </summary>
/// <param name="item"></param>
/// <param name="enqueueAction"></param>
private void Enqueue(T item, Action enqueueAction)
{
m_QueueSection.Execute(enqueueAction);
ThreadingUtils.SafeInvoke(ProcessQueue);
}
bool startWorkerThread;
T nextItem = default(T);
private void ProcessQueue()
{
if (!m_ProcessSection.TryEnter())
return;
m_QueueSection.Enter();
try
{
T item = default(T);
while (m_QueueSection.Execute(() => m_Queue.TryDequeue(out item)))
m_ProcessAction(item);
// Reset the flush event, to block the flush thread while something is in the queue/being processed
m_FlushEvent.Reset();
// We'll start a worker thread if the process isn't running and should be
startWorkerThread = !m_ProcessRunning && RunProcess;
if (startWorkerThread)
m_ProcessRunning = true;
if (m_Queue.Count == 0 && startWorkerThread)
{
// If the queue is empty and we're starting the worker thread
// we won't bother adding an item just to dequeue it again
nextItem = item;
}
else if (startWorkerThread)
{
// If the queue isn't empty and we are starting the worker thread
// enqueue the item and dequeue the next item.
enqueueAction();
nextItem = m_Queue.Dequeue();
}
else
{
// If we're not starting the worker thread, just enqueue the item
enqueueAction();
}
}
finally
{
m_ProcessSection.Leave();
m_QueueSection.Leave();
}
if (startWorkerThread)
ProcessQueue(nextItem);
}
/// <summary>
/// Starts processing the queue by starting a thread
/// When running this, m_ProcessRunning MUST be set to true already,
/// and RunProcess should be checked before this
/// </summary>
/// <param name="item"></param>
private void ProcessQueue(T item)
{
ThreadingUtils.SafeInvoke(() => ProcessQueueThread(item));
}
/// <summary>
/// Processes the item, meant to be run in it's own thread
/// When running this, m_ProcessRunning MUST be set to true already,
/// and RunProcess should be checked before this
/// </summary>
/// <param name="item"></param>
private void ProcessQueueThread(T item)
{
while (true)
{
// Run the process action
m_ProcessAction(item);
m_QueueSection.Enter();
try
{
bool runProcess = RunProcess;
bool hasItem = m_Queue.Count > 0;
long betweenTime = BetweenTime;
// Stop processing and return if we don't have another item, or we aren't to keep running the process
if (!runProcess || !hasItem)
{
m_ProcessRunning = false;
if (!hasItem)
m_FlushEvent.Set();
return;
}
// If a between time is set, start the timer (and leave m_ProcessRunning set)
if (betweenTime > 0)
{
m_BetweenTimeTimer.Reset(betweenTime);
return;
}
item = m_Queue.Dequeue();
}
finally
{
m_QueueSection.Leave();
}
}
}
/// <summary>
/// Starts processing an item if we should be
/// Run by RunProcess being set to true
/// </summary>
private void EnableProcessQueue()
{
T item;
m_QueueSection.Enter();
try
{
if (!RunProcess || m_ProcessRunning || !m_Queue.TryDequeue(out item))
return;
m_ProcessRunning = true;
}
finally
{
m_QueueSection.Leave();
}
ProcessQueue(item);
}
private void BetweenTimerCallback()
{
T item;
m_QueueSection.Enter();
try
{
// Check to make sure RunProcess is still true
// Try to dequeue item
if (!RunProcess || !m_Queue.TryDequeue(out item))
{
m_ProcessRunning = false;
return;
}
}
finally
{
m_QueueSection.Leave();
}
// Process the dequeued item
ProcessQueueThread(item);
}
#endregion