Skip to content

Commit e1fc545

Browse files
committed
etcdserver: process the scenaro of the last WAL record being partially synced to disk
We need to return io.ErrUnexpectedEOF in the error chain, so that etcdserver can repair it automatically. Backport #15068 Signed-off-by: Benjamin Wang <[email protected]>
1 parent 9e3966f commit e1fc545

File tree

5 files changed

+91
-12
lines changed

5 files changed

+91
-12
lines changed

server/etcdserver/storage.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package etcdserver
1616

1717
import (
18+
"errors"
1819
"io"
1920

2021
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
@@ -100,7 +101,7 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b
100101
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
101102
w.Close()
102103
// we can only repair ErrUnexpectedEOF and we never repair twice.
103-
if repaired || err != io.ErrUnexpectedEOF {
104+
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
104105
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
105106
}
106107
if !wal.Repair(lg, waldir) {

server/wal/decoder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
8484
// The length of current WAL entry must be less than the remaining file size.
8585
maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes
8686
if recBytes > maxEntryLimit {
87-
return fmt.Errorf("wal: max entry size limit exceeded, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
88-
recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
87+
return fmt.Errorf("%w: [wal] max entry size limit exceeded when decoding %q, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
88+
io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
8989
}
9090

9191
data := make([]byte, recBytes+padBytes)

server/wal/repair.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package wal
1616

1717
import (
18+
"errors"
1819
"io"
1920
"os"
2021
"path/filepath"
@@ -44,8 +45,8 @@ func Repair(lg *zap.Logger, dirpath string) bool {
4445
for {
4546
lastOffset := decoder.lastOffset()
4647
err := decoder.decode(rec)
47-
switch err {
48-
case nil:
48+
switch {
49+
case err == nil:
4950
// update crc of the decoder when necessary
5051
switch rec.Type {
5152
case crcType:
@@ -59,11 +60,11 @@ func Repair(lg *zap.Logger, dirpath string) bool {
5960
}
6061
continue
6162

62-
case io.EOF:
63+
case errors.Is(err, io.EOF):
6364
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
6465
return true
6566

66-
case io.ErrUnexpectedEOF:
67+
case errors.Is(err, io.ErrUnexpectedEOF):
6768
bf, bferr := os.Create(f.Name() + ".broken")
6869
if bferr != nil {
6970
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))

server/wal/wal.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -500,13 +500,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
500500
// We do not have to read out all entries in read mode.
501501
// The last record maybe a partial written one, so
502502
// ErrunexpectedEOF might be returned.
503-
if err != io.EOF && err != io.ErrUnexpectedEOF {
503+
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
504504
state.Reset()
505505
return nil, state, nil, err
506506
}
507507
default:
508-
// We must read all of the entries if WAL is opened in write mode.
509-
if err != io.EOF {
508+
// We must read all the entries if WAL is opened in write mode.
509+
if !errors.Is(err, io.EOF) {
510510
state.Reset()
511511
return nil, state, nil, err
512512
}
@@ -598,7 +598,7 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
598598
}
599599
// We do not have to read out all the WAL entries
600600
// as the decoder is opened in read mode.
601-
if err != io.EOF && err != io.ErrUnexpectedEOF {
601+
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
602602
return nil, err
603603
}
604604

@@ -688,7 +688,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
688688

689689
// We do not have to read out all the WAL entries
690690
// as the decoder is opened in read mode.
691-
if err != io.EOF && err != io.ErrUnexpectedEOF {
691+
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
692692
return nil, err
693693
}
694694

server/wal/wal_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package wal
1717
import (
1818
"bytes"
1919
"fmt"
20+
"github.com/stretchr/testify/require"
2021
"io"
2122
"io/ioutil"
2223
"math"
@@ -1155,3 +1156,79 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
11551156
t.Fatal(err)
11561157
}
11571158
}
1159+
1160+
func TestLastRecordLengthExceedFileEnd(t *testing.T) {
1161+
/* The data below was generated by code something like below. The length
1162+
* of the last record was intentionally changed to 1000 in order to make
1163+
* sure it exceeds the end of the file.
1164+
*
1165+
* for i := 0; i < 3; i++ {
1166+
* es := []raftpb.Entry{{Index: uint64(i + 1), Data: []byte(fmt.Sprintf("waldata%d", i+1))}}
1167+
* if err = w.Save(raftpb.HardState{}, es); err != nil {
1168+
* t.Fatal(err)
1169+
* }
1170+
* }
1171+
* ......
1172+
* var sb strings.Builder
1173+
* for _, ch := range buf {
1174+
* sb.WriteString(fmt.Sprintf("\\x%02x", ch))
1175+
* }
1176+
*/
1177+
// Generate WAL file
1178+
t.Log("Generate a WAL file with the last record's length modified.")
1179+
data := []byte("\x04\x00\x00\x00\x00\x00\x00\x84\x08\x04\x10\x00\x00" +
1180+
"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x84\x08\x01\x10\x00\x00" +
1181+
"\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x82\x08\x05\x10\xa0\xb3" +
1182+
"\x9b\x8f\x08\x1a\x04\x08\x00\x10\x00\x00\x00\x1a\x00\x00\x00\x00" +
1183+
"\x00\x00\x86\x08\x02\x10\xba\x8b\xdc\x85\x0f\x1a\x10\x08\x00\x10" +
1184+
"\x00\x18\x01\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x31\x00\x00\x00" +
1185+
"\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x86\x08\x02\x10\xa1\xe8" +
1186+
"\xff\x9c\x02\x1a\x10\x08\x00\x10\x00\x18\x02\x22\x08\x77\x61\x6c" +
1187+
"\x64\x61\x74\x61\x32\x00\x00\x00\x00\x00\x00\xe8\x03\x00\x00\x00" +
1188+
"\x00\x00\x86\x08\x02\x10\xa1\x9c\xa1\xaa\x04\x1a\x10\x08\x00\x10" +
1189+
"\x00\x18\x03\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x33\x00\x00\x00" +
1190+
"\x00\x00\x00")
1191+
1192+
buf := bytes.NewBuffer(data)
1193+
f, err := createFileWithData(t, buf)
1194+
fileName := f.Name()
1195+
require.NoError(t, err)
1196+
t.Logf("fileName: %v", fileName)
1197+
1198+
// Verify low-level decoder directly
1199+
t.Log("Verify all records can be parsed correctly.")
1200+
rec := &walpb.Record{}
1201+
decoder := newDecoder(fileutil.NewFileReader(f))
1202+
for {
1203+
if err = decoder.decode(rec); err != nil {
1204+
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
1205+
break
1206+
}
1207+
if rec.Type == entryType {
1208+
e := mustUnmarshalEntry(rec.Data)
1209+
t.Logf("Validating normal entry: %v", e)
1210+
recData := fmt.Sprintf("waldata%d", e.Index)
1211+
require.Equal(t, raftpb.EntryNormal, e.Type)
1212+
require.Equal(t, recData, string(e.Data))
1213+
}
1214+
rec = &walpb.Record{}
1215+
}
1216+
require.NoError(t, f.Close())
1217+
1218+
// Verify w.ReadAll() returns io.ErrUnexpectedEOF in the error chain.
1219+
t.Log("Verify the w.ReadAll returns io.ErrUnexpectedEOF in the error chain")
1220+
newFileName := filepath.Join(filepath.Dir(fileName), "0000000000000000-0000000000000000.wal")
1221+
require.NoError(t, os.Rename(fileName, newFileName))
1222+
1223+
w, err := Open(zaptest.NewLogger(t), filepath.Dir(fileName), walpb.Snapshot{
1224+
Index: 0,
1225+
Term: 0,
1226+
})
1227+
require.NoError(t, err)
1228+
defer w.Close()
1229+
1230+
_, _, _, err = w.ReadAll()
1231+
// Note: The wal file will be repaired automatically in production
1232+
// environment, but only once.
1233+
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
1234+
}

0 commit comments

Comments
 (0)