feat: enhance routing and messaging logic with improved source handling and batch processing capabilities

This commit is contained in:
Neil Dorin 2026-06-24 20:10:52 -06:00
parent a732c5e08e
commit f4fe8eff90
10 changed files with 214 additions and 76 deletions

View file

@ -411,6 +411,20 @@ public static class Extensions
audioOrSingleRoute.ExecuteRoutes(); audioOrSingleRoute.ExecuteRoutes();
videoRoute?.ExecuteRoutes(); videoRoute?.ExecuteRoutes();
// Update ICurrentSources on the destination if it implements the interface
if (request.Destination is ICurrentSources currentSourcesDevice && request.Source is IRoutingSource routingSource)
{
if (request.SignalType.HasFlag(eRoutingSignalType.Audio) || request.SignalType.HasFlag(eRoutingSignalType.AudioVideo))
{
currentSourcesDevice.SetCurrentSource(eRoutingSignalType.Audio, routingSource);
}
if (request.SignalType.HasFlag(eRoutingSignalType.Video) || request.SignalType.HasFlag(eRoutingSignalType.AudioVideo))
{
currentSourcesDevice.SetCurrentSource(eRoutingSignalType.Video, routingSource);
}
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -445,6 +459,31 @@ public static class Extensions
Debug.LogMessage(LogEventLevel.Information, "Releasing current route: {0}", destination, current.Source.Key); Debug.LogMessage(LogEventLevel.Information, "Releasing current route: {0}", destination, current.Source.Key);
current.ReleaseRoutes(clearRoute); current.ReleaseRoutes(clearRoute);
} }
// Clear ICurrentSources on the destination if clearing the route
if (clearRoute && destination is ICurrentSources currentSourcesDevice)
{
if (current != null)
{
var signalType = current.SignalType;
if (signalType.HasFlag(eRoutingSignalType.Audio) || signalType.HasFlag(eRoutingSignalType.AudioVideo))
{
currentSourcesDevice.SetCurrentSource(eRoutingSignalType.Audio, null);
}
if (signalType.HasFlag(eRoutingSignalType.Video) || signalType.HasFlag(eRoutingSignalType.AudioVideo))
{
currentSourcesDevice.SetCurrentSource(eRoutingSignalType.Video, null);
}
}
else
{
// No route descriptor found, clear all signal types
currentSourcesDevice.SetCurrentSource(eRoutingSignalType.Audio, null);
currentSourcesDevice.SetCurrentSource(eRoutingSignalType.Video, null);
}
}
} }
catch (Exception ex) catch (Exception ex)
{ {

View file

@ -74,14 +74,17 @@ public class GenericAudioOut : EssentialsDevice, IRoutingSinkWithFeedback
continue; continue;
} }
if (!signalType.HasFlag(type))
{
this.LogDebug("Skipping {type}", type);
continue;
}
this.LogDebug("setting {type}", type); this.LogDebug("setting {type}", type);
var previousSource = CurrentSources[type]; CurrentSources.TryGetValue(type, out var previousSource);
if (signalType.HasFlag(type)) UpdateCurrentSources(type, previousSource, sourceDevice);
{
UpdateCurrentSources(type, previousSource, sourceDevice);
}
} }
} }
@ -99,11 +102,11 @@ public class GenericAudioOut : EssentialsDevice, IRoutingSinkWithFeedback
// Update the current source key for the specified signal type // Update the current source key for the specified signal type
if (CurrentSourceKeys.ContainsKey(signalType)) if (CurrentSourceKeys.ContainsKey(signalType))
{ {
CurrentSourceKeys[signalType] = sourceDevice.Key; CurrentSourceKeys[signalType] = sourceDevice?.Key;
} }
else else
{ {
CurrentSourceKeys.Add(signalType, sourceDevice.Key); CurrentSourceKeys.Add(signalType, sourceDevice?.Key);
} }
// Raise the CurrentSourcesChanged event // Raise the CurrentSourcesChanged event

View file

