From 621d83d8dcdaa465b7a80f0055c91bd9f02cb503 Mon Sep 17 00:00:00 2001 From: Chris Cameron Date: Fri, 29 Jun 2018 16:37:45 -0400 Subject: [PATCH 1/2] feat: Added RateLimitedEventQueue collection for throttling events --- CHANGELOG.md | 1 + .../Collections/RateLimitedEventQueueTest.cs | 100 ++++++++++ .../Collections/RateLimitedEventQueue.cs | 182 ++++++++++++++++++ .../ICD.Common.Utils_SimplSharp.csproj | 1 + 4 files changed, 284 insertions(+) create mode 100644 ICD.Common.Utils.Tests/Collections/RateLimitedEventQueueTest.cs create mode 100644 ICD.Common.Utils/Collections/RateLimitedEventQueue.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 0611331..1079385 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added - Adding SequenceComparer for ordering collections of lists, arrays, etc + - Added RateLimitedEventQueue collection for throttling events ### Changed - Potential fix for timer disposal on Net Standard diff --git a/ICD.Common.Utils.Tests/Collections/RateLimitedEventQueueTest.cs b/ICD.Common.Utils.Tests/Collections/RateLimitedEventQueueTest.cs new file mode 100644 index 0000000..62c20b2 --- /dev/null +++ b/ICD.Common.Utils.Tests/Collections/RateLimitedEventQueueTest.cs @@ -0,0 +1,100 @@ +using System.Collections.Generic; +using System.Linq; +using ICD.Common.Utils.Collections; +using ICD.Common.Utils.EventArguments; +using NUnit.Framework; + +namespace ICD.Common.Utils.Tests.Collections +{ + [TestFixture] + public sealed class RateLimitedEventQueueTest + { + [Test] + public void ItemDequeuedFeedbackTest() + { + List> callbacks = new List>(); + + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 1000 }) + { + queue.OnItemDequeued += (sender, args) => callbacks.Add(args); + + queue.Enqueue(10); + queue.Enqueue(20); + queue.Enqueue(30); + + ThreadingUtils.Sleep(100); + + Assert.AreEqual(1, callbacks.Count, "Initial enqueue did not trigger a dequeue"); + + queue.OnItemDequeued += (sender, args) => { ThreadingUtils.Sleep(1000); }; + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(2, callbacks.Count, "Second enqueue did not dequeue"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(2, callbacks.Count, "Third enqueue did not wait for process to complete"); + + ThreadingUtils.Sleep(1000); + + Assert.AreEqual(3, callbacks.Count); + } + } + + #region Properties + + [TestCase(1000)] + public void BetweenMillisecondsTest(long milliseconds) + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = milliseconds }) + Assert.AreEqual(milliseconds, queue.BetweenMilliseconds); + } + + [Test] + public void CountTest() + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 100 * 1000 }) + { + queue.Enqueue(1); + queue.Enqueue(1); + queue.Enqueue(1); + + Assert.AreEqual(3, queue.Count); + } + } + + #endregion + + #region Methods + + [Test] + public void EnqueueTest() + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 100 * 1000 }) + { + queue.Enqueue(10); + queue.Enqueue(20); + queue.Enqueue(30); + + Assert.True(queue.SequenceEqual(new[] { 10, 20, 30 })); + } + } + + [Test] + public void ClearTest() + { + using (RateLimitedEventQueue queue = new RateLimitedEventQueue { BetweenMilliseconds = 100 * 1000 }) + { + queue.Enqueue(1); + queue.Enqueue(1); + queue.Enqueue(1); + + queue.Clear(); + + Assert.AreEqual(0, queue.Count); + } + } + + #endregion + } +} diff --git a/ICD.Common.Utils/Collections/RateLimitedEventQueue.cs b/ICD.Common.Utils/Collections/RateLimitedEventQueue.cs new file mode 100644 index 0000000..62eee2b --- /dev/null +++ b/ICD.Common.Utils/Collections/RateLimitedEventQueue.cs @@ -0,0 +1,182 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using ICD.Common.Utils.EventArguments; +using ICD.Common.Utils.Extensions; +using ICD.Common.Utils.Timers; + +namespace ICD.Common.Utils.Collections +{ + /// + /// RateLimitedEventQueue provides features for enqueing items to be raised via an event at a controlled interval. + /// + public sealed class RateLimitedEventQueue : IEnumerable, ICollection, IDisposable + { + /// + /// Raised to handle to the next item in the queue. + /// + public event EventHandler> OnItemDequeued; + + private readonly SafeTimer m_DequeueTimer; + private readonly Queue m_Queue; + private readonly SafeCriticalSection m_QueueSection; + + #region Properties + + /// + /// Gets/sets the time between dequeues in milliseconds. + /// + public long BetweenMilliseconds { get; set; } + + public int Count { get { return m_QueueSection.Execute(() => m_Queue.Count); } } + + bool ICollection.IsSynchronized { get { return true; } } + + object ICollection.SyncRoot { get { return this; } } + + #endregion + + /// + /// Constructor. + /// + public RateLimitedEventQueue() + { + m_Queue = new Queue(); + m_QueueSection = new SafeCriticalSection(); + + m_DequeueTimer = SafeTimer.Stopped(DequeueTimerCallback); + } + + #region Methods + + /// + /// Release resources. + /// + public void Dispose() + { + OnItemDequeued = null; + + m_DequeueTimer.Dispose(); + } + + /// + /// Enqueues the given item. + /// + /// + public void Enqueue(T item) + { + m_QueueSection.Enter(); + + try + { + m_Queue.Enqueue(item); + + if (m_Queue.Count == 1) + SendNext(); + } + finally + { + m_QueueSection.Leave(); + } + } + + /// + /// Clears the queued items. + /// + public void Clear() + { + m_QueueSection.Enter(); + + try + { + m_DequeueTimer.Stop(); + m_Queue.Clear(); + } + finally + { + m_QueueSection.Leave(); + } + } + + #endregion + + #region Private Methods + + /// + /// Sends the next pulse in the queue. + /// + private void SendNext() + { + if (!m_QueueSection.TryEnter()) + return; + + try + { + if (m_Queue.Count == 0) + return; + + T item = m_Queue.Peek(); + + OnItemDequeued.Raise(this, new GenericEventArgs(item)); + + m_DequeueTimer.Reset(BetweenMilliseconds); + } + finally + { + m_QueueSection.Leave(); + } + } + + /// + /// Called when the dequeue timer elapses. + /// + private void DequeueTimerCallback() + { + m_QueueSection.Enter(); + + try + { + m_Queue.Dequeue(); + SendNext(); + } + finally + { + m_QueueSection.Leave(); + } + } + + #endregion + + #region IEnumerable/ICollection + + public IEnumerator GetEnumerator() + { + return m_QueueSection.Execute(() => m_Queue.ToList(m_Queue.Count).GetEnumerator()); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + void ICollection.CopyTo(Array array, int index) + { + m_QueueSection.Enter(); + + try + { + foreach (T item in this) + { + array.SetValue(item, index); + index++; + } + } + finally + { + m_QueueSection.Leave(); + } + } + + #endregion + } +} diff --git a/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj b/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj index 3b766dc..dcc20bd 100644 --- a/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj +++ b/ICD.Common.Utils/ICD.Common.Utils_SimplSharp.csproj @@ -77,6 +77,7 @@ + From 514c0eaec5b155718ea1e92c45a7c6f2500d681a Mon Sep 17 00:00:00 2001 From: Chris Cameron Date: Fri, 29 Jun 2018 17:11:32 -0400 Subject: [PATCH 2/2] fix: Fixing bug where Timer.Reset() would continue repeating on an interval in Net Standard --- CHANGELOG.md | 1 + ICD.Common.Utils/Timers/SafeTimer.cs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1079385..2a8aaec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed - Potential fix for timer disposal on Net Standard - Added workaround for older RPC servers where the typestring being broadcast would stil include _SimplSharp, now will be stripped + - Fixing bug where Timer.Reset() would continue repeating on an interval in Net Standard ## [3.6.0] - 2018-06-19 ### Added diff --git a/ICD.Common.Utils/Timers/SafeTimer.cs b/ICD.Common.Utils/Timers/SafeTimer.cs index 533d30c..747c092 100644 --- a/ICD.Common.Utils/Timers/SafeTimer.cs +++ b/ICD.Common.Utils/Timers/SafeTimer.cs @@ -117,7 +117,7 @@ namespace ICD.Common.Utils.Timers } /// - /// Callback is called after the dueTime milliseconds. + /// Callback is called once after the dueTime milliseconds. /// /// public void Reset(long dueTime) @@ -125,7 +125,7 @@ namespace ICD.Common.Utils.Timers #if SIMPLSHARP m_Timer.Reset(dueTime); #else - m_Timer.Change((int)dueTime, m_RepeatPeriod); + m_Timer.Change((int)dueTime, Timeout.Infinite); #endif }