diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index f96941c7370..3a600a755b1 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -10,6 +10,7 @@ import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient; import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder; import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.google.common.base.Preconditions; import com.google.common.io.CharStreams; import java.io.InputStreamReader; @@ -46,6 +47,8 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater { private final Consumer messageConsumer = this::messageConsumer; private final Consumer errorConsumer = this::errorConsumer; private final String topicName; + private final Duration autoDeleteOnIdle; + private final int prefetchCount; protected WriteToGraphCallback saveResultOnGraph; private ServiceBusProcessorClient eventProcessor; @@ -73,6 +76,8 @@ public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config, TransitModel this.dataInitializationUrl = config.getDataInitializationUrl(); this.timeout = config.getTimeout(); this.feedId = config.feedId(); + this.autoDeleteOnIdle = config.getAutoDeleteOnIdle(); + this.prefetchCount = config.getPrefetchCount(); TransitService transitService = new DefaultTransitService(transitModel); this.entityResolver = new EntityResolver(transitService, feedId); this.fuzzyTripMatcher = @@ -122,10 +127,11 @@ public void run() { .buildAsyncClient(); } - // If Idle more then one day, then delete subscription so we don't have old obsolete subscriptions on Azure Service Bus + // Set options var options = new CreateSubscriptionOptions(); options.setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS)); - options.setAutoDeleteOnIdle(Duration.ofDays(1)); + // Set subscription to be deleted if idle for a certain time, so that orphaned instances doesn't linger. + options.setAutoDeleteOnIdle(autoDeleteOnIdle); // Make sure there is no old subscription on serviceBus if ( @@ -150,15 +156,18 @@ public void run() { .processor() .topicName(topicName) .subscriptionName(subscriptionName) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) + .prefetchCount(prefetchCount) .processError(errorConsumer) .processMessage(messageConsumer) .buildProcessorClient(); eventProcessor.start(); LOG.info( - "Service Bus processor started for topic {} and subscription {}", + "Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.", topicName, - subscriptionName + subscriptionName, + prefetchCount ); ApplicationShutdownSupport.addShutdownHook( diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java index 0d207d27efe..4b8406da896 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java @@ -1,5 +1,7 @@ package org.opentripplanner.ext.siri.updater.azure; +import java.time.Duration; + public abstract class SiriAzureUpdaterParameters { private String configRef; @@ -13,6 +15,8 @@ public abstract class SiriAzureUpdaterParameters { private int timeout; private boolean fuzzyTripMatching; + private Duration autoDeleteOnIdle; + private int prefetchCount; public SiriAzureUpdaterParameters(String type) { this.type = type; @@ -93,4 +97,20 @@ public boolean isFuzzyTripMatching() { public void setFuzzyTripMatching(boolean fuzzyTripMatching) { this.fuzzyTripMatching = fuzzyTripMatching; } + + public Duration getAutoDeleteOnIdle() { + return autoDeleteOnIdle; + } + + public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) { + this.autoDeleteOnIdle = autoDeleteOnIdle; + } + + public int getPrefetchCount() { + return prefetchCount; + } + + public void setPrefetchCount(int prefetchCount) { + this.prefetchCount = prefetchCount; + } } diff --git a/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java b/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java index 35da716337e..ddb6a967f92 100644 --- a/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java +++ b/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java @@ -4,6 +4,7 @@ import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_2; import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_5; +import java.time.Duration; import java.time.LocalDate; import java.time.Period; import java.time.ZoneId; @@ -41,6 +42,20 @@ public static void populateConfig( .summary("The ID of the feed to apply the updates to.") .asString(null) ); + parameters.setAutoDeleteOnIdle( + c + .of("autoDeleteOnIdle") + .since(V2_5) + .summary("The time after which an inactive subscription is removed.") + .asDuration(Duration.ofHours(1)) + ); + parameters.setPrefetchCount( + c + .of("prefetchCount") + .since(V2_5) + .summary("The number of messages to fetch from the subscription at a time.") + .asInt(10) + ); parameters.setFuzzyTripMatching( c .of("fuzzyTripMatching")