@ -373,14 +373,17 @@ public abstract class DisplayBase : EssentialsDevice, IDisplay, ICurrentSources,
continue; continue;
} }
if (!signalType.HasFlag(type))
{
this.LogDebug("Skipping {type}", type);
continue;
}
this.LogDebug("setting {type}", type); this.LogDebug("setting {type}", type);
var previousSource = CurrentSources[type]; CurrentSources.TryGetValue(type, out var previousSource);
if (signalType.HasFlag(type)) UpdateCurrentSources(type, previousSource, sourceDevice);
{
UpdateCurrentSources(type, previousSource, sourceDevice);
}
} }
} }
@ -398,11 +401,11 @@ public abstract class DisplayBase : EssentialsDevice, IDisplay, ICurrentSources,
// Update the current source key for the specified signal type // Update the current source key for the specified signal type
if (CurrentSourceKeys.ContainsKey(signalType)) if (CurrentSourceKeys.ContainsKey(signalType))
{ {
CurrentSourceKeys[signalType] = sourceDevice.Key; CurrentSourceKeys[signalType] = sourceDevice?.Key;
} }
else else
{ {
CurrentSourceKeys.Add(signalType, sourceDevice.Key); CurrentSourceKeys.Add(signalType, sourceDevice?.Key);
} }
// Raise the CurrentSourcesChanged event // Raise the CurrentSourcesChanged event

View file

@ -63,14 +63,17 @@ public class GenericSink : EssentialsDevice, IRoutingSinkWithFeedback
continue; continue;
} }
if (!signalType.HasFlag(type))
{
this.LogDebug("Skipping {type}", type);
continue;
}
this.LogDebug("setting {type}", type); this.LogDebug("setting {type}", type);
var previousSource = CurrentSources[type]; CurrentSources.TryGetValue(type, out var previousSource);
if (signalType.HasFlag(type)) UpdateCurrentSources(type, previousSource, sourceDevice);
{
UpdateCurrentSources(type, previousSource, sourceDevice);
}
} }
} }
@ -88,11 +91,11 @@ public class GenericSink : EssentialsDevice, IRoutingSinkWithFeedback
// Update the current source key for the specified signal type // Update the current source key for the specified signal type
if (CurrentSourceKeys.ContainsKey(signalType)) if (CurrentSourceKeys.ContainsKey(signalType))
{ {
CurrentSourceKeys[signalType] = sourceDevice.Key; CurrentSourceKeys[signalType] = sourceDevice?.Key;
} }
else else
{ {
CurrentSourceKeys.Add(signalType, sourceDevice.Key); CurrentSourceKeys.Add(signalType, sourceDevice?.Key);
} }
// Raise the CurrentSourcesChanged event // Raise the CurrentSourcesChanged event

View file

@ -93,14 +93,17 @@ public class BlueJeansPc : InRoomPc, IRunRouteAction, IRoutingSinkWithFeedback
continue; continue;
} }
if (!signalType.HasFlag(type))
{
this.LogDebug("Skipping {type}", type);
continue;
}
this.LogDebug("setting {type}", type); this.LogDebug("setting {type}", type);
var previousSource = CurrentSources[type]; CurrentSources.TryGetValue(type, out var previousSource);
if (signalType.HasFlag(type)) UpdateCurrentSources(type, previousSource, sourceDevice);
{
UpdateCurrentSources(type, previousSource, sourceDevice);
}
} }
} }
@ -118,11 +121,11 @@ public class BlueJeansPc : InRoomPc, IRunRouteAction, IRoutingSinkWithFeedback
// Update the current source key for the specified signal type // Update the current source key for the specified signal type
if (CurrentSourceKeys.ContainsKey(signalType)) if (CurrentSourceKeys.ContainsKey(signalType))
{ {
CurrentSourceKeys[signalType] = sourceDevice.Key; CurrentSourceKeys[signalType] = sourceDevice?.Key;
} }
else else
{ {
CurrentSourceKeys.Add(signalType, sourceDevice.Key); CurrentSourceKeys.Add(signalType, sourceDevice?.Key);
} }
// Raise the CurrentSourcesChanged event // Raise the CurrentSourcesChanged event

View file

