Skip to content

Commit

Permalink
Merge branch 'main' into tug-of-war-flask-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee authored Jan 31, 2024
2 parents 1fbc400 + fe343ac commit f8ad2a4
Show file tree
Hide file tree
Showing 26 changed files with 961 additions and 34 deletions.
4 changes: 3 additions & 1 deletion examples/C/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
* [Furuta Pendulum](src/modal_models/FurutaPendulum/README.md): A controller and simulation illustrating a modal reactor.
* [Rhythm](src/rhythm/README.md): Sound generation and terminal user interface demos.
* [SDV](src/sdv/README.md): Software defined vehicle sketch integrating user input, a web display, and sound.
* [Shared Memory](src/shared-memory/README.md): Using shared memory to exchange large data objects between federates.
* [Train Door](src/train-door/README.md): Train door controller from a verification paper.
* [Distributed](src/distributed/README.md): Basic federated hello-world examples.
* [Distributed](src/distributed/README.md): Basic federated hello-world examples.
* [Leader Election](src/leader-election/README.md): Federated fault-tolerant system with leader election.
186 changes: 186 additions & 0 deletions examples/C/src/leader-election/HeartbeatBully.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/**
* This program models a redundant fault tolerant system where a primary node, if and when it fails,
* is replaced by one of several backup nodes. The protocol is described in this paper:
*
* Bjarne Johansson; Mats Rågberger; Alessandro V. Papadopoulos; Thomas Nolte, "Heartbeat Bully:
* Failure Detection and Redundancy Role Selection for Network-Centric Controller," Proc. of the
* 46th Annual Conference of the IEEE Industrial Electronics Society (IECON), 8-21 October 2020.
* https://doi.org/10.1109/IECON43393.2020.9254494
*
* The program has a bank of redundant nodes where exactly one is the primary node and the rest are
* backups. The primary node is always the one with the highest bank index that has not failed. The
* primary sends a heartbeat message once per second (by default). When the primary fails, a leader
* election protocol selects a new primary which then starts sending heartbeat messages. The program
* is set so that each primary fails after sending three heartbeat messages. When all nodes have
* failed, then the program exits.
*
* This example is designed to be run as a federated program.
*
* @author Edward A. Lee
* @author Marjan Sirjani
*/
target C {
timeout: 30 s
}

preamble {=
#include "platform.h" // Defines PRINTF_TIME
enum message_type {
heartbeat,
reveal,
sorry
};
typedef struct message_t {
enum message_type type;
int id;
} message_t;
=}

