Skip to content

Commit

Permalink
Merge pull request #2978 from xiang90/fix_backup
Browse files Browse the repository at this point in the history
*:fix point-in-time backup
  • Loading branch information
xiang90 committed Jun 15, 2015
2 parents b69d52e + f59da0e commit e20b487
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 42 deletions.
2 changes: 1 addition & 1 deletion etcdctl/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func handleBackup(c *cli.Context) {
}
}

w, err := wal.OpenNotInUse(srcWAL, walsnap)
w, err := wal.OpenForRead(srcWAL, walsnap)
if err != nil {
log.Fatal(err)
}
Expand Down
86 changes: 54 additions & 32 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, true)
}

// OpenNotInUse only opens the wal files that are not in use.
// Other than that, it is similar to Open.
func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) {
// OpenForRead only opens the wal files for read.
// Write on a read only wal panics.
func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, false)
}

func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, err := fileutil.ReadDir(dirpath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,45 +172,49 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
}
err = l.TryLock()
if err != nil {
if all {
if write {
return nil, err
} else {
plog.Warningf("opened all the files until %s, since it is still in use by an etcd server", name)
break
}
}
rcs = append(rcs, f)
ls = append(ls, l)
}
rc := MultiReadCloser(rcs...)

// open the lastest wal file for appending
seq, _, err := parseWalName(names[len(names)-1])
if err != nil {
rc.Close()
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])
f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
if err != nil {
rc.Close()
return nil, err
}

// create a WAL ready for reading
w := &WAL{
dir: dirpath,
start: snap,
decoder: newDecoder(rc),
locks: ls,
}

if write {
// open the lastest wal file for appending
seq, _, err := parseWalName(names[len(names)-1])
if err != nil {
rc.Close()
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])

f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
if err != nil {
rc.Close()
return nil, err
}

f: f,
seq: seq,
locks: ls,
w.f = f
w.seq = seq
}

return w, nil
}

// ReadAll reads out all records of the current WAL.
// ReadAll reads out records of the current WAL.
// If opened in write mode, it must read out all records until EOF. Or an error
// will be returned.
// If opened in read mode, it will try to read all records if possible.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If loaded snap doesn't match with the expected one, it will return
// all the records and error ErrSnapshotMismatch.
Expand Down Expand Up @@ -265,10 +269,24 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
}
}
if err != io.EOF {
state.Reset()
return nil, state, nil, err

switch w.f {
case nil:
// We do not have to read out all entries in read mode.
// The last record maybe a partial written one, so
// ErrunexpectedEOF might be returned.
if err != io.EOF && err != io.ErrUnexpectedEOF {
state.Reset()
return nil, state, nil, err
}
default:
// We must read all of the entries if WAL is opened in write mode.
if err != io.EOF {
state.Reset()
return nil, state, nil, err
}
}

err = nil
if !match {
err = ErrSnapshotNotFound
Expand All @@ -279,10 +297,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
w.start = walpb.Snapshot{}

w.metadata = metadata
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.f, w.decoder.lastCRC())
w.decoder = nil
lastIndexSaved.Set(float64(w.enti))

if w.f != nil {
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.f, w.decoder.lastCRC())
w.decoder = nil
lastIndexSaved.Set(float64(w.enti))
}

return metadata, state, ents, err
}

Expand Down
17 changes: 8 additions & 9 deletions wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,11 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
w.Close()
}

// TestOpenNotInUse tests that OpenNotInUse can load all files that are
// not in use at that point.
// TestOpenForRead tests that OpenForRead can load all files.
// The tests creates WAL directory, and cut out multiple WAL files. Then
// it releases the lock of part of data, and excepts that OpenNotInUse
// can read out all unlocked data.
func TestOpenNotInUse(t *testing.T) {
// it releases the lock of part of data, and excepts that OpenForRead
// can read out all files even if some are locked for write.
func TestOpenForRead(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
Expand All @@ -435,8 +434,8 @@ func TestOpenNotInUse(t *testing.T) {
unlockIndex := uint64(5)
w.ReleaseLockTo(unlockIndex)

// 1,2,3 are avaliable.
w2, err := OpenNotInUse(p, walpb.Snapshot{})
// All are avaliable for read
w2, err := OpenForRead(p, walpb.Snapshot{})
defer w2.Close()
if err != nil {
t.Fatal(err)
Expand All @@ -445,8 +444,8 @@ func TestOpenNotInUse(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := ents[len(ents)-1].Index; g != unlockIndex-2 {
t.Errorf("last index read = %d, want %d", g, unlockIndex-2)
if g := ents[len(ents)-1].Index; g != 9 {
t.Errorf("last index read = %d, want %d", g, 9)
}
}

Expand Down

0 comments on commit e20b487

Please sign in to comment.