Skip to content

Commit

Permalink
feat: 2-way heartbeat mechanism (now for host as well)
Browse files Browse the repository at this point in the history
  • Loading branch information
therealPaulPlay committed Feb 9, 2025
1 parent f09ff64 commit 03959de
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 42 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "playpeerjs",
"version": "1.2.6",
"version": "1.2.7",
"description": "WebRTC-based wrapper for creating robust peer-2-peer multiplayer systems with ease.",
"type": "module",
"main": "dist/playpeer.js",
Expand Down
105 changes: 64 additions & 41 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ export default class PlayPeer {
// Logic properties
#storage = {};
#isHost = false;
#hostConnections = new Set(); // Host-side array containing all peers connected to current host, send out IDs to clients
#hostConnections = []; // Host-side array containing all peers connected to current host, send out IDs to clients
#hostConnectionsIdArray = []; // Client-side array to store the host's connections' IDs.
#outgoingConnection;

// Heartbeat variables
#heartbeatSendInterval;
#heartbeatHostCheckInterval;
#heartbeatReceived;

/**
Expand Down Expand Up @@ -161,43 +162,65 @@ export default class PlayPeer {
}

/**
* Handle incoming peer connections (Host code)
* Handle incoming peer connections (Host function)
* @private
*/
#handleIncomingConnections(incomingConnection) {
// Check if room is full
if (this.#isHost && this.#maxSize && (this.#hostConnections?.size + 1) >= this.#maxSize) {
if (this.#isHost && this.#maxSize && (this.#hostConnections?.length + 1) >= this.#maxSize) {
console.warn(WARNING_PREFIX + `Connection ${incomingConnection.peer} rejected - room is full.`);
this.#triggerEvent("status", "Rejected connection - room is full.");
incomingConnection.close();
try { incomingConnection.close(); } catch (error) {
console.error(ERROR_PREFIX + "Failed to close incoming connection (room full):", error);
this.#triggerEvent("error", "Failed to close incoming connection (room full): " + error);
}
return; // Don't continue with the rest of events
}

// Close broken connections that don't open in time or address the wrong host
setTimeout(() => {
if (!incomingConnection.open || !this.#isHost) {
try {
if (this.#isHost) console.warn(WARNING_PREFIX + `Connection ${incomingConnection.peer} closed - no response.`);
if (!this.#isHost) console.warn(WARNING_PREFIX + `Connection ${incomingConnection.peer} closed - you are not hosting.`);
incomingConnection.close();
this.#hostConnections.delete(incomingConnection);
} catch (error) {
console.error(ERROR_PREFIX + "Error closing invalid connection:", error);
console.warn(WARNING_PREFIX + `Connection ${incomingConnection.peer} closed - ${this.#isHost ? "no response" : "not hosting"}.`);
try { incomingConnection.close(); } catch (error) {
console.error(ERROR_PREFIX + "Failed to close incoming connection (invalid):", error);
this.#triggerEvent("error", "Failed to close incoming connection (invalid): " + error);
}
return;
}
}, 3 * 1000);

// Only process incoming connections if hosting
if (this.#isHost) {
this.#triggerEvent("status", "New peer connected.");
this.#hostConnections.add(incomingConnection);
this.#triggerEvent("status", "New peer connecting...");

// Set up host heartbeat check if not already
if (!this.#heartbeatHostCheckInterval) {
this.#heartbeatHostCheckInterval = setInterval(() => {
if (!this.#isHost) {
clearInterval(this.#heartbeatHostCheckInterval);
this.#heartbeatHostCheckInterval = undefined;
return;
}
this.#hostConnections?.forEach((e) => {
if (e[1] < Date.now() - 2000) {
console.warn(WARNING_PREFIX + "Peer did not respond to heartbeat - closing connection.");
this.#triggerEvent("status", "Peer did not respond to heartbeat - closting connection.");
try { e[0]?.close(); } catch (error) {
console.error(ERROR_PREFIX + "Failed to close incoming connection (no heartbeat):", error);
this.#triggerEvent("error", "Failed to close incoming connection (no heartbeat): " + error);
}
}
});
}, 1000);
}

incomingConnection.on('open', () => {
this.#triggerEvent("status", "Incoming connection opened.");
this.#triggerEvent("incomingPeerConnected", incomingConnection.peer);

// Sync host's connections with all peers
const peerList = Array.from(this.#hostConnections).map((conn) => conn.peer);
if (this.#hostConnections.findIndex(c => c[0] === incomingConnection) == -1) this.#hostConnections.push([incomingConnection, Date.now()]); // Add new peer
const peerList = Array.from(this.#hostConnections).map(c => c[0]?.peer);
this.#broadcastMessage("peer_list", { peers: peerList });

// Send current storage state to new peer
Expand All @@ -223,6 +246,8 @@ export default class PlayPeer {
// Respond to peers requesting heartbeat
try {
incomingConnection.send({ type: "heartbeat_response" });
const index = this.#hostConnections.findIndex(e => e[0] == incomingConnection);
if (index !== -1) this.#hostConnections[index][1] = Date.now(); // Update last heartbeat to now
} catch (error) {
console.error(ERROR_PREFIX + "Error responding to heartbeat:", error);
this.#triggerEvent("error", "Error responding to heartbeat: " + error);
Expand All @@ -239,11 +264,12 @@ export default class PlayPeer {
});

incomingConnection.on('close', () => {
this.#hostConnections.delete(incomingConnection);
const removeIndex = this.#hostConnections.findIndex(c => c[0] === incomingConnection);
if (removeIndex !== -1) this.#hostConnections.splice(removeIndex, 1);
this.#triggerEvent("incomingPeerDisconnected", incomingConnection.peer);
this.#triggerEvent("status", "Incoming connection closed.");

const peerList = Array.from(this.#hostConnections).map((conn) => conn.peer);
const peerList = Array.from(this.#hostConnections).map(c => c[0]?.peer);
this.#broadcastMessage("peer_list", { peers: peerList });
});

Expand All @@ -252,8 +278,6 @@ export default class PlayPeer {
console.error(ERROR_PREFIX + `Connection ${incomingConnection.peer} error:`, error);
this.#triggerEvent("error", "Error in incoming connection: " + error);
});
} else {
console.warn(WARNING_PREFIX + "Incoming connection ignored as peer is not hosting.");
}
}

Expand Down Expand Up @@ -322,28 +346,26 @@ export default class PlayPeer {
// Regularly check if host responds to heartbeat
this.#heartbeatReceived = true;
this.#heartbeatSendInterval = setInterval(() => {
if (!this.#isHost) {
if (!this.#heartbeatReceived) {
console.warn(WARNING_PREFIX + "Host did not respond to heartbeat - disconnecting from host.");
this.#triggerEvent("status", "Host did not respond to heartbeat - disconnecting.");
this.#outgoingConnection?.close();
return;
}
if (!this.#heartbeatReceived) {
console.warn(WARNING_PREFIX + "Host did not respond to heartbeat - disconnecting from host.");
this.#triggerEvent("status", "Host did not respond to heartbeat - disconnecting from host.");
this.#outgoingConnection?.close();
return;
}

// Ping host
if (this.#outgoingConnection?.open) {
this.#outgoingConnection?.send({
type: 'heartbeat_request'
});
}
} else {
clearInterval(this.#heartbeatSendInterval);
// Ping host
this.#heartbeatReceived = false; // Reset received status to false
if (this.#outgoingConnection?.open) {
this.#outgoingConnection?.send({
type: 'heartbeat_request'
});
}
}, 1000);

// Only migrate host if the connection was initially open
this.#outgoingConnection.on('close', () => {
this.#migrateHost();
clearInterval(this.#heartbeatSendInterval);
if (!this.#isHost) this.#migrateHost();
});

resolve();
Expand All @@ -365,7 +387,6 @@ export default class PlayPeer {
case 'heartbeat_response':
this.#heartbeatReceived = true;
break;

}
});

Expand Down Expand Up @@ -528,13 +549,14 @@ export default class PlayPeer {
*/
#broadcastMessage(type, payload = {}) {
const message = { type, ...payload };
this.#hostConnections.forEach((connection) => {
if (connection.open) {
this.#hostConnections.forEach((element) => {
const connection = element[0];
if (connection?.open) {
try {
connection.send(message);
} catch (error) {
console.error(ERROR_PREFIX + `Failed to send broadcast message to peer ${connection.peer}:`, error);
this.#triggerEvent("error", `Failed to send broadcast message to peer ${connection.peer}: ${error}`);
console.error(ERROR_PREFIX + `Failed to send broadcast message to peer ${connection?.peer}:`, error);
this.#triggerEvent("error", `Failed to send broadcast message to peer ${connection?.peer}: ${error}`);
}
}
});
Expand Down Expand Up @@ -590,12 +612,13 @@ export default class PlayPeer {

// Clear intervals
clearInterval(this.#heartbeatSendInterval);
clearInterval(this.#heartbeatHostCheckInterval);

// Resets
this.#peer = undefined;
this.#storage = {};
this.#isHost = false;
this.#hostConnections.clear();
this.#hostConnections = [];
this.#hostConnectionsIdArray = [];
this.#initialized = false;
this.#maxSize = undefined;
Expand All @@ -610,7 +633,7 @@ export default class PlayPeer {
* @returns {number} Number of active connections to the host
*/
get connectionCount() {
if (this.#isHost) return this.#hostConnections?.size || 0;
if (this.#isHost) return this.#hostConnections?.length || 0;
return this.#hostConnectionsIdArray?.length || 0;
}

Expand Down

0 comments on commit 03959de

Please sign in to comment.