Merge remote-tracking branch 'origin/Krang_v1.9' into Krang_v1.10

This commit is contained in:
Drew Tingen
2021-10-05 10:17:58 -04:00
17 changed files with 1103 additions and 394 deletions

View File

@@ -8,6 +8,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Changed
- Fixed an issue in IcdUriBuilder where relative pathes were not being built into a valid URI.
## [16.0.0] 2021-10-04
### Added
- Added IcdAutoResetEvent and IcdManaualResetEvent
### Changed
- EnumUtils - Fixed bug where not all values were returned for GetValueExceptNone
- ThreadedWorkerQueue - Added BetweenTime property, to wait between process callbacks
- ThreadedWorkerQueue - Added RunProcess option to stop queue from processing items
- ThreadedWorkerQueue - Added WaitForFlush events to wait until the queue is empty
- ThreadedWorkerQueue - Added count property
- ThreadedWorkerQueue - Now implements IDisposable
### Removed
- Removed RateLimitedEventQueue and tests - features added to ThreadedWorkerQueue
- Removed TryEnter from SafeCriticalSection - incorrect behavior on Crestron systems
## [15.2.0] - 2021-08-18
### Added
- TryParse overload in StringUtils, attempts to parse a GUID from a string

View File

@@ -1,100 +0,0 @@
using System.Collections.Generic;
using System.Linq;
using ICD.Common.Utils.Collections;
using ICD.Common.Utils.EventArguments;
using NUnit.Framework;
namespace ICD.Common.Utils.Tests.Collections
{
[TestFixture]
public sealed class RateLimitedEventQueueTest
{
[Test]
public void ItemDequeuedFeedbackTest()
{
List<GenericEventArgs<int>> callbacks = new List<GenericEventArgs<int>>();
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 1000 })
{
queue.OnItemDequeued += (sender, args) => callbacks.Add(args);
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
ThreadingUtils.Sleep(100);
Assert.AreEqual(1, callbacks.Count, "Initial enqueue did not trigger a dequeue");
queue.OnItemDequeued += (sender, args) => { ThreadingUtils.Sleep(1000); };
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, callbacks.Count, "Second enqueue did not dequeue");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, callbacks.Count, "Third enqueue did not wait for process to complete");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(3, callbacks.Count);
}
}
#region Properties
[TestCase(1000)]
public void BetweenMillisecondsTest(long milliseconds)
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = milliseconds })
Assert.AreEqual(milliseconds, queue.BetweenMilliseconds);
}
[Test]
public void CountTest()
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 100 * 1000 })
{
queue.Enqueue(1);
queue.Enqueue(1);
queue.Enqueue(1);
Assert.AreEqual(3, queue.Count);
}
}
#endregion
#region Methods
[Test]
public void EnqueueTest()
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 100 * 1000 })
{
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
Assert.True(queue.SequenceEqual(new[] { 10, 20, 30 }));
}
}
[Test]
public void ClearTest()
{
using (RateLimitedEventQueue<int> queue = new RateLimitedEventQueue<int> { BetweenMilliseconds = 100 * 1000 })
{
queue.Enqueue(1);
queue.Enqueue(1);
queue.Enqueue(1);
queue.Clear();
Assert.AreEqual(0, queue.Count);
}
}
#endregion
}
}

View File

@@ -22,7 +22,8 @@ namespace ICD.Common.Utils.Tests
A = 1,
B = 2,
C = 4,
D = 32
D = 32,
BandC = B | C
}
[Test]

View File

