-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathunit_of_measure_xwalk.py
80 lines (64 loc) · 2.17 KB
/
unit_of_measure_xwalk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# version History
# Sept 2022, Stephanie Hong initial version
# May 2023, Yvette Chen, use input filepath, database name and vocabulary dbname parameter
#
# Databricks notebook source
## 1. create table
## dataframe with data
## persist
## some settings to set before reading parquet files
spark.sql("set spark.databricks.delta.autoCompact.enabled = true")
spark.sql("SET spark.databricks.delta.formatCheck.enabled=false")
spark.sql("set spark.databricks.delta.autoOptimize.optimizeWrite.enabled=true")
spark.sql("set spark.databricks.delta.commitValidation.enabled=false")
#####################################################
## create the table
dbname= "crisp_apr2021"
path="crisp-covid/{}".format(dbname)
tablename="unit_of_measure_xwalk"
spark.sql(f"""CREATE TABLE if not exists {dbname}.{tablename}(
CDM_SOURCE string,
CDM_TBL string,
CDM_TBL_COLUMN_NAME string,
SRC_CODE string,
SRC_CD_DESCRIPTION string,
TARGET_CONCEPT_ID int,
TARGET_CONCEPT_NAME string,
TARGET_DOMAIN_ID string,
TARGET_VOCABULARY_ID string,
TARGET_CONCEPT_CLASS_ID string,
TARGET_STANDARD_CONCEPT string,
TARGET_CONCEPT_CODE string,
TARGET_TBL_COLUMN_NAME string
)
using delta
tblproperties(delta.autoOptimize.optimizeWrite=true)
location "/mnt/{path}/{tablename}"
""")
# COMMAND ----------
# MAGIC %sql
# MAGIC select * from crisp_omop_3.unit_of_measure_xwalk
# MAGIC
# COMMAND ----------
##2. df with data
dfunitofmeasure = spark.sql("""
select * from crisp_omop_3.unit_of_measure_xwalk
""")
# COMMAND ----------
# Create a view or table
spark.sql("SET spark.databricks.delta.formatCheck.enabled=false")
table_name = "viewUnitOfMeasure"
dfunitofmeasure.createOrReplaceTempView(table_name)
# COMMAND ----------
# MAGIC %sql
# MAGIC select * from viewUnitOfMeasure
# COMMAND ----------
## persist data for reference
# Create a view or table
spark.sql("SET spark.databricks.delta.formatCheck.enabled=false")
# persist the data to the table for later access.
dfunitofmeasure.selectExpr("*").write.format("delta").option("mergeSchema", "true").mode("overwrite").save(f"/mnt/crisp-covid/crisp_apr2021/unit_of_measure_xwalk")
# COMMAND ----------
# MAGIC %sql
# MAGIC select * from crisp_apr2021.unit_of_measure_xwalk
# COMMAND ----------