diff --git a/examples/C/README.md b/examples/C/README.md
index e9fd4742..fbc1fab4 100644
--- a/examples/C/README.md
+++ b/examples/C/README.md
@@ -9,5 +9,7 @@
* [Furuta Pendulum](src/modal_models/FurutaPendulum/README.md): A controller and simulation illustrating a modal reactor.
* [Rhythm](src/rhythm/README.md): Sound generation and terminal user interface demos.
* [SDV](src/sdv/README.md): Software defined vehicle sketch integrating user input, a web display, and sound.
+* [Shared Memory](src/shared-memory/README.md): Using shared memory to exchange large data objects between federates.
* [Train Door](src/train-door/README.md): Train door controller from a verification paper.
-* [Distributed](src/distributed/README.md): Basic federated hello-world examples.
\ No newline at end of file
+* [Distributed](src/distributed/README.md): Basic federated hello-world examples.
+* [Leader Election](src/leader-election/README.md): Federated fault-tolerant system with leader election.
\ No newline at end of file
diff --git a/examples/C/src/leader-election/HeartbeatBully.lf b/examples/C/src/leader-election/HeartbeatBully.lf
new file mode 100644
index 00000000..4e692d29
--- /dev/null
+++ b/examples/C/src/leader-election/HeartbeatBully.lf
@@ -0,0 +1,186 @@
+/**
+ * This program models a redundant fault tolerant system where a primary node, if and when it fails,
+ * is replaced by one of several backup nodes. The protocol is described in this paper:
+ *
+ * Bjarne Johansson; Mats Rågberger; Alessandro V. Papadopoulos; Thomas Nolte, "Heartbeat Bully:
+ * Failure Detection and Redundancy Role Selection for Network-Centric Controller," Proc. of the
+ * 46th Annual Conference of the IEEE Industrial Electronics Society (IECON), 8-21 October 2020.
+ * https://doi.org/10.1109/IECON43393.2020.9254494
+ *
+ * The program has a bank of redundant nodes where exactly one is the primary node and the rest are
+ * backups. The primary node is always the one with the highest bank index that has not failed. The
+ * primary sends a heartbeat message once per second (by default). When the primary fails, a leader
+ * election protocol selects a new primary which then starts sending heartbeat messages. The program
+ * is set so that each primary fails after sending three heartbeat messages. When all nodes have
+ * failed, then the program exits.
+ *
+ * This example is designed to be run as a federated program.
+ *
+ * @author Edward A. Lee
+ * @author Marjan Sirjani
+ */
+target C {
+ timeout: 30 s
+}
+
+preamble {=
+ #include "platform.h" // Defines PRINTF_TIME
+ enum message_type {
+ heartbeat,
+ reveal,
+ sorry
+ };
+ typedef struct message_t {
+ enum message_type type;
+ int id;
+ } message_t;
+=}
+
+reactor Node(
+ bank_index: int = 0,
+ num_nodes: int = 3,
+ heartbeat_period: time = 1 s,
+ max_missed_heartbeats: int = 2,
+ primary_fails_after_heartbeats: int = 3) {
+ input[num_nodes] in: message_t
+ output[num_nodes] out: message_t
+
+ state heartbeats_missed: int = 0
+ state primary_heartbeats_counter: int = 0
+
+ initial mode Idle {
+ reaction(startup) -> reset(Backup), reset(Primary) {=
+ lf_print(PRINTF_TIME ": Starting node %d", lf_time_logical_elapsed(), self->bank_index);
+ if (self->bank_index == self->num_nodes - 1) {
+ lf_set_mode(Primary);
+ } else {
+ lf_set_mode(Backup);
+ }
+ =}
+ }
+
+ mode Backup {
+ timer t(heartbeat_period, heartbeat_period)
+ reaction(in) -> out, reset(Prospect) {=
+ int primary_id = -1;
+ for (int i = 0; i < in_width; i++) {
+ if (in[i]->is_present && in[i]->value.id != self->bank_index) {
+ if (in[i]->value.type == heartbeat) {
+ if (primary_id >= 0) {
+ lf_print_error("Multiple primaries detected!!");
+ }
+ primary_id = in[i]->value.id;
+ lf_print(PRINTF_TIME ": Node %d received heartbeat from node %d.", lf_time_logical_elapsed(), self->bank_index, primary_id);
+ self->heartbeats_missed = 0;
+ } else if (in[i]->value.type == reveal && in[i]->value.id < self->bank_index) {
+ // NOTE: This will not occur if the LF semantics are followed because
+ // all nodes will (logically) simultaneously detect heartbeat failure and
+ // transition to the Prospect mode. But we include this anyway in case
+ // a federated version experiences a fault.
+
+ // Send a sorry message.
+ message_t message;
+ message.type = sorry;
+ message.id = self->bank_index;
+ lf_set(out[in[i]->value.id], message);
+ lf_print(PRINTF_TIME ": Node %d sends sorry to node %d", lf_time_logical_elapsed(), self->bank_index, in[i]->value.id);
+ // Go to Prospect mode to send reveal to any higher-priority nodes.
+ lf_set_mode(Prospect);
+ }
+ }
+ }
+ =}
+
+ reaction(t) -> reset(Prospect) {=
+ if (self->heartbeats_missed > self->max_missed_heartbeats) {
+ lf_set_mode(Prospect);
+ }
+ // Increment the counter so if it's not reset to 0 by the next time,
+ // we detect the missed heartbeat.
+ self->heartbeats_missed++;
+ =}
+ }
+
+ mode Primary {
+ timer heartbeat(0, heartbeat_period)
+ reaction(heartbeat) -> out, reset(Failed) {=
+ if (self->primary_heartbeats_counter++ >= self->primary_fails_after_heartbeats) {
+ // Stop sending heartbeats.
+ lf_print(PRINTF_TIME ": **** Primary node %d fails.", lf_time_logical_elapsed(), self->bank_index);
+ lf_set_mode(Failed);
+ } else {
+ lf_print(PRINTF_TIME ": Primary node %d sends heartbeat.", lf_time_logical_elapsed(), self->bank_index);
+ for (int i = 0; i < out_width; i++) {
+ if (i != self->bank_index) {
+ message_t message;
+ message.type = heartbeat;
+ message.id = self->bank_index;
+ lf_set(out[i], message);
+ }
+ }
+ }
+ =}
+ }
+
+ mode Failed {
+ }
+
+ mode Prospect {
+ logical action wait_for_sorry
+ reaction(reset) -> out, wait_for_sorry {=
+ lf_print(PRINTF_TIME ": ***** Node %d entered Prospect mode.", lf_time_logical_elapsed(), self->bank_index);
+ // Send a reveal message with my ID in a bid to become primary.
+ // NOTE: It is not necessary to send to nodes that have a lower
+ // priority than this node, but the connection is broadcast, so
+ // we send to all.
+ message_t message;
+ message.type = reveal;
+ message.id = self->bank_index;
+ for (int i = self->bank_index + 1; i < self->num_nodes; i++) {
+ lf_print(PRINTF_TIME ": Node %d sends reveal to node %d", lf_time_logical_elapsed(), self->bank_index, i);
+ lf_set(out[i], message);
+ }
+ // The reveal message is delayed by heartbeat_period, and if
+ // there is a sorry response, it too will be delayed by heartbeat_period,
+ // so the total logical delay is twice heartbeat_period.
+ lf_schedule(wait_for_sorry, 2 * self->heartbeat_period);
+ =}
+
+ reaction(in) -> out {=
+ for (int i = 0; i < in_width; i++) {
+ if (in[i]->value.type == reveal && in[i]->value.id < self->bank_index) {
+ // Send a sorry message.
+ message_t message;
+ message.type = sorry;
+ message.id = self->bank_index;
+ lf_set(out[in[i]->value.id], message);
+ lf_print(PRINTF_TIME ": Node %d sends sorry to node %d", lf_time_logical_elapsed(), self->bank_index, in[i]->value.id);
+ }
+ }
+ =}
+
+ reaction(wait_for_sorry) in -> reset(Backup), reset(Primary) {=
+ // Check for sorry messages.
+ // Sorry messages are guaranteed to be logically simultaneous
+ // with the wait_for_sorry event, so we just need to check for
+ // presence of sorry inputs.
+ int i;
+ for (i = 0; i < in_width; i++) {
+ if (in[i]->is_present && in[i]->value.type == sorry) {
+ // A sorry message arrived. Go to Backup mode.
+ lf_set_mode(Backup);
+ break;
+ }
+ }
+ if (i == in_width) {
+ // No sorry message arrived. Go to Primary mode.
+ lf_set_mode(Primary);
+ }
+ =}
+ }
+}
+
+federated reactor(num_nodes: int = 4, heartbeat_period: time = 1 s) {
+ nodes = new[num_nodes] Node(num_nodes=num_nodes, heartbeat_period=heartbeat_period)
+ nodes.out -> interleaved(nodes.in) after heartbeat_period
+}
diff --git a/examples/C/src/leader-election/NRP_FD.lf b/examples/C/src/leader-election/NRP_FD.lf
new file mode 100644
index 00000000..9fa28e55
--- /dev/null
+++ b/examples/C/src/leader-election/NRP_FD.lf
@@ -0,0 +1,522 @@
+/**
+ * This program implements a redundant fault-tolerant system where a primary node, if and when it
+ * fails, is replaced by a backup node. The protocol is described in this paper:
+ *
+ * Bjarne Johansson; Mats Rågberger; Alessandro V. Papadopoulos; Thomas Nolte, "Consistency Before
+ * Availability: Network Reference Point based Failure Detection for Controller Redundancy,"
+ * Emerging Technologies and Factory Automation (ETFA), 12-15 September 2023, DOI:
+ * 10.1109/ETFA54631.2023.10275664
+ *
+ * The key idea in this protocol is that when a backup fails to detect the heartbeats of a primary
+ * node, it becomes primary only if it has access to Network Reference Point (NRP), which is a point
+ * in the network. This way, if the network becomes partitioned, only a backup that is on the side
+ * of the partition that still has access to the NRP can become a primary. If a primary loses access
+ * to the NRP, then it relinquishes its primary role because it is now on the wrong side of a
+ * network partition. A backup on the right side of the partition will take over.
+ *
+ * This implementation omits some details in the paper. See NOTEs in the comments.
+ *
+ * This version has switch1 failing at 3s, node1 failing at 10s, and node2 failing at 15s.
+ *
+ * @author Edward A. Lee
+ * @author Marjan Sirjani
+ */
+target C {
+ tracing: true,
+ timeout: 20 s
+}
+
+preamble {=
+ #ifndef NRF_FD
+ #define NRF_FD
+ #include "platform.h" // Defines PRINTF_TIME
+
+ // Paper calls for manual intervention to set initial primary ID and NRP network.
+ // Here, we just hardwire this choice using #define.
+ #define INITIAL_PRIMARY_ID 1
+ #define INITIAL_NRP_NETWORK 0
+
+ enum message_type {
+ heartbeat,
+ ping_NRP,
+ ping_NRP_response,
+ request_new_NRP,
+ new_NRP
+ };
+ typedef struct message_t {
+ enum message_type type;
+ int source;
+ int destination;
+ int payload;
+ } message_t;
+ #endif // NRF_FD
+=}
+
+reactor Node(
+ id: int = 0,
+ heartbeat_period: time = 1 s,
+ routine_ping_offset: time = 1 ms, // Time after heartbeat to ping NRP.
+ max_missed_heartbeats: int = 2,
+ fails_at_time: time = 0, // For testing. 0 for no failure.
+ ping_timeout: time = 500 ms, // Time until ping is deemed to have failed.
+ // Time until new NRP request is deemed to have failed.
+ nrp_timeout: time = 500 ms) {
+ // There are two network interfaces:
+ @side("east")
+ input[2] in: message_t
+ output[2] out: message_t
+
+ timer node_fails(fails_at_time)
+
+ state heartbeats_missed: int[2] = {0}
+ state primary: int = 0 // The known primary node.
+ state ping_pending: bool = false // Ping has been issued and not responded to.
+ state ping_timeout_pending: bool = false // Ping timeout timer hasn't expired.
+ state become_primary_on_ping_response: bool = false
+ state NRP_network: int = {= INITIAL_NRP_NETWORK =}
+ state NRP_switch_id: int = 0 // 0 means not known.
+
+ logical action ping_timed_out(ping_timeout)
+ logical action new_NRP_request_timed_out(nrp_timeout)
+
+ initial mode Waiting {
+ reaction(startup) -> out {=
+ // If I am the initial primary, broadcast a ping on network 1.
+ // The first switch to get this will respond.
+ if (self->id == INITIAL_PRIMARY_ID) {
+ message_t ping_message = {ping_NRP, self->id, 0, 0};
+ lf_set(out[INITIAL_NRP_NETWORK], ping_message);
+ // Instead of scheduling ping_timed_out, we just continue waiting until a ping response arrives.
+ }
+ =}
+
+ reaction(in) -> out, reset(Backup), reset(Primary) {=
+ // Iterate over input channels.
+ for (int c = 0; c < in_width; c++) {
+ if (in[c]->is_present) {
+ // In this mode, primary is waiting for a ping response and backup for a new NRP.
+ if (self->id == INITIAL_PRIMARY_ID && in[c]->value.type == ping_NRP_response) {
+ // Become primary.
+ self->primary = self->id;
+ lf_set_mode(Primary);
+
+ lf_print(PRINTF_TIME ": Initial primary node %d received ping response on network %d. "
+ "Making switch %d the NRP.", lf_time_logical_elapsed(), self->id, c, in[c]->value.source
+ );
+ self->NRP_network = c;
+ self->NRP_switch_id = in[c]->value.source;
+ // Notify the backup of the NRP. Destination 0 here means broadcast.
+ message_t message = {new_NRP, self->id, 0, in[c]->value.source};
+ // Send new NRP message on all networks.
+ for (int i = 0; i < out_width; i++) lf_set(out[i], message);
+ } else if (in[c]->value.type == new_NRP) {
+ if (in[c]->value.payload != self->NRP_switch_id) {
+ // Message is not redundant (new_NRP sent on both networks).
+ // Become backup. Source of the message is the primary.
+ lf_print(PRINTF_TIME ": Waiting node %d received new NRP %d on network %d. "
+ "Becoming backup.", lf_time_logical_elapsed(), self->id, in[c]->value.payload,
+ c, in[c]->value.source
+ );
+ self->primary = in[c]->value.source;
+ self->NRP_switch_id = in[c]->value.payload;
+ self->NRP_network = c;
+ lf_set_mode(Backup);
+ }
+ }
+ }
+ }
+ =}
+ }
+
+ // mode Waiting
+ mode Primary {
+ timer heartbeat(0, heartbeat_period)
+ timer ping_NRP_timer(routine_ping_offset, heartbeat_period)
+ reaction(reset) {=
+ lf_print(PRINTF_TIME ": ---- Node %d becomes primary.", lf_time_logical_elapsed(), self->id);
+ =}
+
+ reaction(node_fails) -> reset(Failed) {=
+ if(lf_time_logical_elapsed() > 0LL) lf_set_mode(Failed);
+ =}
+
+ reaction(heartbeat) -> out {=
+ lf_print(PRINTF_TIME ": Primary node %d sends heartbeat on both networks.",
+ lf_time_logical_elapsed(), self->id
+ );
+ message_t message = {heartbeat, self->id, 0, 0};
+ for (int i = 0; i < out_width; i++) lf_set(out[i], message);
+ =}
+
+ reaction(ping_NRP_timer) -> out, ping_timed_out {=
+ // Ping the NRP if there is one and there isn't a ping timeout pending.
+ if (self->NRP_switch_id != 0 && !self->ping_timeout_pending) {
+ lf_print(PRINTF_TIME ": Primary node %d pings NRP %d (routine).",
+ lf_time_logical_elapsed(), self->id, self->NRP_switch_id
+ );
+ message_t ping = {ping_NRP, self->id, self->NRP_switch_id, 0};
+ lf_set(out[self->NRP_network], ping);
+ self->ping_pending = true;
+ self->ping_timeout_pending = true;
+ lf_schedule(ping_timed_out, 0);
+ }
+ =}
+
+ reaction(in) -> out, ping_timed_out {=
+ // Iterate over input channels.
+ for (int c = 0; c < in_width; c++) {
+ if (in[c]->is_present) {
+ if (in[c]->value.type == request_new_NRP) {
+ // Backup is asking for a new NRP. Invalidate current NRP.
+ self->NRP_switch_id = 0;
+
+ // Switch networks.
+ if (self->NRP_network == 0) self->NRP_network = 1;
+ else self->NRP_network = 0;
+
+ lf_print(PRINTF_TIME ": Primary node %d looking for new NRP on network %d.",
+ lf_time_logical_elapsed(), self->id, self->NRP_network
+ );
+ message_t message = {ping_NRP, self->id, 0, 0};
+ lf_set(out[self->NRP_network], message);
+ self->ping_pending = true;
+ self->ping_timeout_pending = true;
+ lf_schedule(ping_timed_out, 0);
+ } else if (in[c]->value.type == ping_NRP_response) {
+ lf_print(PRINTF_TIME ": Primary node %d received ping response on network %d. NRP is %d.",
+ lf_time_logical_elapsed(), self->id, c, in[c]->value.source
+ );
+ self->ping_pending = false;
+ if (self->NRP_switch_id == 0) {
+ // This is a new NRP.
+ self->NRP_switch_id = in[c]->value.source;
+ self->NRP_network = c;
+ // Notify the backup of the NRP on the NRP's network.
+ message_t message = {new_NRP, self->id, 0, self->NRP_switch_id};
+ lf_set(out[c], message);
+ lf_print(PRINTF_TIME ": Primary node %d notifies backup of new NRP %d on network %d.",
+ lf_time_logical_elapsed(), self->id, self->NRP_switch_id, c
+ );
+ // NOTE: Should the primary get some confirmation from the backup?
+ }
+ }
+ }
+ }
+ =}
+
+ reaction(ping_timed_out) -> out, ping_timed_out, reset(Failed) {=
+ self->ping_timeout_pending = false;
+ if (self->ping_pending) {
+ // Ping timed out.
+ self->ping_pending = false;
+ lf_print(PRINTF_TIME ": Primary node %d gets no response from ping.",
+ lf_time_logical_elapsed(), self->id
+ );
+ if (self->NRP_switch_id == 0) {
+ // Failed to get a new NRP. Declare failure.
+ lf_set_mode(Failed);
+ } else {
+ // Invalidate current NRP.
+ self->NRP_switch_id = 0;
+
+ // Switch networks.
+ if (self->NRP_network == 0) self->NRP_network = 1;
+ else self->NRP_network = 0;
+
+ lf_print(PRINTF_TIME ": Primary node %d looking for new NRP on network %d.",
+ lf_time_logical_elapsed(), self->id, self->NRP_network
+ );
+ message_t message = {ping_NRP, self->id, 0, 0};
+ lf_set(out[self->NRP_network], message);
+ self->ping_pending = true;
+ self->ping_timeout_pending = true;
+ lf_schedule(ping_timed_out, 0);
+ }
+ }
+ =}
+ }
+
+ // mode Primary
+ mode Backup {
+ timer t(heartbeat_period, heartbeat_period)
+ // NOTE: Paper says to SENDIMHERETOPRIMARY with "longer interval".
+ // Is this really necessary?
+ reaction(reset) {=
+ lf_print(PRINTF_TIME ": ---- Node %d becomes backup.", lf_time_logical_elapsed(), self->id);
+ =}
+
+ reaction(node_fails) -> reset(Failed) {=
+ if(lf_time_logical_elapsed() > 0LL) lf_set_mode(Failed);
+ =}
+
+ reaction(in) -> reset(Primary) {=
+ // Iterate over input channels.
+ for (int c = 0; c < in_width; c++) {
+ if (in[c]->is_present) {
+ if (in[c]->value.type == heartbeat) {
+ lf_print(PRINTF_TIME ": Backup node %d received heartbeat from node %d on network %d.",
+ lf_time_logical_elapsed(), self->id, in[c]->value.source, c
+ );
+ self->heartbeats_missed[c] = 0;
+ } else if (in[c]->value.type == ping_NRP_response && in[c]->value.destination == self->id) {
+ // Got a response from the NRP to a ping we sent.
+ lf_print(PRINTF_TIME ": Backup node %d received ping response on network %d from NRP on switch %d.",
+ lf_time_logical_elapsed(), self->id, c, in[c]->value.source
+ );
+ self->NRP_switch_id = in[c]->value.source;
+ // If there was a timeout on both networks that was not simultaneous, then
+ // we tried pinging the NRP before becoming primary.
+ if (self->become_primary_on_ping_response) {
+ lf_set_mode(Primary);
+ self->become_primary_on_ping_response = false;
+ }
+ self->ping_pending = false;
+ } else if (in[c]->value.type == new_NRP) {
+ // NOTE: Should ping the new NRP and send confirmation back to primary.
+ lf_print(PRINTF_TIME ": Backup node %d received new NRP %d on network %d.",
+ lf_time_logical_elapsed(), self->id, in[c]->value.payload, c
+ );
+ self->NRP_network = c;
+ self->NRP_switch_id = in[c]->value.payload;
+ }
+ }
+ }
+ =}
+
+ reaction(t) -> reset(Primary), out, ping_timed_out {=
+ if (self->heartbeats_missed[0] > self->max_missed_heartbeats
+ && self->heartbeats_missed[1] > self->max_missed_heartbeats) {
+ // Simultaneous heartbeat misses.
+ // In the paper, this is tmoAllNotSimul.
+ // For the tmoAllSimul optimization in the paper, we assume that if
+ // self->heartbeats_missed[0] == self->heartbeats_missed[1], then most likely, it is
+ // the primary that failed, and not the network, so can immediately become the primary.
+ // Otherwise, it is possible that one network failed, and then the other failed, in which
+ // case, we may have a partitioned network.
+ lf_print(PRINTF_TIME ": **** Backup node %d detects missing heartbeats on both networks.",
+ lf_time_logical_elapsed(), self->id
+ );
+ if (self->heartbeats_missed[0] == self->heartbeats_missed[1]) {
+ lf_print(PRINTF_TIME ": **** Missing heartbeats on both networks were simultaneous. "
+ "Assume the primary failed.",
+ lf_time_logical_elapsed()
+ );
+ lf_set_mode(Primary);
+ } else if (self->NRP_switch_id != 0) {
+ // Ping the NRP because if we can't access it, we are on the wrong side of
+ // a network partition and could end up with two primaries.
+ message_t message = {ping_NRP, self->id, self->NRP_switch_id, 0};
+ lf_set(out[self->NRP_network], message);
+ // Wait for a response before becoming primary.
+ self->become_primary_on_ping_response = true;
+ self->ping_pending = true;
+ self->ping_timeout_pending = true;
+ lf_schedule(ping_timed_out, 0);
+ } else {
+ lf_print_warning(PRINTF_TIME "**** Do not know which switch is the NRP! Cannot become primary.",
+ lf_time_logical_elapsed()
+ );
+ }
+ self->heartbeats_missed[0] = 0; // Prevent detecting again immediately.
+ self->heartbeats_missed[1] = 0;
+ } else if (self->heartbeats_missed[0] > self->max_missed_heartbeats
+ || self->heartbeats_missed[1] > self->max_missed_heartbeats) {
+ // Heartbeat missed on one network but not yet on the other.
+ // Ping the NRP to make sure we retain access to it so that we can be an effective backup.
+ // This corresponds to tmoSomeNotAll in the paper.
+ lf_print(PRINTF_TIME ": **** Backup node %d detects missing heartbeats on one network.",
+ lf_time_logical_elapsed(), self->id
+ );
+ // Ping the NRP.
+ message_t message = {ping_NRP, self->id, self->NRP_switch_id, 0};
+ if (!self->ping_pending && !self->ping_timeout_pending && self->NRP_switch_id != 0) {
+ lf_set(out[self->NRP_network], message);
+ lf_print(PRINTF_TIME ": Backup node %d pings NRP on network %d, switch %d",
+ lf_time_logical_elapsed(), self->id, self->NRP_network, self->NRP_switch_id
+ );
+ self->ping_pending = true;
+ self->ping_timeout_pending = true;
+ lf_schedule(ping_timed_out, 0);
+ }
+ }
+ // Increment the counters so if they are not reset to 0 by the next time,
+ // we detect the missed heartbeat.
+ self->heartbeats_missed[0]++;
+ self->heartbeats_missed[1]++;
+ =}
+
+ reaction(ping_timed_out) -> out, new_NRP_request_timed_out, reset(Failed) {=
+ self->ping_timeout_pending = false;
+ if (self->ping_pending) {
+ // Ping timed out.
+ lf_print(PRINTF_TIME ": Backup node %d gets no response from ping.", lf_time_logical_elapsed(), self->id);
+ if (self->NRP_switch_id != 0) {
+ // Send request for new NRP on the other network.
+ lf_print(PRINTF_TIME ": Backup node %d requests new NRP.", lf_time_logical_elapsed(), self->id);
+
+ // Invalidate current NRP.
+ self->NRP_switch_id = 0;
+
+ // Switch networks.
+ if (self->NRP_network == 0) self->NRP_network = 1;
+ else self->NRP_network = 0;
+
+ message_t message = {request_new_NRP, self->id, self->primary, 0};
+ lf_set(out[self->NRP_network], message);
+
+ lf_schedule(new_NRP_request_timed_out, 0);
+ } else {
+ // Failed to connect to new NRP.
+ lf_set_mode(Failed);
+ }
+ self->ping_pending = false;
+ }
+ =}
+
+ reaction(new_NRP_request_timed_out) {=
+ if (self->NRP_switch_id == 0) {
+ lf_print(PRINTF_TIME ": Backup node %d new NRP request timed out. Will not function as backup.",
+ lf_time_logical_elapsed(), self->id
+ );
+ if (self->become_primary_on_ping_response) {
+ lf_print(PRINTF_TIME ": Network is likely partitioned. Remaining as (non-functional) backup.",
+ lf_time_logical_elapsed()
+ );
+ self->become_primary_on_ping_response = false;
+ }
+ }
+ =}
+ }
+
+ mode Failed {
+ reaction(reset) {=
+ lf_print(PRINTF_TIME ": #### Node %d fails.", lf_time_logical_elapsed(), self->id);
+ =}
+ }
+}
+
+/**
+ * Switch with two interfaces. When a ping_NRP message arrives on either interface, if the
+ * destination matches the ID of this switch or the destination is 0, then the switch responds on
+ * the same interface with a ping_NRP_response message. When any other message arrives on either
+ * interface, the switch forwards a copy of the message to the other interface. If any two messages
+ * would be simultaneous on an output, one will be sent one microstep later.
+ */
+reactor Switch(
+ id: int = 0,
+ // For testing. 0 for no failure.
+ fails_at_time: time = 0) {
+ input in1: message_t
+ @side("east")
+ input in2: message_t
+ @side("west")
+ output out1: message_t
+ output out2: message_t
+
+ logical action pending_out1: message_t
+ logical action pending_out2: message_t
+
+ timer switch_fails(fails_at_time)
+
+ initial mode Working {
+ reaction(switch_fails) -> reset(Failed) {=
+ if(lf_time_logical_elapsed() > 0LL) lf_set_mode(Failed);
+ =}
+
+ reaction(pending_out1) -> out1 {=
+ lf_set(out1, pending_out1->value);
+ =}
+
+ reaction(pending_out2) -> out2 {=
+ lf_set(out2, pending_out2->value);
+ =}
+
+ reaction(in1, in2) -> out1, out2, pending_out1, pending_out2 {=
+ if (in1->is_present) {
+ if (in1->value.type == ping_NRP) {
+ if (in1->value.destination == self->id || in1->value.destination == 0) {
+ lf_print(PRINTF_TIME ": ==== Switch %d pinged by node %d. Responding.", lf_time_logical_elapsed(), self->id, in1->value.source);
+ // Respond to the ping.
+ message_t message = {ping_NRP_response, self->id, in1->value.source};
+ if (!out1->is_present) {
+ lf_set(out1, message);
+ } else {
+ lf_schedule_copy(pending_out1, 0, &message, 1);
+ }
+ } else {
+ // Forward the ping.
+ if (!out2->is_present) {
+ lf_set(out2, in1->value);
+ } else {
+ lf_schedule_copy(pending_out2, 0, &in1->value, 1);
+ }
+ }
+ } else {
+ // Forward the message.
+ if (!out2->is_present) {
+ lf_set(out2, in1->value);
+ } else {
+ lf_schedule_copy(pending_out2, 0, &in1->value, 1);
+ }
+ }
+ }
+ if (in2->is_present) {
+ if (in2->value.type == ping_NRP) {
+ if (in2->value.destination == self->id) {
+ lf_print(PRINTF_TIME ": ==== Switch %d pinged by node %d. Responding.", lf_time_logical_elapsed(), self->id, in2->value.source);
+ // Construct a response to the ping.
+ message_t message = {ping_NRP_response, self->id, in2->value.source};
+ // Respond to the ping if out2 is available.
+ if (!out2->is_present) {
+ lf_set(out2, message);
+ } else {
+ lf_schedule_copy(pending_out2, 0, &message, 1);
+ }
+ } else {
+ // Forward the ping to out1 if out1 is available.
+ if (!out1->is_present) {
+ lf_set(out1, in2->value);
+ } else {
+ lf_schedule_copy(pending_out1, 0, &in2->value, 1);
+ }
+ }
+ } else {
+ // Forward the message if out1 is available.
+ if (!out1->is_present) {
+ lf_set(out1, in2->value);
+ } else {
+ lf_schedule_copy(pending_out1, 0, &in2->value, 1);
+ }
+ }
+ }
+ =}
+ }
+
+ mode Failed {
+ reaction(reset) {=
+ lf_print(PRINTF_TIME ": ==== Switch %d fails.", lf_time_logical_elapsed(), self->id);
+ =}
+ }
+}
+
+federated reactor(heartbeat_period: time = 1 s, delay: time = 1 ms) {
+ node1 = new Node(heartbeat_period=heartbeat_period, id=1, fails_at_time = 10 s)
+ switch1 = new Switch(id=1, fails_at_time = 3 s)
+ switch3 = new Switch(id=3)
+
+ node2 = new Node(heartbeat_period=heartbeat_period, id=2, fails_at_time = 15 s)
+ switch2 = new Switch(id=2)
+ switch4 = new Switch(id=4)
+
+ node1.out -> switch1.in1, switch3.in1 after delay
+ switch1.out1, switch3.out1 -> node1.in after delay
+
+ switch1.out2 -> switch2.in2 after delay
+ switch2.out2 -> switch1.in2 after delay
+
+ switch2.out1, switch4.out1 -> node2.in after delay
+ node2.out -> switch2.in1, switch4.in1 after delay
+
+ switch3.out2 -> switch4.in2 after delay
+ switch4.out2 -> switch3.in2 after delay
+}
diff --git a/examples/C/src/leader-election/NRP_FD_Partitioning.lf b/examples/C/src/leader-election/NRP_FD_Partitioning.lf
new file mode 100644
index 00000000..3a046333
--- /dev/null
+++ b/examples/C/src/leader-election/NRP_FD_Partitioning.lf
@@ -0,0 +1,37 @@
+/**
+ * This version of NRP_FD partitions the network and shows that the protocol prevents the backup
+ * from becoming primary, thereby preventing two primaries.
+ *
+ * @author Edward A. Lee
+ * @author Marjan Sirjani
+ */
+// This version
+target C {
+ tracing: true,
+ timeout: 20 s
+}
+
+import Switch, Node from "NRP_FD.lf"
+
+federated reactor(heartbeat_period: time = 1 s, delay: time = 1 ms) {
+ node1 = new Node(heartbeat_period=heartbeat_period, id=1, fails_at_time = 15 s)
+ node2 = new Node(heartbeat_period=heartbeat_period, id=2, fails_at_time = 15 s)
+
+ switch1 = new Switch(id=1, fails_at_time = 3 s)
+ switch2 = new Switch(id=2)
+ switch3 = new Switch(id=3)
+ // Failure of switch4 will partition the network.
+ switch4 = new Switch(id=4, fails_at_time = 10 s)
+
+ node1.out -> switch1.in1, switch3.in1 after delay
+ switch1.out1, switch3.out1 -> node1.in after delay
+
+ switch1.out2 -> switch2.in2 after delay
+ switch2.out2 -> switch1.in2 after delay
+
+ switch2.out1, switch4.out1 -> node2.in after delay
+ node2.out -> switch2.in1, switch4.in1 after delay
+
+ switch3.out2 -> switch4.in2 after delay
+ switch4.out2 -> switch3.in2 after delay
+}
diff --git a/examples/C/src/leader-election/NRP_FD_PrimaryFails.lf b/examples/C/src/leader-election/NRP_FD_PrimaryFails.lf
new file mode 100644
index 00000000..c9ed2969
--- /dev/null
+++ b/examples/C/src/leader-election/NRP_FD_PrimaryFails.lf
@@ -0,0 +1,37 @@
+/**
+ * This version of NRP_FD simply has the primary (node1) failing after 5 seconds and the backup
+ * (node2) failing at at 15s. The backup detects simultaneous loss of the heartbeat on both networks
+ * and hence assumes that the primary has failed rather than there being a network failure. Switch 1
+ * remains the NRP.
+ *
+ * @author Edward A. Lee
+ * @author Marjan Sirjani
+ */
+target C {
+ tracing: true,
+ timeout: 20 s
+}
+
+import Switch, Node from "NRP_FD.lf"
+
+federated reactor(heartbeat_period: time = 1 s, delay: time = 1 ms) {
+ node1 = new Node(heartbeat_period=heartbeat_period, id=1, fails_at_time = 5 s)
+ node2 = new Node(heartbeat_period=heartbeat_period, id=2, fails_at_time = 15 s)
+
+ switch1 = new Switch(id=1)
+ switch2 = new Switch(id=2)
+ switch3 = new Switch(id=3)
+ switch4 = new Switch(id=4)
+
+ node1.out -> switch1.in1, switch3.in1 after delay
+ switch1.out1, switch3.out1 -> node1.in after delay
+
+ switch1.out2 -> switch2.in2 after delay
+ switch2.out2 -> switch1.in2 after delay
+
+ switch2.out1, switch4.out1 -> node2.in after delay
+ node2.out -> switch2.in1, switch4.in1 after delay
+
+ switch3.out2 -> switch4.in2 after delay
+ switch4.out2 -> switch3.in2 after delay
+}
diff --git a/examples/C/src/leader-election/README.md b/examples/C/src/leader-election/README.md
new file mode 100644
index 00000000..d2cce2c9
--- /dev/null
+++ b/examples/C/src/leader-election/README.md
@@ -0,0 +1,36 @@
+# Leader Election
+
+These federated programs implement redundant fault-tolerant systems where a primary node, if and when it fails, is replaced by a backup node. The HeartbeatBully example is described in this paper:
+
+> B. Johansson, M. Rågberger, A. V. Papadopoulos and T. Nolte, "Heartbeat Bully: Failure Detection and Redundancy Role Selection for Network-Centric Controller," IECON 2020 The 46th Annual Conference of the IEEE Industrial Electronics Society, Singapore, 2020, pp. 2126-2133, [DOI: 10.1109/IECON43393.2020.9254494](https://doi.org/10.1109/IECON43393.2020.9254494).
+
+The NRP examples extend the algorithm to reduce the likelihood of getting multiple primaries when the network becomes partitioned. The NRP protocol is described in this paper:
+
+> B. Johansson, M. Rågberger, A. V. Papadopoulos, and T. Nolte, "Consistency Before Availability: Network Reference Point based Failure Detection for Controller Redundancy," Emerging Technologies and Factory Automation (ETFA), 12-15 September 2023, [DOI:10.1109/ETFA54631.2023.10275664](https://doi.org/10.1109/ETFA54631.2023.10275664)
+
+The key idea in the NRP protocol is that when a backup fails to detect the heartbeats of a primary node, it becomes primary only if it has access to Network Reference Point (NRP), which is a point in the network. This way, if the network becomes partitioned, only a backup that is on the side of the partition that still has access to the NRP can become a primary. If a primary loses access to the NRP, then it relinquishes its primary role because it is now on the wrong side of a network partition. A backup on the right side of the partition will take over. The "FD" in the names of the programs stands for "fault detection."
+
+## Prerequisite
+
+To run these programs, you are required to first [install the RTI](https://www.lf-lang.org/docs/handbook/distributed-execution?target=c#installation-of-the-rti) (the Run-Time Infrastructure), which handles the coordination.
+
+## Examples
+
+
+
+ |
+ HeartbeatBully.lf : Basic leader electrion protocol called "heartbeat bully". |
+
+
+ |
+ NRP_FD.lf : Extension using a network reference point (NRP) to help prevent multiple primaries. This version has switch1 failing at 3s, node1 failing at 10s, and node2 failing at 15s. |
+
+
+ |
+ NRP_FD_PrimaryFails.lf : This version has the primary (node1) failing after 5 seconds and the backup (node2) failing at at 15s. The backup detects simultaneous loss of the heartbeat on both networks and hence assumes that the primary has failed rather than there being a network failure. Switch 1 remains the NRP. |
+
+
+ |
+ NRP_FD_Partitioning.lf : This version partitions the network and shows that the protocol prevents the backup from becoming primary, thereby preventing two primaries. |
+
+
diff --git a/examples/C/src/leader-election/img/HeartbeatBully.png b/examples/C/src/leader-election/img/HeartbeatBully.png
new file mode 100644
index 00000000..54ed8551
Binary files /dev/null and b/examples/C/src/leader-election/img/HeartbeatBully.png differ
diff --git a/examples/C/src/leader-election/img/NRP_FD.png b/examples/C/src/leader-election/img/NRP_FD.png
new file mode 100644
index 00000000..026cb609
Binary files /dev/null and b/examples/C/src/leader-election/img/NRP_FD.png differ
diff --git a/examples/C/src/leader-election/img/NRP_FD_Partitioning.png b/examples/C/src/leader-election/img/NRP_FD_Partitioning.png
new file mode 100644
index 00000000..7604b7fb
Binary files /dev/null and b/examples/C/src/leader-election/img/NRP_FD_Partitioning.png differ
diff --git a/examples/C/src/leader-election/img/NRP_FD_PrimaryFails.png b/examples/C/src/leader-election/img/NRP_FD_PrimaryFails.png
new file mode 100644
index 00000000..8cff396e
Binary files /dev/null and b/examples/C/src/leader-election/img/NRP_FD_PrimaryFails.png differ
diff --git a/examples/C/src/modal_models/FurutaPendulum/FurutaPendulum.lf b/examples/C/src/modal_models/FurutaPendulum/FurutaPendulum.lf
index 1af7c3e3..fe7969c3 100644
--- a/examples/C/src/modal_models/FurutaPendulum/FurutaPendulum.lf
+++ b/examples/C/src/modal_models/FurutaPendulum/FurutaPendulum.lf
@@ -16,7 +16,7 @@
target C {
timeout: 3 secs,
fast: true,
- flags: "-lm",
+ cmake-include: "furuta.cmake",
build: "./build_run_plot.sh FurutaPendulum"
}
diff --git a/examples/C/src/modal_models/FurutaPendulum/FurutaPendulumDisturbance.lf b/examples/C/src/modal_models/FurutaPendulum/FurutaPendulumDisturbance.lf
index 873cb013..ad5ee330 100644
--- a/examples/C/src/modal_models/FurutaPendulum/FurutaPendulumDisturbance.lf
+++ b/examples/C/src/modal_models/FurutaPendulum/FurutaPendulumDisturbance.lf
@@ -8,7 +8,6 @@
target C {
timeout: 5 secs,
fast: true,
- flags: "-lm",
build: "./build_run_plot.sh FurutaPendulumDisturbance"
}
diff --git a/examples/C/src/modal_models/FurutaPendulum/build_run_plot.sh b/examples/C/src/modal_models/FurutaPendulum/build_run_plot.sh
index 1e8dc089..2f9325e1 100755
--- a/examples/C/src/modal_models/FurutaPendulum/build_run_plot.sh
+++ b/examples/C/src/modal_models/FurutaPendulum/build_run_plot.sh
@@ -20,11 +20,11 @@ fi
# Build the generated code.
cd ${LF_SOURCE_GEN_DIRECTORY}
-cmake -DLF_REACTION_GRAPH_BREADTH=3 -DLF_THREADED=1 -DNUMBER_OF_WORKERS=0 -DSCHEDULER=NP -DMODAL_REACTORS=TRUE .
-cmake --build .
+cmake -Bbuild
+make -C build
# Move the executable to the bin directory.
-mv $1 ${LF_BIN_DIRECTORY}
+mv build/$1 ${LF_BIN_DIRECTORY}
# Move back to source directory to run program
cd ${LF_SOURCE_DIRECTORY}
diff --git a/examples/C/src/modal_models/FurutaPendulum/furuta.cmake b/examples/C/src/modal_models/FurutaPendulum/furuta.cmake
new file mode 100644
index 00000000..430831ec
--- /dev/null
+++ b/examples/C/src/modal_models/FurutaPendulum/furuta.cmake
@@ -0,0 +1 @@
+target_link_libraries(${LF_MAIN_TARGET} PRIVATE m)
\ No newline at end of file
diff --git a/examples/C/src/mqtt/MQTTDistributed.lf b/examples/C/src/mqtt/MQTTDistributed.lf
index 6152b317..bdfb90b4 100644
--- a/examples/C/src/mqtt/MQTTDistributed.lf
+++ b/examples/C/src/mqtt/MQTTDistributed.lf
@@ -12,8 +12,8 @@
* The code generator produces three programs, bin/MQTTDistributed_RTI, bin/MQTTDistributed_source,
* and bin/MQTTDistributed_destination, plus a script bin/MQTTDistributed that runs all three.
*
- * Since the source and destination are running in the same executable, there is no clock
- * synchronization error.
+ * If the source and destination are running in the same machine, there is no clock synchronization
+ * error.
*
* See README.md for prerequisites and further information.
*
@@ -21,7 +21,6 @@
* @author Edward A. Lee
*/
target C {
- cmake-include: ["include/paho-extension.cmake", "include/mosquitto-extension.cmake"],
timeout: 10 secs,
coordination: centralized
}
diff --git a/examples/C/src/mqtt/MQTTDistributedActivity.lf b/examples/C/src/mqtt/MQTTDistributedActivity.lf
index 7db8d95e..1678a9fe 100644
--- a/examples/C/src/mqtt/MQTTDistributedActivity.lf
+++ b/examples/C/src/mqtt/MQTTDistributedActivity.lf
@@ -11,7 +11,6 @@
* @author Edward A. Lee
*/
target C {
- cmake-include: ["include/paho-extension.cmake", "include/mosquitto-extension.cmake"],
timeout: 10 secs,
coordination: centralized
}
diff --git a/examples/C/src/mqtt/MQTTLegacy.lf b/examples/C/src/mqtt/MQTTLegacy.lf
index 0065ae4f..ef384007 100644
--- a/examples/C/src/mqtt/MQTTLegacy.lf
+++ b/examples/C/src/mqtt/MQTTLegacy.lf
@@ -13,18 +13,13 @@
*
* This is a federated program, the publisher and subscriber run in separate programs. This would
* work pretty much the same way, however, as an unfederated program. To run as an unfederated
- * program, add to cmake-include the following file:
- *
- * "include/net_utils.cmake"
- *
- * and change the `federated` keyword to `main`.
+ * program, just change the `federated` keyword to `main`.
*
* See README.md for prerequisites and further information.
*
* @author Edward A. Lee
*/
target C {
- cmake-include: ["include/paho-extension.cmake", "include/mosquitto-extension.cmake"],
timeout: 1 min,
coordination: centralized
}
@@ -49,7 +44,7 @@ reactor Subscriber {
sub.message -> dsp.message
}
-federated reactor {
+main reactor {
source = new Publisher()
destination = new Subscriber()
}
diff --git a/examples/C/src/mqtt/MQTTLogical.lf b/examples/C/src/mqtt/MQTTLogical.lf
index 3a6afb56..fdffb4a9 100644
--- a/examples/C/src/mqtt/MQTTLogical.lf
+++ b/examples/C/src/mqtt/MQTTLogical.lf
@@ -13,10 +13,6 @@
* @author Edward A. Lee
*/
target C {
- cmake-include: [
- "include/paho-extension.cmake", // For #include "MQTTClient.h"
- // For encode_int64()
- "include/net_utils.cmake"],
timeout: 10 secs
}
diff --git a/examples/C/src/mqtt/MQTTPhysical.lf b/examples/C/src/mqtt/MQTTPhysical.lf
index 9094de08..70c881eb 100644
--- a/examples/C/src/mqtt/MQTTPhysical.lf
+++ b/examples/C/src/mqtt/MQTTPhysical.lf
@@ -12,10 +12,6 @@
* @author Edward A. Lee
*/
target C {
- cmake-include: [
- "include/paho-extension.cmake", // For #include "MQTTClient.h"
- // For encode_int64() and extract_int64()
- "include/net_utils.cmake"],
timeout: 10 secs
}
diff --git a/examples/C/src/mqtt/README.md b/examples/C/src/mqtt/README.md
index 6506f72c..8eb3f409 100644
--- a/examples/C/src/mqtt/README.md
+++ b/examples/C/src/mqtt/README.md
@@ -25,8 +25,23 @@ The following examples illustrate more advanced features, particularly the limit
## Prerequisites:
-To get this example to compile, you will need to install the [Eclipse Paho MQTT C client library](https://github.com/eclipse/paho.mqtt.c), which requires that you first install
-[openSSL](https://github.com/openssl/openssl.git) (see [https://www.openssl.org](https://www.openssl.org). To run the compiled code, you need an MQTT broker to be running. For example, the [Mosquitto Eclipse project](https://mosquitto.org/download/) provides one. On a Mac, you can use homebrew to install the Mosquitto broker:
+To get this example to compile, you will need to install
+
+1. [openSSL](https://github.com/openssl/openssl.git). See [https://www.openssl.org](https://www.openssl.org).
+2. The [Eclipse Paho MQTT C client library](https://github.com/eclipse/paho.mqtt.c). E.g., the following might work:
+
+```shell
+ git clone git@github.com:eclipse/paho.mqtt.c.git
+ mkdir /tmp/build.paho ; cd /tmp/build.paho
+ cmake -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=TRUE \
+ -DPAHO_BUILD_SAMPLES=TRUE ~/paho.mqtt.c
+ sudo make install
+ export DYLD_LIBRARY_PATH=/usr/local/lib:$DYLD_LIBRARY_PATH
+```
+
+The last line could be put in your `~/.bash_profile` file so that you don't have to type each time you run the LF program in a new shell.
+
+To run the compiled code, you need an MQTT broker to be running. For example, the [Mosquitto Eclipse project](https://mosquitto.org/download/) provides one. On a Mac, you can use homebrew to install the Mosquitto broker:
brew install mosquitto
@@ -44,6 +59,12 @@ To start the broker and test it, do this:
> mosquitto_pub -t 'test/topic' -m 'Hello World'
+If you want to start the broker always upon login, you can do this:
+
+```
+brew services start mosquitto
+```
+
## Implementation
The [`MQTTPublisher`](https://github.com/lf-lang/examples-lingua-franca/blob/main/C/src/MQTT/lib/MQTTPublisher.lf) and [`MQTTSubscriber`](https://github.com/lf-lang/examples-lingua-franca/blob/main/C/src/MQTT/lib/MQTTSubscriber.lf) reactor use the [Paho MQTT Client Library](https://github.com/eclipse/paho.mqtt.c) (see the [documentation](https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html)).
diff --git a/examples/C/src/mqtt/include/net_utils.cmake b/examples/C/src/mqtt/include/net_utils.cmake
index 448ea62e..52ca4fe6 100644
--- a/examples/C/src/mqtt/include/net_utils.cmake
+++ b/examples/C/src/mqtt/include/net_utils.cmake
@@ -1 +1 @@
-target_sources(${LF_MAIN_TARGET} PRIVATE "core/federated/net_util.c")
+target_sources(${LF_MAIN_TARGET} PRIVATE "core/federated/network/net_util.c")
diff --git a/examples/C/src/mqtt/lib/MQTTPublisher.lf b/examples/C/src/mqtt/lib/MQTTPublisher.lf
index bc49b6cd..9d3ab882 100644
--- a/examples/C/src/mqtt/lib/MQTTPublisher.lf
+++ b/examples/C/src/mqtt/lib/MQTTPublisher.lf
@@ -25,18 +25,22 @@
* @author Ravi Akella
* @author Edward A. Lee
*/
-target C
+target C {
+ cmake-include: [
+ "../include/paho-extension.cmake",
+ // For encode_int64()
+ "../include/net_utils.cmake"]
+}
preamble {=
#ifndef MQTT_PUBLISHER_H
#define MQTT_PUBLISHER_H
#include "platform.h" // Defines lf_critical_section_enter(), etc.
- #include "tag.h" // Defines lf_time_logical()
#include // Defines memcpy
#include "MQTTClient.h"
- #include "core/federated/net_util.h"
+ #include "core/federated/network/net_util.h"
// Struct type used to keep track of messages in flight between reactions.
typedef struct inflight_t {
diff --git a/examples/C/src/mqtt/lib/MQTTSubscriber.lf b/examples/C/src/mqtt/lib/MQTTSubscriber.lf
index e5ae4133..b55d79ce 100644
--- a/examples/C/src/mqtt/lib/MQTTSubscriber.lf
+++ b/examples/C/src/mqtt/lib/MQTTSubscriber.lf
@@ -36,18 +36,22 @@
* @author Ravi Akella
* @author Edward A. Lee
*/
-target C
+target C {
+ cmake-include: [
+ "../include/paho-extension.cmake",
+ // For extract_int64()
+ "../include/net_utils.cmake"]
+}
preamble {=
#ifndef MQTT_SUBSCRIBER_H
#define MQTT_SUBSCRIBER_H
#include "platform.h" // Defines lf_critical_section_enter(), etc.
- #include "tag.h" // Defines lf_time_logical()
#include // Defines memcmp()
#include "MQTTClient.h"
- #include "core/federated/net_util.h"
+ #include "core/federated/network/net_util.h"
// Fix the QoS to indicate that the message will be delivered reliably exactly once.
#define QOS 2
diff --git a/examples/C/src/shared-memory/README.md b/examples/C/src/shared-memory/README.md
new file mode 100644
index 00000000..5b5a959e
--- /dev/null
+++ b/examples/C/src/shared-memory/README.md
@@ -0,0 +1,15 @@
+# Shared Memory
+
+The POSIX Realtime Extension includes a mechanism for processes on a single machine to share memory. A writer opens a "file" using `shm_open` and then uses `mmap` to map a sequence of memory addresses to the contents of this in-memory file. The `mmap` function returns a pointer to this memory, which the writer can then use to store data.
+
+A reader needs only the file name to open the file using `shm_open`, which it can then also map to memory locations using `mmap`
+
+This example shows how you can safely use this mechanism to exchange large chunks of data between LF federates without serializing, streaming, and then deserializing the data. The Sender reactor creates a file name using the current logical time (to ensure uniqueness, assuming no use of microsteps). It populates the shared memory with data and then sends the filename to the Reader. The Reader will only receive the file name after the Sender has finished writing to it, so precedence constraints are satisfied.
+
+
+
+
+
+ | SharedMemory.lf: An illustration of how to use shared memory to exchange large chunks of data between federates. |
+
+
\ No newline at end of file
diff --git a/examples/C/src/shared-memory/SharedMemory.lf b/examples/C/src/shared-memory/SharedMemory.lf
new file mode 100644
index 00000000..0c186ca3
--- /dev/null
+++ b/examples/C/src/shared-memory/SharedMemory.lf
@@ -0,0 +1,78 @@
+/**
+ * The POSIX Realtime Extension includes a mechanism for processes on a single machine to share
+ * memory. A writer opens a "file" using `shm_open` and then uses `mmap` to map a sequence of memory
+ * addresses to the contents of this in-memory file. The `mmap` function returns a pointer to this
+ * memory, which the writer can then use to store data.
+ *
+ * A reader needs only the file name to open the file using `shm_open`, which it can then also map
+ * to memory locations using `mmap`.
+ *
+ * This example shows how you can safely use this mechanism to exchange large chunks of data between
+ * LF federates without serializing, streaming, and then deserializing the data. The Sender reactor
+ * creates a file name using the current logical time (to ensure uniqueness, assuming no use of
+ * microsteps). It populates the shared memory with data and then sends the filename to the Reader.
+ * The Reader will only receive the file name after the Sender has finished writing to it, so
+ * precedence constraints are satisfied.
+ *
+ * @author Edward A. Lee
+ */
+target C {
+ timeout: 0 s
+}
+
+preamble {=
+ #include
+ #include
+ #include
+ #include
+ #define SIZE 4096
+=}
+
+reactor Sender {
+ // Do not use string data type because the string filename is dynamically allocated.
+ output out: char*
+
+ reaction(startup) -> out {=
+ tag_t now = lf_tag();
+ char *name;
+ // Create a file name based on current time.
+ if (asprintf(&name, "Sender_" PRINTF_TIME "_%d", now.time, now.microstep) < 0) {
+ lf_print_error_and_exit("Memory allocation error.");
+ }
+ lf_print("**** Writing to shared memory with filename: %s", name);
+ int fd = shm_open(name, O_CREAT | O_RDWR, 0666);
+ ftruncate(fd, SIZE); // Limit the size.
+ char* ptr = (char*)mmap(0, SIZE, PROT_WRITE, MAP_SHARED, fd, 0);
+ const char* message = "Hello World!";
+
+ // Write to the shared memory file.
+ char* offset = ptr;
+ while (offset < ptr + SIZE - strlen(message)) {
+ sprintf(offset, "%s", message);
+ offset += strlen(message);
+ }
+ // Send out the file name only, not the data it contains.
+ lf_set_array(out, name, strlen(name) + 1);
+ =}
+}
+
+reactor Reader {
+ input in: char*
+
+ reaction(in) {=
+ lf_print("**** Reading shared memory file %s", in->value);
+ int fd = shm_open(in->value, O_RDONLY, 0666);
+
+ // Memory map the shared memory object.
+ char* ptr = (char*)mmap(0, SIZE, PROT_READ, MAP_SHARED, fd, 0);
+
+ // Read the shared memory data.
+ lf_print("%s", ptr);
+ =}
+}
+
+federated reactor {
+ s = new Sender()
+ r = new Reader()
+ s.out -> r.in
+}
diff --git a/examples/C/src/shared-memory/img/SharedMemory.png b/examples/C/src/shared-memory/img/SharedMemory.png
new file mode 100644
index 00000000..b607ca49
Binary files /dev/null and b/examples/C/src/shared-memory/img/SharedMemory.png differ