feat: Added RateLimitedEventQueue collection for throttling events

This commit is contained in:
Chris Cameron
2018-06-29 16:37:45 -04:00
parent 588e3df86e
commit 621d83d8dc
4 changed files with 284 additions and 0 deletions

View File

@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
### Added
- Adding SequenceComparer for ordering collections of lists, arrays, etc
- Added RateLimitedEventQueue collection for throttling events
### Changed
- Potential fix for timer disposal on Net Standard

View File

@@ -0,0 +1,100 @@
using System.Collections.Generic;
using System.Linq;
using ICD.Common.Utils.Collections;
using ICD.Common.Utils.EventArguments;
using NUnit.Framework;
namespace ICD.Common.Utils.Tests.Collections
{
[TestFixture]
public sealed class RateLimitedEventQueueTest
{
[Test]
public void ItemDequeuedFeedbackTest()
{
List<GenericEventArgs<int>> callbacks = new List<GenericEventArgs<int>>();
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 1000 })
{
queue.OnItemDequeued += (sender, args) => callbacks.Add(args);
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
ThreadingUtils.Sleep(100);
Assert.AreEqual(1, callbacks.Count, "Initial enqueue did not trigger a dequeue");
queue.OnItemDequeued += (sender, args) => { ThreadingUtils.Sleep(1000); };
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, callbacks.Count, "Second enqueue did not dequeue");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, callbacks.Count, "Third enqueue did not wait for process to complete");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(3, callbacks.Count);
}
}
#region Properties
[TestCase(1000)]
public void BetweenMillisecondsTest(long milliseconds)
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = milliseconds })
Assert.AreEqual(milliseconds, queue.BetweenMilliseconds);
}
[Test]
public void CountTest()
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 100 * 1000 })
{
queue.Enqueue(1);
queue.Enqueue(1);
queue.Enqueue(1);
Assert.AreEqual(3, queue.Count);
}
}
#endregion
#region Methods
[Test]
public void EnqueueTest()
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 100 * 1000 })
{
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
Assert.True(queue.SequenceEqual(new[] { 10, 20, 30 }));
}
}
[Test]
public void ClearTest()
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 100 * 1000 })
{
queue.Enqueue(1);
queue.Enqueue(1);
queue.Enqueue(1);
queue.Clear();
Assert.AreEqual(0, queue.Count);
}
}
#endregion
}
}

View File

@@ -0,0 +1,182 @@
using System;
using System.Collections;
using System.Collections.Generic;
using ICD.Common.Utils.EventArguments;
using ICD.Common.Utils.Extensions;
using ICD.Common.Utils.Timers;
namespace ICD.Common.Utils.Collections
{
/// <summary>
/// RateLimitedEventQueue provides features for enqueing items to be raised via an event at a controlled interval.
/// </summary>
public sealed class RateLimitedEventQueue<T> : IEnumerable<T>, ICollection, IDisposable
{
/// <summary>
/// Raised to handle to the next item in the queue.
/// </summary>
public event EventHandler<GenericEventArgs<T>> OnItemDequeued;
private readonly SafeTimer m_DequeueTimer;
private readonly Queue<T> m_Queue;
private readonly SafeCriticalSection m_QueueSection;
#region Properties
/// <summary>
/// Gets/sets the time between dequeues in milliseconds.
/// </summary>
public long BetweenMilliseconds { get; set; }
public int Count { get { return m_QueueSection.Execute(() => m_Queue.Count); } }
bool ICollection.IsSynchronized { get { return true; } }
object ICollection.SyncRoot { get { return this; } }
#endregion
/// <summary>
/// Constructor.
/// </summary>
public RateLimitedEventQueue()
{
m_Queue = new Queue<T>();
m_QueueSection = new SafeCriticalSection();
m_DequeueTimer = SafeTimer.Stopped(DequeueTimerCallback);
}
#region Methods
/// <summary>
/// Release resources.
/// </summary>
public void Dispose()
{
OnItemDequeued = null;
m_DequeueTimer.Dispose();
}
/// <summary>
/// Enqueues the given item.
/// </summary>
/// <param name="item"></param>
public void Enqueue(T item)
{
m_QueueSection.Enter();
try
{
m_Queue.Enqueue(item);
if (m_Queue.Count == 1)
SendNext();
}
finally
{
m_QueueSection.Leave();
}
}
/// <summary>
/// Clears the queued items.
/// </summary>
public void Clear()
{
m_QueueSection.Enter();
try
{
m_DequeueTimer.Stop();
m_Queue.Clear();
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
#region Private Methods
/// <summary>
/// Sends the next pulse in the queue.
/// </summary>
private void SendNext()
{
if (!m_QueueSection.TryEnter())
return;
try
{
if (m_Queue.Count == 0)
return;
T item = m_Queue.Peek();
OnItemDequeued.Raise(this, new GenericEventArgs<T>(item));
m_DequeueTimer.Reset(BetweenMilliseconds);
}
finally
{
m_QueueSection.Leave();
}
}
/// <summary>
/// Called when the dequeue timer elapses.
/// </summary>
private void DequeueTimerCallback()
{
m_QueueSection.Enter();
try
{
m_Queue.Dequeue();
SendNext();
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
#region IEnumerable/ICollection
public IEnumerator<T> GetEnumerator()
{
return m_QueueSection.Execute(() => m_Queue.ToList(m_Queue.Count).GetEnumerator());
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
void ICollection.CopyTo(Array array, int index)
{
m_QueueSection.Enter();
try
{
foreach (T item in this)
{
array.SetValue(item, index);
index++;
}
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
}
}

View File

@@ -77,6 +77,7 @@
<Compile Include="Collections\BiDictionary.cs" />
<Compile Include="Collections\IcdOrderedDictionary.cs" />
<Compile Include="Collections\PriorityQueue.cs" />
<Compile Include="Collections\RateLimitedEventQueue.cs" />
<Compile Include="Collections\WeakKeyDictionary.cs" />
<Compile Include="Comparers\FileNameEqualityComparer.cs" />
<Compile Include="Comparers\PredicateComparer.cs" />