@@ -0,0 +1,188 @@
using System;
using System.Collections.Generic;
using System.Text;
using NUnit.Framework;
namespace ICD.Common.Utils.Tests
{
[TestFixture]
class IcdAutoResetEventTest
{
[TestCase(true)]
[TestCase(false)]
public void InitialStateTest(bool initialState)
{
using (var waitHandleEvent = new IcdAutoResetEvent(initialState))
{
Assert.AreEqual(initialState, waitHandleEvent.WaitOne(1), "Initial state incorrect");
}
}
[Test]
public void DefaultInitialStateTest()
{
using (var waitHandleEvent = new IcdAutoResetEvent())
{
Assert.False(waitHandleEvent.WaitOne(1), "Initial state incorrect");
}
}
[TestCase(1)]
[TestCase(5)]
[TestCase(15)]
public void MultipleThreadSetTest(int count)
{
int releasedCount = 0;
SafeCriticalSection countSection = new SafeCriticalSection();
IcdAutoResetEvent waitHandleEvent = new IcdAutoResetEvent(false);
for (int i = 0; i < count; i++)
ThreadingUtils.SafeInvoke(() =>
{
waitHandleEvent.WaitOne();
countSection.Execute(() => releasedCount++);
;
});
Assert.AreEqual(0, countSection.Execute(() => releasedCount), "Threads released early");
///Auto reset should only release one thread at a time
for (int i = 1; i <= count; i++)
{
waitHandleEvent.Set();
ThreadingUtils.Sleep(100);
Assert.AreEqual(i, countSection.Execute(() => releasedCount),
"Incorrect number of threads released");
}
waitHandleEvent.Dispose();
}
[TestCase(1)]
[TestCase(5)]
[TestCase(15)]
public void MultipleThreadResetTest(int count)
{
int releasedCount = 0;
SafeCriticalSection countSection = new SafeCriticalSection();
IcdAutoResetEvent waitHandleEvent = new IcdAutoResetEvent(true);
Assert.True(waitHandleEvent.WaitOne(1), "Initial State Wrong");
waitHandleEvent.Reset();
for (int i = 0; i < count; i++)
ThreadingUtils.SafeInvoke(() =>
{
if (waitHandleEvent.WaitOne(100))
countSection.Execute(() => releasedCount++);
});
ThreadingUtils.Sleep(2000);
Assert.AreEqual(0, countSection.Execute(() => releasedCount), "Incorrect number of threads released");
waitHandleEvent.Set();
Assert.AreEqual(0, countSection.Execute(() => releasedCount), "Threads released after timeout");
waitHandleEvent.Dispose();
}
[TestCase(1)]
[TestCase(200)]
[TestCase(5000)]
public void WaitOneTest(int waitTime)
{
bool released = false;
IcdAutoResetEvent waitHandleEvent = new IcdAutoResetEvent(false);
ThreadingUtils.SafeInvoke(() =>
{
waitHandleEvent.WaitOne();
released = true;
});
ThreadingUtils.Sleep(waitTime);
Assert.False(released, "Thread released when it shouldn't have");
waitHandleEvent.Set();
ThreadingUtils.Sleep(100);
Assert.True(released, "Thread didn't release after set event");
waitHandleEvent.Dispose();
}
[TestCase(200)]
[TestCase(500)]
[TestCase(5000)]
public void WaitOneTimeoutTest(int waitTime)
{
bool released = false;
IcdAutoResetEvent waitHandleEvent = new IcdAutoResetEvent(false);
ThreadingUtils.SafeInvoke(() =>
{
waitHandleEvent.WaitOne(waitTime * 2);
released = true;
});
ThreadingUtils.Sleep(waitTime);
Assert.False(released, "Thread released when it shouldn't have");
waitHandleEvent.Set();
ThreadingUtils.Sleep(100);
Assert.True(released, "Thread didn't release after set event");
waitHandleEvent.Dispose();
}
[TestCase(200)]
[TestCase(500)]
[TestCase(5000)]
public void WaitOneTimedOutTest(int waitTime)
{
bool released = false;
bool? returned = null;
IcdAutoResetEvent waitHandleEvent = new IcdAutoResetEvent(false);
ThreadingUtils.SafeInvoke(() =>
{
returned = waitHandleEvent.WaitOne(waitTime);
released = true;
});
ThreadingUtils.Sleep(100);
Assert.True(returned == null, "WaitOne returned when it shouldn't have");
Assert.False(released, "Thread released when it shouldn't have");
ThreadingUtils.Sleep(waitTime * 2);
Assert.True(released, "Thread didn't release after timeout");
Assert.True(returned.HasValue && returned.Value == false, "WaitOne timeout didn't return false");
waitHandleEvent.Set();
waitHandleEvent.Dispose();
}
}
}

View File

