Skip to content

Commit

Permalink
Merge pull request #6487 from Skanetrafiken/siri-azure-refactor
Browse files Browse the repository at this point in the history
Siri azure updater refactor
  • Loading branch information
habrahamsson-skanetrafiken authored Feb 28, 2025
2 parents 71f7369 + 946f006 commit d6c64e6
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@
import static org.mockito.Mockito.*;

import com.azure.core.util.ExpandableStringEnum;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusFailureReason;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand All @@ -33,16 +32,19 @@
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.opentripplanner.framework.io.OtpHttpClientException;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import uk.org.siri.siri20.ServiceDelivery;

class AbstractAzureSiriUpdaterTest {
class SiriAzureUpdaterTest {

private SiriAzureUpdaterParameters mockConfig;
private AbstractAzureSiriUpdater updater;
private AbstractAzureSiriUpdater.CheckedRunnable task;
private SiriAzureUpdater updater;
private SiriAzureUpdater.CheckedRunnable task;

@BeforeEach
public void setUp() throws Exception {
mockConfig = mock(SiriAzureUpdaterParameters.class);
when(mockConfig.getType()).thenReturn("siri-azure-test-updater");
when(mockConfig.configRef()).thenReturn("testConfigRef");
when(mockConfig.getAuthenticationType()).thenReturn(AuthenticationType.SharedAccessKey);
when(mockConfig.getFullyQualifiedNamespace()).thenReturn("testNamespace");
Expand All @@ -58,22 +60,22 @@ public void setUp() throws Exception {
// Create a spy on AbstractAzureSiriUpdater with the mock configuration
updater =
spy(
new AbstractAzureSiriUpdater(mockConfig) {
@Override
protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) {}

@Override
protected void errorConsumer(ServiceBusErrorContext errorContext) {}

@Override
protected void initializeData(
String url,
Consumer<ServiceBusReceivedMessageContext> consumer
) throws URISyntaxException {}
}
new SiriAzureUpdater(
mockConfig,
new SiriAzureMessageHandler() {
@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {}

@Override
@Nullable
public Future<?> handleMessage(ServiceDelivery serviceDelivery, String messageId) {
return null;
}
}
)
);

task = mock(AbstractAzureSiriUpdater.CheckedRunnable.class);
task = mock(SiriAzureUpdater.CheckedRunnable.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,104 +1,63 @@
package org.opentripplanner.ext.siri.updater.azure;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import jakarta.xml.bind.JAXBException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.xml.stream.XMLStreamException;
import org.apache.hc.core5.net.URIBuilder;
import javax.annotation.Nullable;
import org.opentripplanner.updater.spi.ResultLogger;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.UpdateIncrementality;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.opentripplanner.updater.trip.siri.SiriRealTimeTripUpdateAdapter;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri20.ServiceDelivery;

public class SiriAzureETUpdater extends AbstractAzureSiriUpdater {
public class SiriAzureETUpdater implements SiriAzureMessageHandler {

private static final Logger LOG = LoggerFactory.getLogger(SiriAzureSXUpdater.class);

private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0);

private final LocalDate fromDateTime;
private final SiriRealTimeTripUpdateAdapter adapter;

private final Consumer<UpdateResult> recordMetrics;
private final boolean fuzzyTripMatching;
private final String feedId;

private WriteToGraphCallback writeToGraphCallback;

public SiriAzureETUpdater(
SiriAzureETUpdaterParameters config,
SiriRealTimeTripUpdateAdapter adapter
) {
super(config);
this.fromDateTime = config.getFromDateTime();
this.adapter = adapter;
this.recordMetrics = TripUpdateMetrics.streaming(config);
this.fuzzyTripMatching = config.isFuzzyTripMatching();
this.feedId = Objects.requireNonNull(config.feedId(), "feedId must not be null");
}

@Override
protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) {
var message = messageContext.getMessage();
MESSAGE_COUNTER.incrementAndGet();

if (MESSAGE_COUNTER.get() % 100 == 0) {
LOG.debug("Total SIRI-ET messages received={}", MESSAGE_COUNTER.get());
}

try {
var updates = parseSiriEt(message.getBody().toString(), message.getMessageId());
if (!updates.isEmpty()) {
processMessage(updates);
}
} catch (JAXBException | XMLStreamException e) {
LOG.error(e.getLocalizedMessage(), e);
}
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.writeToGraphCallback = writeToGraphCallback;
}

@Override
protected void errorConsumer(ServiceBusErrorContext errorContext) {
defaultErrorConsumer(errorContext);
}

@Override
protected void initializeData(String url, Consumer<ServiceBusReceivedMessageContext> consumer)
throws URISyntaxException {
if (url == null) {
LOG.info("No history url set up for Siri Azure ET Updater");
return;
@Nullable
public Future<?> handleMessage(ServiceDelivery serviceDelivery, String messageId) {
var etDeliveries = serviceDelivery.getEstimatedTimetableDeliveries();
if (etDeliveries == null || etDeliveries.isEmpty()) {
LOG.info("Empty Siri ET message {}", messageId);
return null;
} else {
return processMessage(etDeliveries);
}

URI uri = new URIBuilder(url)
.addParameter("fromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE))
.build();

LOG.info("Fetching initial Siri ET data from {}, timeout is {} ms.", uri, timeout);
var siri = fetchInitialSiriData(uri);

if (siri.isEmpty()) {
LOG.info("Got empty ET response from history endpoint");
return;
}

// This is fine since runnables are scheduled after each other
processHistory(siri.get());
}

private Future<?> processMessage(List<EstimatedTimetableDeliveryStructure> updates) {
return super.saveResultOnGraph.execute(context -> {
return writeToGraphCallback.execute(context -> {
var result = adapter.applyEstimatedTimetable(
fuzzyTripMatching() ? context.siriFuzzyTripMatcher() : null,
fuzzyTripMatching ? context.siriFuzzyTripMatcher() : null,
context.entityResolver(feedId),
feedId,
UpdateIncrementality.DIFFERENTIAL,
Expand All @@ -108,41 +67,4 @@ private Future<?> processMessage(List<EstimatedTimetableDeliveryStructure> updat
recordMetrics.accept(result);
});
}

private void processHistory(ServiceDelivery siri) {
var updates = siri.getEstimatedTimetableDeliveries();

if (updates == null || updates.isEmpty()) {
LOG.info("Did not receive any ET messages from history endpoint");
return;
}

try {
long t1 = System.currentTimeMillis();
var f = processMessage(updates);
f.get();
LOG.info("Azure ET updater initialized in {} ms.", (System.currentTimeMillis() - t1));
} catch (ExecutionException | InterruptedException e) {
throw new SiriAzureInitializationException("Error applying history", e);
}
}

private List<EstimatedTimetableDeliveryStructure> parseSiriEt(String siriXmlMessage, String id)
throws JAXBException, XMLStreamException {
var siri = SiriXml.parseXml(siriXmlMessage);
if (
siri.getServiceDelivery() == null ||
siri.getServiceDelivery().getEstimatedTimetableDeliveries() == null ||
siri.getServiceDelivery().getEstimatedTimetableDeliveries().isEmpty()
) {
if (siri.getHeartbeatNotification() != null) {
LOG.debug("Received SIRI heartbeat message");
} else {
LOG.info("Empty Siri message {}: {}", id, siriXmlMessage);
}
return new ArrayList<>();
}

return siri.getServiceDelivery().getEstimatedTimetableDeliveries();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.opentripplanner.ext.siri.updater.azure;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import org.apache.hc.core5.net.URIBuilder;
import org.opentripplanner.updater.trip.UrlUpdaterParameters;

public class SiriAzureETUpdaterParameters
Expand Down Expand Up @@ -31,4 +36,17 @@ public String url() {
return url;
}
}

@Override
public Optional<URI> buildDataInitializationUrl() throws URISyntaxException {
var url = getDataInitializationUrl();
if (url == null) {
return Optional.empty();
}
return Optional.of(
new URIBuilder(url)
.addParameter("fromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE))
.build()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opentripplanner.ext.siri.updater.azure;

import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import uk.org.siri.siri20.ServiceDelivery;

public interface SiriAzureMessageHandler {
void setup(WriteToGraphCallback writeToGraphCallback);

/**
* Consume ServiceDelivery and update the otp data model within the graph writer thread.
*
* @return A future for the graph updating process. Null if the message can't be handled.
*/
@Nullable
Future<?> handleMessage(ServiceDelivery serviceDelivery, String messageId);
}
Loading

0 comments on commit d6c64e6

Please sign in to comment.