reactor Node(
bank_index: int = 0,
num_nodes: int = 3,
heartbeat_period: time = 1 s,
max_missed_heartbeats: int = 2,
primary_fails_after_heartbeats: int = 3) {
input[num_nodes] in: message_t
output[num_nodes] out: message_t

state heartbeats_missed: int = 0
state primary_heartbeats_counter: int = 0

initial mode Idle {
reaction(startup) -> reset(Backup), reset(Primary) {=
lf_print(PRINTF_TIME ": Starting node %d", lf_time_logical_elapsed(), self->bank_index);
if (self->bank_index == self->num_nodes - 1) {
lf_set_mode(Primary);
} else {
lf_set_mode(Backup);
}
=}
}

mode Backup {
timer t(heartbeat_period, heartbeat_period)
reaction(in) -> out, reset(Prospect) {=
int primary_id = -1;
for (int i = 0; i < in_width; i++) {
if (in[i]->is_present && in[i]->value.id != self->bank_index) {
if (in[i]->value.type == heartbeat) {
if (primary_id >= 0) {
lf_print_error("Multiple primaries detected!!");
}
primary_id = in[i]->value.id;
lf_print(PRINTF_TIME ": Node %d received heartbeat from node %d.", lf_time_logical_elapsed(), self->bank_index, primary_id);
self->heartbeats_missed = 0;
} else if (in[i]->value.type == reveal && in[i]->value.id < self->bank_index) {
// NOTE: This will not occur if the LF semantics are followed because
// all nodes will (logically) simultaneously detect heartbeat failure and
// transition to the Prospect mode. But we include this anyway in case
// a federated version experiences a fault.

// Send a sorry message.
message_t message;
message.type = sorry;
message.id = self->bank_index;
lf_set(out[in[i]->value.id], message);
lf_print(PRINTF_TIME ": Node %d sends sorry to node %d", lf_time_logical_elapsed(), self->bank_index, in[i]->value.id);
// Go to Prospect mode to send reveal to any higher-priority nodes.
lf_set_mode(Prospect);
}
}
}
=}

reaction(t) -> reset(Prospect) {=
if (self->heartbeats_missed > self->max_missed_heartbeats) {
lf_set_mode(Prospect);
}
// Increment the counter so if it's not reset to 0 by the next time,
// we detect the missed heartbeat.
self->heartbeats_missed++;
=}
}

mode Primary {
timer heartbeat(0, heartbeat_period)
reaction(heartbeat) -> out, reset(Failed) {=
if (self->primary_heartbeats_counter++ >= self->primary_fails_after_heartbeats) {
// Stop sending heartbeats.
lf_print(PRINTF_TIME ": **** Primary node %d fails.", lf_time_logical_elapsed(), self->bank_index);
lf_set_mode(Failed);
} else {
lf_print(PRINTF_TIME ": Primary node %d sends heartbeat.", lf_time_logical_elapsed(), self->bank_index);
for (int i = 0; i < out_width; i++) {
if (i != self->bank_index) {
message_t message;
message.type = heartbeat;
message.id = self->bank_index;
lf_set(out[i], message);
}
}
}
=}
}

mode Failed {
}

mode Prospect {
logical action wait_for_sorry
reaction(reset) -> out, wait_for_sorry {=
lf_print(PRINTF_TIME ": ***** Node %d entered Prospect mode.", lf_time_logical_elapsed(), self->bank_index);
// Send a reveal message with my ID in a bid to become primary.
// NOTE: It is not necessary to send to nodes that have a lower
// priority than this node, but the connection is broadcast, so
// we send to all.
message_t message;
message.type = reveal;
message.id = self->bank_index;
for (int i = self->bank_index + 1; i < self->num_nodes; i++) {
lf_print(PRINTF_TIME ": Node %d sends reveal to node %d", lf_time_logical_elapsed(), self->bank_index, i);
lf_set(out[i], message);
}
// The reveal message is delayed by heartbeat_period, and if
// there is a sorry response, it too will be delayed by heartbeat_period,
// so the total logical delay is twice heartbeat_period.
lf_schedule(wait_for_sorry, 2 * self->heartbeat_period);
=}

reaction(in) -> out {=
for (int i = 0; i < in_width; i++) {
if (in[i]->value.type == reveal && in[i]->value.id < self->bank_index) {
// Send a sorry message.
message_t message;
message.type = sorry;
message.id = self->bank_index;
lf_set(out[in[i]->value.id], message);
lf_print(PRINTF_TIME ": Node %d sends sorry to node %d", lf_time_logical_elapsed(), self->bank_index, in[i]->value.id);
}
}
=}

reaction(wait_for_sorry) in -> reset(Backup), reset(Primary) {=
// Check for sorry messages.
// Sorry messages are guaranteed to be logically simultaneous
// with the wait_for_sorry event, so we just need to check for
// presence of sorry inputs.
int i;
for (i = 0; i < in_width; i++) {
if (in[i]->is_present && in[i]->value.type == sorry) {
// A sorry message arrived. Go to Backup mode.
lf_set_mode(Backup);
break;
}
}
if (i == in_width) {
// No sorry message arrived. Go to Primary mode.
lf_set_mode(Primary);
}
=}
}
}

federated reactor(num_nodes: int = 4, heartbeat_period: time = 1 s) {
nodes = new[num_nodes] Node(num_nodes=num_nodes, heartbeat_period=heartbeat_period)
nodes.out -> interleaved(nodes.in) after heartbeat_period
}
Loading

0 comments on commit f8ad2a4

Please sign in to comment.