From 506f2d59f8469b3367e5cd19adbc074b7c0c5198 Mon Sep 17 00:00:00 2001 From: liketic Date: Sun, 13 Jan 2019 11:47:31 +0800 Subject: [PATCH] Migrate Streamable to Writeable for WatchStatus --- .../actions/ack/AckWatchResponse.java | 2 +- .../activate/ActivateWatchResponse.java | 2 +- .../actions/get/GetWatchResponse.java | 2 +- .../xpack/core/watcher/watch/WatchStatus.java | 51 +++++++++---------- .../xpack/watcher/watch/WatchStatusTests.java | 2 +- 5 files changed, 27 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/ack/AckWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/ack/AckWatchResponse.java index 188c49963151f..6d12bb9d5f811 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/ack/AckWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/ack/AckWatchResponse.java @@ -38,7 +38,7 @@ public WatchStatus getStatus() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - status = in.readBoolean() ? WatchStatus.read(in) : null; + status = in.readBoolean() ? new WatchStatus(in) : null; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/activate/ActivateWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/activate/ActivateWatchResponse.java index 0c92fc046722a..bf43bbbe3c54d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/activate/ActivateWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/activate/ActivateWatchResponse.java @@ -38,7 +38,7 @@ public WatchStatus getStatus() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - status = in.readBoolean() ? WatchStatus.read(in) : null; + status = in.readBoolean() ? new WatchStatus(in) : null; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java index d92ae1dcc4626..1ae29c3db065a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java @@ -77,7 +77,7 @@ public void readFrom(StreamInput in) throws IOException { id = in.readString(); found = in.readBoolean(); if (found) { - status = WatchStatus.read(in); + status = new WatchStatus(in); source = XContentSource.readFrom(in); version = in.readZLong(); } else { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchStatus.java index 93e713bb8844e..9026c399cff89 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchStatus.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -36,7 +37,7 @@ import static org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils.writeOptionalDate; import static org.joda.time.DateTimeZone.UTC; -public class WatchStatus implements ToXContentObject, Streamable { +public class WatchStatus implements ToXContentObject, Streamable, Writeable { public static final String INCLUDE_STATE = "include_state"; @@ -49,8 +50,26 @@ public class WatchStatus implements ToXContentObject, Streamable { @Nullable private Map headers; private Map actions; - // for serialization - private WatchStatus() { + public WatchStatus(StreamInput in) throws IOException { + version = in.readLong(); + lastChecked = readOptionalDate(in, UTC); + lastMetCondition = readOptionalDate(in, UTC); + int count = in.readInt(); + Map actions = new HashMap<>(count); + for (int i = 0; i < count; i++) { + actions.put(in.readString(), ActionStatus.readFrom(in)); + } + this.actions = unmodifiableMap(actions); + state = new State(in.readBoolean(), readDate(in, UTC)); + boolean executionStateExists = in.readBoolean(); + if (executionStateExists) { + executionState = ExecutionState.resolve(in.readString()); + } + if (in.readBoolean()) { + headers = in.readMap(StreamInput::readString, StreamInput::readString); + } else { + headers = Collections.emptyMap(); + } } public WatchStatus(DateTime now, Map actions) { @@ -218,31 +237,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { - version = in.readLong(); - lastChecked = readOptionalDate(in, UTC); - lastMetCondition = readOptionalDate(in, UTC); - int count = in.readInt(); - Map actions = new HashMap<>(count); - for (int i = 0; i < count; i++) { - actions.put(in.readString(), ActionStatus.readFrom(in)); - } - this.actions = unmodifiableMap(actions); - state = new State(in.readBoolean(), readDate(in, UTC)); - boolean executionStateExists = in.readBoolean(); - if (executionStateExists) { - executionState = ExecutionState.resolve(in.readString()); - } - if (in.readBoolean()) { - headers = in.readMap(StreamInput::readString, StreamInput::readString); - } else { - headers = Collections.emptyMap(); - } - } - - public static WatchStatus read(StreamInput in) throws IOException { - WatchStatus status = new WatchStatus(); - status.readFrom(in); - return status; + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStatusTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStatusTests.java index 1a8263c0c33c7..d60a4537f5bc6 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStatusTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStatusTests.java @@ -87,7 +87,7 @@ public void testHeadersSerialization() throws IOException { BytesStreamOutput out = new BytesStreamOutput(); status.writeTo(out); BytesReference bytesReference = out.bytes(); - WatchStatus readStatus = WatchStatus.read(bytesReference.streamInput()); + WatchStatus readStatus = new WatchStatus(bytesReference.streamInput()); assertThat(readStatus, is(status)); assertThat(readStatus.getHeaders(), is(headers));