diff --git a/.Rbuildignore b/.Rbuildignore index 53275cd7..5dc3c765 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -1,3 +1,5 @@ +^renv$ +^renv\.lock$ pom.xml extras docs diff --git a/NEWS.md b/NEWS.md index 0fd1669a..bd008137 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,15 @@ +FeatureExtraction 3.2.1 +======================= + +New Features: + +- Added ability to store aggregate results from `getDbDefaultCovariateData` in the database and added +ability to control all target tables with new `targetTables` list parameter + +Bugfixes: + +- Fixed tests and made sure storage of covariates with `getDbDefaultCovariateData` works and is consistent + FeatureExtraction 3.2.0 ======================= diff --git a/R/GetDefaultCovariates.R b/R/GetDefaultCovariates.R index 12ca90af..170aaecd 100644 --- a/R/GetDefaultCovariates.R +++ b/R/GetDefaultCovariates.R @@ -24,14 +24,25 @@ #' @param covariateSettings Either an object of type \code{covariateSettings} as created using one #' of the createCovariate functions, or a list of such objects. #' @param targetDatabaseSchema (Optional) The name of the database schema where the resulting covariates -#' should be stored. +#' should be stored. If not provided, results will be fetched to R. +#' @param targetTables (Optional) list of mappings for table names. +#' The names of the table where the resulting covariates will be if +#' \code{targetDatabaseSchema} is specified. The tables will be created in permanent +#' table in the \code{targetDatabaseSchema} or as temporary tables. Tables that can be +#' included in this list: covariates, covariateRef, analysisRef, covariatesContinuous, +#' timeRef #' @param targetCovariateTable (Optional) The name of the table where the resulting covariates will -#' be stored. If not provided, results will be fetched to R. The table can be -#' a permanent table in the \code{targetDatabaseSchema} or a temp table. If +#' be stored. If not provided, results will be fetched to R. The table can be +#' a permanent table in the \code{targetDatabaseSchema} or a temp table. If #' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' Superseded by \code{targetTables} #' @param targetCovariateRefTable (Optional) The name of the table where the covariate reference will be stored. +#' Superseded by \code{targetTables} #' @param targetAnalysisRefTable (Optional) The name of the table where the analysis reference will be stored. -#' +#' Superseded by \code{targetTables} +#' @param dropTableIfExists If targetDatabaseSchema, drop any existing tables. Otherwise, results are merged +#' into existing table data. Overides createTable. +#' @param createTable Run sql to create table? Code does not check if table exists. #' @template GetCovarParams #' #' @export @@ -43,10 +54,17 @@ getDbDefaultCovariateData <- function(connection, cdmVersion = "5", rowIdField = "subject_id", covariateSettings, - targetDatabaseSchema, - targetCovariateTable, - targetCovariateRefTable, - targetAnalysisRefTable, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTables = list( + covariates = targetCovariateTable, + covariateRef = targetCovariateRefTable, + analysisRef = targetAnalysisRefTable + ), + dropTableIfExists = FALSE, + createTable = TRUE, aggregated = FALSE) { if (!is(covariateSettings, "covariateSettings")) { stop("Covariate settings object not of type covariateSettings") @@ -54,10 +72,7 @@ getDbDefaultCovariateData <- function(connection, if (cdmVersion == "4") { stop("Common Data Model version 4 is not supported") } - if (!missing(targetCovariateTable) && !is.null(targetCovariateTable) && aggregated) { - stop("Writing aggregated results to database is currently not supported") - } - + settings <- .toJson(covariateSettings) rJava::J("org.ohdsi.featureExtraction.FeatureExtraction")$init(system.file("", package = "FeatureExtraction")) json <- rJava::J("org.ohdsi.featureExtraction.FeatureExtraction")$createSql(settings, aggregated, cohortTable, rowIdField, rJava::.jarray(as.character(cohortId)), cdmDatabaseSchema) @@ -74,123 +89,126 @@ getDbDefaultCovariateData <- function(connection, oracleTempSchema = oracleTempSchema) } } - + ParallelLogger::logInfo("Constructing features on server") - + sql <- SqlRender::translate(sql = todo$sqlConstruction, targetDialect = attr(connection, "dbms"), oracleTempSchema = oracleTempSchema) profile <- (!is.null(getOption("dbProfile")) && getOption("dbProfile") == TRUE) DatabaseConnector::executeSql(connection, sql, profile = profile) - - if (missing(targetCovariateTable) || is.null(targetCovariateTable)) { - ParallelLogger::logInfo("Fetching data from server") - start <- Sys.time() - # Binary or non-aggregated features + # Is the target schema missing or are all the specified tables temp + allTempTables <- all(substr(targetTables,1,1) == "#") + if ((missing(targetDatabaseSchema) | is.null(targetDatabaseSchema)) & !allTempTables) { + # Save to Andromeda covariateData <- Andromeda::andromeda() - if (!is.null(todo$sqlQueryFeatures)) { - sql <- SqlRender::translate(sql = todo$sqlQueryFeatures, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - - DatabaseConnector::querySqlToAndromeda(connection = connection, - sql = sql, - andromeda = covariateData, - andromedaTableName = "covariates", - snakeCaseToCamelCase = TRUE) - } - - # Continuous aggregated features - if (!is.null(todo$sqlQueryContinuousFeatures)) { - sql <- SqlRender::translate(sql = todo$sqlQueryContinuousFeatures, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - DatabaseConnector::querySqlToAndromeda(connection = connection, - sql = sql, - andromeda = covariateData, - andromedaTableName = "covariatesContinuous", - snakeCaseToCamelCase = TRUE) - } - - # Covariate reference - sql <- SqlRender::translate(sql = todo$sqlQueryFeatureRef, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - - DatabaseConnector::querySqlToAndromeda(connection = connection, - sql = sql, - andromeda = covariateData, - andromedaTableName = "covariateRef", - snakeCaseToCamelCase = TRUE) - - # Analysis reference - sql <- SqlRender::translate(sql = todo$sqlQueryAnalysisRef, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - DatabaseConnector::querySqlToAndromeda(connection = connection, - sql = sql, - andromeda = covariateData, - andromedaTableName = "analysisRef", - snakeCaseToCamelCase = TRUE) - - # Time reference - if (!is.null(todo$sqlQueryTimeRef)) { - sql <- SqlRender::translate(sql = todo$sqlQueryTimeRef, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - DatabaseConnector::querySqlToAndromeda(connection = connection, - sql = sql, - andromeda = covariateData, - andromedaTableName = "timeRef", + + queryFunction <- function(sql, tableName) { + DatabaseConnector::querySqlToAndromeda(connection = connection, + sql = sql, + andromeda = covariateData, + andromedaTableName = tableName, snakeCaseToCamelCase = TRUE) } - - - delta <- Sys.time() - start - ParallelLogger::logInfo("Fetching data took ", signif(delta, 3), " ", attr(delta, "units")) + ParallelLogger::logInfo("Fetching data from server") } else { - # Don't fetch to R , but create on server instead - ParallelLogger::logInfo("Writing data to table") - start <- Sys.time() - convertQuery <- function(sql, databaseSchema, table) { - if (missing(databaseSchema) || is.null(databaseSchema)) { - tableName <- table - } else { - tableName <- paste(databaseSchema, table, sep = ".") - } - return(sub("FROM", paste("INTO", tableName, "FROM"), sql)) - } - - # Covariates - if (!is.null(todo$sqlQueryFeatures)) { - sql <- convertQuery(todo$sqlQueryFeatures, targetDatabaseSchema, targetCovariateTable) - sql <- SqlRender::translate(sql = sql, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + + if (dropTableIfExists) { + createTable <- TRUE } - - # Covariate reference - if (!missing(targetCovariateRefTable) && !is.null(targetCovariateRefTable)) { - sql <- convertQuery(todo$sqlQueryFeatureRef, targetDatabaseSchema, targetCovariateRefTable) - sql <- SqlRender::translate(sql = sql, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + # Save to DB + ParallelLogger::logInfo("Creating tables on server") + convertQuery <- function(sql, table) { + outerSql <- " + {@drop} ? { + IF OBJECT_ID('@table', 'U') IS NOT NULL + DROP TABLE @table; + } + {@create} ? { + SELECT * INTO @table FROM ( @sub_query ) sq; + } : { + INSERT INTO @table @sub_query; + } + " + SqlRender::render(outerSql, + sub_query = gsub(";", "", sql), + create = createTable, + drop = dropTableIfExists, + table = table) } - - # Analysis reference - if (!missing(targetAnalysisRefTable) && !is.null(targetAnalysisRefTable)) { - sql <- convertQuery(todo$sqlQueryAnalysisRef, targetDatabaseSchema, targetAnalysisRefTable) - sql <- SqlRender::translate(sql = sql, - targetDialect = attr(connection, "dbms"), - oracleTempSchema = oracleTempSchema) - DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + + queryFunction <- function(sql, table) { + mappedTable <- targetTables[[table]] + if (is.null(mappedTable)) { + if (allTempTables) { + # Only bother storing specified temp tables + ParallelLogger::logInfo("Skipping", table, " other mapped tables are temp") + return(NULL) + } + mappedTable <- SqlRender::camelCaseToSnakeCase(table) + } + + if (substr(mappedTable, 1, 1) != "#") { + mappedTable <- paste0(targetDatabaseSchema, ".", mappedTable) + } + + if (createTable) { + ParallelLogger::logInfo("Creating table ", mappedTable, " for ", table) + } else { + ParallelLogger::logInfo("Appending ", table, " results to table ", mappedTable) + } + + sql <- convertQuery(sql, mappedTable) + DatabaseConnector::renderTranslateExecuteSql(connection, + sql, + tempEmulationSchema = oracleTempSchema, + progressBar = FALSE, + reportOverallTime = FALSE) } - delta <- Sys.time() - start - ParallelLogger::logInfo("Writing data took", signif(delta, 3), " ", attr(delta, "units")) - + } + + start <- Sys.time() + # Binary or non-aggregated features + if (!is.null(todo$sqlQueryFeatures)) { + sql <- SqlRender::translate(sql = todo$sqlQueryFeatures, + targetDialect = attr(connection, "dbms"), + oracleTempSchema = oracleTempSchema) + queryFunction(sql, "covariates") + } + + # Continuous aggregated features + if (!is.null(todo$sqlQueryContinuousFeatures)) { + sql <- SqlRender::translate(sql = todo$sqlQueryContinuousFeatures, + targetDialect = attr(connection, "dbms"), + oracleTempSchema = oracleTempSchema) + queryFunction(sql, "covariatesContinuous") + } + + # Covariate reference + sql <- SqlRender::translate(sql = todo$sqlQueryFeatureRef, + targetDialect = attr(connection, "dbms"), + oracleTempSchema = oracleTempSchema) + + queryFunction(sql, "covariateRef") + + # Analysis reference + sql <- SqlRender::translate(sql = todo$sqlQueryAnalysisRef, + targetDialect = attr(connection, "dbms"), + oracleTempSchema = oracleTempSchema) + queryFunction(sql, "analysisRef") + + # Time reference + if (!is.null(todo$sqlQueryTimeRef)) { + sql <- SqlRender::translate(sql = todo$sqlQueryTimeRef, + targetDialect = attr(connection, "dbms"), + oracleTempSchema = oracleTempSchema) + queryFunction(sql, "timeRef") + } + + delta <- Sys.time() - start + ParallelLogger::logInfo("Fetching data took ", signif(delta, 3), " ", attr(delta, "units")) + # Drop temp tables sql <- SqlRender::translate(sql = todo$sqlCleanup, targetDialect = attr(connection, "dbms"), @@ -206,8 +224,8 @@ getDbDefaultCovariateData <- function(connection, DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } } - - if (missing(targetCovariateTable) || is.null(targetCovariateTable)) { + + if ((missing(targetDatabaseSchema) | is.null(targetDatabaseSchema)) & !allTempTables) { attr(covariateData, "metaData") <- list() if (is.null(covariateData$covariates) && is.null(covariateData$covariatesContinuous)) { warning("No data found, probably because no covariates were specified.") diff --git a/man/getDbDefaultCovariateData.Rd b/man/getDbDefaultCovariateData.Rd index 1628d2ca..a69f99a8 100644 --- a/man/getDbDefaultCovariateData.Rd +++ b/man/getDbDefaultCovariateData.Rd @@ -13,10 +13,14 @@ getDbDefaultCovariateData( cdmVersion = "5", rowIdField = "subject_id", covariateSettings, - targetDatabaseSchema, - targetCovariateTable, - targetCovariateRefTable, - targetAnalysisRefTable, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTables = list(covariates = targetCovariateTable, covariateRef = + targetCovariateRefTable, analysisRef = targetAnalysisRefTable), + dropTableIfExists = FALSE, + createTable = TRUE, aggregated = FALSE ) } @@ -51,16 +55,31 @@ is more than one period per person.} of the createCovariate functions, or a list of such objects.} \item{targetDatabaseSchema}{(Optional) The name of the database schema where the resulting covariates -should be stored.} +should be stored. If not provided, results will be fetched to R.} \item{targetCovariateTable}{(Optional) The name of the table where the resulting covariates will -be stored. If not provided, results will be fetched to R. The table can be -a permanent table in the \code{targetDatabaseSchema} or a temp table. If -it is a temp table, do not specify \code{targetDatabaseSchema}.} +be stored. If not provided, results will be fetched to R. The table can be +a permanent table in the \code{targetDatabaseSchema} or a temp table. If +it is a temp table, do not specify \code{targetDatabaseSchema}. +Superseded by \code{targetTables}} -\item{targetCovariateRefTable}{(Optional) The name of the table where the covariate reference will be stored.} +\item{targetCovariateRefTable}{(Optional) The name of the table where the covariate reference will be stored. +Superseded by \code{targetTables}} -\item{targetAnalysisRefTable}{(Optional) The name of the table where the analysis reference will be stored.} +\item{targetAnalysisRefTable}{(Optional) The name of the table where the analysis reference will be stored. +Superseded by \code{targetTables}} + +\item{targetTables}{(Optional) list of mappings for table names. +The names of the table where the resulting covariates will be if +\code{targetDatabaseSchema} is specified. The tables will be created in permanent +table in the \code{targetDatabaseSchema} or as temporary tables. Tables that can be +included in this list: covariates, covariateRef, analysisRef, covariatesContinuous, +timeRef} + +\item{dropTableIfExists}{If targetDatabaseSchema, drop any existing tables. Otherwise, results are merged +into existing table data. Overides createTable.} + +\item{createTable}{Run sql to create table? Code does not check if table exists.} \item{aggregated}{Should aggregate statistics be computed instead of covariates per cohort entry?} diff --git a/tests/testthat/test-GetDefaultCovariates.R b/tests/testthat/test-GetDefaultCovariates.R index f8e299b0..45508430 100644 --- a/tests/testthat/test-GetDefaultCovariates.R +++ b/tests/testthat/test-GetDefaultCovariates.R @@ -6,53 +6,116 @@ connectionDetails <- Eunomia::getEunomiaConnectionDetails() test_that("Test exit conditions", { connection <- DatabaseConnector::connect(connectionDetails) - + on.exit(DatabaseConnector::disconnect(connection)) # covariateSettings object type expect_error(getDbDefaultCovariateData(connection = connection, cdmDatabaseSchema = "main", covariateSettings = list(), targetDatabaseSchema = "main", - targetCovariateTable = "cov", - targetCovariateRefTable = "cov_ref", - targetAnalysisRefTable = "cov_analysis_ref")) + targetTables = list(covariates = "cov", + covariateRef = "cov_ref", + analysisRef = "cov_analysis_ref"))) # CDM 4 not supported expect_error(getDbDefaultCovariateData(connection = connection, cdmDatabaseSchema = "main", cdmVersion = "4", covariateSettings = createDefaultCovariateSettings(), targetDatabaseSchema = "main", - targetCovariateTable = "cov", - targetCovariateRefTable = "cov_ref", - targetAnalysisRefTable = "cov_analysis_ref")) - - # targetCovariateTable and aggregated not supported - expect_error(getDbDefaultCovariateData(connection = connection, - cdmDatabaseSchema = "main", - covariateSettings = createDefaultCovariateSettings(), - targetDatabaseSchema = "main", - targetCovariateTable = "cov", - targetCovariateRefTable = "cov_ref", - targetAnalysisRefTable = "cov_analysis_ref", - aggregated = TRUE)) - - on.exit(DatabaseConnector::disconnect(connection)) + targetTables = list(covariates = "cov", + covariateRef = "cov_ref", + analysisRef = "cov_analysis_ref"))) }) -# AGS - This test fails and is likely due to a bug when using SqlLite -# test_that("Test target table", { -# connection <- DatabaseConnector::connect(connectionDetails) -# Eunomia::createCohorts(connectionDetails) -# -# results <- getDbDefaultCovariateData(connection = connection, -# cdmDatabaseSchema = "main", -# cohortTable = "cohort", -# covariateSettings = createDefaultCovariateSettings(), -# targetDatabaseSchema = "main", -# targetCovariateTable = "ut_cov", -# targetCovariateRefTable = "ut_cov_ref", -# targetAnalysisRefTable = "ut_cov_analysis_ref") -# -# on.exit(DatabaseConnector::disconnect(connection)) -# }) +test_that("Test target table", { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + Eunomia::createCohorts(connectionDetails) + + results <- getDbDefaultCovariateData(connection = connection, + cdmDatabaseSchema = "main", + cohortTable = "cohort", + covariateSettings = createDefaultCovariateSettings(), + targetDatabaseSchema = "main", + targetTables = list(covariates = "ut_cov", + covariateRef = "ut_cov_ref", + analysisRef = "ut_cov_analysis_ref")) + + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM main.ut_cov")[1], 1) + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM main.ut_cov_ref")[1], 1) + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM main.ut_cov_analysis_ref")[1], 1) + + results <- getDbDefaultCovariateData(connection = connection, + cdmDatabaseSchema = "main", + cohortTable = "cohort", + covariateSettings = createDefaultCovariateSettings(), + targetDatabaseSchema = "main", + aggregated = TRUE, + targetTables = list(covariates = "ut_cov_agg", + covariateRef = "ut_cov_ref_agg", + analysisRef = "ut_cov_analysis_ref_agg")) + + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM main.ut_cov_agg")[1], 1) + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM main.ut_cov_ref_agg")[1], 1) + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM main.ut_cov_analysis_ref_agg")[1], 1) + + # Temp tables with old prototype + results <- getDbDefaultCovariateData(connection = connection, + cdmDatabaseSchema = "main", + cohortTable = "cohort", + covariateSettings = createDefaultCovariateSettings(), + aggregated = TRUE, + targetCovariateTable = "#ut_cov_agg", + targetAnalysisRefTable = "#ut_cov_ref_agg", + targetCovariateRefTable = "#ut_cov_anal_ref_agg") + + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_agg")[1], 1) + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_ref_agg")[1], 1) + expect_gt(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_anal_ref_agg")[1], 1) + + results <- getDbDefaultCovariateData(connection = connection, + cdmDatabaseSchema = "main", + cohortTable = "cohort", + covariateSettings = createDefaultCovariateSettings(), + targetCovariateTable = "#ut_cov", + targetAnalysisRefTable = "#ut_cov_ref", + targetCovariateRefTable = "#ut_cov_analysis_ref") + + covCt <- DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov")[1] + expect_gt(covCt, 1) + covRefCt <- DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_ref")[1] + expect_gt(covRefCt, 1) + anlRefCt <- DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_analysis_ref")[1] + expect_gt(anlRefCt, 1) + + # append results rather than deleting the tables + results <- getDbDefaultCovariateData(connection = connection, + cdmDatabaseSchema = "main", + cohortTable = "cohort", + covariateSettings = createDefaultCovariateSettings(), + createTable = FALSE, + dropTableIfExists = FALSE, + targetCovariateTable = "#ut_cov", + targetAnalysisRefTable = "#ut_cov_ref", + targetCovariateRefTable = "#ut_cov_analysis_ref") + + expect_equal(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov")[1], covCt * 2) + expect_equal(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_ref")[1], covRefCt * 2) + expect_equal(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_analysis_ref")[1], anlRefCt * 2) + + # Recreate tables (and check create override works) + results <- getDbDefaultCovariateData(connection = connection, + cdmDatabaseSchema = "main", + cohortTable = "cohort", + covariateSettings = createDefaultCovariateSettings(), + createTable = FALSE, + dropTableIfExists = TRUE, + targetCovariateTable = "#ut_cov", + targetAnalysisRefTable = "#ut_cov_ref", + targetCovariateRefTable = "#ut_cov_analysis_ref") + + expect_equal(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov")[1], covCt) + expect_equal(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_ref")[1], covRefCt) + expect_equal(DatabaseConnector::renderTranslateQuerySql(connection, "SELECT COUNT(*) FROM #ut_cov_analysis_ref")[1], anlRefCt) +}) unlink(connectionDetails$server()) \ No newline at end of file