feat: Initial commit of AsyncEventQueue

This commit is contained in:
Chris Cameron
2018-08-14 15:02:18 -04:00
parent fd931b0b53
commit 54fbd7eaa8
3 changed files with 226 additions and 0 deletions

View File

@@ -0,0 +1,90 @@
using System.Collections.Generic;
using ICD.Common.Utils.Collections;
using ICD.Common.Utils.EventArguments;
using NUnit.Framework;
namespace ICD.Common.Utils.Tests.Collections
{
[TestFixture]
public sealed class AsyncEventQueueTest
{
[Test]
public void ItemDequeuedFeedbackTest()
{
Assert.Inconclusive();
}
[Test]
public void CountTest()
{
using (AsyncEventQueue<int> queue = new AsyncEventQueue<int>())
{
queue.OnItemDequeued +=
(sender, args) =>
{
ThreadingUtils.Sleep(200);
};
Assert.AreEqual(0, queue.Count);
for (int index = 0; index < 5; index++)
queue.Enqueue(index);
Assert.AreEqual(5, queue.Count);
}
}
[Test]
public void EnqueueTest()
{
using (AsyncEventQueue<int> queue = new AsyncEventQueue<int>())
{
List<GenericEventArgs<int>> eventArgs = new List<GenericEventArgs<int>>();
queue.OnItemDequeued +=
(sender, args) =>
{
eventArgs.Add(args);
};
Assert.AreEqual(0, eventArgs.Count);
for (int index = 0; index < 5; index++)
queue.Enqueue(index);
ThreadingUtils.Sleep(500);
Assert.AreEqual(5, eventArgs.Count);
Assert.AreEqual(0, eventArgs[0].Data);
Assert.AreEqual(1, eventArgs[1].Data);
Assert.AreEqual(2, eventArgs[2].Data);
Assert.AreEqual(3, eventArgs[3].Data);
Assert.AreEqual(4, eventArgs[4].Data);
}
}
[Test]
public void ClearTest()
{
using (AsyncEventQueue<int> queue = new AsyncEventQueue<int>())
{
queue.OnItemDequeued +=
(sender, args) =>
{
ThreadingUtils.Sleep(200);
};
Assert.AreEqual(0, queue.Count);
for (int index = 0; index < 5; index++)
queue.Enqueue(index);
Assert.AreEqual(5, queue.Count);
queue.Clear();
Assert.AreEqual(0, queue.Count);
}
}
}
}

View File

@@ -0,0 +1,135 @@
using System;
using System.Collections;
using System.Collections.Generic;
using ICD.Common.Utils.EventArguments;
using ICD.Common.Utils.Extensions;
namespace ICD.Common.Utils.Collections
{
/// <summary>
/// Provides a buffer for storing items to be raised in order in a new thread.
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class AsyncEventQueue<T> : IEnumerable<T>, ICollection, IDisposable
{
/// <summary>
/// Raised to handle to the next item in the queue.
/// </summary>
public event EventHandler<GenericEventArgs<T>> OnItemDequeued;
private readonly Queue<T> m_Queue;
private readonly SafeCriticalSection m_QueueSection;
private readonly SafeCriticalSection m_ProcessSection;
#region Properties
public int Count { get { return m_QueueSection.Execute(() => m_Queue.Count); } }
bool ICollection.IsSynchronized { get { return true; } }
object ICollection.SyncRoot { get { return this; } }
#endregion
/// <summary>
/// Constructor.
/// </summary>
public AsyncEventQueue()
{
m_Queue = new Queue<T>();
m_QueueSection = new SafeCriticalSection();
m_ProcessSection = new SafeCriticalSection();
}
#region Methods
/// <summary>
/// Release resources.
/// </summary>
public void Dispose()
{
OnItemDequeued = null;
Clear();
}
/// <summary>
/// Enqueues the given item and begins processing the queue.
/// </summary>
/// <param name="item"></param>
public void Enqueue(T item)
{
m_QueueSection.Execute(() => m_Queue.Enqueue(item));
ThreadingUtils.SafeInvoke(ProcessQueue);
}
/// <summary>
/// Clears the queued items.
/// </summary>
public void Clear()
{
m_QueueSection.Execute(() => m_Queue.Clear());
}
#endregion
#region Private Methods
/// <summary>
/// Dequeues and raises each item in sequence.
/// </summary>
private void ProcessQueue()
{
if (!m_ProcessSection.TryEnter())
return;
try
{
T item;
while (m_Queue.Dequeue(out item))
OnItemDequeued.Raise(this, new GenericEventArgs<T>(item));
}
finally
{
m_ProcessSection.Leave();
}
}
#endregion
#region IEnumerable/ICollection
public IEnumerator<T> GetEnumerator()
{
return m_QueueSection.Execute(() => m_Queue.ToList(m_Queue.Count).GetEnumerator());
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
void ICollection.CopyTo(Array array, int index)
{
m_QueueSection.Enter();
try
{
foreach (T item in this)
{
array.SetValue(item, index);
index++;
}
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
}
}

View File

@@ -75,6 +75,7 @@
<ItemGroup>
<Compile Include="Attributes\AbstractIcdAttribute.cs" />
<Compile Include="Collections\BiDictionary.cs" />
<Compile Include="Collections\AsyncEventQueue.cs" />
<Compile Include="Collections\IcdOrderedDictionary.cs" />
<Compile Include="Collections\PriorityQueue.cs" />
<Compile Include="Collections\RateLimitedEventQueue.cs" />