Removed all HTTP Post logic from CotijaSystemController and switched to websockets for sending data to server

This commit is contained in:
Neil Dorin
2018-03-21 12:59:41 -06:00
parent 8d03e81431
commit 82fad55c1e
5 changed files with 142 additions and 208 deletions

View File

@@ -21,12 +21,8 @@ namespace PepperDash.Essentials
{
public class CotijaSystemController : Device
{
int SseMessageLengthBeforeFailureCount;
WebSocketClient WSClient;
//GenericHttpSseClient SseClient;
/// <summary>
/// Prevents post operations from stomping on each other and getting lost
/// </summary>
@@ -36,8 +32,6 @@ namespace PepperDash.Essentials
public CotijaConfig Config { get; private set; }
HttpClient Client;
Dictionary<string, Object> ActionDictionary = new Dictionary<string, Object>(StringComparer.InvariantCultureIgnoreCase);
Dictionary<string, CTimer> PushedActions = new Dictionary<string, CTimer>();
@@ -55,13 +49,6 @@ namespace PepperDash.Essentials
List<CotijaBridgeBase> RoomBridges = new List<CotijaBridgeBase>();
long ButtonHeartbeatInterval = 1000;
bool NeedNewClient;
/// <summary>
/// Used to count retries in PostToServer
/// </summary>
int RetryCounter;
/// <summary>
///
@@ -278,88 +265,21 @@ namespace PepperDash.Essentials
}
/// <summary>
/// Posts a message to the server from a room
/// Sends a message to the server from a room
/// </summary>
/// <param name="room">room from which the message originates</param>
/// <param name="o">object to be serialized and sent in post body</param>
public void PostToServer(JObject o)
public void SendMessageToServer(JObject o)
{
CrestronInvoke.BeginInvoke(oo =>
{
if (string.IsNullOrEmpty(SystemUuid))
{
Debug.Console(1, this, "Status post attempt before UUID is set. Ignoring.");
return;
}
var ready = PostLockEvent.Wait(2000);
if (!ready)
{
Debug.Console(1, this, "PostToServer failed to enter after 2 seconds. Ignoring");
return;
}
PostLockEvent.Reset();
try
{
if (Client == null || NeedNewClient)
{
NeedNewClient = false;
Client = new HttpClient();
}
Client.Verbose = false;
Client.KeepAlive = true;
if (WSClient != null && WSClient.Connected)
{
string message = JsonConvert.SerializeObject(o, Formatting.None, new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore });
HttpClientRequest request = new HttpClientRequest();
request.RequestType = RequestType.Post;
string url = string.Format("http://{0}/api/system/{1}/status", Config.ServerUrl, SystemUuid);
request.Url.Parse(url);
request.KeepAlive = true;
request.Header.ContentType = "application/json";
// Ignore any null objects when serializing and remove formatting
string ignored = JsonConvert.SerializeObject(o, Formatting.None, new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore });
Debug.Console(1, this, "Posting to '{0}':\n{1}", url, ignored);
request.ContentString = ignored;
request.FinalizeHeader();
Client.DispatchAsync(request, (r, err) =>
{
Debug.Console(1, this, "POST result: {0}", err);
var messageBytes = System.Text.Encoding.UTF8.GetBytes(message);
if (err == HTTP_CALLBACK_ERROR.COMPLETED)
{
Debug.Console(1, this, "Status Response Code: {0}", r.Code);
PostLockEvent.Set();
RetryCounter = 0;
}
else
{
// Try again. This client is hosed.
NeedNewClient = true;
RetryCounter++;
// instant retry on first try.
if (RetryCounter >= 2 && RetryCounter < 5)
CrestronEnvironment.Sleep(1000);
else if (RetryCounter >= 5 && RetryCounter <= 10)
CrestronEnvironment.Sleep(5000);
// give up
else if (RetryCounter > 10)
{
Debug.Console(1, this, "Giving up on server POST");
RetryCounter = 0;
return;
}
Debug.Console(1, this, "POST retry #{0}", RetryCounter);
PostLockEvent.Set();
PostToServer(o);
}
});
}
catch (Exception e)
{
Debug.Console(1, this, "Error Posting to Server: {0}", e);
PostLockEvent.Set();
}
});
WSClient.SendAsync(messageBytes, (uint)messageBytes.Length, WebSocketClient.WEBSOCKET_PACKET_TYPES.LWS_WS_OPCODE_07__TEXT_FRAME);
}
}
@@ -410,7 +330,6 @@ namespace PepperDash.Essentials
else
{
Debug.Console(1, this, "Null response received from server.");
NeedNewClient = true;
}
}
}
@@ -485,7 +404,8 @@ namespace PepperDash.Essentials
WSClient.URL = string.Format("wss://{0}/system/join/{1}", Config.ServerUrl, this.SystemUuid);
WSClient.Connect();
Debug.Console(0, this, "Websocket connected");
WSClient.ReceiveCallBack = WebsocketReceive;
WSClient.ReceiveCallBack = WebsocketReceiveCallback;
WSClient.SendCallBack = WebsocketSendCallback;
WSClient.ReceiveAsync();
@@ -516,7 +436,6 @@ namespace PepperDash.Essentials
// ***********************************
}
/// <summary>
/// Resets reconnect timer and updates usercode
/// </summary>
@@ -541,7 +460,7 @@ namespace PepperDash.Essentials
/// <param name="length"></param>
/// <param name="opcode"></param>
/// <param name="err"></param>
int WebsocketReceive(byte[] data, uint length, WebSocketClient.WEBSOCKET_PACKET_TYPES opcode,
int WebsocketReceiveCallback(byte[] data, uint length, WebSocketClient.WEBSOCKET_PACKET_TYPES opcode,
WebSocketClient.WEBSOCKET_RESULT_CODES err)
{
var rx = System.Text.Encoding.UTF8.GetString(data, 0, (int)length);
@@ -551,6 +470,18 @@ namespace PepperDash.Essentials
return 1;
}
/// <summary>
/// Callback to catch possible errors in sending via the websocket
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
int WebsocketSendCallback(Crestron.SimplSharp.CrestronWebSocketClient.WebSocketClient.WEBSOCKET_RESULT_CODES result)
{
Debug.Console(2, "SendCallback result: {0}", result);
return 1;
}
/// <summary>
///
/// </summary>
@@ -558,119 +489,122 @@ namespace PepperDash.Essentials
/// <param name="e"></param>
void ParseStreamRx(string message)
{
Debug.Console(1, this, "Message RX: '{0}'", message);
try
if(string.IsNullOrEmpty(message))
return;
Debug.Console(1, this, "Message RX: '{0}'", message);
try
{
var messageObj = JObject.Parse(message);
var type = messageObj["type"].Value<string>();
if (type == "hello")
{
var messageObj = JObject.Parse(message);
ResetOrStartHearbeatTimer();
}
else if (type == "/system/heartbeat")
{
HandleHeartBeat(messageObj["content"]);
}
else if (type == "close")
{
WSClient.Disconnect();
var type = messageObj["type"].Value<string>();
if (type == "hello")
{
ResetOrStartHearbeatTimer();
}
else if (type == "/system/heartbeat")
ServerHeartbeatCheckTimer.Stop();
// Start the reconnect timer
StartReconnectTimer();
}
else
{
// Check path against Action dictionary
if (ActionDictionary.ContainsKey(type))
{
HandleHeartBeat(messageObj["content"]);
}
else if (type == "close")
{
WSClient.Disconnect();
var action = ActionDictionary[type];
ServerHeartbeatCheckTimer.Stop();
// Start the reconnect timer
StartReconnectTimer();
if (action is Action)
{
(action as Action)();
}
else if (action is PressAndHoldAction)
{
var stateString = messageObj["content"]["state"].Value<string>();
// Look for a button press event
if (!string.IsNullOrEmpty(stateString))
{
switch (stateString)
{
case "true":
{
if (!PushedActions.ContainsKey(type))
{
PushedActions.Add(type, new CTimer(o =>
{
(action as PressAndHoldAction)(false);
PushedActions.Remove(type);
}, null, ButtonHeartbeatInterval, ButtonHeartbeatInterval));
}
// Maybe add an else to reset the timer
break;
}
case "held":
{
if (!PushedActions.ContainsKey(type))
{
PushedActions[type].Reset(ButtonHeartbeatInterval, ButtonHeartbeatInterval);
}
return;
}
case "false":
{
if (PushedActions.ContainsKey(type))
{
PushedActions[type].Stop();
PushedActions.Remove(type);
}
break;
}
}
(action as PressAndHoldAction)(stateString == "true");
}
}
else if (action is Action<bool>)
{
var stateString = messageObj["content"]["state"].Value<string>();
if (!string.IsNullOrEmpty(stateString))
{
(action as Action<bool>)(stateString == "true");
}
}
else if (action is Action<ushort>)
{
(action as Action<ushort>)(messageObj["content"]["value"].Value<ushort>());
}
else if (action is Action<string>)
{
(action as Action<string>)(messageObj["content"]["value"].Value<string>());
}
else if (action is Action<SourceSelectMessageContent>)
{
(action as Action<SourceSelectMessageContent>)(messageObj["content"]
.ToObject<SourceSelectMessageContent>());
}
}
else
{
// Check path against Action dictionary
if (ActionDictionary.ContainsKey(type))
{
var action = ActionDictionary[type];
if (action is Action)
{
(action as Action)();
}
else if (action is PressAndHoldAction)
{
var stateString = messageObj["content"]["state"].Value<string>();
// Look for a button press event
if (!string.IsNullOrEmpty(stateString))
{
switch (stateString)
{
case "true":
{
if (!PushedActions.ContainsKey(type))
{
PushedActions.Add(type, new CTimer(o =>
{
(action as PressAndHoldAction)(false);
PushedActions.Remove(type);
}, null, ButtonHeartbeatInterval, ButtonHeartbeatInterval));
}
// Maybe add an else to reset the timer
break;
}
case "held":
{
if (!PushedActions.ContainsKey(type))
{
PushedActions[type].Reset(ButtonHeartbeatInterval, ButtonHeartbeatInterval);
}
return;
}
case "false":
{
if (PushedActions.ContainsKey(type))
{
PushedActions[type].Stop();
PushedActions.Remove(type);
}
break;
}
}
(action as PressAndHoldAction)(stateString == "true");
}
}
else if (action is Action<bool>)
{
var stateString = messageObj["content"]["state"].Value<string>();
if (!string.IsNullOrEmpty(stateString))
{
(action as Action<bool>)(stateString == "true");
}
}
else if (action is Action<ushort>)
{
(action as Action<ushort>)(messageObj["content"]["value"].Value<ushort>());
}
else if (action is Action<string>)
{
(action as Action<string>)(messageObj["content"]["value"].Value<string>());
}
else if (action is Action<SourceSelectMessageContent>)
{
(action as Action<SourceSelectMessageContent>)(messageObj["content"]
.ToObject<SourceSelectMessageContent>());
}
}
else
{
Debug.Console(1, this, "-- Warning: Incoming message has no registered handler");
}
Debug.Console(1, this, "-- Warning: Incoming message has no registered handler");
}
}
catch (Exception err)
{
Debug.Console(1, "SseMessageLengthBeforeFailureCount: {0}", SseMessageLengthBeforeFailureCount);
SseMessageLengthBeforeFailureCount = 0;
Debug.Console(1, this, "Unable to parse message: {0}", err);
}
}
}
catch (Exception err)
{
//Debug.Console(1, "SseMessageLengthBeforeFailureCount: {0}", SseMessageLengthBeforeFailureCount);
//SseMessageLengthBeforeFailureCount = 0;
Debug.Console(1, this, "Unable to parse message: {0}", err);
}
}
}
}

