Skip to content

Commit df90e8b

Browse files
author
Andrew Or
committed
Use Jackson for JSON de/serialization
This involves a major refactor of all message representations. The main motivation for this change is to simplify the logic to enforce type safety, such that we no longer depend on the behavior of all the scala class magic we used to rely on. This commit also introduces a differentiation between request and response messages to provide further type safety. This would have introduced much additional complexity without the refactor.
1 parent d7a1f9f commit df90e8b

23 files changed

+910
-1071
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
class DriverStatusRequest extends SubmitRestProtocolRequest {
21+
protected override val action = SubmitRestProtocolAction.DRIVER_STATUS_REQUEST
22+
private val driverId = new SubmitRestProtocolField[String]
23+
24+
def getDriverId: String = driverId.toString
25+
def setDriverId(s: String): this.type = setField(driverId, s)
26+
27+
override def validate(): Unit = {
28+
super.validate()
29+
assertFieldIsSet(driverId, "driver_id")
30+
}
31+
}

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala

Lines changed: 0 additions & 47 deletions
This file was deleted.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
class DriverStatusResponse extends SubmitRestProtocolResponse {
21+
protected override val action = SubmitRestProtocolAction.DRIVER_STATUS_RESPONSE
22+
private val driverId = new SubmitRestProtocolField[String]
23+
private val success = new SubmitRestProtocolField[Boolean]
24+
private val driverState = new SubmitRestProtocolField[String]
25+
private val workerId = new SubmitRestProtocolField[String]
26+
private val workerHostPort = new SubmitRestProtocolField[String]
27+
28+
def getDriverId: String = driverId.toString
29+
def getSuccess: String = success.toString
30+
def getDriverState: String = driverState.toString
31+
def getWorkerId: String = workerId.toString
32+
def getWorkerHostPort: String = workerHostPort.toString
33+
34+
def setDriverId(s: String): this.type = setField(driverId, s)
35+
def setSuccess(s: String): this.type = setBooleanField(success, s)
36+
def setDriverState(s: String): this.type = setField(driverState, s)
37+
def setWorkerId(s: String): this.type = setField(workerId, s)
38+
def setWorkerHostPort(s: String): this.type = setField(workerHostPort, s)
39+
40+
override def validate(): Unit = {
41+
super.validate()
42+
assertFieldIsSet(driverId, "driver_id")
43+
assertFieldIsSet(success, "success")
44+
}
45+
}

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala

Lines changed: 0 additions & 52 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala

Lines changed: 0 additions & 44 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
class ErrorResponse extends SubmitRestProtocolResponse {
21+
protected override val action = SubmitRestProtocolAction.ERROR
22+
override def validate(): Unit = {
23+
super.validate()
24+
assertFieldIsSet(message, "message")
25+
}
26+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
class KillDriverRequest extends SubmitRestProtocolRequest {
21+
protected override val action = SubmitRestProtocolAction.KILL_DRIVER_REQUEST
22+
private val driverId = new SubmitRestProtocolField[String]
23+
24+
def getDriverId: String = driverId.toString
25+
def setDriverId(s: String): this.type = setField(driverId, s)
26+
27+
override def validate(): Unit = {
28+
super.validate()
29+
assertFieldIsSet(driverId, "driver_id")
30+
}
31+
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala

Lines changed: 0 additions & 47 deletions
This file was deleted.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.rest
19+
20+
class KillDriverResponse extends SubmitRestProtocolResponse {
21+
protected override val action = SubmitRestProtocolAction.KILL_DRIVER_RESPONSE
22+
private val driverId = new SubmitRestProtocolField[String]
23+
private val success = new SubmitRestProtocolField[Boolean]
24+
25+
def getDriverId: String = driverId.toString
26+
def getSuccess: String = success.toString
27+
28+
def setDriverId(s: String): this.type = setField(driverId, s)
29+
def setSuccess(s: String): this.type = setBooleanField(success, s)
30+
31+
override def validate(): Unit = {
32+
super.validate()
33+
assertFieldIsSet(driverId, "driver_id")
34+
assertFieldIsSet(success, "success")
35+
}
36+
}

0 commit comments

Comments
 (0)