From 94b82efd83b649b73d00bed222653df455194157 Mon Sep 17 00:00:00 2001 From: Daniel Krueger Date: Wed, 26 Feb 2025 14:03:13 +0100 Subject: [PATCH] update tests --- .../runtime/DataCombiningRuntime.java | 43 ++++++++----------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/hivemq-edge/src/main/java/com/hivemq/combining/runtime/DataCombiningRuntime.java b/hivemq-edge/src/main/java/com/hivemq/combining/runtime/DataCombiningRuntime.java index 609213de0c..4707664f3d 100644 --- a/hivemq-edge/src/main/java/com/hivemq/combining/runtime/DataCombiningRuntime.java +++ b/hivemq-edge/src/main/java/com/hivemq/combining/runtime/DataCombiningRuntime.java @@ -89,16 +89,14 @@ public void start() { consumers.add(consumer); }); - combining.sources().topicFilters().forEach(topicFilter -> { - internalSubscriptions.add(subscribeTopicFilter(combining, - topicFilter, - TOPIC_FILTER.equals(combining.sources().primaryReference().type()) && - topicFilter.equals(combining.sources().primaryReference().id()))); - }); + combining.sources() + .topicFilters() + .forEach(topicFilter -> internalSubscriptions.add(subscribeTopicFilter(combining, + topicFilter, + TOPIC_FILTER.equals(combining.sources().primaryReference().type()) && + topicFilter.equals(combining.sources().primaryReference().id())))); - internalSubscriptions.forEach(internalSubscription -> { - internalSubscription.queueConsumer().start(); - }); + internalSubscriptions.forEach(internalSubscription -> internalSubscription.queueConsumer().start()); } public void stop() { @@ -111,10 +109,8 @@ public void stop() { }); } - public InternalSubscription subscribeTopicFilter( - final @NotNull DataCombining dataCombining, - final @NotNull String topicFilter, - final boolean isPrimary) { + public @NotNull InternalSubscription subscribeTopicFilter( + final @NotNull DataCombining dataCombining, final @NotNull String topicFilter, final boolean isPrimary) { final String clientId = dataCombining.id() + "#"; final QoS qos = QoS.EXACTLY_ONCE; @@ -145,19 +141,16 @@ public void triggerPublish(final @NotNull DataCombining dataCombining) { final Map outgoing = new HashMap<>(); - topicFilterResults.forEach((topicFilter, publish) -> { - outgoing.put(topicFilter, new String(publish.getPayload())); - }); + topicFilterResults.forEach((topicFilter, publish) -> outgoing.put(topicFilter, + new String(publish.getPayload()))); - tagsToDataPoints.forEach((tagName, dataPoints) -> { - dataPoints.forEach(dataPoint -> { - outgoing.put(dataPoint.getTagName(), dataPoint.getTagValue().toString()); - }); - }); + tagsToDataPoints.forEach((tagName, dataPoints) -> dataPoints.forEach(dataPoint -> outgoing.put(dataPoint.getTagName(), + dataPoint.getTagValue().toString()))); try { dataCombiningPublishService.publish(combining.destination(), - mapper.writeValueAsBytes(outgoing), dataCombining); + mapper.writeValueAsBytes(outgoing), + dataCombining); } catch (final JsonProcessingException e) { log.error("Can't produce JSON", e); throw new RuntimeException(e); @@ -171,9 +164,7 @@ public final class InternalTagConsumer implements TagConsumer { private final @NotNull DataCombining dataCombining; public InternalTagConsumer( - final @NotNull String tagName, - final @NotNull DataCombining dataCombining, - final boolean isPrimary) { + final @NotNull String tagName, final @NotNull DataCombining dataCombining, final boolean isPrimary) { this.tagName = tagName; this.dataCombining = dataCombining; this.isPrimary = isPrimary; @@ -185,7 +176,7 @@ public InternalTagConsumer( } @Override - public void accept(final List dataPoints) { + public void accept(final @NotNull List dataPoints) { tagResults.put(tagName, dataPoints); if (isPrimary) { triggerPublish(dataCombining);