|
19 | 19 |
|
20 | 20 | import java.io.File; |
21 | 21 | import java.io.IOException; |
22 | | -import java.lang.reflect.Method; |
23 | | -import java.net.URL; |
24 | | -import java.net.URLClassLoader; |
25 | 22 | import java.util.ArrayList; |
26 | 23 | import java.util.HashMap; |
27 | 24 | import java.util.List; |
28 | 25 | import java.util.Map; |
29 | 26 | import java.util.Properties; |
30 | | -import java.util.concurrent.atomic.AtomicLong; |
31 | 27 |
|
32 | 28 | /** |
33 | 29 | * Launcher for Spark applications. |
|
43 | 39 | */ |
44 | 40 | public class SparkLauncher extends AbstractLauncher<SparkLauncher> { |
45 | 41 |
|
46 | | - private static final AtomicLong THREAD_ID = new AtomicLong(); |
47 | | - |
48 | 42 | protected boolean verbose; |
49 | 43 | protected String appName; |
50 | 44 | protected String master; |
@@ -139,78 +133,6 @@ public SparkLauncher setVerbose(boolean verbose) { |
139 | 133 | return this; |
140 | 134 | } |
141 | 135 |
|
142 | | - /** |
143 | | - * Starts a new thread that will run the Spark application. |
144 | | - * <p/> |
145 | | - * The application will run on a separate thread and use a separate, isolated class loader. |
146 | | - * No classes or resources from the current thread's class loader will be visible to the app. |
147 | | - * <p/> |
148 | | - * This mode does not support certain configuration parameters, like configuring the amount of |
149 | | - * driver memory or custom driver command line options. If such configuration is detected, an |
150 | | - * exception will be thrown. |
151 | | - * <p/> |
152 | | - * This is extremely experimental and should not be used in production environments. |
153 | | - * <p/> |
154 | | - * NOTE: SparkSubmit uses system properties to propagate some configuration value to the app |
155 | | - * are run concurrently, they may affect each other's configurations. |
156 | | - * <p/> |
157 | | - * NOTE: for users running JDK versions older than 8, this option can add a lot of overhead |
158 | | - * to the VM's perm gen. |
159 | | - * |
160 | | - * @param exceptionHandler Optional handler for handling exceptions in the app thread. |
161 | | - * @param daemon Whether to start a daemon thread. |
162 | | - * @return A non-daemon thread that will run the application using SparkSubmit. The thread will |
163 | | - * already be started. |
164 | | - */ |
165 | | - public Thread start(Thread.UncaughtExceptionHandler handler, boolean daemon) throws IOException { |
166 | | - // Do some sanity checking that incompatible driver options are not used, because they |
167 | | - // cannot be set in this mode. |
168 | | - Properties props = loadPropertiesFile(); |
169 | | - String extraClassPath = null; |
170 | | - if (isClientMode(props)) { |
171 | | - checkState( |
172 | | - find(DRIVER_EXTRA_JAVA_OPTIONS, conf, props) == null, |
173 | | - "Cannot set driver VM options when running in-process."); |
174 | | - checkState( |
175 | | - find(DRIVER_EXTRA_LIBRARY_PATH, conf, props) == null, |
176 | | - "Cannot set native library path when running in-process."); |
177 | | - checkState( |
178 | | - find(DRIVER_MEMORY, conf, props) == null, |
179 | | - "Cannot set driver memory when running in-process."); |
180 | | - extraClassPath = find(DRIVER_EXTRA_CLASSPATH, conf, props); |
181 | | - } |
182 | | - |
183 | | - List<String> cp = buildClassPath(extraClassPath); |
184 | | - URL[] cpUrls = new URL[cp.size()]; |
185 | | - int idx = 0; |
186 | | - for (String entry : cp) { |
187 | | - cpUrls[idx++] = new File(entry).toURI().toURL(); |
188 | | - } |
189 | | - |
190 | | - URLClassLoader cl = new URLClassLoader(cpUrls, null); |
191 | | - |
192 | | - Thread appThread; |
193 | | - try { |
194 | | - Class<?> sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit"); |
195 | | - Method main = sparkSubmit.getDeclaredMethod("main", String[].class); |
196 | | - List<String> args = buildSparkSubmitArgs(); |
197 | | - appThread = new Thread(new SparkSubmitRunner(main, args)); |
198 | | - } catch (ClassNotFoundException cnfe) { |
199 | | - throw new IOException(cnfe); |
200 | | - } catch (NoSuchMethodException nsme) { |
201 | | - throw new IOException(nsme); |
202 | | - } |
203 | | - |
204 | | - appThread.setName("SparkLauncher-Submit-" + THREAD_ID.incrementAndGet()); |
205 | | - appThread.setContextClassLoader(cl); |
206 | | - if (handler != null) { |
207 | | - appThread.setUncaughtExceptionHandler(handler); |
208 | | - } |
209 | | - appThread.setDaemon(daemon); |
210 | | - appThread.start(); |
211 | | - return appThread; |
212 | | - } |
213 | | - |
214 | 136 | /** |
215 | 137 | * Launches a sub-process that will start the configured Spark application. |
216 | 138 | * |
@@ -340,27 +262,4 @@ private boolean isClientMode(Properties userProps) { |
340 | 262 | (deployMode == null && !userMaster.startsWith("yarn-")); |
341 | 263 | } |
342 | 264 |
|
343 | | - private static class SparkSubmitRunner implements Runnable { |
344 | | - |
345 | | - private final Method main; |
346 | | - private final Object args; |
347 | | - |
348 | | - SparkSubmitRunner(Method main, List<String> args) { |
349 | | - this.main = main; |
350 | | - this.args = args.toArray(new String[args.size()]); |
351 | | - } |
352 | | - |
353 | | - @Override |
354 | | - public void run() { |
355 | | - try { |
356 | | - main.invoke(null, args); |
357 | | - } catch (RuntimeException re) { |
358 | | - throw re; |
359 | | - } catch (Exception e) { |
360 | | - throw new RuntimeException(e); |
361 | | - } |
362 | | - } |
363 | | - |
364 | | - } |
365 | | - |
366 | 265 | } |
0 commit comments