From 2d62a8a37016320d8e09280c7e3acfa761f979ef Mon Sep 17 00:00:00 2001 From: marikomedlock Date: Wed, 18 Jan 2023 09:27:35 -0500 Subject: [PATCH] Override max hierarchy depth. Parallelize indexing jobs. (#322) * Override max hierarchy depth. Parallelize indexing jobs. * Handle null values in modifiers workflow. Add distincts to ancestor-descendant. * Update indexing job tests. * Checkstyle fix. * Update readme. * Address PR comments. --- docs/INDEXING.md | 22 ++- .../tanagra/indexing/BigQueryIndexingJob.java | 16 +- .../bio/terra/tanagra/indexing/Indexer.java | 154 +++++++++--------- .../terra/tanagra/indexing/IndexingJob.java | 55 ++++--- .../java/bio/terra/tanagra/indexing/Main.java | 95 ++++++----- .../job/BuildNumChildrenAndPaths.java | 3 +- .../indexing/job/ComputeDisplayHints.java | 21 ++- .../job/WriteAncestorDescendantIdPairs.java | 10 +- .../tanagra/indexing/job/beam/GraphUtils.java | 9 +- .../indexing/jobexecutor/JobResult.java | 113 +++++++++++++ .../indexing/jobexecutor/JobRunner.java | 61 +++++++ .../indexing/jobexecutor/JobThread.java | 42 +++++ .../indexing/jobexecutor/ParallelRunner.java | 117 +++++++++++++ .../indexing/jobexecutor/SequencedJobSet.java | 40 +++++ .../indexing/jobexecutor/SerialRunner.java | 49 ++++++ .../command/IndexEntityGroupTest.java | 11 +- .../indexing/command/IndexEntityTest.java | 55 ++++--- .../expanded/entity/condition.json | 6 +- .../expanded/entity/ingredient.json | 6 +- .../expanded/entity/measurement.json | 6 +- .../expanded/entity/procedure.json | 6 +- .../original/entity/condition.json | 3 +- .../original/entity/ingredient.json | 3 +- .../original/entity/measurement.json | 3 +- .../original/entity/procedure.json | 3 +- .../cms_synpuf/expanded/entity/condition.json | 6 +- .../expanded/entity/ingredient.json | 6 +- .../cms_synpuf/expanded/entity/procedure.json | 6 +- .../cms_synpuf/original/entity/condition.json | 3 +- .../original/entity/ingredient.json | 3 +- .../cms_synpuf/original/entity/procedure.json | 3 +- .../sdd/original/sql/brand_textSearch.sql | 1 - .../serialization/UFHierarchyMapping.java | 13 ++ .../bio/terra/tanagra/underlay/Entity.java | 7 +- .../tanagra/underlay/HierarchyMapping.java | 27 ++- 35 files changed, 754 insertions(+), 230 deletions(-) create mode 100644 indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobResult.java create mode 100644 indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobRunner.java create mode 100644 indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobThread.java create mode 100644 indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/ParallelRunner.java create mode 100644 indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SequencedJobSet.java create mode 100644 indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SerialRunner.java diff --git a/docs/INDEXING.md b/docs/INDEXING.md index 8dc45af51..19195d20b 100644 --- a/docs/INDEXING.md +++ b/docs/INDEXING.md @@ -80,7 +80,7 @@ Potential downside is more permissions needed by indexing service account.) Set the default application credentials to a service account key file that has read access to the source and read + write access to the index data. ``` -export GOOGLE_APPLICATION_CREDENTIALS=/credentials/indexing_sa.json +export GOOGLE_APPLICATION_CREDENTIALS=$HOME/tanagra/credentials/indexing_sa.json ``` #### All Jobs @@ -88,11 +88,11 @@ Do a dry run of all the indexing jobs. This provides a sanity check that the ind query inputs, are valid. This step is not required, but highly recommended to help catch errors/bugs sooner and without running a bunch of computation first. ``` -./gradlew indexer:index -Dexec.args="INDEX_ALL /config/output/omop.json DRY_RUN" +./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json DRY_RUN" ``` Now actually kick off all the indexing jobs. ``` -./gradlew indexer:index -Dexec.args="INDEX_ALL /config/output/omop.json" +./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json" ``` This can take a long time to complete. If e.g. your computer falls asleep or you need to kill the process on your computer, you can re-run the same command again. You need to check that there are no in-progress Dataflow jobs in the @@ -106,17 +106,25 @@ kicking them off again. You can also kickoff the indexing jobs for a single entity or entity group. This is helpful for testing and debugging. To kick off all the indexing jobs for a particular entity: ``` -./gradlew indexer:index -Dexec.args="INDEX_ENTITY /config/output/omop.json person DRY_RUN" -./gradlew indexer:index -Dexec.args="INDEX_ENTITY /config/output/omop.json person" +./gradlew indexer:index -Dexec.args="INDEX_ENTITY $HOME/tanagra/service/src/main/resources/config/output/omop.json person DRY_RUN" +./gradlew indexer:index -Dexec.args="INDEX_ENTITY $HOME/tanagra/service/src/main/resources/config/output/omop.json person" ``` or entity group: ``` -./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP /config/output/omop.json condition_occurrence_person DRY_RUN" -./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP /config/output/omop.json condition_occurrence_person" +./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP $HOME/tanagra/service/src/main/resources/config/output/omop.json condition_occurrence_person DRY_RUN" +./gradlew indexer:index -Dexec.args="INDEX_ENTITY_GROUP $HOME/tanagra/service/src/main/resources/config/output/omop.json condition_occurrence_person" ``` All the entities in a group should be indexed before the group. The `INDEX_ALL` command ensures this ordering, but keep this in mind if you're running the jobs for each entity or entity group separately. +#### Concurrency +By default, the indexing jobs are run concurrently as much as possible. You can force it to run jobs serially by +appending `SERIAL` to the command: +``` +./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json DRY_RUN SERIAL" +./gradlew indexer:index -Dexec.args="INDEX_ALL $HOME/tanagra/service/src/main/resources/config/output/omop.json NOT_DRY_RUN SERIAL" +``` + ## OMOP Example The `cms_synpuf` is a [public dataset](https://console.cloud.google.com/marketplace/product/hhs/synpuf) that uses the standard OMOP schema. diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/BigQueryIndexingJob.java b/indexer/src/main/java/bio/terra/tanagra/indexing/BigQueryIndexingJob.java index 32c3f552a..50bfa7826 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/BigQueryIndexingJob.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/BigQueryIndexingJob.java @@ -48,23 +48,12 @@ public abstract class BigQueryIndexingJob implements IndexingJob { protected static final String DEFAULT_REGION = "us-central1"; - // The maximum depth of ancestors present in a hierarchy. This may be larger - // than the actual max depth, but if it is smaller the resulting table will be incomplete. - // TODO: Allow overriding the default max hierarchy depth. - protected static final int DEFAULT_MAX_HIERARCHY_DEPTH = 64; - private final Entity entity; protected BigQueryIndexingJob(Entity entity) { this.entity = entity; } - @Override - public boolean prerequisitesComplete() { - // TODO: Implement a required ordering so we can run jobs in parallel. - return true; - } - protected Entity getEntity() { return entity; } @@ -232,6 +221,8 @@ protected BigQueryOptions buildDataflowPipelineOptions(BigQueryDataset outputBQD /** Build a name for the Dataflow job that will be visible in the Cloud Console. */ private String getDataflowJobName() { + String underlayName = entity.getUnderlay().getName(); + String normalizedUnderlayName = underlayName.toLowerCase().replaceAll("[^a-z0-9]", "-"); String jobDisplayName = getName(); String normalizedJobDisplayName = jobDisplayName == null || jobDisplayName.length() == 0 @@ -243,7 +234,8 @@ private String getDataflowJobName() { String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt()); return String.format( - "%s-%s-%s-%s", normalizedJobDisplayName, normalizedUserName, datePart, randomPart); + "%s-%s-%s-%s-%s", + normalizedUnderlayName, normalizedJobDisplayName, normalizedUserName, datePart, randomPart); } protected String getAppDefaultSAEmail() { diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/Indexer.java b/indexer/src/main/java/bio/terra/tanagra/indexing/Indexer.java index b705be4d0..b2ed06198 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/Indexer.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/Indexer.java @@ -13,6 +13,10 @@ import bio.terra.tanagra.indexing.job.WriteAncestorDescendantIdPairs; import bio.terra.tanagra.indexing.job.WriteParentChildIdPairs; import bio.terra.tanagra.indexing.job.WriteRelationshipIdPairs; +import bio.terra.tanagra.indexing.jobexecutor.JobRunner; +import bio.terra.tanagra.indexing.jobexecutor.ParallelRunner; +import bio.terra.tanagra.indexing.jobexecutor.SequencedJobSet; +import bio.terra.tanagra.indexing.jobexecutor.SerialRunner; import bio.terra.tanagra.serialization.UFEntity; import bio.terra.tanagra.serialization.UFEntityGroup; import bio.terra.tanagra.serialization.UFUnderlay; @@ -21,12 +25,10 @@ import bio.terra.tanagra.underlay.Underlay; import bio.terra.tanagra.underlay.entitygroup.CriteriaOccurrence; import bio.terra.tanagra.utils.FileIO; -import bio.terra.tanagra.utils.HttpUtils; import bio.terra.tanagra.utils.JacksonMapper; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -35,6 +37,23 @@ public final class Indexer { private static final Logger LOGGER = LoggerFactory.getLogger(Indexer.class); + enum JobExecutor { + PARALLEL, + SERIAL; + + public JobRunner getRunner( + List jobSets, boolean isDryRun, IndexingJob.RunType runType) { + switch (this) { + case SERIAL: + return new SerialRunner(jobSets, isDryRun, runType); + case PARALLEL: + return new ParallelRunner(jobSets, isDryRun, runType); + default: + throw new IllegalArgumentException("Unknown JobExecution enum type: " + this); + } + } + } + private final Underlay underlay; private UFUnderlay expandedUnderlay; private List expandedEntities; @@ -103,91 +122,98 @@ public void writeSerializedUnderlay() throws IOException { } } - public void runJobsForAllEntities(boolean isDryRun) { - underlay.getEntities().keySet().stream() - .sorted() - .forEach(name -> runJobsForEntity(name, isDryRun)); + public JobRunner runJobsForAllEntities( + JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType) { + LOGGER.info("INDEXING all entities"); + List jobSets = + underlay.getEntities().values().stream() + .map(Indexer::getJobSetForEntity) + .collect(Collectors.toList()); + return runJobs(jobExecutor, isDryRun, runType, jobSets); } - public void runJobsForEntity(String name, boolean isDryRun) { - LOGGER.info("RUN entity: {}", name); - getJobsForEntity(underlay.getEntity(name)) - .forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndRun(isDryRun), ij.getName())); + public JobRunner runJobsForSingleEntity( + JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType, String name) { + LOGGER.info("INDEXING entity: {}", name); + List jobSets = List.of(getJobSetForEntity(underlay.getEntity(name))); + return runJobs(jobExecutor, isDryRun, runType, jobSets); } - public void runJobsForAllEntityGroups(boolean isDryRun) { - underlay.getEntityGroups().keySet().stream() - .sorted() - .forEach(name -> runJobsForEntityGroup(name, isDryRun)); + public JobRunner runJobsForAllEntityGroups( + JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType) { + LOGGER.info("INDEXING all entity groups"); + List jobSets = + underlay.getEntityGroups().values().stream() + .map(Indexer::getJobSetForEntityGroup) + .collect(Collectors.toList()); + return runJobs(jobExecutor, isDryRun, runType, jobSets); } - public void runJobsForEntityGroup(String name, boolean isDryRun) { - LOGGER.info("RUN entity group: {}", name); - getJobsForEntityGroup(underlay.getEntityGroup(name)) - .forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndRun(isDryRun), ij.getName())); + public JobRunner runJobsForSingleEntityGroup( + JobExecutor jobExecutor, boolean isDryRun, IndexingJob.RunType runType, String name) { + LOGGER.info("INDEXING entity group: {}", name); + List jobSets = List.of(getJobSetForEntityGroup(underlay.getEntityGroup(name))); + return runJobs(jobExecutor, isDryRun, runType, jobSets); } - public void cleanAllEntities(boolean isDryRun) { - underlay.getEntities().keySet().stream().sorted().forEach(name -> cleanEntity(name, isDryRun)); + private JobRunner runJobs( + JobExecutor jobExecutor, + boolean isDryRun, + IndexingJob.RunType runType, + List jobSets) { + JobRunner jobRunner = jobExecutor.getRunner(jobSets, isDryRun, runType); + jobRunner.runJobSets(); + return jobRunner; } - public void cleanEntity(String name, boolean isDryRun) { - LOGGER.info("CLEAN entity: {}", name); - getJobsForEntity(underlay.getEntity(name)) - .forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndClean(isDryRun), ij.getName())); - } + @VisibleForTesting + public static SequencedJobSet getJobSetForEntity(Entity entity) { + SequencedJobSet jobSet = new SequencedJobSet(entity.getName()); + jobSet.startNewStage(); + jobSet.addJob(new CreateEntityTable(entity)); - public void cleanAllEntityGroups(boolean isDryRun) { - underlay.getEntityGroups().keySet().stream() - .sorted() - .forEach(name -> cleanEntityGroup(name, isDryRun)); - } + jobSet.startNewStage(); + jobSet.addJob(new DenormalizeEntityInstances(entity)); - public void cleanEntityGroup(String name, boolean isDryRun) { - LOGGER.info("CLEAN entity group: {}", name); - getJobsForEntityGroup(underlay.getEntityGroup(name)) - .forEach(ij -> tolerateJobExceptions(() -> ij.checkStatusAndClean(isDryRun), ij.getName())); - } + if (entity.getTextSearch().isEnabled() || entity.hasHierarchies()) { + jobSet.startNewStage(); + } - @VisibleForTesting - public static List getJobsForEntity(Entity entity) { - List jobs = new ArrayList<>(); - jobs.add(new CreateEntityTable(entity)); - jobs.add(new DenormalizeEntityInstances(entity)); if (entity.getTextSearch().isEnabled()) { - jobs.add(new BuildTextSearchStrings(entity)); + jobSet.addJob(new BuildTextSearchStrings(entity)); } entity.getHierarchies().stream() .forEach( hierarchy -> { - jobs.add(new WriteParentChildIdPairs(entity, hierarchy.getName())); - jobs.add(new WriteAncestorDescendantIdPairs(entity, hierarchy.getName())); - jobs.add(new BuildNumChildrenAndPaths(entity, hierarchy.getName())); + jobSet.addJob(new WriteParentChildIdPairs(entity, hierarchy.getName())); + jobSet.addJob(new WriteAncestorDescendantIdPairs(entity, hierarchy.getName())); + jobSet.addJob(new BuildNumChildrenAndPaths(entity, hierarchy.getName())); }); - return jobs; + return jobSet; } @VisibleForTesting - public static List getJobsForEntityGroup(EntityGroup entityGroup) { - List jobs = new ArrayList<>(); + public static SequencedJobSet getJobSetForEntityGroup(EntityGroup entityGroup) { + SequencedJobSet jobSet = new SequencedJobSet(entityGroup.getName()); + jobSet.startNewStage(); - // for each relationship, write the index relationship mapping + // For each relationship, write the index relationship mapping. entityGroup.getRelationships().values().stream() .forEach( // TODO: If the source relationship mapping table = one of the entity tables, then just // populate a new column on that entity table, instead of always writing a new table. - relationship -> jobs.add(new WriteRelationshipIdPairs(relationship))); + relationship -> jobSet.addJob(new WriteRelationshipIdPairs(relationship))); if (EntityGroup.Type.CRITERIA_OCCURRENCE.equals(entityGroup.getType())) { CriteriaOccurrence criteriaOccurrence = (CriteriaOccurrence) entityGroup; // Compute the criteria rollup counts for both the criteria-primary and criteria-occurrence // relationships. - jobs.add( + jobSet.addJob( new ComputeRollupCounts( criteriaOccurrence.getCriteriaEntity(), criteriaOccurrence.getCriteriaPrimaryRelationship(), null)); - jobs.add( + jobSet.addJob( new ComputeRollupCounts( criteriaOccurrence.getCriteriaEntity(), criteriaOccurrence.getOccurrenceCriteriaRelationship(), @@ -199,12 +225,12 @@ public static List getJobsForEntityGroup(EntityGroup entityGroup) { criteriaOccurrence.getCriteriaEntity().getHierarchies().stream() .forEach( hierarchy -> { - jobs.add( + jobSet.addJob( new ComputeRollupCounts( criteriaOccurrence.getCriteriaEntity(), criteriaOccurrence.getCriteriaPrimaryRelationship(), hierarchy)); - jobs.add( + jobSet.addJob( new ComputeRollupCounts( criteriaOccurrence.getCriteriaEntity(), criteriaOccurrence.getOccurrenceCriteriaRelationship(), @@ -214,33 +240,15 @@ public static List getJobsForEntityGroup(EntityGroup entityGroup) { // Compute display hints for the occurrence entity. if (!criteriaOccurrence.getModifierAttributes().isEmpty()) { - jobs.add( + jobSet.addJob( new ComputeDisplayHints( criteriaOccurrence, criteriaOccurrence.getModifierAttributes())); } } - - return jobs; + return jobSet; } public Underlay getUnderlay() { return underlay; } - - /** - * Execute an indexing job. If an exception is thrown, make sure the error message and stack trace - * are logged. - * - * @param runJob function with no return value - * @param jobName name of the indexing job to include in log statements - */ - private void tolerateJobExceptions( - HttpUtils.RunnableWithCheckedException runJob, String jobName) { - try { - runJob.run(); - } catch (Exception ex) { - LOGGER.error("Error running indexing job: {}", jobName); - LOGGER.error("Exception thrown: {}", ex); - } - } } diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/IndexingJob.java b/indexer/src/main/java/bio/terra/tanagra/indexing/IndexingJob.java index 1becc480d..fd41d9980 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/IndexingJob.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/IndexingJob.java @@ -12,6 +12,12 @@ enum JobStatus { COMPLETE } + enum RunType { + STATUS, + CLEAN, + RUN + } + String getName(); void run(boolean isDryRun); @@ -20,32 +26,39 @@ enum JobStatus { JobStatus checkStatus(); - boolean prerequisitesComplete(); - - default void checkStatusAndRun(boolean isDryRun) { - LOGGER.info("RUN Indexing job: {}", getName()); + default JobStatus execute(RunType runType, boolean isDryRun) { + LOGGER.info("Executing indexing job: {}, {}", runType, getName()); JobStatus status = checkStatus(); LOGGER.info("Job status: {}", status); - if (!prerequisitesComplete()) { - LOGGER.info("Skipping because prerequisites are not complete"); - return; - } else if (!JobStatus.NOT_STARTED.equals(status)) { - LOGGER.info("Skipping because job is either in progress or complete"); - return; + switch (runType) { + case RUN: + if (!JobStatus.NOT_STARTED.equals(status)) { + LOGGER.info("Skipping because job is either in progress or complete"); + return status; + } + run(isDryRun); + return checkStatus(); + case CLEAN: + if (JobStatus.IN_PROGRESS.equals(status)) { + LOGGER.info("Skipping because job is in progress"); + return status; + } + clean(isDryRun); + return checkStatus(); + case STATUS: + return status; + default: + throw new IllegalArgumentException("Unknown execution type: " + runType); } - run(isDryRun); } - default void checkStatusAndClean(boolean isDryRun) { - LOGGER.info("CLEAN Indexing job: {}", getName()); - JobStatus status = checkStatus(); - LOGGER.info("Job status: {}", status); - - if (JobStatus.IN_PROGRESS.equals(status)) { - LOGGER.info("Skipping because job is in progress"); - return; - } - clean(isDryRun); + /** Check if the job completed what it was supposed to. */ + static boolean checkStatusAfterRunMatchesExpected( + RunType runType, boolean isDryRun, JobStatus status) { + return isDryRun + || RunType.STATUS.equals(runType) + || (RunType.RUN.equals(runType) && JobStatus.COMPLETE.equals(status)) + || (RunType.CLEAN.equals(runType) && JobStatus.NOT_STARTED.equals(status)); } } diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/Main.java b/indexer/src/main/java/bio/terra/tanagra/indexing/Main.java index 1a89da574..954562973 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/Main.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/Main.java @@ -1,9 +1,11 @@ package bio.terra.tanagra.indexing; +import static bio.terra.tanagra.indexing.Main.Command.INDEX_ALL; import static bio.terra.tanagra.indexing.Main.Command.INDEX_ENTITY; import static bio.terra.tanagra.indexing.Main.Command.INDEX_ENTITY_GROUP; import bio.terra.tanagra.exception.SystemException; +import bio.terra.tanagra.indexing.jobexecutor.JobRunner; import bio.terra.tanagra.utils.FileIO; import bio.terra.tanagra.utils.FileUtils; import java.nio.file.Path; @@ -21,20 +23,7 @@ enum Command { CLEAN_ALL } - /** - * Main entrypoint for running indexing. - * - *

- Expand the user-specified underlay config: - * - *

./gradlew api:index -Dexec.args="EXPAND_CONFIG underlay.json output_dir" - * - *

- Kick off the indexing jobs for a single entity: - * - *

[dry run] ./gradlew api:index -Dexec.args="INDEX_ENTITY output_dir/underlay.json person - * DRY_RUN" - * - *

[actual run] ./gradlew api:index -Dexec.args="INDEX_ENTITY output_dir/underlay.json person" - */ + /** Main entrypoint for running indexing. */ public static void main(String... args) throws Exception { // TODO: Consider using the picocli library for command parsing and packaging this as an actual // CLI. @@ -61,55 +50,65 @@ public static void main(String... args) throws Exception { break; case INDEX_ENTITY: case CLEAN_ENTITY: - boolean isRunEntity = INDEX_ENTITY.equals(cmd); + IndexingJob.RunType runTypeEntity = + INDEX_ENTITY.equals(cmd) ? IndexingJob.RunType.RUN : IndexingJob.RunType.CLEAN; String nameEntity = args[2]; + boolean isAllEntities = "*".equals(nameEntity); boolean isDryRunEntity = isDryRun(3, args); + Indexer.JobExecutor jobExecEntity = getJobExec(4, args); // Index/clean all the entities (*) or just one (entityName). - if ("*".equals(nameEntity)) { - if (isRunEntity) { - indexer.runJobsForAllEntities(isDryRunEntity); - } else { - indexer.cleanAllEntities(isDryRunEntity); - } + JobRunner entityJobRunner; + if (isAllEntities) { + entityJobRunner = + indexer.runJobsForAllEntities(jobExecEntity, isDryRunEntity, runTypeEntity); } else { - if (isRunEntity) { - indexer.runJobsForEntity(nameEntity, isDryRunEntity); - } else { - indexer.cleanEntity(nameEntity, isDryRunEntity); - } + entityJobRunner = + indexer.runJobsForSingleEntity( + jobExecEntity, isDryRunEntity, runTypeEntity, nameEntity); } + entityJobRunner.printJobResultSummary(); + entityJobRunner.throwIfAnyFailures(); break; case INDEX_ENTITY_GROUP: case CLEAN_ENTITY_GROUP: - boolean isRunEntityGroup = INDEX_ENTITY_GROUP.equals(cmd); + IndexingJob.RunType runTypeEntityGroup = + INDEX_ENTITY_GROUP.equals(cmd) ? IndexingJob.RunType.RUN : IndexingJob.RunType.CLEAN; String nameEntityGroup = args[2]; + boolean isAllEntityGroups = "*".equals(nameEntityGroup); boolean isDryRunEntityGroup = isDryRun(3, args); + Indexer.JobExecutor jobExecEntityGroup = getJobExec(4, args); // Index/clean all the entity groups (*) or just one (entityGroupName). - if ("*".equals(nameEntityGroup)) { - if (isRunEntityGroup) { - indexer.runJobsForAllEntityGroups(isDryRunEntityGroup); - } else { - indexer.cleanAllEntityGroups(isDryRunEntityGroup); - } + JobRunner entityGroupJobRunner; + if (isAllEntityGroups) { + entityGroupJobRunner = + indexer.runJobsForAllEntityGroups( + jobExecEntityGroup, isDryRunEntityGroup, runTypeEntityGroup); } else { - if (isRunEntityGroup) { - indexer.runJobsForEntityGroup(nameEntityGroup, isDryRunEntityGroup); - } else { - indexer.cleanEntityGroup(nameEntityGroup, isDryRunEntityGroup); - } + entityGroupJobRunner = + indexer.runJobsForSingleEntityGroup( + jobExecEntityGroup, isDryRunEntityGroup, runTypeEntityGroup, nameEntityGroup); } + entityGroupJobRunner.printJobResultSummary(); + entityGroupJobRunner.throwIfAnyFailures(); break; case INDEX_ALL: - boolean isDryRunIndexAll = isDryRun(2, args); - indexer.runJobsForAllEntities(isDryRunIndexAll); - indexer.runJobsForAllEntityGroups(isDryRunIndexAll); - break; case CLEAN_ALL: - boolean isDryRunCleanAll = isDryRun(2, args); - indexer.cleanAllEntities(isDryRunCleanAll); - indexer.cleanAllEntityGroups(isDryRunCleanAll); + IndexingJob.RunType runTypeAll = + INDEX_ALL.equals(cmd) ? IndexingJob.RunType.RUN : IndexingJob.RunType.CLEAN; + boolean isDryRunAll = isDryRun(2, args); + Indexer.JobExecutor jobExecAll = getJobExec(3, args); + + // Index/clean all the entities and entity groups. + JobRunner entityJobRunnerAll = + indexer.runJobsForAllEntities(jobExecAll, isDryRunAll, runTypeAll); + JobRunner entityGroupJobRunnerAll = + indexer.runJobsForAllEntityGroups(jobExecAll, isDryRunAll, runTypeAll); + entityJobRunnerAll.printJobResultSummary(); + entityGroupJobRunnerAll.printJobResultSummary(); + entityJobRunnerAll.throwIfAnyFailures(); + entityGroupJobRunnerAll.throwIfAnyFailures(); break; default: throw new SystemException("Unknown command: " + cmd); @@ -119,4 +118,10 @@ public static void main(String... args) throws Exception { private static boolean isDryRun(int index, String... args) { return args.length > index && "DRY_RUN".equals(args[index]); } + + private static Indexer.JobExecutor getJobExec(int index, String... args) { + return args.length > index && "SERIAL".equals(args[index]) + ? Indexer.JobExecutor.SERIAL + : Indexer.JobExecutor.PARALLEL; + } } diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/job/BuildNumChildrenAndPaths.java b/indexer/src/main/java/bio/terra/tanagra/indexing/job/BuildNumChildrenAndPaths.java index 346a869c6..3bc3c3a72 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/job/BuildNumChildrenAndPaths.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/job/BuildNumChildrenAndPaths.java @@ -166,7 +166,8 @@ private void writeFieldsToTempTable(boolean isDryRun) { // compute a path to a root node for each node in the hierarchy PCollection> nodePathKVsPC = - PathUtils.computePaths(allNodesPC, childParentRelationshipsPC, DEFAULT_MAX_HIERARCHY_DEPTH); + PathUtils.computePaths( + allNodesPC, childParentRelationshipsPC, sourceHierarchyMapping.getMaxHierarchyDepth()); // count the number of children for each node in the hierarchy PCollection> nodeNumChildrenKVsPC = diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/job/ComputeDisplayHints.java b/indexer/src/main/java/bio/terra/tanagra/indexing/job/ComputeDisplayHints.java index 26a39839a..a733a1ae3 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/job/ComputeDisplayHints.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/job/ComputeDisplayHints.java @@ -197,15 +197,26 @@ private void numericRangeHint( occAllAttrs .apply( Filter.by( - occIdAndTableRow -> occIdAndTableRow.getValue().get(numValColName) != null)) + occIdAndTableRow -> + occIdAndTableRow.getValue().get(numValColName) != null + && !occIdAndTableRow + .getValue() + .get(numValColName) + .toString() + .isEmpty())) .apply( MapElements.into( TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.doubles())) .via( - occIdAndTableRow -> - KV.of( - occIdAndTableRow.getKey(), - (Double) occIdAndTableRow.getValue().get(numValColName)))); + occIdAndTableRow -> { + Double doubleVal; + try { + doubleVal = (Double) occIdAndTableRow.getValue().get(numValColName); + } catch (ClassCastException ccEx) { + doubleVal = Double.MIN_VALUE; + } + return KV.of(occIdAndTableRow.getKey(), doubleVal); + })); // Build key-value pairs: [key] criteriaId, [value] attribute value. final TupleTag criIdTag = new TupleTag<>(); diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/job/WriteAncestorDescendantIdPairs.java b/indexer/src/main/java/bio/terra/tanagra/indexing/job/WriteAncestorDescendantIdPairs.java index 7a865dd17..49b3ed31b 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/job/WriteAncestorDescendantIdPairs.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/job/WriteAncestorDescendantIdPairs.java @@ -10,6 +10,7 @@ import bio.terra.tanagra.query.SQLExpression; import bio.terra.tanagra.query.TablePointer; import bio.terra.tanagra.underlay.Entity; +import bio.terra.tanagra.underlay.HierarchyMapping; import bio.terra.tanagra.underlay.Underlay; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; @@ -66,11 +67,10 @@ public String getName() { @Override public void run(boolean isDryRun) { + HierarchyMapping sourceHierarchyMapping = + getEntity().getHierarchy(hierarchyName).getMapping(Underlay.MappingType.SOURCE); SQLExpression selectChildParentIdPairs = - getEntity() - .getHierarchy(hierarchyName) - .getMapping(Underlay.MappingType.SOURCE) - .queryChildParentPairs(CHILD_COLUMN_NAME, PARENT_COLUMN_NAME); + sourceHierarchyMapping.queryChildParentPairs(CHILD_COLUMN_NAME, PARENT_COLUMN_NAME); String sql = selectChildParentIdPairs.renderSQL(); LOGGER.info("select all child-parent id pairs SQL: {}", sql); @@ -89,7 +89,7 @@ public void run(boolean isDryRun) { TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.longs())) .via(WriteAncestorDescendantIdPairs::relationshipRowToKV)); PCollection> flattenedRelationships = - GraphUtils.transitiveClosure(relationships, DEFAULT_MAX_HIERARCHY_DEPTH) + GraphUtils.transitiveClosure(relationships, sourceHierarchyMapping.getMaxHierarchyDepth()) .apply(Distinct.create()); // There may be duplicate descendants. flattenedRelationships .apply(ParDo.of(new KVToTableRow())) diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/job/beam/GraphUtils.java b/indexer/src/main/java/bio/terra/tanagra/indexing/job/beam/GraphUtils.java index 44f78991c..f369eb0b8 100644 --- a/indexer/src/main/java/bio/terra/tanagra/indexing/job/beam/GraphUtils.java +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/job/beam/GraphUtils.java @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.KvSwap; @@ -70,10 +71,14 @@ public static PCollection> transitiveClosure( int n = 2; while (true) { PCollection> nExactPaths = - concatenate(exactPaths.get(n / 2), exactPaths.get(n / 2), "exactPaths N" + n); + concatenate( + exactPaths.get(n / 2).apply(Distinct.create()), + exactPaths.get(n / 2).apply(Distinct.create()), + "exactPaths N" + n) + .apply(Distinct.create()); exactPaths.put(n, nExactPaths); - PCollection> nMinus1AllPaths = allPaths.get(n - 1); + PCollection> nMinus1AllPaths = allPaths.get(n - 1).apply(Distinct.create()); PCollection> newAllPaths = PCollectionList.of(nMinus1AllPaths) diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobResult.java b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobResult.java new file mode 100644 index 000000000..325228845 --- /dev/null +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobResult.java @@ -0,0 +1,113 @@ +package bio.terra.tanagra.indexing.jobexecutor; + +import static bio.terra.tanagra.indexing.jobexecutor.ParallelRunner.TERMINAL_ANSI_GREEN; +import static bio.terra.tanagra.indexing.jobexecutor.ParallelRunner.TERMINAL_ANSI_RED; +import static bio.terra.tanagra.indexing.jobexecutor.ParallelRunner.TERMINAL_ESCAPE_RESET; + +import bio.terra.tanagra.indexing.IndexingJob; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Output of a thread that runs a single indexing job. */ +public class JobResult { + private static final Logger LOGGER = LoggerFactory.getLogger(JobResult.class); + + private final String jobDescription; + private final String threadName; + + private IndexingJob.JobStatus jobStatus; + private boolean threadTerminatedOnTime; + private boolean jobStatusAsExpected; + private long elapsedTimeNS; + + private boolean exceptionWasThrown; + private String exceptionStackTrace; + private String exceptionMessage; + + public JobResult(String jobDescription, String threadName) { + this.jobDescription = jobDescription; + this.threadName = threadName; + + this.threadTerminatedOnTime = false; + this.jobStatusAsExpected = false; + this.exceptionWasThrown = false; + this.exceptionStackTrace = null; + this.exceptionMessage = null; + } + + @SuppressWarnings("PMD.SystemPrintln") + public void print() { + System.out.println( + String.format( + "%s %s", + jobDescription, + isFailure() + ? (TERMINAL_ANSI_RED + "FAILED" + TERMINAL_ESCAPE_RESET) + : (TERMINAL_ANSI_GREEN + "SUCCESS" + TERMINAL_ESCAPE_RESET))); + System.out.println(String.format(" thread: %s", threadName)); + System.out.println(String.format(" job status: %s", jobStatus)); + System.out.println(String.format(" job status as expected: %s", jobStatusAsExpected)); + System.out.println( + String.format( + " elapsed time (sec): %d", + TimeUnit.MINUTES.convert(elapsedTimeNS, TimeUnit.NANOSECONDS))); + System.out.println(String.format(" thread terminated on time: %s", threadTerminatedOnTime)); + System.out.println(String.format(" exception msg: %s", exceptionMessage)); + System.out.println(String.format(" exception stack trace: %s", exceptionStackTrace)); + } + + /** + * Store the exception message and stack trace for the job results. Don't store the full {@link + * Throwable} object, because that may not always be serializable. This class may be serialized to + * disk as part of writing out the job results, so it needs to be a POJO. + */ + public void saveExceptionThrown(Throwable exceptionThrown) { + exceptionWasThrown = true; + exceptionMessage = + StringUtils.isBlank(exceptionMessage) + ? exceptionThrown.getMessage() + : String.format("%s%n%s", exceptionMessage, exceptionThrown.getMessage()); + + StringWriter stackTraceStr = new StringWriter(); + exceptionThrown.printStackTrace(new PrintWriter(stackTraceStr)); + exceptionStackTrace = stackTraceStr.toString(); + + LOGGER.error("Job thread threw error", exceptionThrown); // print the stack trace to the console + } + + public boolean isFailure() { + return exceptionWasThrown || !threadTerminatedOnTime || !jobStatusAsExpected; + } + + public String getJobDescription() { + return jobDescription; + } + + public IndexingJob.JobStatus getJobStatus() { + return jobStatus; + } + + public void setJobStatus(IndexingJob.JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + public void setThreadTerminatedOnTime(boolean threadTerminatedOnTime) { + this.threadTerminatedOnTime = threadTerminatedOnTime; + } + + public void setJobStatusAsExpected(boolean jobStatusAsExpected) { + this.jobStatusAsExpected = jobStatusAsExpected; + } + + public void setElapsedTimeNS(long elapsedTimeNS) { + this.elapsedTimeNS = elapsedTimeNS; + } + + public void setExceptionWasThrown(boolean exceptionWasThrown) { + this.exceptionWasThrown = exceptionWasThrown; + } +} diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobRunner.java b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobRunner.java new file mode 100644 index 000000000..7c0fbb05e --- /dev/null +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobRunner.java @@ -0,0 +1,61 @@ +package bio.terra.tanagra.indexing.jobexecutor; + +import bio.terra.tanagra.exception.SystemException; +import bio.terra.tanagra.indexing.IndexingJob; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public abstract class JobRunner { + protected static final long MAX_TIME_PER_JOB_MIN = 60; + protected static final long MAX_TIME_PER_JOB_DRY_RUN_MIN = 5; + + public static final String TERMINAL_ESCAPE_RESET = "\u001B[0m"; + public static final String TERMINAL_ANSI_PURPLE = "\u001B[35m"; + public static final String TERMINAL_ANSI_GREEN = "\u001b[32m"; + public static final String TERMINAL_ANSI_RED = "\u001b[31m"; + + protected final List jobSets; + protected final boolean isDryRun; + protected final IndexingJob.RunType runType; + protected final List jobResults; + + public JobRunner(List jobSets, boolean isDryRun, IndexingJob.RunType runType) { + this.jobSets = jobSets; + this.isDryRun = isDryRun; + this.runType = runType; + this.jobResults = new ArrayList<>(); + } + + /** Name for display only. */ + protected abstract String getName(); + + /** Run all job sets. */ + public abstract void runJobSets(); + + /** Run a single job set. */ + protected abstract void runSingleJobSet(SequencedJobSet sequencedJobSet) + throws InterruptedException, ExecutionException; + + /** Pretty print the job results to the terminal. */ + @SuppressWarnings("PMD.SystemPrintln") + public void printJobResultSummary() { + System.out.println(System.lineSeparator()); + System.out.println( + TERMINAL_ANSI_PURPLE + "Indexing job summary (" + getName() + ")" + TERMINAL_ESCAPE_RESET); + jobResults.stream() + .sorted(Comparator.comparing(JobResult::getJobDescription, String.CASE_INSENSITIVE_ORDER)) + .forEach(JobResult::print); + } + + public void throwIfAnyFailures() { + jobResults.stream() + .forEach( + jobResult -> { + if (jobResult.isFailure()) { + throw new SystemException("There were job failures"); + } + }); + } +} diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobThread.java b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobThread.java new file mode 100644 index 000000000..b1df69267 --- /dev/null +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/JobThread.java @@ -0,0 +1,42 @@ +package bio.terra.tanagra.indexing.jobexecutor; + +import bio.terra.tanagra.indexing.IndexingJob; +import java.util.concurrent.Callable; + +/** Thread that runs a single indexing job and outputs an instance of the result class. */ +public class JobThread implements Callable { + private final IndexingJob indexingJob; + private final boolean isDryRun; + private final IndexingJob.RunType runType; + private final String jobDescription; + + public JobThread( + IndexingJob indexingJob, + boolean isDryRun, + IndexingJob.RunType runType, + String jobDescription) { + this.indexingJob = indexingJob; + this.isDryRun = isDryRun; + this.runType = runType; + this.jobDescription = jobDescription; + } + + @Override + public JobResult call() { + JobResult result = new JobResult(jobDescription, Thread.currentThread().getName()); + + long startTime = System.nanoTime(); + try { + IndexingJob.JobStatus status = indexingJob.execute(runType, isDryRun); + result.setJobStatus(status); + result.setJobStatusAsExpected( + IndexingJob.checkStatusAfterRunMatchesExpected(runType, isDryRun, status)); + result.setExceptionWasThrown(false); + } catch (Throwable ex) { + result.saveExceptionThrown(ex); + } + result.setElapsedTimeNS(System.nanoTime() - startTime); + + return result; + } +} diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/ParallelRunner.java b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/ParallelRunner.java new file mode 100644 index 000000000..f764dee56 --- /dev/null +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/ParallelRunner.java @@ -0,0 +1,117 @@ +package bio.terra.tanagra.indexing.jobexecutor; + +import bio.terra.tanagra.exception.SystemException; +import bio.terra.tanagra.indexing.IndexingJob; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility class that runs multiple job sets in parallel. */ +public final class ParallelRunner extends JobRunner { + private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunner.class); + private static final long MAX_TIME_FOR_SHUTDOWN_SEC = 60; + + public ParallelRunner( + List jobSets, boolean isDryRun, IndexingJob.RunType runType) { + super(jobSets, isDryRun, runType); + } + + @Override + protected String getName() { + return "PARALLEL"; + } + + /** Run all job sets in parallel. */ + @Override + public void runJobSets() { + // Create a thread pool to run the job set. + ThreadPoolExecutor threadPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(jobSets.size()); + LOGGER.info("Running job sets in parallel: {} sets", jobSets.size()); + + // Kick off each job set in a separate thread. + jobSets.forEach( + jobSet -> + threadPool.submit( + () -> { + try { + runSingleJobSet(jobSet); + } catch (InterruptedException | ExecutionException ex) { + throw new SystemException("Job set execution failed", ex); + } + })); + + try { + LOGGER.info("Waiting for job set to to finish"); + shutdownThreadPool(threadPool); + } catch (InterruptedException intEx) { + LOGGER.error("Error running jobs in parallel. Try running in serial."); + throw new SystemException("Error running jobs in parallel", intEx); + } + } + + /** Run a single job set. Run the stages serially, and the jobs within each stage in parallel. */ + @Override + protected void runSingleJobSet(SequencedJobSet sequencedJobSet) + throws InterruptedException, ExecutionException { + // Iterate through the job stages, running all jobs in each stage. + Iterator> jobStagesIterator = sequencedJobSet.iterator(); + while (jobStagesIterator.hasNext()) { + List jobsInStage = jobStagesIterator.next(); + + // Create a thread pool for each stage, one thread per job. + ThreadPoolExecutor threadPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(jobsInStage.size()); + LOGGER.info("New job stage: {} jobs", jobsInStage.size()); + + // Kick off one thread per job. + List> jobFutures = new ArrayList<>(); + for (IndexingJob job : jobsInStage) { + LOGGER.info("Kicking off thread for job set: {}", job.getName()); + Future jobFuture = + threadPool.submit(new JobThread(job, isDryRun, runType, job.getName())); + jobFutures.add(jobFuture); + } + + // Wait for all jobs in this stage to finish, before moving to the next stage. + LOGGER.info("Waiting for jobs in stage to finish"); + shutdownThreadPool(threadPool); + + // Compile the results. + for (Future jobFuture : jobFutures) { + JobResult jobResult = jobFuture.get(); + jobResult.setThreadTerminatedOnTime(true); + jobResults.add(jobResult); + } + + LOGGER.info("Stage complete"); + } + } + + /** + * Tell a thread pool to stop accepting new jobs, wait for the existing jobs to finish. If the + * jobs time out, then interrupt the threads and force them to terminate. + */ + private void shutdownThreadPool(ThreadPoolExecutor threadPool) throws InterruptedException { + // Wait for all threads to finish. + threadPool.shutdown(); + boolean terminatedByItself = + threadPool.awaitTermination( + isDryRun ? MAX_TIME_PER_JOB_DRY_RUN_MIN : MAX_TIME_PER_JOB_MIN, TimeUnit.MINUTES); + + // If the threads didn't finish in the expected time, then send them interrupts. + if (!terminatedByItself) { + threadPool.shutdownNow(); + } + if (!threadPool.awaitTermination(MAX_TIME_FOR_SHUTDOWN_SEC, TimeUnit.SECONDS)) { + LOGGER.error("All threads in the pool failed to terminate"); + } + } +} diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SequencedJobSet.java b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SequencedJobSet.java new file mode 100644 index 000000000..9ec61cbb4 --- /dev/null +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SequencedJobSet.java @@ -0,0 +1,40 @@ +package bio.terra.tanagra.indexing.jobexecutor; + +import bio.terra.tanagra.indexing.IndexingJob; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Container for a set of jobs and the sequence they must be run in. The stages must be run + * serially. The jobs within each stage can be run in parallel. + */ +public class SequencedJobSet { + private final List> stages; + private final String description; + + public SequencedJobSet(String description) { + this.stages = new ArrayList<>(); + this.description = description; + } + + public void startNewStage() { + stages.add(new ArrayList<>()); + } + + public void addJob(IndexingJob job) { + stages.get(stages.size() - 1).add(job); + } + + public Iterator> iterator() { + return stages.iterator(); + } + + public int getNumStages() { + return stages.size(); + } + + public String getDescription() { + return description; + } +} diff --git a/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SerialRunner.java b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SerialRunner.java new file mode 100644 index 000000000..c272529d3 --- /dev/null +++ b/indexer/src/main/java/bio/terra/tanagra/indexing/jobexecutor/SerialRunner.java @@ -0,0 +1,49 @@ +package bio.terra.tanagra.indexing.jobexecutor; + +import bio.terra.tanagra.exception.SystemException; +import bio.terra.tanagra.indexing.IndexingJob; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** Utility class that runs multiple job sets in serial. */ +public final class SerialRunner extends JobRunner { + public SerialRunner( + List jobSets, boolean isDryRun, IndexingJob.RunType runType) { + super(jobSets, isDryRun, runType); + } + + @Override + protected String getName() { + return "SERIAL"; + } + + /** Run all job sets serially. */ + @Override + public void runJobSets() { + jobSets.forEach( + jobSet -> { + try { + runSingleJobSet(jobSet); + } catch (InterruptedException | ExecutionException ex) { + throw new SystemException("Job set execution failed", ex); + } + }); + } + + /** Run a single job set. Run the stages serially, and the jobs within each stage serially. */ + @Override + protected void runSingleJobSet(SequencedJobSet sequencedJobSet) + throws InterruptedException, ExecutionException { + // Iterate through the job stages, running all jobs in each stage. + Iterator> jobStagesIterator = sequencedJobSet.iterator(); + while (jobStagesIterator.hasNext()) { + List jobsInStage = jobStagesIterator.next(); + for (IndexingJob job : jobsInStage) { + JobResult jobResult = new JobThread(job, isDryRun, runType, job.getName()).call(); + jobResult.setThreadTerminatedOnTime(true); + jobResults.add(jobResult); + } + } + } +} diff --git a/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityGroupTest.java b/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityGroupTest.java index 09cab1ebb..8c6a4ba5d 100644 --- a/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityGroupTest.java +++ b/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityGroupTest.java @@ -3,7 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import bio.terra.tanagra.indexing.Indexer; -import bio.terra.tanagra.indexing.IndexingJob; +import bio.terra.tanagra.indexing.jobexecutor.SequencedJobSet; import bio.terra.tanagra.underlay.DataPointer; import bio.terra.tanagra.underlay.Entity; import bio.terra.tanagra.underlay.EntityGroup; @@ -11,7 +11,6 @@ import bio.terra.tanagra.utils.FileIO; import java.io.IOException; import java.nio.file.Path; -import java.util.List; import java.util.Map; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -35,10 +34,10 @@ static void readDataPointers() throws IOException { void oneToMany() throws IOException { EntityGroup brandIngredient = EntityGroup.fromJSON("BrandIngredient.json", dataPointers, entities, primaryEntityName); - List jobs = Indexer.getJobsForEntityGroup(brandIngredient); + SequencedJobSet jobs = Indexer.getJobSetForEntityGroup(brandIngredient); // copy relationship id pairs - assertEquals(1, jobs.size()); + assertEquals(1, jobs.getNumStages()); } @Test @@ -46,11 +45,11 @@ void criteriaOccurrenceWithHierarchy() throws IOException { EntityGroup conditionPersonOccurrence = EntityGroup.fromJSON( "ConditionPersonOccurrence.json", dataPointers, entities, primaryEntityName); - List jobs = Indexer.getJobsForEntityGroup(conditionPersonOccurrence); + SequencedJobSet jobs = Indexer.getJobSetForEntityGroup(conditionPersonOccurrence); // copy relationship id pairs (x3 relationships) // compute rollup counts (x2 relationships) // compute rollup counts with hierarchy (x2 relationships) - assertEquals(7, jobs.size()); + assertEquals(7, jobs.iterator().next().size()); } } diff --git a/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityTest.java b/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityTest.java index 0c5527a62..6c13c704a 100644 --- a/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityTest.java +++ b/indexer/src/test/java/bio/terra/tanagra/indexing/command/IndexEntityTest.java @@ -12,12 +12,14 @@ import bio.terra.tanagra.indexing.job.DenormalizeEntityInstances; import bio.terra.tanagra.indexing.job.WriteAncestorDescendantIdPairs; import bio.terra.tanagra.indexing.job.WriteParentChildIdPairs; +import bio.terra.tanagra.indexing.jobexecutor.SequencedJobSet; import bio.terra.tanagra.underlay.DataPointer; import bio.terra.tanagra.underlay.Entity; import bio.terra.tanagra.underlay.Underlay; import bio.terra.tanagra.utils.FileIO; import java.io.IOException; import java.nio.file.Path; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -38,14 +40,15 @@ static void readDataPointers() throws IOException { @Test void person() throws IOException { Entity person = Entity.fromJSON("Person.json", dataPointers); - List jobs = Indexer.getJobsForEntity(person); + SequencedJobSet jobs = Indexer.getJobSetForEntity(person); - assertEquals(2, jobs.size(), "two indexing jobs generated"); - IndexingJob job = jobs.get(0); + assertEquals(2, jobs.getNumStages(), "two indexing job stages generated"); + Iterator> jobStageItr = jobs.iterator(); + IndexingJob job = jobStageItr.next().get(0); assertEquals( CreateEntityTable.class, job.getClass(), "CreateEntityTable indexing job generated"); - job = jobs.get(1); + job = jobStageItr.next().get(0); assertEquals( DenormalizeEntityInstances.class, job.getClass(), @@ -58,36 +61,33 @@ void person() throws IOException { @Test void condition() throws IOException { Entity condition = Entity.fromJSON("Condition.json", dataPointers); - List jobs = Indexer.getJobsForEntity(condition); + SequencedJobSet jobs = Indexer.getJobSetForEntity(condition); - assertEquals(6, jobs.size(), "six indexing jobs generated"); - - Optional createEntityTable = - jobs.stream().filter(job -> job.getClass().equals(CreateEntityTable.class)).findFirst(); - assertTrue(createEntityTable.isPresent(), "CreateEntityTable indexing job generated"); + assertEquals(3, jobs.getNumStages(), "three indexing job stages generated"); + Iterator> jobStageItr = jobs.iterator(); + IndexingJob job = jobStageItr.next().get(0); + assertEquals( + CreateEntityTable.class, job.getClass(), "CreateEntityTable indexing job generated"); - Optional denormalizeEntityInstances = - jobs.stream() - .filter(job -> job.getClass().equals(DenormalizeEntityInstances.class)) - .findFirst(); - assertTrue( - denormalizeEntityInstances.isPresent(), + job = jobStageItr.next().get(0); + assertEquals( + DenormalizeEntityInstances.class, + job.getClass(), "DenormalizeEntityInstances indexing job generated"); assertEquals( "broad-tanagra-dev:aou_synthetic_SR2019q4r4_indexes.condition", - ((BigQueryIndexingJob) denormalizeEntityInstances.get()) - .getEntityIndexTable() - .getPathForIndexing()); + ((BigQueryIndexingJob) job).getEntityIndexTable().getPathForIndexing()); + List jobStage = jobStageItr.next(); Optional buildTextSearchStrings = - jobs.stream() - .filter(job -> job.getClass().equals(BuildTextSearchStrings.class)) + jobStage.stream() + .filter(jobInStage -> jobInStage.getClass().equals(BuildTextSearchStrings.class)) .findFirst(); assertTrue(buildTextSearchStrings.isPresent(), "BuildTextSearchStrings indexing job generated"); Optional writeParentChildIdPairs = - jobs.stream() - .filter(job -> job.getClass().equals(WriteParentChildIdPairs.class)) + jobStage.stream() + .filter(jobInStage -> jobInStage.getClass().equals(WriteParentChildIdPairs.class)) .findFirst(); assertTrue( writeParentChildIdPairs.isPresent(), "WriteParentChildIdPairs indexing job generated"); @@ -98,8 +98,9 @@ void condition() throws IOException { .getPathForIndexing()); Optional writeAncestorDescendantIdPairs = - jobs.stream() - .filter(job -> job.getClass().equals(WriteAncestorDescendantIdPairs.class)) + jobStage.stream() + .filter( + jobInStage -> jobInStage.getClass().equals(WriteAncestorDescendantIdPairs.class)) .findFirst(); assertTrue( writeAncestorDescendantIdPairs.isPresent(), @@ -111,8 +112,8 @@ void condition() throws IOException { .getPathForIndexing()); Optional buildNumChildrenAndPaths = - jobs.stream() - .filter(job -> job.getClass().equals(BuildNumChildrenAndPaths.class)) + jobStage.stream() + .filter(jobInStage -> jobInStage.getClass().equals(BuildNumChildrenAndPaths.class)) .findFirst(); assertTrue( buildNumChildrenAndPaths.isPresent(), "BuildNumChildrenAndPaths indexing job generated"); diff --git a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/condition.json b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/condition.json index 7d1132c8e..57fe0e80e 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/condition.json +++ b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/condition.json @@ -201,7 +201,8 @@ "column" : "concept_id" } } - } + }, + "maxHierarchyDepth" : 20 } } }, @@ -291,7 +292,8 @@ "column" : "id" } } - } + }, + "maxHierarchyDepth" : 20 } } } diff --git a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/ingredient.json b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/ingredient.json index 9fcd21dab..3f11bc8d4 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/ingredient.json +++ b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/ingredient.json @@ -189,7 +189,8 @@ "column" : "concept_id" } } - } + }, + "maxHierarchyDepth" : 8 } } }, @@ -279,7 +280,8 @@ "column" : "id" } } - } + }, + "maxHierarchyDepth" : 8 } } } diff --git a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/measurement.json b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/measurement.json index b7753373f..5e230bc9e 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/measurement.json +++ b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/measurement.json @@ -269,7 +269,8 @@ "column" : "concept_id" } } - } + }, + "maxHierarchyDepth" : 11 } } }, @@ -359,7 +360,8 @@ "column" : "id" } } - } + }, + "maxHierarchyDepth" : 11 } } } diff --git a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/procedure.json b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/procedure.json index a80ec8e81..f84a13948 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/procedure.json +++ b/service/src/main/resources/config/broad/aou_synthetic/expanded/entity/procedure.json @@ -217,7 +217,8 @@ "column" : "concept_id" } } - } + }, + "maxHierarchyDepth" : 19 } } }, @@ -307,7 +308,8 @@ "column" : "id" } } - } + }, + "maxHierarchyDepth" : 19 } } } diff --git a/service/src/main/resources/config/broad/aou_synthetic/original/entity/condition.json b/service/src/main/resources/config/broad/aou_synthetic/original/entity/condition.json index 12780768b..be324f139 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/original/entity/condition.json +++ b/service/src/main/resources/config/broad/aou_synthetic/original/entity/condition.json @@ -45,7 +45,8 @@ "fieldPointers": { "id": { "column": "concept_id" } } - } + }, + "maxHierarchyDepth": 20 } } }, diff --git a/service/src/main/resources/config/broad/aou_synthetic/original/entity/ingredient.json b/service/src/main/resources/config/broad/aou_synthetic/original/entity/ingredient.json index f68cecb6e..3fec0026a 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/original/entity/ingredient.json +++ b/service/src/main/resources/config/broad/aou_synthetic/original/entity/ingredient.json @@ -52,7 +52,8 @@ "fieldPointers": { "id": { "column": "concept_id" } } - } + }, + "maxHierarchyDepth": 8 } } }, diff --git a/service/src/main/resources/config/broad/aou_synthetic/original/entity/measurement.json b/service/src/main/resources/config/broad/aou_synthetic/original/entity/measurement.json index a0acd29c7..43ca17fca 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/original/entity/measurement.json +++ b/service/src/main/resources/config/broad/aou_synthetic/original/entity/measurement.json @@ -103,7 +103,8 @@ "fieldPointers": { "id": { "column": "concept_id" } } - } + }, + "maxHierarchyDepth": 11 } } }, diff --git a/service/src/main/resources/config/broad/aou_synthetic/original/entity/procedure.json b/service/src/main/resources/config/broad/aou_synthetic/original/entity/procedure.json index 706afc538..6b60bb777 100644 --- a/service/src/main/resources/config/broad/aou_synthetic/original/entity/procedure.json +++ b/service/src/main/resources/config/broad/aou_synthetic/original/entity/procedure.json @@ -45,7 +45,8 @@ "fieldPointers": { "id": { "column": "concept_id" } } - } + }, + "maxHierarchyDepth": 19 } } }, diff --git a/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/condition.json b/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/condition.json index b79948370..0df28a0b3 100644 --- a/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/condition.json +++ b/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/condition.json @@ -165,7 +165,8 @@ "column" : "concept_id" } } - } + }, + "maxHierarchyDepth" : 20 } } }, @@ -255,7 +256,8 @@ "column" : "id" } } - } + }, + "maxHierarchyDepth" : 20 } } } diff --git a/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/ingredient.json b/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/ingredient.json index 439e1c261..14914084e 100644 --- a/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/ingredient.json +++ b/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/ingredient.json @@ -177,7 +177,8 @@ "column" : "concept_id" } } - } + }, + "maxHierarchyDepth" : 8 } } }, @@ -267,7 +268,8 @@ "column" : "id" } } - } + }, + "maxHierarchyDepth" : 8 } } } diff --git a/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/procedure.json b/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/procedure.json index 87de26237..9273ffb09 100644 --- a/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/procedure.json +++ b/service/src/main/resources/config/broad/cms_synpuf/expanded/entity/procedure.json @@ -189,7 +189,8 @@ "column" : "concept_id" } } - } + }, + "maxHierarchyDepth" : 19 } } }, @@ -279,7 +280,8 @@ "column" : "id" } } - } + }, + "maxHierarchyDepth" : 19 } } } diff --git a/service/src/main/resources/config/broad/cms_synpuf/original/entity/condition.json b/service/src/main/resources/config/broad/cms_synpuf/original/entity/condition.json index 6ae8e62b7..fb68349c7 100644 --- a/service/src/main/resources/config/broad/cms_synpuf/original/entity/condition.json +++ b/service/src/main/resources/config/broad/cms_synpuf/original/entity/condition.json @@ -45,7 +45,8 @@ "fieldPointers": { "id": { "column": "concept_id" } } - } + }, + "maxHierarchyDepth": 20 } } }, diff --git a/service/src/main/resources/config/broad/cms_synpuf/original/entity/ingredient.json b/service/src/main/resources/config/broad/cms_synpuf/original/entity/ingredient.json index 8046ad5fb..9c1f1acfc 100644 --- a/service/src/main/resources/config/broad/cms_synpuf/original/entity/ingredient.json +++ b/service/src/main/resources/config/broad/cms_synpuf/original/entity/ingredient.json @@ -52,7 +52,8 @@ "fieldPointers": { "id": { "column": "concept_id" } } - } + }, + "maxHierarchyDepth": 8 } } }, diff --git a/service/src/main/resources/config/broad/cms_synpuf/original/entity/procedure.json b/service/src/main/resources/config/broad/cms_synpuf/original/entity/procedure.json index fe3cd4a7a..6a48583c8 100644 --- a/service/src/main/resources/config/broad/cms_synpuf/original/entity/procedure.json +++ b/service/src/main/resources/config/broad/cms_synpuf/original/entity/procedure.json @@ -45,7 +45,8 @@ "fieldPointers": { "id": { "column": "concept_id" } } - } + }, + "maxHierarchyDepth": 19 } } }, diff --git a/service/src/main/resources/config/vumc/sdd/original/sql/brand_textSearch.sql b/service/src/main/resources/config/vumc/sdd/original/sql/brand_textSearch.sql index dab3034a8..52f444b8b 100644 --- a/service/src/main/resources/config/vumc/sdd/original/sql/brand_textSearch.sql +++ b/service/src/main/resources/config/vumc/sdd/original/sql/brand_textSearch.sql @@ -22,5 +22,4 @@ ON c.concept_id = textsearch.id WHERE c.domain_id = 'Drug' AND c.concept_class_id = 'Brand Name' -AND c.invalid_reason IS NULL AND c.vocabulary_id IN ('RxNorm', 'RxNorm Extension') \ No newline at end of file diff --git a/underlay/src/main/java/bio/terra/tanagra/serialization/UFHierarchyMapping.java b/underlay/src/main/java/bio/terra/tanagra/serialization/UFHierarchyMapping.java index b318a205e..85aa0b693 100644 --- a/underlay/src/main/java/bio/terra/tanagra/serialization/UFHierarchyMapping.java +++ b/underlay/src/main/java/bio/terra/tanagra/serialization/UFHierarchyMapping.java @@ -17,6 +17,7 @@ public class UFHierarchyMapping { private final UFAuxiliaryDataMapping rootNodesFilter; private final UFAuxiliaryDataMapping ancestorDescendant; private final UFAuxiliaryDataMapping pathNumChildren; + private final int maxHierarchyDepth; public UFHierarchyMapping(HierarchyMapping hierarchyMapping) { this.childParent = new UFAuxiliaryDataMapping(hierarchyMapping.getChildParent()); @@ -32,6 +33,7 @@ public UFHierarchyMapping(HierarchyMapping hierarchyMapping) { hierarchyMapping.hasPathNumChildren() ? new UFAuxiliaryDataMapping(hierarchyMapping.getPathNumChildren()) : null; + this.maxHierarchyDepth = hierarchyMapping.getMaxHierarchyDepth(); } private UFHierarchyMapping(Builder builder) { @@ -39,6 +41,7 @@ private UFHierarchyMapping(Builder builder) { this.rootNodesFilter = builder.rootNodesFilter; this.ancestorDescendant = builder.ancestorDescendant; this.pathNumChildren = builder.pathNumChildren; + this.maxHierarchyDepth = builder.maxHierarchyDepth; } @JsonPOJOBuilder(buildMethodName = "build", withPrefix = "") @@ -47,6 +50,7 @@ public static class Builder { private UFAuxiliaryDataMapping rootNodesFilter; private UFAuxiliaryDataMapping ancestorDescendant; private UFAuxiliaryDataMapping pathNumChildren; + private int maxHierarchyDepth; public Builder childParent(UFAuxiliaryDataMapping childParent) { this.childParent = childParent; @@ -68,6 +72,11 @@ public Builder pathNumChildren(UFAuxiliaryDataMapping pathNumChildren) { return this; } + public Builder maxHierarchyDepth(int maxHierarchyDepth) { + this.maxHierarchyDepth = maxHierarchyDepth; + return this; + } + /** Call the private constructor. */ public UFHierarchyMapping build() { return new UFHierarchyMapping(this); @@ -89,4 +98,8 @@ public UFAuxiliaryDataMapping getAncestorDescendant() { public UFAuxiliaryDataMapping getPathNumChildren() { return pathNumChildren; } + + public int getMaxHierarchyDepth() { + return maxHierarchyDepth; + } } diff --git a/underlay/src/main/java/bio/terra/tanagra/underlay/Entity.java b/underlay/src/main/java/bio/terra/tanagra/underlay/Entity.java index e08f5faa3..05f3abe4e 100644 --- a/underlay/src/main/java/bio/terra/tanagra/underlay/Entity.java +++ b/underlay/src/main/java/bio/terra/tanagra/underlay/Entity.java @@ -208,7 +208,8 @@ private static Map deserializeHierarchies( ? HierarchyMapping.defaultIndexMapping( serialized.getName(), sourceHierarchyMappingSerialized.getKey(), - idAttribute.getValue()) + idAttribute.getValue(), + sourceHierarchyMappingSerialized.getValue().getMaxHierarchyDepth()) : HierarchyMapping.fromSerialized( indexHierarchyMappingsSerialized.get( sourceHierarchyMappingSerialized.getKey()), @@ -294,6 +295,10 @@ public Relationship getRelationship(Entity relatedEntity) { .get(); } + public Underlay getUnderlay() { + return underlay; + } + public EntityMapping getMapping(Underlay.MappingType mappingType) { return Underlay.MappingType.SOURCE.equals(mappingType) ? sourceDataMapping : indexDataMapping; } diff --git a/underlay/src/main/java/bio/terra/tanagra/underlay/HierarchyMapping.java b/underlay/src/main/java/bio/terra/tanagra/underlay/HierarchyMapping.java index 90fd9afea..c7425b4f8 100644 --- a/underlay/src/main/java/bio/terra/tanagra/underlay/HierarchyMapping.java +++ b/underlay/src/main/java/bio/terra/tanagra/underlay/HierarchyMapping.java @@ -14,6 +14,10 @@ import java.util.stream.Collectors; public final class HierarchyMapping { + // The maximum depth of ancestors present in a hierarchy. This may be larger + // than the actual max depth, but if it is smaller the resulting table will be incomplete. + private static final int DEFAULT_MAX_HIERARCHY_DEPTH = 64; + private static final String ID_FIELD_NAME = "id"; public static final String CHILD_FIELD_NAME = "child"; public static final String PARENT_FIELD_NAME = "parent"; @@ -37,6 +41,7 @@ public final class HierarchyMapping { private final AuxiliaryDataMapping rootNodesFilter; private final AuxiliaryDataMapping ancestorDescendant; private final AuxiliaryDataMapping pathNumChildren; + private final int maxHierarchyDepth; private final Underlay.MappingType mappingType; private Hierarchy hierarchy; @@ -45,11 +50,13 @@ private HierarchyMapping( AuxiliaryDataMapping rootNodesFilter, AuxiliaryDataMapping ancestorDescendant, AuxiliaryDataMapping pathNumChildren, + int maxHierarchyDepth, Underlay.MappingType mappingType) { this.childParent = childParent; this.rootNodesFilter = rootNodesFilter; this.ancestorDescendant = ancestorDescendant; this.pathNumChildren = pathNumChildren; + this.maxHierarchyDepth = maxHierarchyDepth; this.mappingType = mappingType; } @@ -83,11 +90,16 @@ public static HierarchyMapping fromSerialized( : AuxiliaryDataMapping.fromSerialized( serialized.getPathNumChildren(), dataPointer, PATH_NUM_CHILDREN_AUXILIARY_DATA); return new HierarchyMapping( - childParent, rootNodesFilter, ancestorDescendant, pathNumChildren, mappingType); + childParent, + rootNodesFilter, + ancestorDescendant, + pathNumChildren, + serialized.getMaxHierarchyDepth(), + mappingType); } public static HierarchyMapping defaultIndexMapping( - String entityName, String hierarchyName, FieldPointer entityIdField) { + String entityName, String hierarchyName, FieldPointer entityIdField, int maxHierarchyDepth) { TablePointer entityTable = entityIdField.getTablePointer(); DataPointer dataPointer = entityTable.getDataPointer(); String tablePrefix = entityName + "_" + hierarchyName + "_"; @@ -124,7 +136,12 @@ public static HierarchyMapping defaultIndexMapping( }))); return new HierarchyMapping( - childParent, null, ancestorDescendant, pathNumChildren, Underlay.MappingType.INDEX); + childParent, + null, + ancestorDescendant, + pathNumChildren, + maxHierarchyDepth, + Underlay.MappingType.INDEX); } public SQLExpression queryChildParentPairs(String childFieldAlias, String parentFieldAlias) { @@ -265,4 +282,8 @@ public boolean hasPathNumChildren() { public AuxiliaryDataMapping getPathNumChildren() { return pathNumChildren; } + + public int getMaxHierarchyDepth() { + return maxHierarchyDepth <= 0 ? DEFAULT_MAX_HIERARCHY_DEPTH : maxHierarchyDepth; + } }