-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPolledSimulator.lf
62 lines (57 loc) · 2.42 KB
/
PolledSimulator.lf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* This program emulates a simulator that responds to queries for its current status. The simulator
* counts once per `period` and responds to queries with the current count. Queries arrive as MQTT
* messages on the `poll_topic` topic, and the responses are published via MQTT on the
* `response_topic` topic. No timestamp is sent with the response.
*
* The message sent is a JSON string containing a count and the logical time at which it was
* updated.
*
* See ../README.md for prerequisites and further information.
*
* @author Edward A. Lee
*
* @param broker The MQTT broker address.
* @param poll_topic The topic to subscribe to.
* @param response_topic The topic to publish to.
* @param period The period of the simulator updates.
*/
target C {
timeout: 15 s
}
import MQTTSubscriber from <mqtt-c/MQTTSubscriber.lf>
import MQTTPublisher from <mqtt-c/MQTTPublisher.lf>
main reactor(
broker: string = "tcp://localhost:1883",
poll_topic: string = "simulator-poll",
response_topic: string = "simulator-response",
period: time = 1 s) {
timer t(0, period)
state count: int = 0
state last_logical_time: interval_t = 0
pub = new MQTTPublisher(address=broker, topic=response_topic, include_timestamp=false)
sub = new MQTTSubscriber(address=broker, topic=poll_topic, use_physical_time=false)
reaction(t) {=
self->count++;
self->last_logical_time = lf_time_logical_elapsed();
lf_print("PolledSimulator: count=%d, logical_time=" PRINTF_TIME, self->count, self->last_logical_time);
=}
reaction(sub.message) -> pub.in {=
// With NULL, 0 arguments, snprintf tells us how many bytes are needed.
// Add one for the null terminator.
interval_t start_time = lf_time_physical_elapsed();
interval_t now = lf_time_logical_elapsed();
char* format = "{\"count\":%d, \"logical_time_at_source\":" PRINTF_TIME ", \"physical_time_at_source\":" PRINTF_TIME "}";
size_t length = snprintf(NULL, 0, format, self->count, now, start_time) + 1;
// Dynamically allocate memory for the output.
char* buffer = (char*)malloc(length);
// Populate the output string and increment the count.
snprintf(buffer, length, format, self->count, now, start_time);
lf_set_array(pub.in, buffer, length);
tag_t tag = lf_tag();
lf_print("PolledSimulator: At (elapsed) tag " PRINTF_TAG ", publish message:\n %s",
tag.time - lf_time_start(), tag.microstep,
pub.in->value
);
=}
}