@@ -0,0 +1,183 @@
using System;
using System.Collections.Generic;
using System.Text;
using NUnit.Framework;
namespace ICD.Common.Utils.Tests
{
[TestFixture]
class IcdManualResetEventTest
{
[TestCase(true)]
[TestCase(false)]
public void InitialStateTest(bool initialState)
{
using (var waitHandleEvent = new IcdManualResetEvent(initialState))
{
Assert.AreEqual(initialState, waitHandleEvent.WaitOne(1), "Initial state incorrect");
}
}
[Test]
public void DefaultInitialStateTest()
{
using (var waitHandleEvent = new IcdManualResetEvent())
{
Assert.False(waitHandleEvent.WaitOne(1), "Initial state incorrect");
}
}
[TestCase(1)]
[TestCase(5)]
[TestCase(15)]
public void MultipleThreadSetTest(int count)
{
int releasedCount = 0;
SafeCriticalSection countSection = new SafeCriticalSection();
IcdManualResetEvent waitHandleEvent = new IcdManualResetEvent(false);
for (int i = 0; i < count; i++)
ThreadingUtils.SafeInvoke(() =>
{
waitHandleEvent.WaitOne();
countSection.Execute(() => releasedCount++);
;
});
Assert.AreEqual(0, countSection.Execute(() => releasedCount), "Threads released early");
waitHandleEvent.Set();
ThreadingUtils.Sleep(500);
Assert.AreEqual(count, countSection.Execute(() => releasedCount), "Incorrect number of threads released");
waitHandleEvent.Dispose();
}
[TestCase(1)]
[TestCase(5)]
[TestCase(15)]
public void MultipleThreadResetTest(int count)
{
int releasedCount = 0;
SafeCriticalSection countSection = new SafeCriticalSection();
IcdManualResetEvent waitHandleEvent = new IcdManualResetEvent(true);
Assert.True(waitHandleEvent.WaitOne(1), "Initial State Wrong");
waitHandleEvent.Reset();
for (int i = 0; i < count; i++)
ThreadingUtils.SafeInvoke(() =>
{
if (waitHandleEvent.WaitOne(100))
countSection.Execute(() => releasedCount++);
});
ThreadingUtils.Sleep(2000);
Assert.AreEqual(0, countSection.Execute(() => releasedCount), "Incorrect number of threads released");
waitHandleEvent.Set();
Assert.AreEqual(0, countSection.Execute(() => releasedCount), "Threads released after timeout");
waitHandleEvent.Dispose();
}
[TestCase(1)]
[TestCase(200)]
[TestCase(5000)]
public void WaitOneTest(int waitTime)
{
bool released = false;
IcdManualResetEvent waitHandleEvent = new IcdManualResetEvent(false);
ThreadingUtils.SafeInvoke(() =>
{
waitHandleEvent.WaitOne();
released = true;
});
ThreadingUtils.Sleep(waitTime);
Assert.False(released, "Thread released when it shouldn't have");
waitHandleEvent.Set();
ThreadingUtils.Sleep(100);
Assert.True(released, "Thread didn't release after set event");
waitHandleEvent.Dispose();
}
[TestCase(200)]
[TestCase(500)]
[TestCase(5000)]
public void WaitOneTimeoutTest(int waitTime)
{
bool released = false;
IcdManualResetEvent waitHandleEvent = new IcdManualResetEvent(false);
ThreadingUtils.SafeInvoke(() =>
{
waitHandleEvent.WaitOne(waitTime * 2);
released = true;
});
ThreadingUtils.Sleep(waitTime);
Assert.False(released, "Thread released when it shouldn't have");
waitHandleEvent.Set();
ThreadingUtils.Sleep(100);
Assert.True(released, "Thread didn't release after set event");
waitHandleEvent.Dispose();
}
[TestCase(200)]
[TestCase(500)]
[TestCase(5000)]
public void WaitOneTimedOutTest(int waitTime)
{
bool released = false;
bool? returned = null;
IcdManualResetEvent waitHandleEvent = new IcdManualResetEvent(false);
ThreadingUtils.SafeInvoke(() =>
{
returned = waitHandleEvent.WaitOne(waitTime);
released = true;
});
ThreadingUtils.Sleep(100);
Assert.True(returned == null, "WaitOne returned when it shouldn't have");
Assert.False(released, "Thread released when it shouldn't have");
ThreadingUtils.Sleep(waitTime * 2);
Assert.True(released, "Thread didn't release after timeout");
Assert.True(returned.HasValue && returned.Value == false, "WaitOne timeout didn't return false");
waitHandleEvent.Set();
waitHandleEvent.Dispose();
}
}
}

View File

@@ -37,30 +37,5 @@ namespace ICD.Common.Utils.Tests
Assert.Inconclusive();
}
[Test]
public void TryEnterTest()
{
int result = 0;
SafeCriticalSection section = new SafeCriticalSection();
section.Enter();
// ReSharper disable once NotAccessedVariable
ThreadingUtils.SafeInvoke(() => { result = section.TryEnter() ? 0 : 1; });
Assert.IsTrue(ThreadingUtils.Wait(() => result == 1, 1000));
section.Leave();
// ReSharper disable once RedundantAssignment
ThreadingUtils.SafeInvoke(() =>
{
result = section.TryEnter() ? 2 : 0;
section.Leave();
});
Assert.IsTrue(ThreadingUtils.Wait(() => result == 2, 1000));
}
}
}
}

View File

