diff --git a/CHANGELOG.md b/CHANGELOG.md index 0611331..2a8aaec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added - Adding SequenceComparer for ordering collections of lists, arrays, etc + - Added RateLimitedEventQueue collection for throttling events ### Changed - Potential fix for timer disposal on Net Standard - Added workaround for older RPC servers where the typestring being broadcast would stil include _SimplSharp, now will be stripped + - Fixing bug where Timer.Reset() would continue repeating on an interval in Net Standard ## [3.6.0] - 2018-06-19 ### Added diff --git a/ICD.Common.Utils.Tests/Collections/RateLimitedEventQueueTest.cs b/ICD.Common.Utils.Tests/Collections/RateLimitedEventQueueTest.cs new file mode 100644 index 0000000..62c20b2 --- /dev/null +++ b/ICD.Common.Utils.Tests/Collections/RateLimitedEventQueueTest.cs @@ -0,0 +1,100 @@ +using System.Collections.Generic; +using System.Linq; +using ICD.Common.Utils.Collections; +using ICD.Common.Utils.EventArguments; +using NUnit.Framework; + +namespace ICD.Common.Utils.Tests.Collections +{ + [TestFixture] + public sealed class RateLimitedEventQueueTest + { + [Test] + public void ItemDequeuedFeedbackTest() + { + List> callbacks = new List>(); + + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 1000 }) + { + queue.OnItemDequeued += (sender, args) => callbacks.Add(args); + + queue.Enqueue(10); + queue.Enqueue(20); + queue.Enqueue(30); + + ThreadingUtils.Sleep(100); + + Assert.AreEqual(1, callbacks.Count, "Initial enqueue did not trigger a dequeue"); + + queue.OnItemDequeued += (sender, args) => { ThreadingUtils.Sleep(1000); }; + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(2, callbacks.Count, "Second enqueue did not dequeue"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(2, callbacks.Count, "Third enqueue did not wait for process to complete"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(3, callbacks.Count); + } + } + + #region Properties + + [TestCase(1000)] + public void BetweenMillisecondsTest(long milliseconds) + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = milliseconds }) + Assert.AreEqual(milliseconds, queue.BetweenMilliseconds); + } + + [Test] + public void CountTest() + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 100 * 1000 }) + { + queue.Enqueue(1); + queue.Enqueue(1); + queue.Enqueue(1); + + Assert.AreEqual(3, queue.Count); + } + } + + #endregion + + #region Methods + + [Test] + public void EnqueueTest() + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 100 * 1000 }) + { + queue.Enqueue(10); + queue.Enqueue(20); + queue.Enqueue(30); + + Assert.True(queue.SequenceEqual(new[] { 10, 20, 30 })); + } + } + + [Test] + public void ClearTest() + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 100 * 1000 }) + { + queue.Enqueue(1); + queue.Enqueue(1); + queue.Enqueue(1); + + queue.Clear(); + + Assert.AreEqual(0, queue.Count); + } + } + + #endregion + } +} diff --git a/ICD.Common.Utils/Collections/RateLimitedEventQueue.cs b/ICD.Common.Utils/Collections/RateLimitedEventQueue.cs new file mode 100644 index 0000000..62eee2b --- /dev/null +++ b/ICD.Common.Utils/Collections/RateLimitedEventQueue.cs @@ -0,0 +1,182 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using ICD.Common.Utils.EventArguments; +using ICD.Common.Utils.Extensions; +using ICD.Common.Utils.Timers; + +namespace ICD.Common.Utils.Collections +{ + /// + /// RateLimitedEventQueue provides features for enqueing items to be raised via an event at a controlled interval. + /// + public sealed class RateLimitedEventQueue : IEnumerable, ICollection, IDisposable + { + /// + /// Raised to handle to the next item in the queue. + /// + public event EventHandler> OnItemDequeued; + + private readonly SafeTimer m_DequeueTimer; + private readonly Queue m_Queue; + private readonly SafeCriticalSection m_QueueSection; + + #region Properties + + /// + /// Gets/sets the time between dequeues in milliseconds. + /// + public long BetweenMilliseconds { get; set; } + + public int Count { get { return m_QueueSection.Execute(() => m_Queue.Count); } } + + bool ICollection.IsSynchronized { get { return true; } } + + object ICollection.SyncRoot { get { return this; } } + + #endregion + + /// + /// Constructor. + /// + public RateLimitedEventQueue() + { + m_Queue = new Queue(); + m_QueueSection = new SafeCriticalSection(); + + m_DequeueTimer = SafeTimer.Stopped(DequeueTimerCallback); + } + + #region Methods + + /// + /// Release resources. + /// + public void Dispose() + { + OnItemDequeued = null; + + m_DequeueTimer.Dispose(); + } + + /// + /// Enqueues the given item. + /// + /// + public void Enqueue(T item) + { + m_QueueSection.Enter(); + + try + { + m_Queue.Enqueue(item); + + if (m_Queue.Count == 1) + SendNext(); + } + finally + { + m_QueueSection.Leave(); + } + } + + /// + /// Clears the queued items. + /// + public void Clear() + { + m_QueueSection.Enter(); + + try + { + m_DequeueTimer.Stop(); + m_Queue.Clear(); + } + finally + { + m_QueueSection.Leave(); + } + } + + #endregion + + #region Private Methods + + /// + /// Sends the next pulse in the queue. + /// + private void SendNext() + { + if (!m_QueueSection.TryEnter()) + return; + + try + { + if (m_Queue.Count == 0) + return; + + T item = m_Queue.Peek(); + + OnItemDequeued.Raise(this, new GenericEventArgs(item)); + + m_DequeueTimer.Reset(BetweenMilliseconds); + } + finally + { + m_QueueSection.Leave(); + } + } + + /// + /// Called when the dequeue timer elapses. + /// + private void DequeueTimerCallback() + { + m_QueueSection.Enter(); + + try + { + m_Queue.Dequeue(); + SendNext(); + } + finally + { + m_QueueSection.Leave(); + } + } + + #endregion + + #region IEnumerable/ICollection + + public IEnumerator GetEnumerator() + { + return m_QueueSection.Execute(() => m_Queue.ToList(m_Queue.Count).GetEnumerator()); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + void ICollection.CopyTo(Array array, int index) + { + m_QueueSection.Enter(); + + try + { + foreach (T item in this) + { + array.SetValue(item, index); + index++; + } + } + finally + { + m_QueueSection.Leave(); + } + } + + #endregion + } +} diff --git a/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj b/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj index 3b766dc..dcc20bd 100644 --- a/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj +++ b/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj @@ -77,6 +77,7 @@ + diff --git a/ICD.Common.Utils/Timers/SafeTimer.cs b/ICD.Common.Utils/Timers/SafeTimer.cs index 533d30c..747c092 100644 --- a/ICD.Common.Utils/Timers/SafeTimer.cs +++ b/ICD.Common.Utils/Timers/SafeTimer.cs @@ -117,7 +117,7 @@ namespace ICD.Common.Utils.Timers } /// - /// Callback is called after the dueTime milliseconds. + /// Callback is called once after the dueTime milliseconds. /// /// public void Reset(long dueTime) @@ -125,7 +125,7 @@ namespace ICD.Common.Utils.Timers #if SIMPLSHARP m_Timer.Reset(dueTime); #else - m_Timer.Change((int)dueTime, m_RepeatPeriod); + m_Timer.Change((int)dueTime, Timeout.Infinite); #endif }