@ -100,14 +100,17 @@ public class GenericSoftCodec : EssentialsDevice, IRoutingSource, IRoutingSinkWi
continue; continue;
} }
if (!signalType.HasFlag(type))
{
this.LogDebug("Skipping {type}", type);
continue;
}
this.LogDebug("setting {type}", type); this.LogDebug("setting {type}", type);
var previousSource = CurrentSources[type]; CurrentSources.TryGetValue(type, out var previousSource);
if (signalType.HasFlag(type)) UpdateCurrentSources(type, previousSource, sourceDevice);
{
UpdateCurrentSources(type, previousSource, sourceDevice);
}
} }
} }
@ -125,11 +128,11 @@ public class GenericSoftCodec : EssentialsDevice, IRoutingSource, IRoutingSinkWi
// Update the current source key for the specified signal type // Update the current source key for the specified signal type
if (CurrentSourceKeys.ContainsKey(signalType)) if (CurrentSourceKeys.ContainsKey(signalType))
{ {
CurrentSourceKeys[signalType] = sourceDevice.Key; CurrentSourceKeys[signalType] = sourceDevice?.Key;
} }
else else
{ {
CurrentSourceKeys.Add(signalType, sourceDevice.Key); CurrentSourceKeys.Add(signalType, sourceDevice?.Key);
} }
// Raise the CurrentSourcesChanged event // Raise the CurrentSourcesChanged event

View file

@ -42,12 +42,10 @@ namespace PepperDash.Essentials.AppServer.Messengers
{ {
// need to copy the dictionaries to avoid enumeration issues // need to copy the dictionaries to avoid enumeration issues
var currentSourceKeys = sourceDevice.CurrentSourceKeys.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); var currentSourceKeys = sourceDevice.CurrentSourceKeys.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
var currentSources = sourceDevice.CurrentSources.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
PostStatusMessage(JToken.FromObject(new PostStatusMessage(JToken.FromObject(new
{ {
currentSourceKeys, currentSourceKeys,
currentSources,
})); }));
}; };
} }
@ -57,7 +55,6 @@ namespace PepperDash.Essentials.AppServer.Messengers
var message = new CurrentSourcesStateMessage var message = new CurrentSourcesStateMessage
{ {
CurrentSourceKeys = sourceDevice.CurrentSourceKeys, CurrentSourceKeys = sourceDevice.CurrentSourceKeys,
CurrentSources = sourceDevice.CurrentSources
}; };
PostStatusMessage(message, id); PostStatusMessage(message, id);
@ -75,11 +72,5 @@ namespace PepperDash.Essentials.AppServer.Messengers
[JsonProperty("currentSourceKeys", NullValueHandling = NullValueHandling.Ignore)] [JsonProperty("currentSourceKeys", NullValueHandling = NullValueHandling.Ignore)]
public Dictionary<eRoutingSignalType, string> CurrentSourceKeys { get; set; } public Dictionary<eRoutingSignalType, string> CurrentSourceKeys { get; set; }
/// <summary>
/// Gets or sets the CurrentSource
/// </summary>
[JsonProperty("currentSources")]
public Dictionary<eRoutingSignalType, IRoutingSource> CurrentSources { get; set; }
} }
} }

View file

