diff --git a/application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdaterTest.java b/application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterTest.java similarity index 95% rename from application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdaterTest.java rename to application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterTest.java index c0336bb6d7d..92b85856dfd 100644 --- a/application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdaterTest.java +++ b/application/src/ext-test/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterTest.java @@ -7,7 +7,6 @@ import static org.mockito.Mockito.*; import com.azure.core.util.ExpandableStringEnum; -import com.azure.messaging.servicebus.ServiceBusErrorContext; import com.azure.messaging.servicebus.ServiceBusErrorSource; import com.azure.messaging.servicebus.ServiceBusException; import com.azure.messaging.servicebus.ServiceBusFailureReason; @@ -15,15 +14,15 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.URISyntaxException; import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.stream.Stream; +import javax.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -33,16 +32,19 @@ import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.opentripplanner.framework.io.OtpHttpClientException; +import org.opentripplanner.updater.spi.WriteToGraphCallback; +import uk.org.siri.siri20.ServiceDelivery; -class AbstractAzureSiriUpdaterTest { +class SiriAzureUpdaterTest { private SiriAzureUpdaterParameters mockConfig; - private AbstractAzureSiriUpdater updater; - private AbstractAzureSiriUpdater.CheckedRunnable task; + private SiriAzureUpdater updater; + private SiriAzureUpdater.CheckedRunnable task; @BeforeEach public void setUp() throws Exception { mockConfig = mock(SiriAzureUpdaterParameters.class); + when(mockConfig.getType()).thenReturn("siri-azure-test-updater"); when(mockConfig.configRef()).thenReturn("testConfigRef"); when(mockConfig.getAuthenticationType()).thenReturn(AuthenticationType.SharedAccessKey); when(mockConfig.getFullyQualifiedNamespace()).thenReturn("testNamespace"); @@ -58,22 +60,22 @@ public void setUp() throws Exception { // Create a spy on AbstractAzureSiriUpdater with the mock configuration updater = spy( - new AbstractAzureSiriUpdater(mockConfig) { - @Override - protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) {} - - @Override - protected void errorConsumer(ServiceBusErrorContext errorContext) {} - - @Override - protected void initializeData( - String url, - Consumer consumer - ) throws URISyntaxException {} - } + new SiriAzureUpdater( + mockConfig, + new SiriAzureMessageHandler() { + @Override + public void setup(WriteToGraphCallback writeToGraphCallback) {} + + @Override + @Nullable + public Future handleMessage(ServiceDelivery serviceDelivery, String messageId) { + return null; + } + } + ) ); - task = mock(AbstractAzureSiriUpdater.CheckedRunnable.class); + task = mock(SiriAzureUpdater.CheckedRunnable.class); } /** diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java index 32240e49aa0..d543eb3b293 100644 --- a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java @@ -1,104 +1,63 @@ package org.opentripplanner.ext.siri.updater.azure; -import com.azure.messaging.servicebus.ServiceBusErrorContext; -import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; -import jakarta.xml.bind.JAXBException; -import java.net.URI; -import java.net.URISyntaxException; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Objects; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import javax.xml.stream.XMLStreamException; -import org.apache.hc.core5.net.URIBuilder; +import javax.annotation.Nullable; import org.opentripplanner.updater.spi.ResultLogger; import org.opentripplanner.updater.spi.UpdateResult; +import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.opentripplanner.updater.trip.UpdateIncrementality; import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics; import org.opentripplanner.updater.trip.siri.SiriRealTimeTripUpdateAdapter; -import org.rutebanken.siri20.util.SiriXml; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure; import uk.org.siri.siri20.ServiceDelivery; -public class SiriAzureETUpdater extends AbstractAzureSiriUpdater { +public class SiriAzureETUpdater implements SiriAzureMessageHandler { private static final Logger LOG = LoggerFactory.getLogger(SiriAzureSXUpdater.class); - private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0); - - private final LocalDate fromDateTime; private final SiriRealTimeTripUpdateAdapter adapter; - private final Consumer recordMetrics; + private final boolean fuzzyTripMatching; + private final String feedId; + + private WriteToGraphCallback writeToGraphCallback; public SiriAzureETUpdater( SiriAzureETUpdaterParameters config, SiriRealTimeTripUpdateAdapter adapter ) { - super(config); - this.fromDateTime = config.getFromDateTime(); this.adapter = adapter; this.recordMetrics = TripUpdateMetrics.streaming(config); + this.fuzzyTripMatching = config.isFuzzyTripMatching(); + this.feedId = Objects.requireNonNull(config.feedId(), "feedId must not be null"); } @Override - protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) { - var message = messageContext.getMessage(); - MESSAGE_COUNTER.incrementAndGet(); - - if (MESSAGE_COUNTER.get() % 100 == 0) { - LOG.debug("Total SIRI-ET messages received={}", MESSAGE_COUNTER.get()); - } - - try { - var updates = parseSiriEt(message.getBody().toString(), message.getMessageId()); - if (!updates.isEmpty()) { - processMessage(updates); - } - } catch (JAXBException | XMLStreamException e) { - LOG.error(e.getLocalizedMessage(), e); - } + public void setup(WriteToGraphCallback writeToGraphCallback) { + this.writeToGraphCallback = writeToGraphCallback; } @Override - protected void errorConsumer(ServiceBusErrorContext errorContext) { - defaultErrorConsumer(errorContext); - } - - @Override - protected void initializeData(String url, Consumer consumer) - throws URISyntaxException { - if (url == null) { - LOG.info("No history url set up for Siri Azure ET Updater"); - return; + @Nullable + public Future handleMessage(ServiceDelivery serviceDelivery, String messageId) { + var etDeliveries = serviceDelivery.getEstimatedTimetableDeliveries(); + if (etDeliveries == null || etDeliveries.isEmpty()) { + LOG.info("Empty Siri ET message {}", messageId); + return null; + } else { + return processMessage(etDeliveries); } - - URI uri = new URIBuilder(url) - .addParameter("fromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) - .build(); - - LOG.info("Fetching initial Siri ET data from {}, timeout is {} ms.", uri, timeout); - var siri = fetchInitialSiriData(uri); - - if (siri.isEmpty()) { - LOG.info("Got empty ET response from history endpoint"); - return; - } - - // This is fine since runnables are scheduled after each other - processHistory(siri.get()); } private Future processMessage(List updates) { - return super.saveResultOnGraph.execute(context -> { + return writeToGraphCallback.execute(context -> { var result = adapter.applyEstimatedTimetable( - fuzzyTripMatching() ? context.siriFuzzyTripMatcher() : null, + fuzzyTripMatching ? context.siriFuzzyTripMatcher() : null, context.entityResolver(feedId), feedId, UpdateIncrementality.DIFFERENTIAL, @@ -108,41 +67,4 @@ private Future processMessage(List updat recordMetrics.accept(result); }); } - - private void processHistory(ServiceDelivery siri) { - var updates = siri.getEstimatedTimetableDeliveries(); - - if (updates == null || updates.isEmpty()) { - LOG.info("Did not receive any ET messages from history endpoint"); - return; - } - - try { - long t1 = System.currentTimeMillis(); - var f = processMessage(updates); - f.get(); - LOG.info("Azure ET updater initialized in {} ms.", (System.currentTimeMillis() - t1)); - } catch (ExecutionException | InterruptedException e) { - throw new SiriAzureInitializationException("Error applying history", e); - } - } - - private List parseSiriEt(String siriXmlMessage, String id) - throws JAXBException, XMLStreamException { - var siri = SiriXml.parseXml(siriXmlMessage); - if ( - siri.getServiceDelivery() == null || - siri.getServiceDelivery().getEstimatedTimetableDeliveries() == null || - siri.getServiceDelivery().getEstimatedTimetableDeliveries().isEmpty() - ) { - if (siri.getHeartbeatNotification() != null) { - LOG.debug("Received SIRI heartbeat message"); - } else { - LOG.info("Empty Siri message {}: {}", id, siriXmlMessage); - } - return new ArrayList<>(); - } - - return siri.getServiceDelivery().getEstimatedTimetableDeliveries(); - } } diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdaterParameters.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdaterParameters.java index e63615dd893..e178aef34b2 100644 --- a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdaterParameters.java +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdaterParameters.java @@ -1,7 +1,12 @@ package org.opentripplanner.ext.siri.updater.azure; import com.azure.core.amqp.implementation.ConnectionStringProperties; +import java.net.URI; +import java.net.URISyntaxException; import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Optional; +import org.apache.hc.core5.net.URIBuilder; import org.opentripplanner.updater.trip.UrlUpdaterParameters; public class SiriAzureETUpdaterParameters @@ -31,4 +36,17 @@ public String url() { return url; } } + + @Override + public Optional buildDataInitializationUrl() throws URISyntaxException { + var url = getDataInitializationUrl(); + if (url == null) { + return Optional.empty(); + } + return Optional.of( + new URIBuilder(url) + .addParameter("fromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) + .build() + ); + } } diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureMessageHandler.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureMessageHandler.java new file mode 100644 index 00000000000..0e129570eea --- /dev/null +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureMessageHandler.java @@ -0,0 +1,19 @@ +package org.opentripplanner.ext.siri.updater.azure; + +import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; +import java.util.concurrent.Future; +import javax.annotation.Nullable; +import org.opentripplanner.updater.spi.WriteToGraphCallback; +import uk.org.siri.siri20.ServiceDelivery; + +public interface SiriAzureMessageHandler { + void setup(WriteToGraphCallback writeToGraphCallback); + + /** + * Consume ServiceDelivery and update the otp data model within the graph writer thread. + * + * @return A future for the graph updating process. Null if the message can't be handled. + */ + @Nullable + Future handleMessage(ServiceDelivery serviceDelivery, String messageId); +} diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java index a7263f66908..6cec89ab346 100644 --- a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java @@ -1,142 +1,49 @@ package org.opentripplanner.ext.siri.updater.azure; -import com.azure.messaging.servicebus.ServiceBusErrorContext; -import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; -import jakarta.xml.bind.JAXBException; -import java.net.URI; -import java.net.URISyntaxException; import java.time.Duration; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; -import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import javax.xml.stream.XMLStreamException; -import org.apache.hc.core5.net.URIBuilder; +import javax.annotation.Nullable; import org.opentripplanner.routing.impl.TransitAlertServiceImpl; import org.opentripplanner.routing.services.TransitAlertService; import org.opentripplanner.transit.service.TimetableRepository; import org.opentripplanner.updater.alert.TransitAlertProvider; import org.opentripplanner.updater.alert.siri.SiriAlertsUpdateHandler; -import org.rutebanken.siri20.util.SiriXml; +import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.org.siri.siri20.ServiceDelivery; -public class SiriAzureSXUpdater extends AbstractAzureSiriUpdater implements TransitAlertProvider { +public class SiriAzureSXUpdater implements TransitAlertProvider, SiriAzureMessageHandler { private final Logger LOG = LoggerFactory.getLogger(getClass()); private final SiriAlertsUpdateHandler updateHandler; private final TransitAlertService transitAlertService; - private static final transient AtomicLong messageCounter = new AtomicLong(0); - private final LocalDate fromDateTime; - private final LocalDate toDateTime; + private WriteToGraphCallback saveResultOnGraph; public SiriAzureSXUpdater( SiriAzureSXUpdaterParameters config, TimetableRepository timetableRepository ) { - super(config); - this.fromDateTime = config.getFromDateTime(); - this.toDateTime = config.getToDateTime(); this.transitAlertService = new TransitAlertServiceImpl(timetableRepository); - this.updateHandler = new SiriAlertsUpdateHandler(feedId, transitAlertService, Duration.ZERO); + this.updateHandler = + new SiriAlertsUpdateHandler(config.feedId(), transitAlertService, Duration.ZERO); } @Override - protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) { - var message = messageContext.getMessage(); - - LOG.debug( - "Processing message. messageId={}, sequenceNumber={}, enqueued time={}", - message.getMessageId(), - message.getSequenceNumber(), - message.getEnqueuedTime() - ); - - messageCounter.incrementAndGet(); - - try { - var siriSx = parseSiriSx(message.getBody().toString(), message.getMessageId()); - if (siriSx.isEmpty()) { - return; - } - processMessage(siriSx.get()); - } catch (JAXBException | XMLStreamException e) { - LOG.error(e.getLocalizedMessage(), e); - } - } - - @Override - protected void errorConsumer(ServiceBusErrorContext errorContext) { - defaultErrorConsumer(errorContext); + public void setup(WriteToGraphCallback writeToGraphCallback) { + this.saveResultOnGraph = writeToGraphCallback; } @Override - protected void initializeData(String url, Consumer consumer) - throws URISyntaxException { - if (url == null) { - LOG.info("No history url set up for Siri Azure SX Updater"); - return; - } - - URI uri = new URIBuilder(url) - .addParameter("publishFromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) - .addParameter("publishToDateTime", toDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) - .build(); - - LOG.info("Fetching initial Siri SX data from {}, timeout is {} ms.", uri, timeout); - var siri = fetchInitialSiriData(uri); - - if (siri.isEmpty()) { - LOG.info("Got empty SX response from history endpoint"); - return; - } - - // This is fine since runnables are scheduled after each other - processHistory(siri.get()); - } - - private Optional parseSiriSx(String xmlMessage, String id) - throws XMLStreamException, JAXBException { - var siri = SiriXml.parseXml(xmlMessage); - if ( - siri.getServiceDelivery() == null || - siri.getServiceDelivery().getSituationExchangeDeliveries() == null || - siri.getServiceDelivery().getSituationExchangeDeliveries().isEmpty() - ) { - if (siri.getHeartbeatNotification() != null) { - LOG.debug("Received SIRI heartbeat message"); - } else { - LOG.info("Empty Siri message for messageId {}", id); - } - return Optional.empty(); - } - return Optional.of(siri.getServiceDelivery()); - } - - private Future processMessage(ServiceDelivery siriSx) { - return super.saveResultOnGraph.execute(context -> updateHandler.update(siriSx, context)); - } - - private void processHistory(ServiceDelivery siri) { - var sx = siri.getSituationExchangeDeliveries(); - - if (sx == null || sx.isEmpty()) { - LOG.info("Did not receive any SX messages from history endpoint"); - return; - } - - try { - var t1 = System.currentTimeMillis(); - var f = processMessage(siri); - f.get(); - LOG.info("Azure SX updater initialized in {} ms.", (System.currentTimeMillis() - t1)); - } catch (ExecutionException | InterruptedException e) { - throw new SiriAzureInitializationException("Error applying SX history", e); + @Nullable + public Future handleMessage(ServiceDelivery serviceDelivery, String messageId) { + var sxDeliveries = serviceDelivery.getSituationExchangeDeliveries(); + if (sxDeliveries == null || sxDeliveries.isEmpty()) { + LOG.info("Empty Siri SX message {}", messageId); + return null; + } else { + return saveResultOnGraph.execute(context -> updateHandler.update(serviceDelivery, context)); } } diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdaterParameters.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdaterParameters.java index 2062f383309..ac5cc4ab08e 100644 --- a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdaterParameters.java +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdaterParameters.java @@ -1,6 +1,11 @@ package org.opentripplanner.ext.siri.updater.azure; +import java.net.URI; +import java.net.URISyntaxException; import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Optional; +import org.apache.hc.core5.net.URIBuilder; public class SiriAzureSXUpdaterParameters extends SiriAzureUpdaterParameters { @@ -26,4 +31,19 @@ public LocalDate getToDateTime() { public void setToDateTime(LocalDate toDateTime) { this.toDateTime = toDateTime; } + + @Override + public Optional buildDataInitializationUrl() throws URISyntaxException { + var url = getDataInitializationUrl(); + if (url == null) { + return Optional.empty(); + } + + return Optional.of( + new URIBuilder(url) + .addParameter("publishFromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) + .addParameter("publishToDateTime", toDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) + .build() + ); + } } diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdater.java similarity index 76% rename from application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java rename to application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdater.java index bb9b23cc0f3..9e5ff9a8499 100644 --- a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdater.java @@ -12,6 +12,7 @@ import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.google.common.base.Preconditions; +import jakarta.xml.bind.JAXBException; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; @@ -20,22 +21,33 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; +import javax.xml.stream.XMLStreamException; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hc.client5.http.classic.methods.HttpGet; import org.opentripplanner.framework.application.ApplicationShutdownSupport; import org.opentripplanner.framework.io.OtpHttpClientException; import org.opentripplanner.framework.io.OtpHttpClientFactory; +import org.opentripplanner.transit.service.TimetableRepository; import org.opentripplanner.updater.spi.GraphUpdater; import org.opentripplanner.updater.spi.HttpHeaders; import org.opentripplanner.updater.spi.WriteToGraphCallback; +import org.opentripplanner.updater.trip.siri.SiriRealTimeTripUpdateAdapter; import org.rutebanken.siri20.util.SiriXml; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.org.siri.siri20.ServiceDelivery; +import uk.org.siri.siri20.Siri; -public abstract class AbstractAzureSiriUpdater implements GraphUpdater { +/** + * This is the main handler for siri messages over azure. It handles the generic code for communicating + * with the azure service bus and delegates to SiriAzureETUpdater and SiriAzureSXUpdater for ET and + * SX specific stuff. + */ +public class SiriAzureUpdater implements GraphUpdater { /** * custom functional interface that allows throwing checked exceptions, thereby @@ -67,49 +79,52 @@ interface CheckedRunnable { ); private final Logger LOG = LoggerFactory.getLogger(getClass()); + private final String updaterType; private final AuthenticationType authenticationType; private final String fullyQualifiedNamespace; private final String configRef; private final String serviceBusUrl; - private final boolean fuzzyTripMatching; - private final Consumer messageConsumer = this::messageConsumer; - private final Consumer errorConsumer = this::errorConsumer; private final String topicName; private final Duration autoDeleteOnIdle; private final int prefetchCount; - protected WriteToGraphCallback saveResultOnGraph; private ServiceBusProcessorClient eventProcessor; private ServiceBusAdministrationAsyncClient serviceBusAdmin; private boolean isPrimed = false; private String subscriptionName; - protected final String feedId; + private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0); + + private final SiriAzureMessageHandler messageHandler; /** - * The URL used to fetch all initial updates + * The URL used to fetch all initial updates, null means don't fetch initial data */ - private final String dataInitializationUrl; + @Nullable + private final URI dataInitializationUrl; + /** * The timeout used when fetching historical data */ - protected final int timeout; + private final int timeout; + + SiriAzureUpdater(SiriAzureUpdaterParameters config, SiriAzureMessageHandler messageHandler) { + this.messageHandler = Objects.requireNonNull(messageHandler); + + try { + this.dataInitializationUrl = config.buildDataInitializationUrl().orElse(null); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid history url", e); + } - public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config) { this.configRef = Objects.requireNonNull(config.configRef(), "configRef must not be null"); this.authenticationType = Objects.requireNonNull(config.getAuthenticationType(), "authenticationType must not be null"); this.topicName = Objects.requireNonNull(config.getTopicName(), "topicName must not be null"); - this.dataInitializationUrl = - Objects.requireNonNull( - config.getDataInitializationUrl(), - "dataInitializationUrl must not be null" - ); + this.updaterType = Objects.requireNonNull(config.getType(), "type must not be null"); this.timeout = config.getTimeout(); - this.feedId = Objects.requireNonNull(config.feedId(), "feedId must not be null"); this.autoDeleteOnIdle = config.getAutoDeleteOnIdle(); this.prefetchCount = config.getPrefetchCount(); - this.fuzzyTripMatching = config.isFuzzyTripMatching(); if (authenticationType == AuthenticationType.FederatedIdentity) { this.fullyQualifiedNamespace = @@ -130,21 +145,25 @@ public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config) { } } - /** - * Consume Service Bus topic message and implement business logic. - * @param messageContext The Service Bus processor message context that holds a received message and additional methods to settle the message. - */ - protected abstract void messageConsumer(ServiceBusReceivedMessageContext messageContext); + public static SiriAzureUpdater createETUpdater( + SiriAzureETUpdaterParameters config, + SiriRealTimeTripUpdateAdapter adapter + ) { + var messageHandler = new SiriAzureETUpdater(config, adapter); + return new SiriAzureUpdater(config, messageHandler); + } - /** - * Consume error and decide how to manage it. - * @param errorContext Context for errors handled by the ServiceBusProcessorClient. - */ - protected abstract void errorConsumer(ServiceBusErrorContext errorContext); + public static SiriAzureUpdater createSXUpdater( + SiriAzureSXUpdaterParameters config, + TimetableRepository timetableRepository + ) { + var messageHandler = new SiriAzureSXUpdater(config, timetableRepository); + return new SiriAzureUpdater(config, messageHandler); + } @Override public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; + this.messageHandler.setup(writeToGraphCallback); } @Override @@ -159,7 +178,14 @@ public void run() { executeWithRetry(this::setupSubscription, "Setting up Service Bus subscription to topic"); executeWithRetry( - () -> initializeData(dataInitializationUrl, messageConsumer), + () -> { + var initialData = fetchInitialSiriData(); + if (initialData.isEmpty()) { + LOG.info("Got empty response from history endpoint"); + } else { + processInitialSiriData(initialData.get()); + } + }, "Initializing historical Siri data" ); @@ -197,7 +223,7 @@ public void run() { * @param millis number of milliseconds * @throws InterruptedException if sleep is interrupted */ - protected void sleep(int millis) throws InterruptedException { + void sleep(int millis) throws InterruptedException { Thread.sleep(millis); } @@ -208,7 +234,7 @@ protected void sleep(int millis) throws InterruptedException { * @param description A description of the task for logging purposes. * @throws InterruptedException If the thread is interrupted while waiting between retries. */ - protected void executeWithRetry(CheckedRunnable task, String description) throws Exception { + void executeWithRetry(CheckedRunnable task, String description) throws Exception { int sleepPeriod = 1000; // Start with 1-second delay int attemptCounter = 1; @@ -243,7 +269,7 @@ protected void executeWithRetry(CheckedRunnable task, String description) throws } } - protected boolean shouldRetry(Exception e) { + boolean shouldRetry(Exception e) { if (e instanceof ServiceBusException sbException) { ServiceBusFailureReason reason = sbException.getReason(); @@ -339,8 +365,8 @@ private void startEventProcessor() throws ServiceBusException { .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .disableAutoComplete() // Receive and delete does not need autocomplete .prefetchCount(prefetchCount) - .processError(errorConsumer) - .processMessage(messageConsumer) + .processError(this::errorConsumer) + .processMessage(this::handleMessage) .buildProcessorClient(); eventProcessor.start(); @@ -352,6 +378,32 @@ private void startEventProcessor() throws ServiceBusException { ); } + private void handleMessage(ServiceBusReceivedMessageContext messageContext) { + var message = messageContext.getMessage(); + MESSAGE_COUNTER.incrementAndGet(); + + if (MESSAGE_COUNTER.get() % 100 == 0) { + LOG.debug("Total SIRI-{} messages received={}", updaterType, MESSAGE_COUNTER.get()); + } + + try { + var siriXmlMessage = message.getBody().toString(); + var siri = SiriXml.parseXml(siriXmlMessage); + var serviceDelivery = siri.getServiceDelivery(); + if (serviceDelivery == null) { + if (siri.getHeartbeatNotification() != null) { + LOG.debug("Updater {} received SIRI heartbeat message", updaterType); + } else { + LOG.debug("Updater {} received SIRI message without ServiceDelivery", updaterType); + } + } else { + messageHandler.handleMessage(serviceDelivery, message.getMessageId()); + } + } catch (JAXBException | XMLStreamException e) { + LOG.error(e.getLocalizedMessage(), e); + } + } + @Override public boolean isPrimed() { return this.isPrimed; @@ -369,14 +421,23 @@ public String getConfigRef() { /** * Returns None for empty result */ - protected Optional fetchInitialSiriData(URI uri) { + private Optional fetchInitialSiriData() { + if (dataInitializationUrl == null) { + return Optional.empty(); + } var headers = HttpHeaders.of().acceptApplicationXML().build().asMap(); + LOG.info( + "Fetching initial Siri data from {}, timeout is {} ms.", + this.dataInitializationUrl, + timeout + ); + try (OtpHttpClientFactory otpHttpClientFactory = new OtpHttpClientFactory()) { var otpHttpClient = otpHttpClientFactory.create(LOG); var t1 = System.currentTimeMillis(); var siriOptional = otpHttpClient.executeAndMapOptional( - new HttpGet(uri), + new HttpGet(dataInitializationUrl), Duration.ofMillis(timeout), headers, SiriXml::parseXml @@ -388,25 +449,29 @@ protected Optional fetchInitialSiriData(URI uri) { LOG.info("Got status 204 'No Content'."); } - return siriOptional.map(siri -> siri.getServiceDelivery()); + return siriOptional.map(Siri::getServiceDelivery); } } - boolean fuzzyTripMatching() { - return fuzzyTripMatching; + public void processInitialSiriData(ServiceDelivery serviceDelivery) { + try { + long t1 = System.currentTimeMillis(); + var f = messageHandler.handleMessage(serviceDelivery, "history-message"); + if (f != null) { + f.get(); + } + LOG.info("{} updater initialized in {} ms.", updaterType, (System.currentTimeMillis() - t1)); + } catch (ExecutionException | InterruptedException e) { + throw new SiriAzureInitializationException("Error applying history", e); + } } - protected abstract void initializeData( - String url, - Consumer consumer - ) throws URISyntaxException; - /** * Make some sensible logging on error and if Service Bus is busy, sleep for some time before try again to get messages. * This code snippet is taken from Microsoft example .... * @param errorContext Context for errors handled by the ServiceBusProcessorClient. */ - protected void defaultErrorConsumer(ServiceBusErrorContext errorContext) { + private void errorConsumer(ServiceBusErrorContext errorContext) { LOG.error( "Error when receiving messages from namespace={}, Entity={}", errorContext.getFullyQualifiedNamespace(), @@ -422,7 +487,7 @@ protected void defaultErrorConsumer(ServiceBusErrorContext errorContext) { if ( reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || - reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND + reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND // should this be recoverable? ) { LOG.error( "An unrecoverable error occurred. Stopping processing with reason {} {}", diff --git a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java index 4b8406da896..d90d95c5d7c 100644 --- a/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java +++ b/application/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java @@ -1,11 +1,14 @@ package org.opentripplanner.ext.siri.updater.azure; +import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; +import java.util.Optional; public abstract class SiriAzureUpdaterParameters { private String configRef; - private String type; + private final String type; private AuthenticationType authenticationType; private String fullyQualifiedNamespace; private String serviceBusUrl; @@ -113,4 +116,10 @@ public int getPrefetchCount() { public void setPrefetchCount(int prefetchCount) { this.prefetchCount = prefetchCount; } + + /** + * Create the url used for fetching initial data. Returns empty if there is no initial data url + * configured. + */ + public abstract Optional buildDataInitializationUrl() throws URISyntaxException; } diff --git a/application/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java b/application/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java index 09fd4a66c97..dad6a8aacb3 100644 --- a/application/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java +++ b/application/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java @@ -4,8 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import org.opentripplanner.ext.siri.updater.azure.SiriAzureETUpdater; -import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdater; +import org.opentripplanner.ext.siri.updater.azure.SiriAzureUpdater; import org.opentripplanner.ext.vehiclerentalservicedirectory.VehicleRentalServiceDirectoryFetcher; import org.opentripplanner.ext.vehiclerentalservicedirectory.api.VehicleRentalServiceDirectoryFetcherParameters; import org.opentripplanner.framework.io.OtpHttpClientFactory; @@ -217,10 +216,10 @@ private List createUpdatersFromConfig() { } } for (var configItem : updatersParameters.getSiriAzureETUpdaterParameters()) { - updaters.add(new SiriAzureETUpdater(configItem, provideSiriAdapter())); + updaters.add(SiriAzureUpdater.createETUpdater(configItem, provideSiriAdapter())); } for (var configItem : updatersParameters.getSiriAzureSXUpdaterParameters()) { - updaters.add(new SiriAzureSXUpdater(configItem, timetableRepository)); + updaters.add(SiriAzureUpdater.createSXUpdater(configItem, timetableRepository)); } return updaters;