Skip to content

Commit 53e7c0e

Browse files
author
Andrew Or
committed
Initial client, server, and all the messages
This commit introduces type-safe schemas for all messages exchanged in the REST protocol. Each message is expected to contain an ACTION field that has only one possible value for each message type. Before the message is sent, we validate that all required fields are in fact present, and that the value of the action field is the correct type. The next step is to actually integrate this in standalone mode.
1 parent 14e3f11 commit 53e7c0e

11 files changed

+910
-2
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ object SparkSubmit {
3939
private val STANDALONE = 2
4040
private val MESOS = 4
4141
private val LOCAL = 8
42-
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
42+
private val REST = 16
43+
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | REST
4344

4445
// Deploy modes
4546
private val CLIENT = 1
@@ -97,7 +98,7 @@ object SparkSubmit {
9798
case m if m.startsWith("spark") => STANDALONE
9899
case m if m.startsWith("mesos") => MESOS
99100
case m if m.startsWith("local") => LOCAL
100-
case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
101+
case _ => printErrorAndExit("Master must start with yarn, spark, mesos, local, or rest"); -1
101102
}
102103

103104
// Set the deploy mode; default is client mode
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
/**
21+
* A field used in a DriverStatusRequestMessage.
22+
*/
23+
private[spark] abstract class DriverStatusRequestField extends StandaloneRestProtocolField
24+
private[spark] object DriverStatusRequestField extends StandaloneRestProtocolFieldCompanion {
25+
case object ACTION extends DriverStatusRequestField
26+
case object SPARK_VERSION extends DriverStatusRequestField
27+
case object MESSAGE extends DriverStatusRequestField
28+
case object MASTER extends DriverStatusRequestField
29+
case object DRIVER_ID extends DriverStatusRequestField
30+
override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID)
31+
override val optionalFields = Seq(MESSAGE)
32+
}
33+
34+
/**
35+
* A request sent to the standalone Master to query the status of a driver.
36+
*/
37+
private[spark] class DriverStatusRequestMessage extends StandaloneRestProtocolMessage(
38+
StandaloneRestProtocolAction.DRIVER_STATUS_REQUEST,
39+
DriverStatusRequestField.ACTION,
40+
DriverStatusRequestField.requiredFields)
41+
42+
private[spark] object DriverStatusRequestMessage extends StandaloneRestProtocolMessageCompanion {
43+
protected override def newMessage(): StandaloneRestProtocolMessage =
44+
new DriverStatusRequestMessage
45+
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
46+
DriverStatusRequestField.withName(field)
47+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
/**
21+
* A field used in a DriverStatusResponseMessage.
22+
*/
23+
private[spark] abstract class DriverStatusResponseField extends StandaloneRestProtocolField
24+
private[spark] object DriverStatusResponseField extends StandaloneRestProtocolFieldCompanion {
25+
case object ACTION extends DriverStatusResponseField
26+
case object SPARK_VERSION extends DriverStatusResponseField
27+
case object MESSAGE extends DriverStatusResponseField
28+
case object MASTER extends DriverStatusResponseField
29+
case object DRIVER_ID extends DriverStatusResponseField
30+
case object DRIVER_STATE extends SubmitDriverResponseField
31+
case object WORKER_ID extends SubmitDriverResponseField
32+
case object WORKER_HOST_PORT extends SubmitDriverResponseField
33+
override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE,
34+
MASTER, DRIVER_ID, DRIVER_STATE, WORKER_ID, WORKER_HOST_PORT)
35+
override val optionalFields = Seq.empty
36+
}
37+
38+
/**
39+
* A message sent from the standalone Master in response to a DriverStatusResponseMessage.
40+
*/
41+
private[spark] class DriverStatusResponseMessage extends StandaloneRestProtocolMessage(
42+
StandaloneRestProtocolAction.DRIVER_STATUS_RESPONSE,
43+
DriverStatusResponseField.ACTION,
44+
DriverStatusResponseField.requiredFields)
45+
46+
private[spark] object DriverStatusResponseMessage extends StandaloneRestProtocolMessageCompanion {
47+
protected override def newMessage(): StandaloneRestProtocolMessage =
48+
new DriverStatusResponseMessage
49+
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
50+
DriverStatusResponseField.withName(field)
51+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
/**
21+
* A field used in a ErrorMessage.
22+
*/
23+
private[spark] abstract class ErrorField extends StandaloneRestProtocolField
24+
private[spark] object ErrorField extends StandaloneRestProtocolFieldCompanion {
25+
case object ACTION extends ErrorField
26+
case object SPARK_VERSION extends ErrorField
27+
case object MESSAGE extends ErrorField
28+
override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE)
29+
override val optionalFields = Seq.empty
30+
}
31+
32+
/**
33+
* An error message exchanged in the standalone REST protocol.
34+
*/
35+
private[spark] class ErrorMessage extends StandaloneRestProtocolMessage(
36+
StandaloneRestProtocolAction.ERROR,
37+
ErrorField.ACTION,
38+
ErrorField.requiredFields)
39+
40+
private[spark] object ErrorMessage extends StandaloneRestProtocolMessageCompanion {
41+
protected override def newMessage(): StandaloneRestProtocolMessage = new ErrorMessage
42+
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
43+
ErrorField.withName(field)
44+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
/**
21+
* A field used in a KillDriverRequestMessage.
22+
*/
23+
private[spark] abstract class KillDriverRequestField extends StandaloneRestProtocolField
24+
private[spark] object KillDriverRequestField extends StandaloneRestProtocolFieldCompanion {
25+
case object ACTION extends KillDriverRequestField
26+
case object SPARK_VERSION extends KillDriverRequestField
27+
case object MESSAGE extends KillDriverRequestField
28+
case object MASTER extends KillDriverRequestField
29+
case object DRIVER_ID extends KillDriverRequestField
30+
override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID)
31+
override val optionalFields = Seq(MESSAGE)
32+
}
33+
34+
/**
35+
* A request sent to the standalone Master to kill a driver.
36+
*/
37+
private[spark] class KillDriverRequestMessage extends StandaloneRestProtocolMessage(
38+
StandaloneRestProtocolAction.KILL_DRIVER_REQUEST,
39+
KillDriverRequestField.ACTION,
40+
KillDriverRequestField.requiredFields)
41+
42+
private[spark] object KillDriverRequestMessage extends StandaloneRestProtocolMessageCompanion {
43+
protected override def newMessage(): StandaloneRestProtocolMessage =
44+
new KillDriverRequestMessage
45+
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
46+
KillDriverRequestField.withName(field)
47+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
/**
21+
* A field used in a KillDriverResponseMessage.
22+
*/
23+
private[spark] abstract class KillDriverResponseField extends StandaloneRestProtocolField
24+
private[spark] object KillDriverResponseField extends StandaloneRestProtocolFieldCompanion {
25+
case object ACTION extends KillDriverResponseField
26+
case object SPARK_VERSION extends KillDriverResponseField
27+
case object MESSAGE extends KillDriverResponseField
28+
case object MASTER extends KillDriverResponseField
29+
case object DRIVER_ID extends KillDriverResponseField
30+
case object DRIVER_STATE extends SubmitDriverResponseField
31+
override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE, MASTER, DRIVER_ID, DRIVER_STATE)
32+
override val optionalFields = Seq.empty
33+
}
34+
35+
/**
36+
* A message sent from the standalone Master in response to a KillDriverResponseMessage.
37+
*/
38+
private[spark] class KillDriverResponseMessage extends StandaloneRestProtocolMessage(
39+
StandaloneRestProtocolAction.KILL_DRIVER_RESPONSE,
40+
KillDriverResponseField.ACTION,
41+
KillDriverResponseField.requiredFields)
42+
43+
private[spark] object KillDriverResponseMessage extends StandaloneRestProtocolMessageCompanion {
44+
protected override def newMessage(): StandaloneRestProtocolMessage =
45+
new KillDriverResponseMessage
46+
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
47+
KillDriverResponseField.withName(field)
48+
}

0 commit comments

Comments
 (0)