diff --git a/core/src/main/java/org/lflang/AttributeUtils.java b/core/src/main/java/org/lflang/AttributeUtils.java index 79447cb440..37cbcecd38 100644 --- a/core/src/main/java/org/lflang/AttributeUtils.java +++ b/core/src/main/java/org/lflang/AttributeUtils.java @@ -226,6 +226,11 @@ public static boolean isSparse(EObject node) { return findAttributeByName(node, "sparse") != null; } + /** Return true if the node has an {@code @transient} attribute. */ + public static boolean isTransient(Instantiation node) { + return findAttributeByName(node, "transient") != null; + } + /** Return true if the reactor is marked to be a federate. */ public static boolean isFederate(Reactor reactor) { return findAttributeByName(reactor, "_fed_config") != null; diff --git a/core/src/main/java/org/lflang/federated/extensions/CExtension.java b/core/src/main/java/org/lflang/federated/extensions/CExtension.java index 9e91349c98..b873cf0922 100644 --- a/core/src/main/java/org/lflang/federated/extensions/CExtension.java +++ b/core/src/main/java/org/lflang/federated/extensions/CExtension.java @@ -466,7 +466,7 @@ public String generatePortAbsentReactionBody( + receivingPortID + ", " + connection.getDstFederate().id - + ", (long long) lf_time_logical_elapsed());", + + ", lf_time_logical_elapsed());", "if (" + sendRef + " == NULL || !" + sendRef + "->is_present) {", "LF_PRINT_LOG(\"The output port is NULL or it is not present.\");", " lf_send_port_absent_to_federate(" @@ -555,15 +555,33 @@ protected String makePreamble( // that handles incoming network messages destined to the specified // port. This will only be used if there are federates. int numOfNetworkActions = federate.networkMessageActions.size(); + int numZDCNetworkActions = federate.zeroDelayCycleNetworkMessageActions.size(); code.pr( """ interval_t _lf_action_delay_table[%1$s]; lf_action_base_t* _lf_action_table[%1$s]; size_t _lf_action_table_size = %1$s; - lf_action_base_t* _lf_zero_delay_cycle_action_table[%2$s]; - size_t _lf_zero_delay_cycle_action_table_size = %2$s; """ - .formatted(numOfNetworkActions, federate.zeroDelayCycleNetworkMessageActions.size())); + .formatted(numOfNetworkActions)); + if (numZDCNetworkActions > 0) { + code.pr( + """ + lf_action_base_t* _lf_zero_delay_cycle_action_table[%1$s]; + size_t _lf_zero_delay_cycle_action_table_size = %1$s; + uint16_t _lf_zero_delay_cycle_upstream_ids[%1$s]; + bool _lf_zero_delay_cycle_upstream_disconnected[%1$s] = { false }; + """ + .formatted(numZDCNetworkActions)); + } else { + // Make sure these symbols are defined, even though only size will be used. + code.pr( + """ + lf_action_base_t** _lf_zero_delay_cycle_action_table = NULL; + size_t _lf_zero_delay_cycle_action_table_size = 0; + uint16_t* _lf_zero_delay_cycle_upstream_ids = NULL; + bool* _lf_zero_delay_cycle_upstream_disconnected = NULL; + """); + } int numOfNetworkReactions = federate.networkReceiverReactions.size(); code.pr( @@ -725,6 +743,8 @@ else if (globalSTP instanceof CodeExprImpl) } // Set global variable identifying the federate. code.pr("_lf_my_fed_id = " + federate.id + ";"); + // Set indicator variable that specifies whether the federate is transient or not. + code.pr("_fed.is_transient = " + federate.isTransient + ";"); // We keep separate record for incoming and outgoing p2p connections to allow incoming traffic // to be processed in a separate diff --git a/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java b/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java index ad982431a2..a7eeda782a 100644 --- a/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java +++ b/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java @@ -53,7 +53,6 @@ public static String initializeTriggersForNetworkActions( CodeBuilder code = new CodeBuilder(); if (!federate.networkMessageActions.isEmpty()) { var actionTableCount = 0; - var zeroDelayActionTableCount = 0; for (int i = 0; i < federate.networkMessageActions.size(); ++i) { // Find the corresponding ActionInstance. Action action = federate.networkMessageActions.get(i); @@ -76,10 +75,17 @@ public static String initializeTriggersForNetworkActions( // Set the ID of the source federate. code.pr( trigger + ".source_id = " + federate.networkMessageSourceFederate.get(i).id + "; \\"); - if (federate.zeroDelayCycleNetworkMessageActions.contains(action)) { + int j = federate.zeroDelayCycleNetworkMessageActions.indexOf(action); + if (j >= 0) { + var upstream = federate.zeroDelayCycleNetworkUpstreamFeds.get(j); + code.pr("_lf_zero_delay_cycle_upstream_ids[" + j + "] = " + upstream.id + "; \\"); + if (upstream.isTransient) { + // Transient federates are assumed to be initially disconnected. + code.pr("_lf_zero_delay_cycle_upstream_disconnected[" + j + "] = true; \\"); + } code.pr( "_lf_zero_delay_cycle_action_table[" - + zeroDelayActionTableCount++ + + j + "] = (lf_action_base_t*)&" + trigger + "; \\"); @@ -151,7 +157,8 @@ public static String stpStructs(FederateInstance federate) { */ public static String createPortStatusFieldForInput(Input input) { StringBuilder builder = new StringBuilder(); - // If it is not a multiport, then we could re-use the port trigger, and nothing needs to be done + // If it is not a multiport, then we could re-use the port trigger, and nothing + // needs to be done if (ASTUtils.isMultiport(input)) { // If it is a multiport, then create an auxiliary list of port // triggers for each channel of @@ -236,12 +243,13 @@ static boolean clockSyncIsOn(FederateInstance federate, RtiConfig rtiConfig) { * *

Clock synchronization can be enabled using the clock-sync target property. * - * @see Documentation + * @see Documentation */ public static void initializeClockSynchronization( FederateInstance federate, RtiConfig rtiConfig, MessageReporter messageReporter) { - // Check if clock synchronization should be enabled for this federate in the first place + // Check if clock synchronization should be enabled for this federate in the + // first place if (clockSyncIsOn(federate, rtiConfig)) { messageReporter .nowhere() @@ -267,8 +275,8 @@ public static void initializeClockSynchronization( * *

Clock synchronization can be enabled using the clock-sync target property. * - * @see Documentation + * @see Documentation */ public static void addClockSyncCompileDefinitions(FederateInstance federate) { @@ -311,8 +319,15 @@ public static void generateCMakeInclude( "add_compile_definitions(LF_SOURCE_DIRECTORY=\"" + fileConfig.srcPath + "\")"); cmakeIncludeCode.pr( "add_compile_definitions(LF_PACKAGE_DIRECTORY=\"" + fileConfig.srcPkgPath + "\")"); + // After federates have been divided, their root package directory is different. cmakeIncludeCode.pr( - "add_compile_definitions(LF_SOURCE_GEN_DIRECTORY=\"" + fileConfig.getSrcGenPath() + "\")"); + "add_compile_definitions(LF_FED_PACKAGE_DIRECTORY=\"" + + fileConfig.srcPkgPath + + File.separator + + "fed-gen" + + File.separator + + fileConfig.name + + "\")"); cmakeIncludeCode.pr("add_compile_definitions(LF_FILE_SEPARATOR=\"" + File.separator + "\")"); try (var srcWriter = Files.newBufferedWriter(cmakeIncludePath)) { srcWriter.write(cmakeIncludeCode.getCode()); diff --git a/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java b/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java index 5973b04dfd..3355152b48 100644 --- a/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java +++ b/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java @@ -287,9 +287,10 @@ private static void addNetworkReceiverReactor( connection.dstFederate.networkMessageSourceFederate.add(connection.srcFederate); connection.dstFederate.networkMessageActionDelays.add(connection.getDefinition().getDelay()); if (connection.srcFederate.isInZeroDelayCycle() - && connection.getDefinition().getDelay() == null) + && connection.getDefinition().getDelay() == null) { connection.dstFederate.zeroDelayCycleNetworkMessageActions.add(networkAction); - + connection.dstFederate.zeroDelayCycleNetworkUpstreamFeds.add(connection.srcFederate); + } // Get the largest STAA for any reaction triggered by the destination port. TimeValue maxSTAA = findMaxSTAA(connection, coordination); diff --git a/core/src/main/java/org/lflang/federated/generator/FederateInstance.java b/core/src/main/java/org/lflang/federated/generator/FederateInstance.java index 5b7775719d..baa10e9488 100644 --- a/core/src/main/java/org/lflang/federated/generator/FederateInstance.java +++ b/core/src/main/java/org/lflang/federated/generator/FederateInstance.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.lflang.AttributeUtils; import org.lflang.MessageReporter; import org.lflang.TimeValue; import org.lflang.ast.ASTUtils; @@ -96,6 +97,7 @@ public FederateInstance( this.bankWidth = bankWidth; this.messageReporter = messageReporter; this.targetConfig = targetConfig; + this.isTransient = AttributeUtils.isTransient(instantiation); // If the instantiation is in a bank, then we have to append // the bank index to the name. @@ -157,6 +159,9 @@ public Instantiation getInstantiation() { /** The integer ID of this federate. */ public int id; + /** Type of the federate: transient if true, and peristent if false . */ + public boolean isTransient = false; + /** * The name of this federate instance. This will be the instantiation name, possibly appended with * "__n", where n is the bank position of this instance if the instantiation is of a bank of @@ -188,6 +193,12 @@ public Instantiation getInstantiation() { */ public List zeroDelayCycleNetworkMessageActions = new ArrayList<>(); + /** + * List of upstream federates corresponding to actions in the zeroDelayCycleNetworkMessageActions + * list. + */ + public List zeroDelayCycleNetworkUpstreamFeds = new ArrayList<>(); + /** * A set of federates with which this federate has an inbound connection There will only be one * physical connection even if federate A has defined multiple physical connections to federate B. diff --git a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java index 192dcef334..76514a4101 100644 --- a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java +++ b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java @@ -324,12 +324,20 @@ private String getRtiCommand(List federates, boolean isRemote) if (targetConfig.getOrDefault(TracingProperty.INSTANCE).isEnabled()) { commands.add(" -t \\"); } + // Identify the number of transient federates. + int transientFederatesNumber = 0; + for (FederateInstance federate : federates) { + if (federate.isTransient) { + transientFederatesNumber++; + } + } if (!targetConfig.getOrDefault(DNETProperty.INSTANCE)) { commands.add(" -d \\"); } commands.addAll( List.of( " -n " + federates.size() + " \\", + " -nt " + transientFederatesNumber + " \\", " -c " + targetConfig.getOrDefault(ClockSyncModeProperty.INSTANCE).toString() + " \\")); diff --git a/core/src/main/java/org/lflang/generator/c/CCompiler.java b/core/src/main/java/org/lflang/generator/c/CCompiler.java index 12de4474dc..5c1defff83 100644 --- a/core/src/main/java/org/lflang/generator/c/CCompiler.java +++ b/core/src/main/java/org/lflang/generator/c/CCompiler.java @@ -109,9 +109,9 @@ public boolean runCCompiler(GeneratorBase generator, LFGeneratorContext context) // avoid any error residue that can occur in CMake from // a previous build. // FIXME: This is slow and only needed if an error - // has previously occurred. Deleting the build directory - // if no prior errors have occurred can prolong the compilation - // substantially. See #1416 for discussion. + // has previously occurred. Deleting the build directory + // if no prior errors have occurred can prolong the compilation + // substantially. See #1416 for discussion. FileUtil.deleteDirectory(buildPath); // Make sure the build directory exists Files.createDirectories(buildPath); @@ -240,14 +240,15 @@ private static List cmakeOptions(TargetConfig targetConfig, FileConfig f + FileUtil.toUnixString(fileConfig.getOutPath().relativize(fileConfig.binPath)), "-DLF_FILE_SEPARATOR='" + quote + separator + quote + "'")); // Add #define for source file directory. - // Do not do this for federated programs because for those, the definition is put - // into the cmake file (and fileConfig.srcPath is the wrong directory anyway). + // Do not do this for federated programs because for those, the definition is + // put into the cmake file (and fileConfig.srcPath is the wrong directory + // anyway). if (!fileConfig.srcPath.toString().contains("fed-gen")) { // Do not convert to Unix path arguments.add("-DLF_SOURCE_DIRECTORY='" + quote + srcPath + quote + "'"); arguments.add("-DLF_PACKAGE_DIRECTORY='" + quote + rootPath + quote + "'"); - arguments.add("-DLF_SOURCE_GEN_DIRECTORY='" + quote + srcGenPath + quote + "'"); } + arguments.add("-DLF_SOURCE_GEN_DIRECTORY='" + quote + srcGenPath + quote + "'"); arguments.add(FileUtil.toUnixString(fileConfig.getSrcGenPath())); if (GeneratorUtils.isHostWindows()) { diff --git a/core/src/main/java/org/lflang/validation/AttributeSpec.java b/core/src/main/java/org/lflang/validation/AttributeSpec.java index a685e98ca7..cb13cca247 100644 --- a/core/src/main/java/org/lflang/validation/AttributeSpec.java +++ b/core/src/main/java/org/lflang/validation/AttributeSpec.java @@ -205,6 +205,8 @@ enum AttrParamType { new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.STRING, false)))); // @sparse ATTRIBUTE_SPECS_BY_NAME.put("sparse", new AttributeSpec(null)); + // @transient + ATTRIBUTE_SPECS_BY_NAME.put("transient", new AttributeSpec(null)); // @icon("value") ATTRIBUTE_SPECS_BY_NAME.put( "icon", diff --git a/core/src/main/java/org/lflang/validation/LFValidator.java b/core/src/main/java/org/lflang/validation/LFValidator.java index eb721cf032..10ad9c2c11 100644 --- a/core/src/main/java/org/lflang/validation/LFValidator.java +++ b/core/src/main/java/org/lflang/validation/LFValidator.java @@ -531,6 +531,19 @@ public void checkInstantiation(Instantiation instantiation) { error("Variable-width banks are not supported.", Literals.INSTANTIATION__WIDTH_SPEC); } } + + // If the Instantiation is annotated as '@transient', then: + // - The container has to be a federated reactor, + // - The coordination is centralized, + // - And the target is C. + if (AttributeUtils.isTransient(instantiation)) { + Reactor container = (Reactor) instantiation.eContainer(); + if (!container.isFederated()) { + error( + "Only federates can be transients: " + instantiation.getReactorClass().getName(), + Literals.INSTANTIATION__REACTOR_CLASS); + } + } } @Check(CheckType.FAST) diff --git a/core/src/main/resources/lib/c/reactor-c b/core/src/main/resources/lib/c/reactor-c index 77ed6d15e4..7e2971ba3a 160000 --- a/core/src/main/resources/lib/c/reactor-c +++ b/core/src/main/resources/lib/c/reactor-c @@ -1 +1 @@ -Subproject commit 77ed6d15e4176103bdb2b410729c74533b1dbf73 +Subproject commit 7e2971ba3a2f1d4820efccd3108faef589237f84 diff --git a/core/src/main/resources/lib/cpp/reactor-cpp b/core/src/main/resources/lib/cpp/reactor-cpp index d009fdd85b..d255a3da57 160000 --- a/core/src/main/resources/lib/cpp/reactor-cpp +++ b/core/src/main/resources/lib/cpp/reactor-cpp @@ -1 +1 @@ -Subproject commit d009fdd85bf15beb992a3102a99ed91497f8a6fd +Subproject commit d255a3da57d38db2988bcab68024b0a77ccc8657 diff --git a/test/C/src/federated/transient/TransientDownstreamWithTimer.lf b/test/C/src/federated/transient/TransientDownstreamWithTimer.lf new file mode 100644 index 0000000000..0b9626a362 --- /dev/null +++ b/test/C/src/federated/transient/TransientDownstreamWithTimer.lf @@ -0,0 +1,158 @@ +/** + * This LF program tests if a transient federate corretly leaves then joins the federation. It also + * tests if the transient's downstream executes as expected, that is it receives correct TAGs, + * regardless of the transient being absent or present. In this test: + * - the transient federate spontaneously leaves the federation after 2 reactions to input port + * `in`, + * - the downstream of the transient federate has only one transient as upstream. + */ +target C { + timeout: 3 s +} + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + lf_print("**** Launch command: %s", mid_launch_cmd); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, upstream of the transient. It reacts to its timer by sending increments to + * output port out. + */ +reactor Up(period: time = 500 ms) { + output out: int + timer t(0, period) + state count: int = 0 + + reaction(t) -> out {= + lf_set(out, self->count); + self->count++; + =} +} + +/** + * Transient federate that forwards whatever it receives from `Up` to `Down`. It reacts twice to + * input port `in`, then stops. It will execute twice during the lifetime of the federation. The + * second launch is done by `TransientExec` at logical time 1 s. Each time `Middle` joins, it + * notifies `Down`. + */ +reactor Middle { + input in: int + output out: int + output join: int + state count: int = 0 + + // Middle notifies its downstream that he joined, but make sure first that the effective start + // tag is correct + reaction(startup) -> join {= + tag_t t = lf_tag_start_effective(); + if(t.time < lf_time_start()) { + lf_print_error_and_exit("Fatal error: the transient's effective start time is less than the federation start time"); + } + + lf_set(join, 0); + =} + + // Pass the input value to the output port and stop spontaneously after two reactions to in + reaction(in) -> out {= + self->count++; + lf_set(out, in->value); + + if (self->count == 2) { + lf_stop(); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down { + timer t(0, 500 ms) + + input in: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(in) {= + self->count_in_mid_reactions++; + =} + + reaction(shutdown) {= + // Check that the TAG has been successfully issued to Down + if (self->count_timer < 5) { + lf_print_error_and_exit("Down federate's timer reacted %d times, while it had to react more than %d times.", + self->count_timer, 5); + } + + // Check that `Middle` have joined 2 times + if (self->count_join != 2) { + lf_print_error_and_exit("Transient federate did not join twice, but %d times!", self->count_join); + } + + // Check that `Middle` have reacted correctly + if (self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 4, but had: %d.", + self->count_in_mid_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.out -> down.in +} diff --git a/test/C/src/federated/transient/TransientDownstreamWithTwoUpstream.lf b/test/C/src/federated/transient/TransientDownstreamWithTwoUpstream.lf new file mode 100644 index 0000000000..745ee2727d --- /dev/null +++ b/test/C/src/federated/transient/TransientDownstreamWithTwoUpstream.lf @@ -0,0 +1,128 @@ +/** + * This LF program tests if a transient federate corretly leaves then joins the federation. It also + * tests if the transient's downstream executes as expected, that is it received correct TAGs, + * regardless of the transient being absent or present. In this test: + * - the transient federate spontaneously leaves the federation after 2 reactions to input port in, + * - the downstream of the transient federate has one persistent and one transient upstreams. + * + * In addition, the program tests if authentication works in case of a federation with transients, + * by adding `auth` target property. + */ +target C { + timeout: 3 s, + auth: true +} + +import Up from "TransientDownstreamWithTimer.lf" +import Middle from "TransientDownstreamWithTimer.lf" + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down { + timer t(0, 500 ms) + + input in_mid: int + input in_up: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + state count_in_up_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(in_mid) {= + self->count_in_mid_reactions++; + =} + + reaction(in_up) {= + self->count_in_up_reactions++; + =} + + reaction(shutdown) {= + // Check that the TAG have been successfully issued to Down + if (self->count_timer < 5) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react more than %d times.", + self->count_timer, + 5); + } + if (self->count_in_up_reactions < 7) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react more than %d times.", + self->count_in_up_reactions, + 7); + } + + // Check that Middle have joined 2 times + if (self->count_join != 2) { + lf_print_error_and_exit("Transient federate did not join twice, but %d times!", self->count_join); + } + + // Check that Middle have reacted correctly + if (self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 4, but had: %d.", + self->count_in_mid_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up1 = new Up() + up2 = new Up(period = 300 msec) + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up1.out -> mid.in + mid.join -> down.join + mid.out -> down.in_mid + up2.out -> down.in_up +} diff --git a/test/C/src/federated/transient/TransientHotSwap.lf b/test/C/src/federated/transient/TransientHotSwap.lf new file mode 100644 index 0000000000..b8270af8d7 --- /dev/null +++ b/test/C/src/federated/transient/TransientHotSwap.lf @@ -0,0 +1,97 @@ +/** + * This LF program is a variant of TransientDownstreamWithTimer that tests the Hot Swap mechanism. + * For this, it tests if the transient's downstream executes as expected and if `mid` is stopped and + * the second instance joins as expected. In this test: + * - the transient federate DOES NOT spontaneously leave the federation. + * - the downstream of the transient federate has only one transient as upstream. + * - A persistent federate `TransientExec` launches `mid` after 1s to activate the hot mechanism + * swap. + */ +target C { + timeout: 3 s, + auth: true +} + +import Up from "TransientDownstreamWithTimer.lf" +import Down from "TransientDownstreamWithTimer.lf" + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Transient federate that forwards whatever it receives from `Up` to `Down`. It reacts twice to + * input port `in`, then stops. It will execute twice during the lifetime of the federation. The + * second launch is done by `TransientExec` at logical time 1 s. Each time `Middle` joins, it + * notifies `Down`. + */ +reactor Middle { + input in: int + output out: int + output join: int + state count: int = 0 + + // Middle notifies its downstream that he joined, but make sure first that the effective start + // tag is correct + reaction(startup) -> join {= + tag_t t = lf_tag_start_effective(); + if(t.time < lf_time_start()) { + lf_print_error_and_exit("Fatal error: the transient's effective start time is less than the federation start time"); + } + + lf_set(join, 0); + =} + + // Pass the input value to the output port + reaction(in) -> out {= + self->count++; + lf_set(out, in->value); + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.out -> down.in +} diff --git a/test/C/src/federated/transient/TransientStatePersistence.lf b/test/C/src/federated/transient/TransientStatePersistence.lf new file mode 100644 index 0000000000..bb5c0eff68 --- /dev/null +++ b/test/C/src/federated/transient/TransientStatePersistence.lf @@ -0,0 +1,194 @@ +/** + * This LF program showcases and tests the persistance of the internal state of a transient federate + * across executions. Using the hot swap mechanism, the transient federate `Middle` leaves and then + * joins. Whenever the state (of type `federate_state_t`) changes, it notifies `Persistence`. + * `Middle` notifies `Persistence` also when it joins. When `Middle` joins the second time or after, + * it receives the saved state and sets it. In this, the order of the reactions is important. + */ +target C { + timeout: 2900 ms +} + +preamble {= + #include + #include + // The internal federate state to be persistent across executions + typedef struct federate_state_t { + char state_char; + int state_count; + } federate_state_t; +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate %s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +reactor Persistence { + state middle_state: federate_state_t = {'A', 0} + state middle_first_join: bool = true + + input in_from_middle: federate_state_t + input in_middle_join: bool + output out_to_middle: federate_state_t + + // Only send the previous state if it not the first time Middle joins + reaction(in_middle_join) -> out_to_middle {= + if (!self->middle_first_join) { + lf_set(out_to_middle, self->middle_state); + lf_print("Notifying Mid of the latest state: {%c,%d}", self->middle_state.state_char, + self->middle_state.state_count); + } + self->middle_first_join = false; + =} + + reaction(in_from_middle) {= + self->middle_state.state_char = in_from_middle->value.state_char; + self->middle_state.state_count = in_from_middle->value.state_count; + lf_print("Latest received state: {%c,%d}", self->middle_state.state_char, + self->middle_state.state_count); + =} +} + +/** + * Persistent federate, upstream of the transient. It reacts to its timer by sending increments to + * out output port. + */ +reactor Up(period: time = 500 ms) { + output out: int + timer t(0, period) + state count: int = 0 + + reaction(t) -> out {= + lf_set(out, self->count); + self->count++; + lf_print("Up timer sent %d", self->count); + =} +} + +/** + * Transient federate that forwards whatever it receives from Up to down. It reacts twice to in + * input ports, then stops. It will execute twice during the lifetime of the federation. The second + * launch is done by TransientExec at logical time 1 s. Each time Middle joins, it notifies Down. + */ +reactor Middle { + input in: int + output out: int + output join: bool + state middle_state: federate_state_t = {'A', 0} + + output out_to_persistence: federate_state_t // State Persistence + input in_from_persistence: federate_state_t + + // Middle notifies its downstream that he joined + reaction(startup) -> join {= + lf_set(join, true); + =} + + reaction(in_from_persistence) {= + self->middle_state = in_from_persistence->value; + lf_print("Received the latest state of: {%c,%d} at " PRINTF_TIME ".", + self->middle_state.state_char, + self->middle_state.state_count, + lf_time_logical_elapsed()); + =} + + // When an input is received, the internal state is updated, and then sent to + // Persistance. + reaction(in) -> out, out_to_persistence {= + self->middle_state.state_char++; + self->middle_state.state_count += 2; + lf_set(out, self->middle_state.state_count); + lf_set(out_to_persistence, self->middle_state); + lf_print("Mid state is: {count='%c', count=%d}", + self->middle_state.state_char, + self->middle_state.state_count); + + if (self->middle_state.state_count == 4) { + lf_stop(); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down { + timer t(0, 500 ms) + + input in: int + input join: bool + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + lf_print("Down timer count %d", self->count_timer); + =} + + reaction(join) {= + self->count_join++; + lf_print("Down count join %d", self->count_join); + =} + + reaction(in) {= + self->count_in_mid_reactions++; + lf_print("Down in %d", self->count_in_mid_reactions); + =} + + reaction(shutdown) {= + if(self->count_join == 2 && self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Mid Joined twice, but the state did not persist \ + across executions! state_count is %d, while is should be > then %d.", + self->count_in_mid_reactions, + 4); + } + =} +} + +federated reactor { + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + persistence = new Persistence() + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.join -> persistence.in_middle_join + mid.out -> down.in + persistence.out_to_middle -> mid.in_from_persistence + mid.out_to_persistence -> persistence.in_from_middle +}