From 3e1d94625b27214b4c4bed5d4f63ad66737faa60 Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Mon, 10 Feb 2025 15:02:58 +0200 Subject: [PATCH] Remove sparkr and related loading code, partially re-open SparkInterpreterLauncherTest --- .../zeppelin/spark/ZeppelinRContext.java | 69 ------------------- .../src/test/resources/spark_server.R | 23 ------- .../interpreter/src/test/resources/spark_ui.R | 35 ---------- .../integration/ZSessionIntegrationTest.java | 17 ----- .../integration/ZeppelinSparkClusterTest.java | 16 ----- .../launcher/SparkInterpreterLauncher.java | 28 -------- .../SparkInterpreterLauncherTest.java | 36 ++-------- 7 files changed, 7 insertions(+), 217 deletions(-) delete mode 100644 spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java delete mode 100644 spark/interpreter/src/test/resources/spark_server.R delete mode 100644 spark/interpreter/src/test/resources/spark_ui.R diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java deleted file mode 100644 index 13427ce23..000000000 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.zeppelin.interpreter.ZeppelinContext; - -/** - * Contains the Spark and Zeppelin Contexts made available to SparkR. - */ -public class ZeppelinRContext { - private static SparkContext sparkContext; - private static Object sqlContext; - private static ZeppelinContext zeppelinContext; - private static Object sparkSession; - private static JavaSparkContext javaSparkContext; - - public static void setSparkContext(SparkContext sparkContext) { - ZeppelinRContext.sparkContext = sparkContext; - } - - public static void setZeppelinContext(ZeppelinContext zeppelinContext) { - ZeppelinRContext.zeppelinContext = zeppelinContext; - } - - public static void setSqlContext(Object sqlContext) { - ZeppelinRContext.sqlContext = sqlContext; - } - - public static void setSparkSession(Object sparkSession) { - ZeppelinRContext.sparkSession = sparkSession; - } - - public static SparkContext getSparkContext() { - return sparkContext; - } - - public static Object getSqlContext() { - return sqlContext; - } - - public static ZeppelinContext getZeppelinContext() { - return zeppelinContext; - } - - public static Object getSparkSession() { - return sparkSession; - } - - public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; } - - public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; } -} diff --git a/spark/interpreter/src/test/resources/spark_server.R b/spark/interpreter/src/test/resources/spark_server.R deleted file mode 100644 index 071631dd7..000000000 --- a/spark/interpreter/src/test/resources/spark_server.R +++ /dev/null @@ -1,23 +0,0 @@ -# Define server logic to summarize and view selected dataset ---- -server <- function(input, output) { - - # Return the requested dataset ---- - datasetInput <- reactive({ - switch(input$dataset, - "rock" = as.DataFrame(rock), - "pressure" = as.DataFrame(pressure), - "cars" = as.DataFrame(cars)) - }) - - # Generate a summary of the dataset ---- - output$summary <- renderPrint({ - dataset <- datasetInput() - showDF(summary(dataset)) - }) - - # Show the first "n" observations ---- - output$view <- renderTable({ - head(datasetInput(), n = input$obs) - }) - -} \ No newline at end of file diff --git a/spark/interpreter/src/test/resources/spark_ui.R b/spark/interpreter/src/test/resources/spark_ui.R deleted file mode 100644 index a81ad0c2b..000000000 --- a/spark/interpreter/src/test/resources/spark_ui.R +++ /dev/null @@ -1,35 +0,0 @@ -# Define UI for dataset viewer app ---- -ui <- fluidPage( - -# App title ---- -titlePanel(paste("Spark Version", sparkR.version(), sep=":")), - -# Sidebar layout with a input and output definitions ---- -sidebarLayout( - -# Sidebar panel for inputs ---- -sidebarPanel( - -# Input: Selector for choosing dataset ---- -selectInput(inputId = "dataset", -label = "Choose a dataset:", -choices = c("rock", "pressure", "cars")), - -# Input: Numeric entry for number of obs to view ---- -numericInput(inputId = "obs", -label = "Number of observations to view:", -value = 10) -), - -# Main panel for displaying outputs ---- -mainPanel( - -# Output: Verbatim text for data summary ---- -verbatimTextOutput("summary"), - -# Output: HTML table with requested number of observations ---- -tableOutput("view") - -) -) -) \ No newline at end of file diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index b229498a5..ecce97cdc 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -193,14 +193,6 @@ public void testZSession_Spark() throws Exception { "+---+---+", result.getResults().get(0).getData().trim()); assertTrue(result.getJobUrls().size() > 0); - // sparkr - result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)"); - assertEquals(Status.FINISHED, result.getStatus()); - assertEquals(1, result.getResults().size()); - assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting")); - assertEquals(2, result.getJobUrls().size()); - // spark sql result = session.execute("sql", "select * from df"); assertEquals(Status.FINISHED, result.getStatus()); @@ -264,15 +256,6 @@ public void testZSession_Spark_Submit() throws Exception { "+---+---+", result.getResults().get(0).getData().trim()); assertTrue(result.getJobUrls().size() > 0); - // sparkr - result = session.submit("r", "df <- as.DataFrame(faithful)\nhead(df)"); - result = session.waitUntilFinished(result.getStatementId()); - assertEquals(Status.FINISHED, result.getStatus()); - assertEquals(1, result.getResults().size()); - assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting")); - assertEquals(2, result.getJobUrls().size()); - // spark sql result = session.submit("sql", "select * from df"); result = session.waitUntilFinished(result.getStatementId()); diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index ac46c89f9..fd1d828cd 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -295,22 +295,6 @@ public void sparkSQLTest() throws IOException { assertEquals("_1\t_2\nhello\t20\n", p.getReturn().message().get(0).getData()); } - @Ignore(value="Tests Spark R which we do not support") - @Test - public void sparkRTest() throws IOException { - Note note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); - Paragraph p = note.addNewParagraph(anonymous); - - p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + - "df <- createDataFrame(localDF)\n" + - "count(df)" - ); - - note.run(p.getId(), true); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("[1] 3", p.getReturn().message().get(0).getData().trim()); - } - @Test public void pySparkTest() throws IOException { // create new note diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 356ae4ea9..0bb21b482 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -89,7 +89,6 @@ public Map buildEnvFromProperties(InterpreterLaunchContext conte } setupPropertiesForPySpark(sparkProperties); - setupPropertiesForSparkR(sparkProperties); String condaEnvName = context.getProperties().getProperty("zeppelin.interpreter.conda.env.name"); if (StringUtils.isNotBlank(condaEnvName)) { @@ -343,33 +342,6 @@ private void mergeSparkProperty(Properties sparkProperties, String propertyName, } } - private void setupPropertiesForSparkR(Properties sparkProperties) { - if (isYarnMode()) { - String sparkHome = getEnv("SPARK_HOME"); - File sparkRBasePath = null; - if (sparkHome == null) { - if (!getSparkMaster().startsWith("local")) { - throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + - " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + - " interpreter setting"); - } - String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); - sparkRBasePath = new File(zeppelinHome, - "interpreter" + File.separator + "spark" + File.separator + "R"); - } else { - sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); - } - - File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); - if (sparkRPath.exists() && sparkRPath.isFile()) { - mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", - sparkRPath.getAbsolutePath() + "#sparkr"); - } else { - LOGGER.warn("sparkr.zip is not found, SparkR may not work."); - } - } - } - /** * Returns cached Spark Master value if it's present, or calculate it * diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index e8c26b029..5e8d71793 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -39,12 +39,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@Ignore(value="Depends on old spark 2.x version, requires sparkr that is not being supported") public class SparkInterpreterLauncherTest { private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class); - private String sparkHome; private String zeppelinHome; @Before @@ -53,7 +51,6 @@ public void setUp() { System.clearProperty(confVar.getVarName()); } - // sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7"); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath()); @@ -66,7 +63,6 @@ public void testConnectTimeOut() throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); InterpreterOption option = new InterpreterOption(); @@ -83,12 +79,12 @@ public void testConnectTimeOut() throws IOException { assertTrue(interpreterProcess.isUserImpersonated()); } + @Ignore(value="Values provided in the ZEPPELIN_SPARK_CONF are not guaranteed to be in same order -> string matched assertion fails") @Test public void testLocalMode() throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("ENV_1", ""); properties.setProperty("property_1", "value_1"); properties.setProperty("spark.master", "local[*]"); @@ -104,19 +100,18 @@ public void testLocalMode() throws IOException { assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertFalse(interpreterProcess.getEnv().containsKey("ENV_1")); assertEquals("--conf|spark.files=file_1" + "|--conf|spark.jars=jar_1|--conf|spark.app.name=intpGroupId|--conf|spark.master=local[*]", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } + @Ignore(value="Values provided in the ZEPPELIN_SPARK_CONF are not guaranteed to be in same order -> string matched assertion fails") @Test public void testYarnClientMode_1() throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("spark.master", "yarn-client"); properties.setProperty("spark.files", "file_1"); @@ -131,23 +126,20 @@ public void testYarnClientMode_1() throws IOException { assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); String sparkJars = "jar_1"; - String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1"; - assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip + - "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + + assertEquals("|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-client", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } + @Ignore(value="Values provided in the ZEPPELIN_SPARK_CONF are not guaranteed to be in same order -> string matched assertion fails") @Test public void testYarnClientMode_2() throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("spark.master", "yarn"); properties.setProperty("spark.submit.deployMode", "client"); @@ -163,13 +155,10 @@ public void testYarnClientMode_2() throws IOException { assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); String sparkJars = "jar_1"; - String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1"; - assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip + - "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + + assertEquals("|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.submit.deployMode=client" + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); @@ -181,7 +170,6 @@ public void testYarnClusterMode_1() throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("spark.master", "yarn-cluster"); properties.setProperty("spark.files", "file_1"); @@ -196,16 +184,13 @@ public void testYarnClusterMode_1() throws IOException { assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); String sparkJars = "jar_1," + zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; - String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip + - "|--conf|spark.yarn.maxAppAttempts=1" + + assertEquals("|--conf|spark.yarn.maxAppAttempts=1" + "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.yarn.isPython=true" + @@ -221,7 +206,6 @@ public void testYarnClusterMode_2() throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("spark.master", "yarn"); properties.setProperty("spark.submit.deployMode", "cluster"); @@ -239,14 +223,12 @@ public void testYarnClusterMode_2() throws IOException { assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); String sparkJars = "jar_1," + zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; - String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals("--proxy-user|user1|--conf|spark.yarn.dist.archives=" + sparkrZip + + assertEquals("--proxy-user|user1" + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId" + "|--conf|spark.yarn.maxAppAttempts=1" + "|--conf|spark.master=yarn" + @@ -262,7 +244,6 @@ public void testYarnClusterMode_3() throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("spark.master", "yarn"); properties.setProperty("spark.submit.deployMode", "cluster"); @@ -280,17 +261,14 @@ public void testYarnClusterMode_3() throws IOException { assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); String sparkJars = "jar_1," + zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; - String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; // escape special characters String sparkFiles = "{}," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; assertEquals("--proxy-user|user1" + - "|--conf|spark.yarn.dist.archives=" + sparkrZip + "|--conf|spark.yarn.isPython=true" + "|--conf|spark.app.name=intpGroupId" + "|--conf|spark.yarn.maxAppAttempts=1" +