@ -16,35 +16,46 @@ namespace PepperDash.Essentials
/// </summary> /// </summary>
public class MessageToClients : IQueueMessage public class MessageToClients : IQueueMessage
{ {
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
Converters = { new IsoDateTimeConverter() }
};
private readonly MobileControlWebsocketServer _server; private readonly MobileControlWebsocketServer _server;
private readonly object msgToSend; private readonly string _serializedMessage;
private readonly string _clientId;
/// <summary> /// <summary>
/// Message to send to Direct Server Clients /// Message to send to Direct Server Clients.
/// Serialization occurs here in the caller's thread context (parallel) rather than on the queue thread (sequential).
/// </summary> /// </summary>
/// <param name="msg">message object to send</param> /// <param name="msg">message object to send</param>
/// <param name="server">WebSocket server instance</param> /// <param name="server">WebSocket server instance</param>
public MessageToClients(object msg, MobileControlWebsocketServer server) public MessageToClients(object msg, MobileControlWebsocketServer server)
{ {
_server = server; _server = server;
msgToSend = msg; _serializedMessage = JsonConvert.SerializeObject(msg, Formatting.None, SerializerSettings);
_clientId = (msg as MobileControlMessage)?.ClientId;
} }
/// <summary> /// <summary>
/// Message to send to Direct Server Clients /// Message to send to Direct Server Clients.
/// Serialization occurs here in the caller's thread context (parallel) rather than on the queue thread (sequential).
/// </summary> /// </summary>
/// <param name="msg">message object to send</param> /// <param name="msg">message object to send</param>
/// <param name="server">WebSocket server instance</param> /// <param name="server">WebSocket server instance</param>
public MessageToClients(DeviceStateMessageBase msg, MobileControlWebsocketServer server) public MessageToClients(DeviceStateMessageBase msg, MobileControlWebsocketServer server)
{ {
_server = server; _server = server;
msgToSend = msg; _serializedMessage = JsonConvert.SerializeObject(msg, Formatting.None, SerializerSettings);
_clientId = null;
} }
#region Implementation of IQueueMessage #region Implementation of IQueueMessage
/// <summary> /// <summary>
/// Dispatch method /// Dispatch method - only handles WebSocket send since serialization was done at construction time
/// </summary> /// </summary>
public void Dispatch() public void Dispatch()
{ {
@ -56,24 +67,18 @@ namespace PepperDash.Essentials
return; return;
} }
var message = JsonConvert.SerializeObject(msgToSend, Formatting.None, if (_clientId != null)
new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore, Converters = { new IsoDateTimeConverter() } });
var clientSpecificMessage = msgToSend as MobileControlMessage;
if (clientSpecificMessage.ClientId != null)
{ {
var clientId = clientSpecificMessage.ClientId; _server.LogVerbose("Message TX To client {clientId}: {message}", _clientId, _serializedMessage);
_server.LogVerbose("Message TX To client {clientId}: {message}", clientId, message); _server.SendMessageToClient(_clientId, _serializedMessage);
_server.SendMessageToClient(clientId, message);
return; return;
} }
_server.SendMessageToAllClients(message); _server.SendMessageToAllClients(_serializedMessage);
_server.LogVerbose("Message TX To all clients: {message}", message); _server.LogVerbose("Message TX To all clients: {message}", _serializedMessage);
} }
catch (ThreadAbortException) catch (ThreadAbortException)
{ {

View file

@ -196,14 +196,14 @@ namespace PepperDash.Essentials
_receiveQueue = new GenericQueue( _receiveQueue = new GenericQueue(
key + "-rxqueue", key + "-rxqueue",
System.Threading.ThreadPriority.Highest, System.Threading.ThreadPriority.Highest,
25 100
); );
// The queue that will collect the outgoing messages in the order they are received // The queue that will collect the outgoing messages in the order they are received
_transmitToServerQueue = new GenericQueue( _transmitToServerQueue = new GenericQueue(
key + "-txqueue", key + "-txqueue",
System.Threading.ThreadPriority.Highest, System.Threading.ThreadPriority.Highest,
25 100
); );
if (Config.DirectServer != null && Config.DirectServer.EnableDirectServer) if (Config.DirectServer != null && Config.DirectServer.EnableDirectServer)
@ -218,7 +218,7 @@ namespace PepperDash.Essentials
_transmitToClientsQueue = new GenericQueue( _transmitToClientsQueue = new GenericQueue(
key + "-clienttxqueue", key + "-clienttxqueue",
System.Threading.ThreadPriority.Highest, System.Threading.ThreadPriority.Highest,
25 100
); );
} }
@ -1500,6 +1500,87 @@ namespace PepperDash.Essentials
}); });
} }
/// <summary>
/// Handles a batch request for full status of multiple devices.
/// Triggers all registered messengers for each device key in parallel,
/// then sends an /system/initialSyncComplete message after all have been processed.
/// </summary>
private void HandleBatchDeviceFullStatus(string clientId, JToken content)
{
if (content == null)
{
this.LogWarning("BatchDeviceFullStatus: content is null");
return;
}
var deviceKeys = content.SelectToken("deviceKeys")?.ToObject<List<string>>();
if (deviceKeys == null || deviceKeys.Count == 0)
{
this.LogWarning("BatchDeviceFullStatus: No device keys provided");
SendMessageObject(new MobileControlMessage
{
Type = "/system/initialSyncComplete",
ClientId = clientId
});
return;
}
this.LogInformation("BatchDeviceFullStatus: Processing {count} device keys", deviceKeys.Count);
var tasks = new List<Task>();
foreach (var deviceKey in deviceKeys)
{
var fullStatusPath = $"/device/{deviceKey}/fullStatus";
var handlers = _actionDictionary
.Where(kv => fullStatusPath.StartsWith(kv.Key + "/"))
.SelectMany(kv => kv.Value)
.ToList();
if (handlers.Count == 0)
{
this.LogDebug("BatchDeviceFullStatus: No handlers for {deviceKey}", deviceKey);
continue;
}
foreach (var handler in handlers)
{
tasks.Add(Task.Run(() =>
{
try
{
handler.Action(fullStatusPath, clientId, JToken.FromObject(new { deviceKey }));
}
catch (Exception ex)
{
this.LogError("BatchDeviceFullStatus: Exception in handler for {deviceKey}: {message}", deviceKey, ex.Message);
}
}));
}
}
// After all handlers have completed and enqueued their responses, send sync complete
Task.Run(async () =>
{
try
{
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
this.LogError("BatchDeviceFullStatus: Exception waiting for tasks: {message}", ex.Message);
}
SendMessageObject(new MobileControlMessage
{
Type = "/system/initialSyncComplete",
ClientId = clientId
});
});
}
private void SendDeviceInterfaces(string clientId) private void SendDeviceInterfaces(string clientId)
{ {
this.LogDebug("Sending Device interfaces"); this.LogDebug("Sending Device interfaces");
@ -1602,6 +1683,9 @@ namespace PepperDash.Essentials
case "/system/clientJoined": case "/system/clientJoined":
HandleClientJoined(message.Content); HandleClientJoined(message.Content);
break; break;
case "/system/batchDeviceFullStatus":
HandleBatchDeviceFullStatus(message.ClientId, message.Content);
break;
case "/system/reboot": case "/system/reboot":
SystemMonitorController.ProcessorReboot(); SystemMonitorController.ProcessorReboot();
break; break;

View file

@ -13,35 +13,43 @@ namespace PepperDash.Essentials
/// </summary> /// </summary>
public class TransmitMessage : IQueueMessage public class TransmitMessage : IQueueMessage
{ {
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
Converters = { new IsoDateTimeConverter() }
};
private readonly WebSocket _ws; private readonly WebSocket _ws;
private readonly object msgToSend; private readonly string _serializedMessage;
/// <summary> /// <summary>
/// Initialize a message to send /// Initialize a message to send.
/// Serialization occurs here in the caller's thread context rather than on the queue thread.
/// </summary> /// </summary>
/// <param name="msg">message object to send</param> /// <param name="msg">message object to send</param>
/// <param name="ws">WebSocket instance</param> /// <param name="ws">WebSocket instance</param>
public TransmitMessage(object msg, WebSocket ws) public TransmitMessage(object msg, WebSocket ws)
{ {
_ws = ws; _ws = ws;
msgToSend = msg; _serializedMessage = JsonConvert.SerializeObject(msg, Formatting.None, SerializerSettings);
} }
/// <summary> /// <summary>
/// Initialize a message to send /// Initialize a message to send.
/// Serialization occurs here in the caller's thread context rather than on the queue thread.
/// </summary> /// </summary>
/// <param name="msg">message object to send</param> /// <param name="msg">message object to send</param>
/// <param name="ws">WebSocket instance</param> /// <param name="ws">WebSocket instance</param>
public TransmitMessage(DeviceStateMessageBase msg, WebSocket ws) public TransmitMessage(DeviceStateMessageBase msg, WebSocket ws)
{ {
_ws = ws; _ws = ws;
msgToSend = msg; _serializedMessage = JsonConvert.SerializeObject(msg, Formatting.None, SerializerSettings);
} }
#region Implementation of IQueueMessage #region Implementation of IQueueMessage
/// <summary> /// <summary>
/// Dispatch method /// Dispatch method - only handles WebSocket send since serialization was done at construction time
/// </summary> /// </summary>
public void Dispatch() public void Dispatch()
{ {
@ -59,13 +67,9 @@ namespace PepperDash.Essentials
return; return;
} }
Debug.LogVerbose("Message TX: {0}", _serializedMessage);
var message = JsonConvert.SerializeObject(msgToSend, Formatting.None, _ws.Send(_serializedMessage);
new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore, Converters = { new IsoDateTimeConverter() } });
Debug.LogVerbose("Message TX: {0}", message);
_ws.Send(message);
} }
catch (Exception ex) catch (Exception ex)
{ {