Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public WatchStatus getStatus() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
status = in.readBoolean() ? WatchStatus.read(in) : null;
status = in.readBoolean() ? new WatchStatus(in) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public WatchStatus getStatus() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
status = in.readBoolean() ? WatchStatus.read(in) : null;
status = in.readBoolean() ? new WatchStatus(in) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void readFrom(StreamInput in) throws IOException {
id = in.readString();
found = in.readBoolean();
if (found) {
status = WatchStatus.read(in);
status = new WatchStatus(in);
source = XContentSource.readFrom(in);
version = in.readZLong();
seqNo = in.readZLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -36,7 +37,7 @@
import static org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils.writeDate;
import static org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils.writeOptionalDate;

public class WatchStatus implements ToXContentObject, Streamable {
public class WatchStatus implements ToXContentObject, Streamable, Writeable {

public static final String INCLUDE_STATE = "include_state";

Expand All @@ -49,8 +50,26 @@ public class WatchStatus implements ToXContentObject, Streamable {
@Nullable private Map<String, String> headers;
private Map<String, ActionStatus> actions;

// for serialization
private WatchStatus() {
public WatchStatus(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
int count = in.readInt();
Map<String, ActionStatus> actions = new HashMap<>(count);
for (int i = 0; i < count; i++) {
actions.put(in.readString(), ActionStatus.readFrom(in));
}
this.actions = unmodifiableMap(actions);
state = new State(in.readBoolean(), Instant.ofEpochMilli(in.readLong()).atZone(ZoneOffset.UTC));
boolean executionStateExists = in.readBoolean();
if (executionStateExists) {
executionState = ExecutionState.resolve(in.readString());
}
if (in.readBoolean()) {
headers = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
headers = Collections.emptyMap();
}
}

public WatchStatus(ZonedDateTime now, Map<String, ActionStatus> actions) {
Expand Down Expand Up @@ -222,31 +241,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
int count = in.readInt();
Map<String, ActionStatus> actions = new HashMap<>(count);
for (int i = 0; i < count; i++) {
actions.put(in.readString(), ActionStatus.readFrom(in));
}
this.actions = unmodifiableMap(actions);
state = new State(in.readBoolean(), Instant.ofEpochMilli(in.readLong()).atZone(ZoneOffset.UTC));
boolean executionStateExists = in.readBoolean();
if (executionStateExists) {
executionState = ExecutionState.resolve(in.readString());
}
if (in.readBoolean()) {
headers = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
headers = Collections.emptyMap();
}
}

public static WatchStatus read(StreamInput in) throws IOException {
WatchStatus status = new WatchStatus();
status.readFrom(in);
return status;
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testHeadersSerialization() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
status.writeTo(out);
BytesReference bytesReference = out.bytes();
WatchStatus readStatus = WatchStatus.read(bytesReference.streamInput());
WatchStatus readStatus = new WatchStatus(bytesReference.streamInput());
assertThat(readStatus, is(status));
assertThat(readStatus.getHeaders(), is(headers));

Expand Down