Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust SIRI ServiceBus-client parameters to improve performance #5741

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions docs/sandbox/siri/SiriAzureUpdater.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ To enable the SIRI updater you need to add it to the updaters section of the `ro
<!-- siri-azure-et-updater BEGIN -->
<!-- NOTE! This section is auto-generated. Do not change, change doc in code instead. -->

| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |
| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 |
| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 |
| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |


##### Parameter details
Expand Down Expand Up @@ -107,21 +109,23 @@ Has to be present for authenticationMethod SharedAccessKey. This should be Prima
<!-- siri-azure-sx-updater BEGIN -->
<!-- NOTE! This section is auto-generated. Do not change, change doc in code instead. -->

| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |
| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 |
| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 |
| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |


##### Parameter details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.google.common.base.Preconditions;
import com.google.common.io.CharStreams;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -46,6 +47,8 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater {
private final Consumer<ServiceBusReceivedMessageContext> messageConsumer = this::messageConsumer;
private final Consumer<ServiceBusErrorContext> errorConsumer = this::errorConsumer;
private final String topicName;
private final Duration autoDeleteOnIdle;
private final int prefetchCount;

protected WriteToGraphCallback saveResultOnGraph;
private ServiceBusProcessorClient eventProcessor;
Expand Down Expand Up @@ -73,6 +76,8 @@ public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config, TransitModel
this.dataInitializationUrl = config.getDataInitializationUrl();
this.timeout = config.getTimeout();
this.feedId = config.feedId();
this.autoDeleteOnIdle = config.getAutoDeleteOnIdle();
this.prefetchCount = config.getPrefetchCount();
TransitService transitService = new DefaultTransitService(transitModel);
this.entityResolver = new EntityResolver(transitService, feedId);
this.fuzzyTripMatcher =
Expand Down Expand Up @@ -122,10 +127,11 @@ public void run() {
.buildAsyncClient();
}

// If Idle more then one day, then delete subscription so we don't have old obsolete subscriptions on Azure Service Bus
// Set options
var options = new CreateSubscriptionOptions();
options.setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS));
options.setAutoDeleteOnIdle(Duration.ofDays(1));
// Set subscription to be deleted if idle for a certain time, so that orphaned instances doesn't linger.
options.setAutoDeleteOnIdle(autoDeleteOnIdle);

// Make sure there is no old subscription on serviceBus
if (
Expand All @@ -150,15 +156,18 @@ public void run() {
.processor()
.topicName(topicName)
.subscriptionName(subscriptionName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.prefetchCount(prefetchCount)
.processError(errorConsumer)
.processMessage(messageConsumer)
.buildProcessorClient();

eventProcessor.start();
LOG.info(
"Service Bus processor started for topic {} and subscription {}",
"Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.",
topicName,
subscriptionName
subscriptionName,
prefetchCount
);

ApplicationShutdownSupport.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opentripplanner.ext.siri.updater.azure;

import java.time.Duration;

public abstract class SiriAzureUpdaterParameters {

private String configRef;
Expand All @@ -13,6 +15,8 @@ public abstract class SiriAzureUpdaterParameters {
private int timeout;

private boolean fuzzyTripMatching;
private Duration autoDeleteOnIdle;
private int prefetchCount;

public SiriAzureUpdaterParameters(String type) {
this.type = type;
Expand Down Expand Up @@ -93,4 +97,20 @@ public boolean isFuzzyTripMatching() {
public void setFuzzyTripMatching(boolean fuzzyTripMatching) {
this.fuzzyTripMatching = fuzzyTripMatching;
}

public Duration getAutoDeleteOnIdle() {
return autoDeleteOnIdle;
}

public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) {
this.autoDeleteOnIdle = autoDeleteOnIdle;
}

public int getPrefetchCount() {
return prefetchCount;
}

public void setPrefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_2;
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_5;

import java.time.Duration;
import java.time.LocalDate;
import java.time.Period;
import java.time.ZoneId;
Expand Down Expand Up @@ -41,6 +42,20 @@ public static void populateConfig(
.summary("The ID of the feed to apply the updates to.")
.asString(null)
);
parameters.setAutoDeleteOnIdle(
c
.of("autoDeleteOnIdle")
.since(V2_5)
.summary("The time after which an inactive subscription is removed.")
.asDuration(Duration.ofHours(1))
);
parameters.setPrefetchCount(
c
.of("prefetchCount")
.since(V2_5)
.summary("The number of messages to fetch from the subscription at a time.")
.asInt(10)
);
parameters.setFuzzyTripMatching(
c
.of("fuzzyTripMatching")
Expand Down
Loading