@@ -0,0 +1,205 @@
using System.Collections.Generic;
using System.Linq;
using NUnit.Framework;
namespace ICD.Common.Utils.Tests
{
[TestFixture]
public sealed class ThreadedWorkerQueueTest
{
[Test]
public void BetweenTimeTest()
{
List<int> callbacks = new List<int>();
using (ThreadedWorkerQueue<int> queue = new ThreadedWorkerQueue<int>((d) => callbacks.Add(d), true, 1000))
{
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
ThreadingUtils.Sleep(100);
Assert.AreEqual(1, callbacks.Count, "Initial enqueue did not trigger a dequeue");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, callbacks.Count, "Second enqueue did not dequeue");
ThreadingUtils.Sleep(100);
Assert.AreEqual(2, callbacks.Count, "Third enqueue did not wait for process to complete");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(3, callbacks.Count);
}
}
#region Properties
[TestCase(1000)]
[TestCase(0)]
[TestCase(long.MaxValue)]
public void BetweenMillisecondsTest(long milliseconds)
{
using (var queue = new ThreadedWorkerQueue<int>((d) => { }, true, milliseconds))
Assert.AreEqual(milliseconds, queue.BetweenTime);
}
[TestCase(5)]
[TestCase(0)]
[TestCase(30)]
public void CountTest(int count)
{
using (var queue = new ThreadedWorkerQueue<int>(d => { }, false))
{
for (int i = 0; i < count; i++)
queue.Enqueue(1);
Assert.AreEqual(count, queue.Count);
}
}
[Test]
public void ProcessBetweenTimeTest()
{
var processed = new List<int>();
using (var queue = new ThreadedWorkerQueue<int>(d => processed.Add(d), false, 1000))
{
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
ThreadingUtils.Sleep(100);
Assert.AreEqual(0, processed.Count, "Queue processed item early");
queue.SetRunProcess(true);
ThreadingUtils.Sleep(100);
Assert.AreEqual(1, processed.Count, "First item not processed");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(2, processed.Count, "Second item not processed");
queue.SetRunProcess(false);
ThreadingUtils.Sleep(2000);
Assert.AreEqual(2, processed.Count, "Item processed after stopping run process");
Assert.AreEqual(1, queue.Count, "Incorrect number of items in queue");
// Queue lower priority item
queue.Enqueue(5, 1);
queue.SetRunProcess(true);
ThreadingUtils.Sleep(100);
Assert.AreEqual(3, processed.Count, "Third item not processed");
Assert.AreEqual(5, processed[2], "Dequeued incorrect priority item");
ThreadingUtils.Sleep(1000);
Assert.AreEqual(4, processed.Count, "Didn't process all items");
Assert.True(processed.SequenceEqual(new[] { 1, 2, 5, 3 }), "Processed sequence incorrect");
}
}
[Test]
public void FlushQueueBetweenTimeTest()
{
var processed = new List<int>();
using (var queue = new ThreadedWorkerQueue<int>(d => processed.Add(d), false, 1000))
{
Assert.True(queue.WaitForFlush(1), "WaitForFlush on empty queue failed");
queue.Enqueue(11);
queue.Enqueue(21);
queue.Enqueue(31);
queue.Enqueue(41);
queue.Enqueue(51);
queue.Enqueue(61);
queue.Enqueue(71);
queue.Enqueue(81);
queue.Enqueue(91);
queue.Enqueue(101);
queue.SetRunProcess(true);
Assert.False(queue.WaitForFlush(1250), "WaitForFlush didn't time out");
Assert.AreEqual(2, processed.Count, "Didn't process correct number of items in time frame");
Assert.True(queue.WaitForFlush(), "WaitForFlush failed");
Assert.AreEqual(10, processed.Count, "Not all items processed");
Assert.AreEqual(0, queue.Count, "Queue not empty");
}
}
#endregion
#region Methods
[TestCase(false)]
[TestCase(true)]
public void SetRunProcessTest(bool runProcess)
{
using (var queue = new ThreadedWorkerQueue<int>(d => { }, !runProcess))
{
Assert.AreEqual(!runProcess, queue.RunProcess, "Initial state wrong");
queue.SetRunProcess(runProcess);
Assert.AreEqual(runProcess, queue.RunProcess, "Didn't set to correct state 1st time");
queue.SetRunProcess(!runProcess);
Assert.AreEqual(!runProcess, queue.RunProcess, "Didn't set to correct state 2nd time");
}
}
[Test]
public void EnqueueTest()
{
var processed = new List<int>();
using (var queue = new ThreadedWorkerQueue<int>(d => processed.Add(d), false))
{
queue.Enqueue(10);
queue.Enqueue(20);
queue.Enqueue(30);
Assert.AreEqual(3, queue.Count, "First queue count wrong");
queue.Enqueue(40);
queue.Enqueue(50);
queue.Enqueue(60);
Assert.AreEqual(6, queue.Count, "Second queue count wrong");
queue.SetRunProcess(true);
Assert.True(queue.WaitForFlush(),"Queue didn't flush after processing");
Assert.True(processed.SequenceEqual(new[] { 10, 20, 30, 40, 50, 60 }), "Processed sequence wrong");
}
}
[Test]
public void ClearTest()
{
using (var queue = new ThreadedWorkerQueue<int>(d => { }, false))
{
queue.Enqueue(1);
queue.Enqueue(1);
queue.Enqueue(1);
queue.Clear();
Assert.AreEqual(0, queue.Count);
}
}
#endregion
}
}

View File

@@ -0,0 +1,106 @@
using System;
using ICD.Common.Properties;
#if SIMPLSHARP
using Crestron.SimplSharp;
#else
using System.Threading;
#endif
namespace ICD.Common.Utils
{
public abstract class AbstractIcdResetEvent : IDisposable
{
#if SIMPLSHARP
private readonly CEvent m_Event;
#else
private readonly EventWaitHandle m_Event;
#endif
#if SIMPLSHARP
/// <summary>
/// Initializes a new instance of the IcdManualResetEvent class with the CEvent
/// </summary>
[PublicAPI]
protected AbstractIcdResetEvent(CEvent eventHandle)
{
m_Event = eventHandle;
}
#else
/// <summary>
/// Initializes a new instance of the IcdManualResetEvent class with the CEvent
/// </summary>
[PublicAPI]
protected AbstractIcdResetEvent(EventWaitHandle eventHandle)
{
m_Event = eventHandle;
}
#endif
/// <summary>
/// Sets the state of the event to signaled, allowing one or more waiting threads to proceed.
/// </summary>
/// <returns>true if the operation succeeds; otherwise, false.</returns>
[PublicAPI]
public bool Set()
{
return m_Event.Set();
}
/// <summary>
/// Sets the state of the event to nonsignaled, causing threads to block.
/// </summary>
/// <returns>true if the operation succeeds; otherwise, false.</returns>
[PublicAPI]
public bool Reset()
{
return m_Event.Reset();
}
/// <summary>
/// Function to wait for the event to be signaled. This will block indefinitely until the event is signaled.
/// </summary>
/// <returns>True if the current instance receives a signal otherwise false.</returns>
[PublicAPI]
public bool WaitOne()
{
#if SIMPLSHARP
return m_Event.Wait();
#else
return m_Event.WaitOne();
#endif
}
/// <summary>
/// Function to wait for the event to be signaled.
/// </summary>
/// <param name="timeout">Timeout in milliseconds or Timeout.Infinite to wait indefinitely.</param>
/// <returns>True if the current instance receives a signal otherwise false.</returns>
public bool WaitOne(int timeout)
{
#if SIMPLSHARP
return m_Event.Wait(timeout);
#else
return m_Event.WaitOne(timeout);
#endif
}
/// <summary>
/// Clean up of resources.
/// </summary>
public void Dispose()
{
m_Event.Dispose();
}
/// <summary>
/// Close the event to release all resources used by this instance.
/// </summary>
[PublicAPI]
public void Close()
{
m_Event.Close();
}
}
}

