Skip to content

Commit a00ce0a

Browse files
nandakumar131RogPodge
authored andcommitted
HDDS-1810. SCM command to Activate and Deactivate pipelines. (apache#1224)
1 parent 22633ce commit a00ce0a

File tree

20 files changed

+395
-9
lines changed

20 files changed

+395
-9
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,18 @@ public List<Pipeline> listPipelines() throws IOException {
226226
return storageContainerLocationClient.listPipelines();
227227
}
228228

229+
@Override
230+
public void activatePipeline(HddsProtos.PipelineID pipelineID)
231+
throws IOException {
232+
storageContainerLocationClient.activatePipeline(pipelineID);
233+
}
234+
235+
@Override
236+
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
237+
throws IOException {
238+
storageContainerLocationClient.deactivatePipeline(pipelineID);
239+
}
240+
229241
@Override
230242
public void closePipeline(HddsProtos.PipelineID pipelineID)
231243
throws IOException {

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,22 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
180180
*/
181181
List<Pipeline> listPipelines() throws IOException;
182182

183+
/**
184+
* Activates the pipeline given a pipeline ID.
185+
*
186+
* @param pipelineID PipelineID to activate.
187+
* @throws IOException In case of exception while activating the pipeline
188+
*/
189+
void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
190+
191+
/**
192+
* Deactivates the pipeline given a pipeline ID.
193+
*
194+
* @param pipelineID PipelineID to deactivate.
195+
* @throws IOException In case of exception while deactivating the pipeline
196+
*/
197+
void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
198+
183199
/**
184200
* Closes the pipeline given a pipeline ID.
185201
*

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,14 +354,15 @@ public Pipeline build() {
354354
* Possible Pipeline states in SCM.
355355
*/
356356
public enum PipelineState {
357-
ALLOCATED, OPEN, CLOSED;
357+
ALLOCATED, OPEN, DORMANT, CLOSED;
358358

359359
public static PipelineState fromProtobuf(HddsProtos.PipelineState state)
360360
throws UnknownPipelineStateException {
361361
Preconditions.checkNotNull(state, "Pipeline state is null");
362362
switch (state) {
363363
case PIPELINE_ALLOCATED: return ALLOCATED;
364364
case PIPELINE_OPEN: return OPEN;
365+
case PIPELINE_DORMANT: return DORMANT;
365366
case PIPELINE_CLOSED: return CLOSED;
366367
default:
367368
throw new UnknownPipelineStateException(
@@ -375,6 +376,7 @@ public static HddsProtos.PipelineState getProtobuf(PipelineState state)
375376
switch (state) {
376377
case ALLOCATED: return HddsProtos.PipelineState.PIPELINE_ALLOCATED;
377378
case OPEN: return HddsProtos.PipelineState.PIPELINE_OPEN;
379+
case DORMANT: return HddsProtos.PipelineState.PIPELINE_DORMANT;
378380
case CLOSED: return HddsProtos.PipelineState.PIPELINE_CLOSED;
379381
default:
380382
throw new UnknownPipelineStateException(

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,22 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
146146
*/
147147
List<Pipeline> listPipelines() throws IOException;
148148

149+
/**
150+
* Activates a dormant pipeline.
151+
*
152+
* @param pipelineID ID of the pipeline to activate.
153+
* @throws IOException in case of any Exception
154+
*/
155+
void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
156+
157+
/**
158+
* Deactivates an active pipeline.
159+
*
160+
* @param pipelineID ID of the pipeline to deactivate.
161+
* @throws IOException in case of any Exception
162+
*/
163+
void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
164+
149165
/**
150166
* Closes a pipeline given the pipelineID.
151167
*

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.google.protobuf.RpcController;
2121
import com.google.protobuf.ServiceException;
2222
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
24+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
2325
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
2426
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
2527
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
@@ -339,6 +341,36 @@ public List<Pipeline> listPipelines() throws IOException {
339341
}
340342
}
341343

344+
@Override
345+
public void activatePipeline(HddsProtos.PipelineID pipelineID)
346+
throws IOException {
347+
try {
348+
ActivatePipelineRequestProto request =
349+
ActivatePipelineRequestProto.newBuilder()
350+
.setTraceID(TracingUtil.exportCurrentSpan())
351+
.setPipelineID(pipelineID)
352+
.build();
353+
rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request);
354+
} catch (ServiceException e) {
355+
throw ProtobufHelper.getRemoteException(e);
356+
}
357+
}
358+
359+
@Override
360+
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
361+
throws IOException {
362+
try {
363+
DeactivatePipelineRequestProto request =
364+
DeactivatePipelineRequestProto.newBuilder()
365+
.setTraceID(TracingUtil.exportCurrentSpan())
366+
.setPipelineID(pipelineID)
367+
.build();
368+
rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request);
369+
} catch (ServiceException e) {
370+
throw ProtobufHelper.getRemoteException(e);
371+
}
372+
}
373+
342374
@Override
343375
public void closePipeline(HddsProtos.PipelineID pipelineID)
344376
throws IOException {

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public enum SCMAction implements AuditAction {
3333
LIST_CONTAINER,
3434
LIST_PIPELINE,
3535
CLOSE_PIPELINE,
36+
ACTIVATE_PIPELINE,
37+
DEACTIVATE_PIPELINE,
3638
DELETE_CONTAINER,
3739
IN_SAFE_MODE,
3840
FORCE_EXIT_SAFE_MODE,

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@
4848
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
4949
import org.apache.hadoop.hdds.protocol.proto
5050
.StorageContainerLocationProtocolProtos;
51+
import org.apache.hadoop.hdds.protocol.proto
52+
.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
53+
import org.apache.hadoop.hdds.protocol.proto
54+
.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
55+
import org.apache.hadoop.hdds.protocol.proto
56+
.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
57+
import org.apache.hadoop.hdds.protocol.proto
58+
.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
5159
import org.apache.hadoop.hdds.protocol.proto
5260
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
5361
import org.apache.hadoop.hdds.protocol.proto
@@ -257,6 +265,32 @@ public ListPipelineResponseProto listPipelines(
257265
}
258266
}
259267

268+
@Override
269+
public ActivatePipelineResponseProto activatePipeline(
270+
RpcController controller, ActivatePipelineRequestProto request)
271+
throws ServiceException {
272+
try (Scope ignored = TracingUtil
273+
.importAndCreateScope("activatePipeline", request.getTraceID())) {
274+
impl.activatePipeline(request.getPipelineID());
275+
return ActivatePipelineResponseProto.newBuilder().build();
276+
} catch (IOException e) {
277+
throw new ServiceException(e);
278+
}
279+
}
280+
281+
@Override
282+
public DeactivatePipelineResponseProto deactivatePipeline(
283+
RpcController controller, DeactivatePipelineRequestProto request)
284+
throws ServiceException {
285+
try (Scope ignored = TracingUtil
286+
.importAndCreateScope("deactivatePipeline", request.getTraceID())) {
287+
impl.deactivatePipeline(request.getPipelineID());
288+
return DeactivatePipelineResponseProto.newBuilder().build();
289+
} catch (IOException e) {
290+
throw new ServiceException(e);
291+
}
292+
}
293+
260294
@Override
261295
public ClosePipelineResponseProto closePipeline(
262296
RpcController controller, ClosePipelineRequestProto request)

hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,22 @@ message ListPipelineResponseProto {
167167
repeated Pipeline pipelines = 1;
168168
}
169169

170+
message ActivatePipelineRequestProto {
171+
required PipelineID pipelineID = 1;
172+
optional string traceID = 2;
173+
}
174+
175+
message ActivatePipelineResponseProto {
176+
}
177+
178+
message DeactivatePipelineRequestProto {
179+
required PipelineID pipelineID = 1;
180+
optional string traceID = 2;
181+
}
182+
183+
message DeactivatePipelineResponseProto {
184+
}
185+
170186
message ClosePipelineRequestProto {
171187
required PipelineID pipelineID = 1;
172188
optional string traceID = 2;
@@ -274,6 +290,12 @@ service StorageContainerLocationProtocolService {
274290
rpc listPipelines(ListPipelineRequestProto)
275291
returns (ListPipelineResponseProto);
276292

293+
rpc activatePipeline(ActivatePipelineRequestProto)
294+
returns (ActivatePipelineResponseProto);
295+
296+
rpc deactivatePipeline(DeactivatePipelineRequestProto)
297+
returns (DeactivatePipelineResponseProto);
298+
277299
/**
278300
* Closes a pipeline.
279301
*/

hadoop-hdds/common/src/main/proto/hdds.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ message PipelineID {
6262
enum PipelineState {
6363
PIPELINE_ALLOCATED = 1;
6464
PIPELINE_OPEN = 2;
65-
PIPELINE_CLOSED = 3;
65+
PIPELINE_DORMANT = 3;
66+
PIPELINE_CLOSED = 4;
6667
}
6768

6869
message Pipeline {

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,21 @@ void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
7777
void triggerPipelineCreation();
7878

7979
void incNumBlocksAllocatedMetric(PipelineID id);
80+
81+
/**
82+
* Activates a dormant pipeline.
83+
*
84+
* @param pipelineID ID of the pipeline to activate.
85+
* @throws IOException in case of any Exception
86+
*/
87+
void activatePipeline(PipelineID pipelineID) throws IOException;
88+
89+
/**
90+
* Deactivates an active pipeline.
91+
*
92+
* @param pipelineID ID of the pipeline to deactivate.
93+
* @throws IOException in case of any Exception
94+
*/
95+
void deactivatePipeline(PipelineID pipelineID) throws IOException;
96+
8097
}

0 commit comments

Comments
 (0)