View File

@@ -456,7 +456,7 @@ namespace PepperDash.Essentials.Room.Cotija
/// <param name="contentObject">The contents of the content object</param>
void PostStatusMessage(object contentObject)
{
Parent.PostToServer(JObject.FromObject(new
Parent.SendMessageToServer(JObject.FromObject(new
{
type = "/room/status/",
content = contentObject
@@ -470,7 +470,7 @@ namespace PepperDash.Essentials.Room.Cotija
/// <param name="contentObject"></param>
void PostMessage(string messageType, object contentObject)
{
Parent.PostToServer(JObject.FromObject(new
Parent.SendMessageToServer(JObject.FromObject(new
{
type = messageType,
content = contentObject

View File

@@ -102,7 +102,7 @@ namespace PepperDash.Essentials
JObject message = new JObject();
message.Add("type", "/room/shutdown/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}
/// <summary>
@@ -117,7 +117,7 @@ namespace PepperDash.Essentials
JObject message = new JObject();
message.Add("type", "/room/shutdown/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}
/// <summary>
@@ -133,7 +133,7 @@ namespace PepperDash.Essentials
JObject message = new JObject();
message.Add("type", "/room/shutdown/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
// equivalent JS message:
// Post( { type: '/room/status/', content: { shutdown: 'hasStarted', duration: Room.ShutdownPromptTimer.SecondsToCount })
}
@@ -150,7 +150,7 @@ namespace PepperDash.Essentials
JObject message = new JObject();
message.Add("type", "/room/status/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}
/// <summary>
@@ -165,7 +165,7 @@ namespace PepperDash.Essentials
JObject message = new JObject();
message.Add("type", "/room/status/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}
/// <summary>
@@ -193,7 +193,7 @@ namespace PepperDash.Essentials
message.Add("type", "/room/status/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}
void Room_CurrentVolumeDeviceChange(object sender, VolumeDeviceChangeEventArgs e)
@@ -245,7 +245,7 @@ namespace PepperDash.Essentials
message.Add("type", "/room/status/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}
}
@@ -297,7 +297,7 @@ namespace PepperDash.Essentials
message.Add("type", "/room/status/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}
else
{
@@ -362,7 +362,7 @@ namespace PepperDash.Essentials
message.Add("type", "/room/status/");
message.Add("content", roomStatus);
Parent.PostToServer(message);
Parent.SendMessageToServer(message);
}