Skip to content

Commit

Permalink
Override max hierarchy depth. Parallelize indexing jobs. (#322)
Browse files Browse the repository at this point in the history
* Override max hierarchy depth. Parallelize indexing jobs.

* Handle null values in modifiers workflow. Add distincts to ancestor-descendant.

* Update indexing job tests.

* Checkstyle fix.

* Update readme.

* Address PR comments.
  • Loading branch information
marikomedlock authored Jan 18, 2023
1 parent 6e291a1 commit 2d62a8a
Show file tree
Hide file tree
Showing 35 changed files with 754 additions and 230 deletions.
22 changes: 15 additions & 7 deletions docs/INDEXING.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ Potential downside is more permissions needed by indexing service account.)
Set the default application credentials to a service account key file that has read access to the source and read +
write access to the index data.
```
export GOOGLE_APPLICATION_CREDENTIALS=/credentials/indexing_sa.json
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/tanagra/credentials/indexing_sa.json
```

#### All Jobs
Do a dry run of all the indexing jobs. This provides a sanity check that the indexing jobs inputs, especially the SQL
query inputs, are valid. This step is not required, but highly recommended to help catch errors/bugs sooner and without
running a bunch of computation first.
```
./gradlew indexer:index -Dexec.args="INDEX_ALL /config/output/omop.json DRY_RUN"
./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json DRY_RUN"
```
Now actually kick off all the indexing jobs.
```
./gradlew indexer:index -Dexec.args="INDEX_ALL /config/output/omop.json"
./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json"
```
This can take a long time to complete. If e.g. your computer falls asleep or you need to kill the process on your
computer, you can re-run the same command again. You need to check that there are no in-progress Dataflow jobs in the
Expand All @@ -106,17 +106,25 @@ kicking them off again.
You can also kickoff the indexing jobs for a single entity or entity group. This is helpful for testing and debugging.
To kick off all the indexing jobs for a particular entity:
```
./gradlew indexer:index -Dexec.args="INDEX_ENTITY /config/output/omop.json person DRY_RUN"
./gradlew indexer:index -Dexec.args="INDEX_ENTITY /config/output/omop.json person"
./gradlew indexer:index -Dexec.args="INDEX_ENTITY $HOME/tanagra/service/src/main/resources/config/output/omop.json person DRY_RUN"
./gradlew indexer:index -Dexec.args="INDEX_ENTITY $HOME/tanagra/service/src/main/resources/config/output/omop.json person"
```
or entity group:
```
./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP /config/output/omop.json condition_occurrence_person DRY_RUN"
./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP /config/output/omop.json condition_occurrence_person"
./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP $HOME/tanagra/service/src/main/resources/config/output/omop.json condition_occurrence_person DRY_RUN"
./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP $HOME/tanagra/service/src/main/resources/config/output/omop.json condition_occurrence_person"
```
All the entities in a group should be indexed before the group. The `INDEX_ALL` command ensures this ordering, but keep
this in mind if you're running the jobs for each entity or entity group separately.

#### Concurrency
By default, the indexing jobs are run concurrently as much as possible. You can force it to run jobs serially by
appending `SERIAL` to the command:
```
./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json DRY_RUN SERIAL"
./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json NOT_DRY_RUN SERIAL"
```

