Skip to content

Commit

Permalink
35375 Adjust SIRI ServiceBus-client parameters to improve performance
Browse files Browse the repository at this point in the history
* Move to ServiceBusReceiveMode.RECEIVE_AND_DELETE instead of PEEK_LOCK.
  In this mode the message is deleted from the subscription as soon as
  it has been delivered. In case of an error this means that the message
  is lost but 1) the data is applied to the graph in another thread anyway
  and 2) there is nothing that indicates that a second attempt on the same
  message data would be more successful.
* Fetch more than one message at a time, greatly improving message throughput
  even with small values. Roundtrip latency of the message fetching can
  in some situations otherwise lead to the client falling behind.
  Add config parameter 'prefetchCount' with a default value of 10 messages.
* Lower the AutoDeleteOnIdle of subscriptions from one day to default one hour,
  still a conservative value. Despite having automatic removal of subscriptions on
  shutdown, lingering subscriptions may still happen, for example if the JVM dies.
  Add config parameter 'autoDeleteOnIdle' with a default value of one hour.
  • Loading branch information
Johan Torin committed Mar 11, 2024
1 parent 0532203 commit e924f50
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.google.common.base.Preconditions;
import com.google.common.io.CharStreams;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -46,6 +47,8 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater {
private final Consumer<ServiceBusReceivedMessageContext> messageConsumer = this::messageConsumer;
private final Consumer<ServiceBusErrorContext> errorConsumer = this::errorConsumer;
private final String topicName;
private final Duration autoDeleteOnIdle;
private final int prefetchCount;

protected WriteToGraphCallback saveResultOnGraph;
private ServiceBusProcessorClient eventProcessor;
Expand Down Expand Up @@ -73,6 +76,8 @@ public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config, TransitModel
this.dataInitializationUrl = config.getDataInitializationUrl();
this.timeout = config.getTimeout();
this.feedId = config.feedId();
this.autoDeleteOnIdle = config.getAutoDeleteOnIdle();
this.prefetchCount = config.getPrefetchCount();
TransitService transitService = new DefaultTransitService(transitModel);
this.entityResolver = new EntityResolver(transitService, feedId);
this.fuzzyTripMatcher =
Expand Down Expand Up @@ -122,10 +127,11 @@ public void run() {
.buildAsyncClient();
}

// If Idle more then one day, then delete subscription so we don't have old obsolete subscriptions on Azure Service Bus
// Set options
var options = new CreateSubscriptionOptions();
options.setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS));
options.setAutoDeleteOnIdle(Duration.ofDays(1));
// Set subscription to be deleted if idle for a certain time, so that orphaned instances doesn't linger.
options.setAutoDeleteOnIdle(autoDeleteOnIdle);

// Make sure there is no old subscription on serviceBus
if (
Expand All @@ -150,15 +156,18 @@ public void run() {
.processor()
.topicName(topicName)
.subscriptionName(subscriptionName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.prefetchCount(prefetchCount)
.processError(errorConsumer)
.processMessage(messageConsumer)
.buildProcessorClient();

eventProcessor.start();
LOG.info(
"Service Bus processor started for topic {} and subscription {}",
"Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.",
topicName,
subscriptionName
subscriptionName,
prefetchCount
);

ApplicationShutdownSupport.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opentripplanner.ext.siri.updater.azure;

import java.time.Duration;

public abstract class SiriAzureUpdaterParameters {

private String configRef;
Expand All @@ -13,6 +15,8 @@ public abstract class SiriAzureUpdaterParameters {
private int timeout;

private boolean fuzzyTripMatching;
private Duration autoDeleteOnIdle;
private int prefetchCount;

public SiriAzureUpdaterParameters(String type) {
this.type = type;
Expand Down Expand Up @@ -93,4 +97,20 @@ public boolean isFuzzyTripMatching() {
public void setFuzzyTripMatching(boolean fuzzyTripMatching) {
this.fuzzyTripMatching = fuzzyTripMatching;
}

public Duration getAutoDeleteOnIdle() {
return autoDeleteOnIdle;
}

public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) {
this.autoDeleteOnIdle = autoDeleteOnIdle;
}

public int getPrefetchCount() {
return prefetchCount;
}

public void setPrefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_2;
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_5;

import java.time.Duration;
import java.time.LocalDate;
import java.time.Period;
import java.time.ZoneId;
Expand Down Expand Up @@ -41,6 +42,20 @@ public static void populateConfig(
.summary("The ID of the feed to apply the updates to.")
.asString(null)
);
parameters.setAutoDeleteOnIdle(
c
.of("autoDeleteOnIdle")
.since(V2_5)
.summary("The time after which an inactive subscription is removed.")
.asDuration(Duration.ofHours(1))
);
parameters.setPrefetchCount(
c
.of("prefetchCount")
.since(V2_5)
.summary("The number of messages to fetch from the subscription at a time.")
.asInt(10)
);
parameters.setFuzzyTripMatching(
c
.of("fuzzyTripMatching")
Expand Down

0 comments on commit e924f50

Please sign in to comment.