Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DC2-DanielKrueger committed Feb 26, 2025
1 parent 946ec2f commit 94b82ef
Showing 1 changed file with 17 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,14 @@ public void start() {
consumers.add(consumer);
});

combining.sources().topicFilters().forEach(topicFilter -> {
internalSubscriptions.add(subscribeTopicFilter(combining,
topicFilter,
TOPIC_FILTER.equals(combining.sources().primaryReference().type()) &&
topicFilter.equals(combining.sources().primaryReference().id())));
});
combining.sources()
.topicFilters()
.forEach(topicFilter -> internalSubscriptions.add(subscribeTopicFilter(combining,
topicFilter,
TOPIC_FILTER.equals(combining.sources().primaryReference().type()) &&
topicFilter.equals(combining.sources().primaryReference().id()))));

internalSubscriptions.forEach(internalSubscription -> {
internalSubscription.queueConsumer().start();
});
internalSubscriptions.forEach(internalSubscription -> internalSubscription.queueConsumer().start());
}

public void stop() {
Expand All @@ -111,10 +109,8 @@ public void stop() {
});
}

public InternalSubscription subscribeTopicFilter(
final @NotNull DataCombining dataCombining,
final @NotNull String topicFilter,
final boolean isPrimary) {
public @NotNull InternalSubscription subscribeTopicFilter(
final @NotNull DataCombining dataCombining, final @NotNull String topicFilter, final boolean isPrimary) {
final String clientId = dataCombining.id() + "#";
final QoS qos = QoS.EXACTLY_ONCE;

Expand Down Expand Up @@ -145,19 +141,16 @@ public void triggerPublish(final @NotNull DataCombining dataCombining) {

final Map<String, Object> outgoing = new HashMap<>();

topicFilterResults.forEach((topicFilter, publish) -> {
outgoing.put(topicFilter, new String(publish.getPayload()));
});
topicFilterResults.forEach((topicFilter, publish) -> outgoing.put(topicFilter,
new String(publish.getPayload())));

tagsToDataPoints.forEach((tagName, dataPoints) -> {
dataPoints.forEach(dataPoint -> {
outgoing.put(dataPoint.getTagName(), dataPoint.getTagValue().toString());
});
});
tagsToDataPoints.forEach((tagName, dataPoints) -> dataPoints.forEach(dataPoint -> outgoing.put(dataPoint.getTagName(),
dataPoint.getTagValue().toString())));

try {
dataCombiningPublishService.publish(combining.destination(),
mapper.writeValueAsBytes(outgoing), dataCombining);
mapper.writeValueAsBytes(outgoing),
dataCombining);
} catch (final JsonProcessingException e) {
log.error("Can't produce JSON", e);
throw new RuntimeException(e);
Expand All @@ -171,9 +164,7 @@ public final class InternalTagConsumer implements TagConsumer {
private final @NotNull DataCombining dataCombining;

public InternalTagConsumer(
final @NotNull String tagName,
final @NotNull DataCombining dataCombining,
final boolean isPrimary) {
final @NotNull String tagName, final @NotNull DataCombining dataCombining, final boolean isPrimary) {
this.tagName = tagName;
this.dataCombining = dataCombining;
this.isPrimary = isPrimary;
Expand All @@ -185,7 +176,7 @@ public InternalTagConsumer(
}

@Override
public void accept(final List<DataPoint> dataPoints) {
public void accept(final @NotNull List<DataPoint> dataPoints) {
tagResults.put(tagName, dataPoints);
if (isPrimary) {
triggerPublish(dataCombining);
Expand Down

0 comments on commit 94b82ef

Please sign in to comment.