## OMOP Example
The `cms_synpuf` is a [public dataset](https://console.cloud.google.com/marketplace/product/hhs/synpuf) that uses the
standard OMOP schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,12 @@ public abstract class BigQueryIndexingJob implements IndexingJob {

protected static final String DEFAULT_REGION = "us-central1";

// The maximum depth of ancestors present in a hierarchy. This may be larger
// than the actual max depth, but if it is smaller the resulting table will be incomplete.
// TODO: Allow overriding the default max hierarchy depth.
protected static final int DEFAULT_MAX_HIERARCHY_DEPTH = 64;

private final Entity entity;

protected BigQueryIndexingJob(Entity entity) {
this.entity = entity;
}

@Override
public boolean prerequisitesComplete() {
// TODO: Implement a required ordering so we can run jobs in parallel.
return true;
}

protected Entity getEntity() {
return entity;
}
Expand Down Expand Up @@ -232,6 +221,8 @@ protected BigQueryOptions buildDataflowPipelineOptions(BigQueryDataset outputBQD

/** Build a name for the Dataflow job that will be visible in the Cloud Console. */
private String getDataflowJobName() {
String underlayName = entity.getUnderlay().getName();
String normalizedUnderlayName = underlayName.toLowerCase().replaceAll("[^a-z0-9]", "-");
String jobDisplayName = getName();
String normalizedJobDisplayName =
jobDisplayName == null || jobDisplayName.length() == 0
Expand All @@ -243,7 +234,8 @@ private String getDataflowJobName() {

String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
return String.format(
"%s-%s-%s-%s", normalizedJobDisplayName, normalizedUserName, datePart, randomPart);
"%s-%s-%s-%s-%s",
normalizedUnderlayName, normalizedJobDisplayName, normalizedUserName, datePart, randomPart);
}

protected String getAppDefaultSAEmail() {
Expand Down
154 changes: 81 additions & 73 deletions indexer/src/main/java/bio/terra/tanagra/indexing/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import bio.terra.tanagra.indexing.job.WriteAncestorDescendantIdPairs;
import bio.terra.tanagra.indexing.job.WriteParentChildIdPairs;
import bio.terra.tanagra.indexing.job.WriteRelationshipIdPairs;
import bio.terra.tanagra.indexing.jobexecutor.JobRunner;
import bio.terra.tanagra.indexing.jobexecutor.ParallelRunner;
import bio.terra.tanagra.indexing.jobexecutor.SequencedJobSet;
import bio.terra.tanagra.indexing.jobexecutor.SerialRunner;
import bio.terra.tanagra.serialization.UFEntity;
import bio.terra.tanagra.serialization.UFEntityGroup;
import bio.terra.tanagra.serialization.UFUnderlay;
Expand All @@ -21,12 +25,10 @@
import bio.terra.tanagra.underlay.Underlay;
import bio.terra.tanagra.underlay.entitygroup.CriteriaOccurrence;
import bio.terra.tanagra.utils.FileIO;
import bio.terra.tanagra.utils.HttpUtils;
import bio.terra.tanagra.utils.JacksonMapper;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -35,6 +37,23 @@
public final class Indexer {
private static final Logger LOGGER = LoggerFactory.getLogger(Indexer.class);

enum JobExecutor {
PARALLEL,
SERIAL;

public JobRunner getRunner(
List<SequencedJobSet> jobSets, boolean isDryRun, IndexingJob.RunType runType) {
switch (this) {
case SERIAL:
return new SerialRunner(jobSets, isDryRun, runType);
case PARALLEL:
return new ParallelRunner(jobSets, isDryRun, runType);
default:
throw new IllegalArgumentException("Unknown JobExecution enum type: " + this);
}
}
}

private final Underlay underlay;
private UFUnderlay expandedUnderlay;
private List<UFEntity> expandedEntities;
Expand Down Expand Up @@ -103,91 +122,98 @@ public void writeSerializedUnderlay() throws IOException {
}
}

public void runJobsForAllEntities(boolean isDryRun) {
underlay.getEntities().keySet().stream()
.sorted()
.forEach(name -> runJobsForEntity(name, isDryRun));
public JobRunner runJobsForAllEntities(
JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType) {
LOGGER.info("INDEXING all entities");
List<SequencedJobSet> jobSets =
underlay.getEntities().values().stream()
.map(Indexer::getJobSetForEntity)
.collect(Collectors.toList());
return runJobs(jobExecutor, isDryRun, runType, jobSets);
}

public void runJobsForEntity(String name, boolean isDryRun) {
LOGGER.info("RUN entity: {}", name);
getJobsForEntity(underlay.getEntity(name))
.forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndRun(isDryRun), ij.getName()));
public JobRunner runJobsForSingleEntity(
JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType, String name) {
LOGGER.info("INDEXING entity: {}", name);
List<SequencedJobSet> jobSets = List.of(getJobSetForEntity(underlay.getEntity(name)));
return runJobs(jobExecutor, isDryRun, runType, jobSets);
}

public void runJobsForAllEntityGroups(boolean isDryRun) {
underlay.getEntityGroups().keySet().stream()
.sorted()
.forEach(name -> runJobsForEntityGroup(name, isDryRun));
public JobRunner runJobsForAllEntityGroups(
JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType) {
LOGGER.info("INDEXING all entity groups");
List<SequencedJobSet> jobSets =
underlay.getEntityGroups().values().stream()
.map(Indexer::getJobSetForEntityGroup)
.collect(Collectors.toList());
return runJobs(jobExecutor, isDryRun, runType, jobSets);
}

public void runJobsForEntityGroup(String name, boolean isDryRun) {
LOGGER.info("RUN entity group: {}", name);
getJobsForEntityGroup(underlay.getEntityGroup(name))
.forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndRun(isDryRun), ij.getName()));
public JobRunner runJobsForSingleEntityGroup(
JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType, String name) {
LOGGER.info("INDEXING entity group: {}", name);
List<SequencedJobSet> jobSets = List.of(getJobSetForEntityGroup(underlay.getEntityGroup(name)));
return runJobs(jobExecutor, isDryRun, runType, jobSets);
}

public void cleanAllEntities(boolean isDryRun) {
underlay.getEntities().keySet().stream().sorted().forEach(name -> cleanEntity(name, isDryRun));
private JobRunner runJobs(
JobExecutor jobExecutor,
boolean isDryRun,
IndexingJob.RunType runType,
List<SequencedJobSet> jobSets) {
JobRunner jobRunner = jobExecutor.getRunner(jobSets, isDryRun, runType);
jobRunner.runJobSets();
return jobRunner;
}

public void cleanEntity(String name, boolean isDryRun) {
LOGGER.info("CLEAN entity: {}", name);
getJobsForEntity(underlay.getEntity(name))
.forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndClean(isDryRun), ij.getName()));
}
@VisibleForTesting
public static SequencedJobSet getJobSetForEntity(Entity entity) {
SequencedJobSet jobSet = new SequencedJobSet(entity.getName());
jobSet.startNewStage();
jobSet.addJob(new CreateEntityTable(entity));

public void cleanAllEntityGroups(boolean isDryRun) {
underlay.getEntityGroups().keySet().stream()
.sorted()
.forEach(name -> cleanEntityGroup(name, isDryRun));
}
jobSet.startNewStage();
jobSet.addJob(new DenormalizeEntityInstances(entity));

public void cleanEntityGroup(String name, boolean isDryRun) {
LOGGER.info("CLEAN entity group: {}", name);
getJobsForEntityGroup(underlay.getEntityGroup(name))
.forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndClean(isDryRun), ij.getName()));
}
if (entity.getTextSearch().isEnabled() || entity.hasHierarchies()) {
jobSet.startNewStage();
}

@VisibleForTesting
public static List<IndexingJob> getJobsForEntity(Entity entity) {
List<IndexingJob> jobs = new ArrayList<>();
jobs.add(new CreateEntityTable(entity));
jobs.add(new DenormalizeEntityInstances(entity));
if (entity.getTextSearch().isEnabled()) {
jobs.add(new BuildTextSearchStrings(entity));
jobSet.addJob(new BuildTextSearchStrings(entity));
}
entity.getHierarchies().stream()
.forEach(
hierarchy -> {
jobs.add(new WriteParentChildIdPairs(entity, hierarchy.getName()));
jobs.add(new WriteAncestorDescendantIdPairs(entity, hierarchy.getName()));
jobs.add(new BuildNumChildrenAndPaths(entity, hierarchy.getName()));
jobSet.addJob(new WriteParentChildIdPairs(entity, hierarchy.getName()));
jobSet.addJob(new WriteAncestorDescendantIdPairs(entity, hierarchy.getName()));
jobSet.addJob(new BuildNumChildrenAndPaths(entity, hierarchy.getName()));
});
return jobs;
return jobSet;
}

@VisibleForTesting
public static List<IndexingJob> getJobsForEntityGroup(EntityGroup entityGroup) {
List<IndexingJob> jobs = new ArrayList<>();
public static SequencedJobSet getJobSetForEntityGroup(EntityGroup entityGroup) {
SequencedJobSet jobSet = new SequencedJobSet(entityGroup.getName());
jobSet.startNewStage();

// for each relationship, write the index relationship mapping
// For each relationship, write the index relationship mapping.
entityGroup.getRelationships().values().stream()
.forEach(
// TODO: If the source relationship mapping table = one of the entity tables, then just
// populate a new column on that entity table, instead of always writing a new table.
relationship -> jobs.add(new WriteRelationshipIdPairs(relationship)));
relationship -> jobSet.addJob(new WriteRelationshipIdPairs(relationship)));

if (EntityGroup.Type.CRITERIA_OCCURRENCE.equals(entityGroup.getType())) {
CriteriaOccurrence criteriaOccurrence = (CriteriaOccurrence) entityGroup;
// Compute the criteria rollup counts for both the criteria-primary and criteria-occurrence
// relationships.
jobs.add(
jobSet.addJob(
new ComputeRollupCounts(
criteriaOccurrence.getCriteriaEntity(),
criteriaOccurrence.getCriteriaPrimaryRelationship(),
null));
jobs.add(
jobSet.addJob(
new ComputeRollupCounts(
criteriaOccurrence.getCriteriaEntity(),
criteriaOccurrence.getOccurrenceCriteriaRelationship(),
Expand All @@ -199,12 +225,12 @@ public static List<IndexingJob> getJobsForEntityGroup(EntityGroup entityGroup) {
criteriaOccurrence.getCriteriaEntity().getHierarchies().stream()
.forEach(
hierarchy -> {
jobs.add(
jobSet.addJob(
new ComputeRollupCounts(
criteriaOccurrence.getCriteriaEntity(),
criteriaOccurrence.getCriteriaPrimaryRelationship(),
hierarchy));
jobs.add(
jobSet.addJob(
new ComputeRollupCounts(
criteriaOccurrence.getCriteriaEntity(),
criteriaOccurrence.getOccurrenceCriteriaRelationship(),
Expand All @@ -214,33 +240,15 @@ public static List<IndexingJob> getJobsForEntityGroup(EntityGroup entityGroup) {

// Compute display hints for the occurrence entity.
if (!criteriaOccurrence.getModifierAttributes().isEmpty()) {
jobs.add(
jobSet.addJob(
new ComputeDisplayHints(
criteriaOccurrence, criteriaOccurrence.getModifierAttributes()));
}
}

return jobs;
return jobSet;
}

public Underlay getUnderlay() {
return underlay;
}

/**
* Execute an indexing job. If an exception is thrown, make sure the error message and stack trace
* are logged.
*
* @param runJob function with no return value
* @param jobName name of the indexing job to include in log statements
*/
private void tolerateJobExceptions(
HttpUtils.RunnableWithCheckedException<Exception> runJob, String jobName) {
try {
runJob.run();
} catch (Exception ex) {
LOGGER.error("Error running indexing job: {}", jobName);
LOGGER.error("Exception thrown: {}", ex);
}
}
}
Loading

0 comments on commit 2d62a8a

Please sign in to comment.