Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow number of retries on failover #624

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion quickfixj-core/src/main/java/quickfix/Initiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,11 @@ public interface Initiator extends Connector {
* AbstractSocketInitiator.createDynamicSession is called
*/
String SETTING_DYNAMIC_SESSION = "DynamicSession";

/**
* Initiator setting for reconnect attempts. Only valid when
* session connection type is "initiator".
*
* @see quickfix.SessionFactory#SETTING_CONNECTION_TYPE
*/
String SETTING_RECONNECT_ATTEMPT = "ReconnectAttempt";
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ private void createInitiator(final Session session, final boolean continueInitOn
String proxyDomain = null;

int proxyPort = -1;
int retryCount = 1;

if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) {
proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE);
Expand All @@ -173,14 +174,17 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST);
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
}
if (getSettings().isSetting(sessionID, Initiator.SETTING_RECONNECT_ATTEMPT)){
retryCount = settings.getInt(sessionID, Initiator.SETTING_RECONNECT_ATTEMPT);
}

ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService());
try {
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
socketAddresses, localAddress, connectTimeout, reconnectingIntervals,
scheduledExecutorService, settings, networkingOptions,
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, retryCount);

initiators.add(ioSessionInitiator);
} catch (ConfigError e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses,
EventHandlingStrategy eventHandlingStrategy,
IoFilterChainBuilder userIoFilterChainBuilder, boolean sslEnabled, SSLConfig sslConfig,
String proxyType, String proxyVersion, String proxyHost, int proxyPort,
String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation) throws ConfigError {
String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation, int retryCount) throws ConfigError {
this.executor = executor;

final long connectTimeoutMillis = connectTimeout * 1000L;
Expand All @@ -83,7 +83,7 @@ public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses,
reconnectTask = new ConnectTask(sslEnabled, socketAddresses, localAddress,
userIoFilterChainBuilder, fixSession, connectTimeoutMillis, reconnectIntervalInMillis,
sessionSettings, networkingOptions, eventHandlingStrategy, sslConfig,
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, log);
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, log, retryCount);
} catch (GeneralSecurityException e) {
throw new ConfigError(e);
}
Expand Down Expand Up @@ -111,6 +111,9 @@ private static class ConnectTask implements Runnable {
private long lastConnectTime;
private int nextSocketAddressIndex;
private int connectionFailureCount;
private int retryCount = 1;
private int retryAttempt = 0;
private boolean isFirstTime = true;
private ConnectFuture connectFuture;

private final String proxyType;
Expand All @@ -128,7 +131,7 @@ public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses,
SessionSettings sessionSettings, NetworkingOptions networkingOptions, EventHandlingStrategy eventHandlingStrategy, SSLConfig sslConfig,
String proxyType, String proxyVersion, String proxyHost,
int proxyPort, String proxyUser, String proxyPassword, String proxyDomain,
String proxyWorkstation, Logger log) throws ConfigError, GeneralSecurityException {
String proxyWorkstation, Logger log, int retryCount) throws ConfigError, GeneralSecurityException {
this.sslEnabled = sslEnabled;
this.socketAddresses = socketAddresses;
this.localAddress = localAddress;
Expand All @@ -150,6 +153,7 @@ public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses,
this.proxyPassword = proxyPassword;
this.proxyDomain = proxyDomain;
this.proxyWorkstation = proxyWorkstation;
this.retryCount = retryCount;

setupIoConnector();
}
Expand Down Expand Up @@ -224,7 +228,14 @@ public void run() {
private void connect() {
try {
lastReconnectAttemptTime = SystemTime.currentTimeMillis();
SocketAddress nextSocketAddress = getNextSocketAddress();
SocketAddress nextSocketAddress = socketAddresses[getCurrentSocketAddressIndex()];
if (retryCount == 1 || retryAttempt == retryCount || isFirstTime){
nextSocketAddress = getNextSocketAddress();
retryAttempt = 1;
isFirstTime = false;
} else {
++retryAttempt;
}
if (localAddress == null) {
connectFuture = ioConnector.connect(nextSocketAddress);
} else {
Expand Down