View File

@@ -1,204 +0,0 @@
using System;
using System.Collections;
using System.Collections.Generic;
using ICD.Common.Properties;
using ICD.Common.Utils.EventArguments;
using ICD.Common.Utils.Extensions;
using ICD.Common.Utils.Timers;
#if !SIMPLSHARP
using System.Diagnostics;
#endif
namespace ICD.Common.Utils.Collections
{
/// <summary>
/// RateLimitedEventQueue provides features for enqueing items to be raised via an event at a controlled interval.
/// </summary>
#if !SIMPLSHARP
[DebuggerDisplay("Count = {Count}")]
#endif
public sealed class RateLimitedEventQueue<T> : IEnumerable<T>, ICollection, IDisposable
{
/// <summary>
/// Raised to handle to the next item in the queue.
/// </summary>
public event EventHandler<GenericEventArgs<T>> OnItemDequeued;
private readonly SafeTimer m_DequeueTimer;
private readonly Queue<T> m_Queue;
private readonly SafeCriticalSection m_QueueSection;
#region Properties
/// <summary>
/// Gets/sets the time between dequeues in milliseconds.
/// </summary>
public long BetweenMilliseconds { get; set; }
public int Count { get { return m_QueueSection.Execute(() => m_Queue.Count); } }
bool ICollection.IsSynchronized { get { return true; } }
[NotNull]
object ICollection.SyncRoot { get { return this; } }
#endregion
/// <summary>
/// Constructor.
/// </summary>
public RateLimitedEventQueue()
{
m_Queue = new Queue<T>();
m_QueueSection = new SafeCriticalSection();
m_DequeueTimer = SafeTimer.Stopped(DequeueTimerCallback);
}
#region Methods
/// <summary>
/// Release resources.
/// </summary>
public void Dispose()
{
OnItemDequeued = null;
m_QueueSection.Enter();
try
{
m_DequeueTimer.Dispose();
}
finally
{
m_QueueSection.Leave();
}
}
/// <summary>
/// Enqueues the given item.
/// </summary>
/// <param name="item"></param>
public void Enqueue([CanBeNull] T item)
{
m_QueueSection.Enter();
try
{
m_Queue.Enqueue(item);
if (m_Queue.Count == 1)
SendNext();
}
finally
{
m_QueueSection.Leave();
}
}
/// <summary>
/// Clears the queued items.
/// </summary>
public void Clear()
{
m_QueueSection.Enter();
try
{
m_DequeueTimer.Stop();
m_Queue.Clear();
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
#region Private Methods
/// <summary>
/// Sends the next pulse in the queue.
/// </summary>
private void SendNext()
{
if (!m_QueueSection.TryEnter())
return;
try
{
if (m_Queue.Count == 0)
return;
T item = m_Queue.Peek();
OnItemDequeued.Raise(this, new GenericEventArgs<T>(item));
m_DequeueTimer.Reset(BetweenMilliseconds);
}
finally
{
m_QueueSection.Leave();
}
}
/// <summary>
/// Called when the dequeue timer elapses.
/// </summary>
private void DequeueTimerCallback()
{
m_QueueSection.Enter();
try
{
m_Queue.Dequeue();
SendNext();
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
#region IEnumerable/ICollection
[NotNull]
public IEnumerator<T> GetEnumerator()
{
return m_QueueSection.Execute(() => m_Queue.ToList(m_Queue.Count).GetEnumerator());
}
[NotNull]
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
void ICollection.CopyTo([NotNull] Array array, int index)
{
if (array == null)
throw new ArgumentNullException("array");
m_QueueSection.Enter();
try
{
foreach (T item in this)
{
array.SetValue(item, index);
index++;
}
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
}
}

View File

@@ -198,7 +198,7 @@ namespace ICD.Common.Utils
if (type == null)
throw new ArgumentNullException("type");
return GetFlagsExceptNone(type);
return GetValues(type).Where(v => (int)v != 0);
}
/// <summary>

View File

@@ -84,7 +84,6 @@
<Compile Include="Collections\IcdSortedDictionary.cs" />
<Compile Include="Collections\INotifyCollectionChanged.cs" />
<Compile Include="Collections\PriorityQueue.cs" />
<Compile Include="Collections\RateLimitedEventQueue.cs" />
<Compile Include="Collections\WeakKeyDictionary.cs" />
<Compile Include="Comparers\FileNameEqualityComparer.cs" />
<Compile Include="Comparers\PredicateComparer.cs" />
@@ -121,6 +120,9 @@
<Compile Include="Extensions\CollectionExtensions.cs" />
<Compile Include="Extensions\RegistryExtensions.cs" />
<Compile Include="Extensions\VersionExtensions.cs" />
<Compile Include="AbstractIcdResetEvent.cs" />
<Compile Include="IcdAutoResetEvent.cs" />
<Compile Include="IcdManualResetEvent.cs" />
<Compile Include="ThreadedWorkerQueue.cs" />
<Compile Include="TimeZoneInfo\IcdTimeZoneInfo.cs" />
<Compile Include="ObfuscationSettings.cs" />

View File

@@ -0,0 +1,39 @@
using ICD.Common.Properties;
#if SIMPLSHARP
using Crestron.SimplSharp;
# else
using System.Threading;
#endif
namespace ICD.Common.Utils
{
/// <summary>
/// Notifies one or more waiting threads that an event has occurred. Every thread that passes WaitOne causes the event to be reset
/// </summary>
[PublicAPI]
public sealed class IcdAutoResetEvent : AbstractIcdResetEvent
{
/// <summary>
/// Initializes a new instance of the IcdAutoResetEvent class with the initial state to nonsignaled.
/// </summary>
[PublicAPI]
public IcdAutoResetEvent() : this(false)
{
}
/// <summary>
/// Initializes a new instance of the IcdManualResetEvent class with a Boolean value indicating whether to set the initial state to signaled.
/// </summary>
/// <param name="initialState">true to set the initial state signaled; false to set the initial state to nonsignaled.</param>
[PublicAPI]
public IcdAutoResetEvent(bool initialState) :
#if SIMPLSHARP
base(new CEvent(true, initialState))
#else
base(new AutoResetEvent(initialState))
#endif
{
}
}
}

View File

@@ -0,0 +1,40 @@
using ICD.Common.Properties;
#if SIMPLSHARP
using Crestron.SimplSharp;
#else
using System.Threading;
#endif
namespace ICD.Common.Utils
{
/// <summary>
/// Notifies one or more waiting threads that an event has occurred.
/// </summary>
[PublicAPI]
public sealed class IcdManualResetEvent : AbstractIcdResetEvent
{
/// <summary>
/// Initializes a new instance of the IcdManualResetEvent class with the initial state to nonsignaled.
/// </summary>
[PublicAPI]
public IcdManualResetEvent() : this(false)
{
}
/// <summary>
/// Initializes a new instance of the IcdManualResetEvent class with a Boolean value indicating whether to set the initial state to signaled.
/// </summary>
/// <param name="initialState">true to set the initial state signaled; false to set the initial state to nonsignaled.</param>
[PublicAPI]
public IcdManualResetEvent(bool initialState) :
#if SIMPLSHARP
base(new CEvent(false, initialState))
#else
base(new ManualResetEvent(initialState))
#endif
{
}
}
}

View File

@@ -4,4 +4,4 @@ using System.Reflection;
[assembly: AssemblyCompany("ICD Systems")]
[assembly: AssemblyProduct("ICD.Common.Utils")]
[assembly: AssemblyCopyright("Copyright © ICD Systems 2021")]
[assembly: AssemblyVersion("15.2.0.0")]
[assembly: AssemblyVersion("16.0.0.0")]

View File

@@ -90,31 +90,6 @@ namespace ICD.Common.Utils
}
}
/// <summary>
/// Attempt to enter the critical section without blocking.
/// </summary>
/// <returns>
/// True, calling thread has ownership of the critical section; otherwise, false.
/// </returns>
public bool TryEnter()
{
if (m_CriticalSection == null)
return false;
try
{
#if DEBUG
return m_CriticalSection.WaitForMutex(0);
#else
return m_CriticalSection.TryEnter();
#endif
}
catch (ObjectDisposedException)
{
return false;
}
}
#endregion
}
}

View File

@@ -24,17 +24,6 @@ namespace ICD.Common.Utils
Monitor.Exit(this);
}
/// <summary>
/// Attempt to enter the critical section without blocking.
/// </summary>
/// <returns>
/// True, calling thread has ownership of the critical section; otherwise, false.
/// </returns>
public bool TryEnter()
{
return Monitor.TryEnter(this);
}
#endregion
}
}

