Skip to content

Commit

Permalink
Remove sparkr and related loading code, partially re-open SparkInterp…
Browse files Browse the repository at this point in the history
…reterLauncherTest
  • Loading branch information
StrongestNumber9 committed Feb 10, 2025
1 parent 055c870 commit 3e1d946
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 217 deletions.

This file was deleted.

23 changes: 0 additions & 23 deletions spark/interpreter/src/test/resources/spark_server.R

This file was deleted.

35 changes: 0 additions & 35 deletions spark/interpreter/src/test/resources/spark_ui.R

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,6 @@ public void testZSession_Spark() throws Exception {
"+---+---+", result.getResults().get(0).getData().trim());
assertTrue(result.getJobUrls().size() > 0);

// sparkr
result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
assertEquals(Status.FINISHED, result.getStatus());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
assertEquals(2, result.getJobUrls().size());

// spark sql
result = session.execute("sql", "select * from df");
assertEquals(Status.FINISHED, result.getStatus());
Expand Down Expand Up @@ -264,15 +256,6 @@ public void testZSession_Spark_Submit() throws Exception {
"+---+---+", result.getResults().get(0).getData().trim());
assertTrue(result.getJobUrls().size() > 0);

// sparkr
result = session.submit("r", "df <- as.DataFrame(faithful)\nhead(df)");
result = session.waitUntilFinished(result.getStatementId());
assertEquals(Status.FINISHED, result.getStatus());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
assertEquals(2, result.getJobUrls().size());

// spark sql
result = session.submit("sql", "select * from df");
result = session.waitUntilFinished(result.getStatementId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,22 +295,6 @@ public void sparkSQLTest() throws IOException {
assertEquals("_1\t_2\nhello\t20\n", p.getReturn().message().get(0).getData());
}

@Ignore(value="Tests Spark R which we do not support")
@Test
public void sparkRTest() throws IOException {
Note note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
Paragraph p = note.addNewParagraph(anonymous);

p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" +
"df <- createDataFrame(localDF)\n" +
"count(df)"
);

note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[1] 3", p.getReturn().message().get(0).getData().trim());
}

@Test
public void pySparkTest() throws IOException {
// create new note
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext conte
}

setupPropertiesForPySpark(sparkProperties);
setupPropertiesForSparkR(sparkProperties);

String condaEnvName = context.getProperties().getProperty("zeppelin.interpreter.conda.env.name");
if (StringUtils.isNotBlank(condaEnvName)) {
Expand Down Expand Up @@ -343,33 +342,6 @@ private void mergeSparkProperty(Properties sparkProperties, String propertyName,
}
}

private void setupPropertiesForSparkR(Properties sparkProperties) {
if (isYarnMode()) {
String sparkHome = getEnv("SPARK_HOME");
File sparkRBasePath = null;
if (sparkHome == null) {
if (!getSparkMaster().startsWith("local")) {
throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
" for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
" interpreter setting");
}
String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
sparkRBasePath = new File(zeppelinHome,
"interpreter" + File.separator + "spark" + File.separator + "R");
} else {
sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
}

File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
sparkRPath.getAbsolutePath() + "#sparkr");
} else {
LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
}
}
}

/**
* Returns cached Spark Master value if it's present, or calculate it
*
Expand Down
Loading

0 comments on commit 3e1d946

Please sign in to comment.