Skip to content

Commit

Permalink
Adjust SIRI ServiceBus-client parameters to improve performance
Browse files Browse the repository at this point in the history
* Move to ServiceBusReceiveMode.RECEIVE_AND_DELETE instead of PEEK_LOCK.
  In this mode the message is deleted from the subscription as soon as
  it has been delivered. In case of an error this means that the message
  is lost but 1) the data is applied to the graph in another thread anyway
  and 2) there is nothing that indicates that a second attempt on the same
  message data would be more successful.
* Fetch more than one message at a time, greatly improving message throughput
  even with small values. Roundtrip latency of the message fetching can
  in some situations otherwise lead to the client falling behind.
  Add config parameter 'prefetchCount' with a default value of 10 messages.
* Lower the AutoDeleteOnIdle of subscriptions from one day to default one hour,
  still a conservative value. Despite having automatic removal of subscriptions on
  shutdown, lingering subscriptions may still happen, for example if the JVM dies.
  Add config parameter 'autoDeleteOnIdle' with a default value of one hour.
  • Loading branch information
Johan Torin committed Mar 11, 2024
1 parent 0532203 commit 163b0aa
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 33 deletions.
62 changes: 33 additions & 29 deletions docs/sandbox/siri/SiriAzureUpdater.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ To enable the SIRI updater you need to add it to the updaters section of the `ro
<!-- siri-azure-et-updater BEGIN -->
<!-- NOTE! This section is auto-generated. Do not change, change doc in code instead. -->

| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |
| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 |
| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 |
| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |


##### Parameter details
Expand Down Expand Up @@ -107,21 +109,23 @@ Has to be present for authenticationMethod SharedAccessKey. This should be Prima
<!-- siri-azure-sx-updater BEGIN -->
<!-- NOTE! This section is auto-generated. Do not change, change doc in code instead. -->

| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |
| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 |
| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 |
| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |


##### Parameter details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.google.common.base.Preconditions;
import com.google.common.io.CharStreams;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -46,6 +47,8 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater {
private final Consumer<ServiceBusReceivedMessageContext> messageConsumer = this::messageConsumer;
private final Consumer<ServiceBusErrorContext> errorConsumer = this::errorConsumer;
private final String topicName;
private final Duration autoDeleteOnIdle;
private final int prefetchCount;

protected WriteToGraphCallback saveResultOnGraph;
private ServiceBusProcessorClient eventProcessor;
Expand Down Expand Up @@ -73,6 +76,8 @@ public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config, TransitModel
this.dataInitializationUrl = config.getDataInitializationUrl();
this.timeout = config.getTimeout();
this.feedId = config.feedId();
this.autoDeleteOnIdle = config.getAutoDeleteOnIdle();
this.prefetchCount = config.getPrefetchCount();
TransitService transitService = new DefaultTransitService(transitModel);
this.entityResolver = new EntityResolver(transitService, feedId);
this.fuzzyTripMatcher =
Expand Down Expand Up @@ -122,10 +127,11 @@ public void run() {
.buildAsyncClient();
}

// If Idle more then one day, then delete subscription so we don't have old obsolete subscriptions on Azure Service Bus
// Set options
var options = new CreateSubscriptionOptions();
options.setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS));
options.setAutoDeleteOnIdle(Duration.ofDays(1));
// Set subscription to be deleted if idle for a certain time, so that orphaned instances doesn't linger.
options.setAutoDeleteOnIdle(autoDeleteOnIdle);

// Make sure there is no old subscription on serviceBus
if (
Expand All @@ -150,15 +156,18 @@ public void run() {
.processor()
.topicName(topicName)
.subscriptionName(subscriptionName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.prefetchCount(prefetchCount)
.processError(errorConsumer)
.processMessage(messageConsumer)
.buildProcessorClient();

eventProcessor.start();
LOG.info(
"Service Bus processor started for topic {} and subscription {}",
"Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.",
topicName,
subscriptionName
subscriptionName,
prefetchCount
);

ApplicationShutdownSupport.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opentripplanner.ext.siri.updater.azure;

import java.time.Duration;

public abstract class SiriAzureUpdaterParameters {

private String configRef;
Expand All @@ -13,6 +15,8 @@ public abstract class SiriAzureUpdaterParameters {
private int timeout;

private boolean fuzzyTripMatching;
private Duration autoDeleteOnIdle;
private int prefetchCount;

public SiriAzureUpdaterParameters(String type) {
this.type = type;
Expand Down Expand Up @@ -93,4 +97,20 @@ public boolean isFuzzyTripMatching() {
public void setFuzzyTripMatching(boolean fuzzyTripMatching) {
this.fuzzyTripMatching = fuzzyTripMatching;
}

public Duration getAutoDeleteOnIdle() {
return autoDeleteOnIdle;
}

public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) {
this.autoDeleteOnIdle = autoDeleteOnIdle;
}

public int getPrefetchCount() {
return prefetchCount;
}

public void setPrefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_2;
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_5;

import java.time.Duration;
import java.time.LocalDate;
import java.time.Period;
import java.time.ZoneId;
Expand Down Expand Up @@ -41,6 +42,20 @@ public static void populateConfig(
.summary("The ID of the feed to apply the updates to.")
.asString(null)
);
parameters.setAutoDeleteOnIdle(
c
.of("autoDeleteOnIdle")
.since(V2_5)
.summary("The time after which an inactive subscription is removed.")
.asDuration(Duration.ofHours(1))
);
parameters.setPrefetchCount(
c
.of("prefetchCount")
.since(V2_5)
.summary("The number of messages to fetch from the subscription at a time.")
.asInt(10)
);
parameters.setFuzzyTripMatching(
c
.of("fuzzyTripMatching")
Expand Down

0 comments on commit 163b0aa

Please sign in to comment.