Skip to content

Commit

Permalink
Restores SparkInterpreterLauncherTest.java
Browse files Browse the repository at this point in the history
  • Loading branch information
StrongestNumber9 committed Feb 11, 2025
1 parent 3e1d946 commit 79cf2af
Showing 1 changed file with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@Ignore(value="Depends on old spark 2.x version, requires sparkr that is not being supported")
public class SparkInterpreterLauncherTest {

private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);

private String sparkHome;
private String zeppelinHome;

@Before
Expand All @@ -51,6 +53,7 @@ public void setUp() {
System.clearProperty(confVar.getVarName());
}

// sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
new File("..").getAbsolutePath());

Expand All @@ -63,6 +66,7 @@ public void testConnectTimeOut() throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", sparkHome);
properties.setProperty(
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
InterpreterOption option = new InterpreterOption();
Expand All @@ -79,12 +83,12 @@ public void testConnectTimeOut() throws IOException {
assertTrue(interpreterProcess.isUserImpersonated());
}

@Ignore(value="Values provided in the ZEPPELIN_SPARK_CONF are not guaranteed to be in same order -> string matched assertion fails")
@Test
public void testLocalMode() throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", sparkHome);
properties.setProperty("ENV_1", "");
properties.setProperty("property_1", "value_1");
properties.setProperty("spark.master", "local[*]");
Expand All @@ -100,18 +104,19 @@ public void testLocalMode() throws IOException {
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertTrue(interpreterProcess.getEnv().size() >= 2);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
assertFalse(interpreterProcess.getEnv().containsKey("ENV_1"));
assertEquals("--conf|spark.files=file_1" +
"|--conf|spark.jars=jar_1|--conf|spark.app.name=intpGroupId|--conf|spark.master=local[*]",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}

@Ignore(value="Values provided in the ZEPPELIN_SPARK_CONF are not guaranteed to be in same order -> string matched assertion fails")
@Test
public void testYarnClientMode_1() throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", sparkHome);
properties.setProperty("property_1", "value_1");
properties.setProperty("spark.master", "yarn-client");
properties.setProperty("spark.files", "file_1");
Expand All @@ -126,20 +131,23 @@ public void testYarnClientMode_1() throws IOException {
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertTrue(interpreterProcess.getEnv().size() >= 2);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));

String sparkJars = "jar_1";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1";
assertEquals("|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
"|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
"|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-client",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}

@Ignore(value="Values provided in the ZEPPELIN_SPARK_CONF are not guaranteed to be in same order -> string matched assertion fails")
@Test
public void testYarnClientMode_2() throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", sparkHome);
properties.setProperty("property_1", "value_1");
properties.setProperty("spark.master", "yarn");
properties.setProperty("spark.submit.deployMode", "client");
Expand All @@ -155,10 +163,13 @@ public void testYarnClientMode_2() throws IOException {
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertTrue(interpreterProcess.getEnv().size() >= 2);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));

String sparkJars = "jar_1";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1";
assertEquals("|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
"|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars +
"|--conf|spark.submit.deployMode=client" +
"|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
Expand All @@ -170,6 +181,7 @@ public void testYarnClusterMode_1() throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", sparkHome);
properties.setProperty("property_1", "value_1");
properties.setProperty("spark.master", "yarn-cluster");
properties.setProperty("spark.files", "file_1");
Expand All @@ -184,13 +196,16 @@ public void testYarnClusterMode_1() throws IOException {
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertTrue(interpreterProcess.getEnv().size() >= 3);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));

assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
String sparkJars = "jar_1," +
zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," +
zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
assertEquals("|--conf|spark.yarn.maxAppAttempts=1" +
assertEquals("--conf|spark.yarn.dist.archives=" + sparkrZip +
"|--conf|spark.yarn.maxAppAttempts=1" +
"|--conf|spark.files=" + sparkFiles +
"|--conf|spark.jars=" + sparkJars +
"|--conf|spark.yarn.isPython=true" +
Expand All @@ -206,6 +221,7 @@ public void testYarnClusterMode_2() throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", sparkHome);
properties.setProperty("property_1", "value_1");
properties.setProperty("spark.master", "yarn");
properties.setProperty("spark.submit.deployMode", "cluster");
Expand All @@ -223,12 +239,14 @@ public void testYarnClusterMode_2() throws IOException {
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertTrue(interpreterProcess.getEnv().size() >= 3);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
String sparkJars = "jar_1," +
zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," +
zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
assertEquals("--proxy-user|user1" +
assertEquals("--proxy-user|user1|--conf|spark.yarn.dist.archives=" + sparkrZip +
"|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId" +
"|--conf|spark.yarn.maxAppAttempts=1" +
"|--conf|spark.master=yarn" +
Expand All @@ -244,6 +262,7 @@ public void testYarnClusterMode_3() throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", sparkHome);
properties.setProperty("property_1", "value_1");
properties.setProperty("spark.master", "yarn");
properties.setProperty("spark.submit.deployMode", "cluster");
Expand All @@ -261,14 +280,17 @@ public void testYarnClusterMode_3() throws IOException {
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertTrue(interpreterProcess.getEnv().size() >= 3);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));

String sparkJars = "jar_1," +
zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," +
zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
// escape special characters
String sparkFiles = "{}," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
assertEquals("--proxy-user|user1" +
"|--conf|spark.yarn.dist.archives=" + sparkrZip +
"|--conf|spark.yarn.isPython=true" +
"|--conf|spark.app.name=intpGroupId" +
"|--conf|spark.yarn.maxAppAttempts=1" +
Expand Down

0 comments on commit 79cf2af

Please sign in to comment.