View File

@@ -1,6 +1,7 @@
using System;
using ICD.Common.Properties;
using ICD.Common.Utils.Collections;
using ICD.Common.Utils.Timers;
namespace ICD.Common.Utils
{
@@ -10,30 +11,127 @@ namespace ICD.Common.Utils
/// them in a worker thread one at a time
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class ThreadedWorkerQueue<T>
public sealed class ThreadedWorkerQueue<T> : IDisposable
{
/// <summary>
/// Underlying Queue to hold items
/// </summary>
private readonly PriorityQueue<T> m_Queue;
/// <summary>
/// Bool to track if a thread is currently running to dequeue and process items
/// </summary>
private bool m_ProcessRunning;
/// <summary>
/// Critical section to lock the queue and the process running bool
/// </summary>
private readonly SafeCriticalSection m_QueueSection;
private readonly SafeCriticalSection m_ProcessSection;
/// <summary>
/// Action that should be run for every item as it is dequeued
/// </summary>
private readonly Action<T> m_ProcessAction;
/// <summary>
/// Event used to block and wait for the queue to empty
/// </summary>
private readonly IcdManualResetEvent m_FlushEvent;
/// <summary>
/// If true, the queue will be processed.
/// </summary>
private bool m_RunProcess;
private long m_BetweenTime;
/// <summary>
/// Time to wait between processing items
/// If less than or equal to 0, items will be processed immediately
/// </summary>
[PublicAPI]
public long BetweenTime
{
get { return m_BetweenTime; }
set
{
if (value < 0)
throw new InvalidOperationException("BetweenTime can't be negative");
m_BetweenTime = value;
}
}
/// <summary>
/// Timer used to wait the BetweenTime
/// </summary>
private readonly SafeTimer m_BetweenTimeTimer;
/// <summary>
/// While true, the queue will be processed
/// When set to false, the queue will be be stopped after any current processing is finished
/// When set to true, the queue processing will be started in a new thread if there are any items in the queue
///
/// </summary>
[PublicAPI]
public bool RunProcess { get { return m_RunProcess; } }
/// <summary>
/// Gets the current count of the items in the queue
/// </summary>
public int Count
{
get { return m_QueueSection.Execute(() => m_Queue.Count); }
}
#region Constructors
/// <summary>
/// Constructor.
/// </summary>
/// <param name="processItemAction">Action to process the dequeued items</param>
public ThreadedWorkerQueue([NotNull] Action<T> processItemAction)
: this(processItemAction, true, 0)
{
}
/// <summary>
/// Constructor.
/// </summary>
/// <param name="processItemAction">Action to process the dequeued items</param>
/// <param name="runProcess">If true, queued items will be processed, if false, no processing will happen until runProcess is set</param>
public ThreadedWorkerQueue([NotNull] Action<T> processItemAction, bool runProcess)
: this(processItemAction, runProcess, 0)
{
}
/// <summary>
/// Constructor.
/// </summary>
/// <param name="processItemAction">Action to process the dequeued items</param>
/// <param name="runProcess">If true, queued items will be processed, if false, no processing will happen until runProcess is set</param>
/// <param name="betweenTime">Time to wait between processing items</param>
public ThreadedWorkerQueue([NotNull] Action<T> processItemAction, bool runProcess, long betweenTime)
{
if (processItemAction == null)
throw new ArgumentNullException("processItemAction");
if (betweenTime < 0)
throw new ArgumentOutOfRangeException("betweenTime", "Between time can't be negative");
m_Queue = new PriorityQueue<T>();
m_QueueSection = new SafeCriticalSection();
m_ProcessSection = new SafeCriticalSection();
m_FlushEvent = new IcdManualResetEvent(true);
m_RunProcess = runProcess;
m_ProcessAction = processItemAction;
m_BetweenTimeTimer = SafeTimer.Stopped(BetweenTimerCallback);
BetweenTime = betweenTime;
}
#region Queue Methods
#endregion
#region Methods
/// <summary>
/// Clears the collection.
@@ -44,6 +142,68 @@ namespace ICD.Common.Utils
m_QueueSection.Execute(() => m_Queue.Clear());
}
/// <summary>
/// Blocks until the queue is empty
/// </summary>
/// <returns>true if the queue empties</returns>
[PublicAPI]
public bool WaitForFlush()
{
return m_FlushEvent.WaitOne();
}
/// <summary>
/// Blocks until the queue is empty, or the timeout is reached
/// </summary>
/// <param name="timeout">Timeout in ms</param>
/// <returns>true if the queue empties, false if the timeout is reached</returns>
[PublicAPI]
public bool WaitForFlush(int timeout)
{
return m_FlushEvent.WaitOne(timeout);
}
public void SetRunProcess(bool runProcess)
{
m_QueueSection.Enter();
try
{
if (m_RunProcess == runProcess)
return;
m_RunProcess = runProcess;
if (runProcess)
EnableProcessQueue();
}
finally
{
m_QueueSection.Leave();
}
}
public void Dispose()
{
m_QueueSection.Enter();
try
{
SetRunProcess(false);
m_Queue.Clear();
m_BetweenTimeTimer.Stop();
m_BetweenTimeTimer.Dispose();
m_FlushEvent.Dispose();
}
finally
{
m_QueueSection.Leave();
}
}
#endregion
#region Enqueue Methods
/// <summary>
/// Adds the item to the end of the queue.
/// </summary>
@@ -51,7 +211,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void Enqueue([CanBeNull] T item)
{
Enqueue(() => m_Queue.Enqueue(item));
Enqueue(item, () => m_Queue.Enqueue(item));
}
/// <summary>
@@ -63,7 +223,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void Enqueue([CanBeNull] T item, int priority)
{
Enqueue(() => m_Queue.Enqueue(item, priority));
Enqueue(item, () => m_Queue.Enqueue(item, priority));
}
/// <summary>
@@ -75,7 +235,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void Enqueue([CanBeNull] T item, int priority, int position)
{
Enqueue(() => m_Queue.Enqueue(item, priority, position));
Enqueue(item, () => m_Queue.Enqueue(item, priority, position));
}
/// <summary>
@@ -85,7 +245,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueFirst([CanBeNull] T item)
{
Enqueue(() => m_Queue.EnqueueFirst(item));
Enqueue(item, () => m_Queue.EnqueueFirst(item));
}
/// <summary>
@@ -98,7 +258,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueRemove([CanBeNull] T item, [NotNull] Func<T, bool> remove)
{
Enqueue(() => m_Queue.EnqueueRemove(item, remove));
Enqueue(item, () => m_Queue.EnqueueRemove(item, remove));
}
/// <summary>
@@ -112,7 +272,7 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueRemove([CanBeNull] T item, [NotNull] Func<T, bool> remove, int priority)
{
Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority));
Enqueue(item, () => m_Queue.EnqueueRemove(item, remove, priority));
}
/// <summary>
@@ -127,34 +287,168 @@ namespace ICD.Common.Utils
[PublicAPI]
public void EnqueueRemove([CanBeNull] T item, [NotNull] Func<T, bool> remove, int priority, bool deDuplicateToEndOfQueue)
{
Enqueue(() => m_Queue.EnqueueRemove(item, remove, priority, deDuplicateToEndOfQueue));
Enqueue(item, () => m_Queue.EnqueueRemove(item, remove, priority, deDuplicateToEndOfQueue));
}
#endregion
#region Private Methods
private void Enqueue(Action enqueueAction)
/// <summary>
/// Enqueues an item using the given enqueueAction
/// Starts processing the queue if it's not already processing
/// </summary>
/// <param name="item"></param>
/// <param name="enqueueAction"></param>
private void Enqueue(T item, Action enqueueAction)
{
m_QueueSection.Execute(enqueueAction);
ThreadingUtils.SafeInvoke(ProcessQueue);
}
bool startWorkerThread;
T nextItem = default(T);
private void ProcessQueue()
{
if (!m_ProcessSection.TryEnter())
return;
m_QueueSection.Enter();
try
{
T item = default(T);
while (m_QueueSection.Execute(() => m_Queue.TryDequeue(out item)))
m_ProcessAction(item);
// Reset the flush event, to block the flush thread while something is in the queue/being processed
m_FlushEvent.Reset();
// We'll start a worker thread if the process isn't running and should be
startWorkerThread = !m_ProcessRunning && RunProcess;
if (startWorkerThread)
m_ProcessRunning = true;
if (m_Queue.Count == 0 && startWorkerThread)
{
// If the queue is empty and we're starting the worker thread
// we won't bother adding an item just to dequeue it again
nextItem = item;
}
else if (startWorkerThread)
{
// If the queue isn't empty and we are starting the worker thread
// enqueue the item and dequeue the next item.
enqueueAction();
nextItem = m_Queue.Dequeue();
}
else
{
// If we're not starting the worker thread, just enqueue the item
enqueueAction();
}
}
finally
{
m_ProcessSection.Leave();
m_QueueSection.Leave();
}
if (startWorkerThread)
ProcessQueue(nextItem);
}
/// <summary>
/// Starts processing the queue by starting a thread
/// When running this, m_ProcessRunning MUST be set to true already,
/// and RunProcess should be checked before this
/// </summary>
/// <param name="item"></param>
private void ProcessQueue(T item)
{
ThreadingUtils.SafeInvoke(() => ProcessQueueThread(item));
}
/// <summary>
/// Processes the item, meant to be run in it's own thread
/// When running this, m_ProcessRunning MUST be set to true already,
/// and RunProcess should be checked before this
/// </summary>
/// <param name="item"></param>
private void ProcessQueueThread(T item)
{
while (true)
{
// Run the process action
m_ProcessAction(item);
m_QueueSection.Enter();
try
{
bool runProcess = RunProcess;
bool hasItem = m_Queue.Count > 0;
long betweenTime = BetweenTime;
// Stop processing and return if we don't have another item, or we aren't to keep running the process
if (!runProcess || !hasItem)
{
m_ProcessRunning = false;
if (!hasItem)
m_FlushEvent.Set();
return;
}
// If a between time is set, start the timer (and leave m_ProcessRunning set)
if (betweenTime > 0)
{
m_BetweenTimeTimer.Reset(betweenTime);
return;
}
item = m_Queue.Dequeue();
}
finally
{
m_QueueSection.Leave();
}
}
}
/// <summary>
/// Starts processing an item if we should be
/// Run by RunProcess being set to true
/// </summary>
private void EnableProcessQueue()
{
T item;
m_QueueSection.Enter();
try
{
if (!RunProcess || m_ProcessRunning || !m_Queue.TryDequeue(out item))
return;
m_ProcessRunning = true;
}
finally
{
m_QueueSection.Leave();
}
ProcessQueue(item);
}
private void BetweenTimerCallback()
{
T item;
m_QueueSection.Enter();
try
{
// Check to make sure RunProcess is still true
// Try to dequeue item
if (!RunProcess || !m_Queue.TryDequeue(out item))
{
m_ProcessRunning = false;
return;
}
}
finally
{
m_QueueSection.Leave();
}
// Process the dequeued item
ProcessQueueThread(item);
}
#endregion