Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,50 @@

package org.apache.flink.test.util;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;

import static org.junit.Assert.fail;

/** Test utilities. */
public class TestUtils {

public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name)
throws Exception {
// Execute the job and wait for the job result synchronously. The method throws exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the description is currently wrong. How about we rephrase it:

    /**
     * Execute the job and wait for the job result synchronously.
     *
     * @throws Exception If executing the environment throws an exception which does not have {@link
     *     SuccessException} as a cause.
     */

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @dawidwys for catching this. The updated comment is much better.

// iff one of the following conditions happens:
// 1) The job finishes successfully without exception
// 2) The job finishes with an exception that contains SuccessException.
public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
JobClient jobClient = null;
try {
return see.execute(name);
} catch (ProgramInvocationException | JobExecutionException root) {
Throwable cause = root.getCause();

// search for nested SuccessExceptions
int depth = 0;
while (!(cause instanceof SuccessException)) {
if (cause == null || depth++ == 20) {
root.printStackTrace();
fail("Test failed: " + root.getMessage());
} else {
cause = cause.getCause();
StreamGraph graph = see.getStreamGraph(name);
jobClient = see.executeAsync(graph);
jobClient.getJobExecutionResult().get();
} catch (Throwable root) {
if (jobClient != null) {
try {
jobClient.cancel().get();
} catch (Exception e) {
// Exception could be thrown if the job has already finished.
// Ignore the exception.
}
}
}

return null;
Throwable t = root;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could reuse ExceptionUtils:

            Optional<SuccessException> successAsCause =
                    ExceptionUtils.findThrowable(root, SuccessException.class);

            if (!successAsCause.isPresent()) {
                root.printStackTrace();
                fail("Test failed: " + root.getMessage());
            }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

while (t != null && !(t instanceof SuccessException)) {
t = t.getCause();
}

if (t == null) {
root.printStackTrace();
fail("Test failed: " + root.getMessage());
}
}
}

public static void submitJobAndWaitForResult(
Expand Down