From 6dd4c628891af897befc89b757835aaa6a6733fe Mon Sep 17 00:00:00 2001 From: Brian Freeman Date: Thu, 30 Jan 2025 12:38:27 -0600 Subject: [PATCH] Adding retry if concurrent update exception during indexing build (#1152) * adding retry if concurrent update * moving comment down * Adding sleep on retry --- .../job/dataflow/WriteRollupCounts.java | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/job/dataflow/WriteRollupCounts.java b/indexer/src/main/java/bio/terra/tanagra/indexing/job/dataflow/WriteRollupCounts.java index 86cab1893..fdeda5b6c 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/job/dataflow/WriteRollupCounts.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/job/dataflow/WriteRollupCounts.java @@ -21,6 +21,7 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Table; import jakarta.annotation.Nullable; import java.util.ArrayList; @@ -159,9 +160,36 @@ public void run(boolean isDryRun) { LOGGER.info("Rollup counts source SQL is defined. Skipping Dataflow job."); } - // Dataflow jobs can only write new rows to BigQuery, so in this second step, copy over the - // count values to the corresponding column in the index entity main table. - copyFieldsToEntityTable(isDryRun); + final int maxRetries = 5; + var attempt = 0; + + while (attempt < maxRetries) { + try { + // Dataflow jobs can only write new rows to BigQuery, so in this second step, copy over the + // count values to the corresponding column in the index entity main table. + copyFieldsToEntityTable(isDryRun); + return; // Exit if successful + } catch (BigQueryException e) { + if (e.getMessage().contains("due to concurrent update")) { + attempt++; + LOGGER.info("Attempt {} failed: {}", attempt, e.getMessage()); + + if (attempt == maxRetries) { + LOGGER.info("Max retries reached. Giving up."); + throw e; // Rethrow after max attempts + } + + try { + Thread.sleep(1000); // Sleep for 1 second + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } + } else { + throw e; // Rethrow if it's a different exception + } + } + } } @Override