Fix a bug in upstream CreateDelta that could ignore trailing data if a matching hash is not found for it

This commit is contained in:
Kovid Goyal
2023-07-03 17:47:33 +05:30
parent 82721ef053
commit 79fd0b19cf
2 changed files with 34 additions and 20 deletions

View File

@@ -267,11 +267,10 @@ func (r *RSync) CreateDelta(source io.Reader, signature []BlockHash, ops Operati
// Send the last operation if there is one waiting.
defer func() {
if prevOp == nil {
return
if prevOp != nil {
err = ops(*prevOp)
prevOp = nil
}
err = ops(*prevOp)
prevOp = nil
}()
// Combine OpBlock into OpBlockRange. To do this store the previous
@@ -320,6 +319,14 @@ func (r *RSync) CreateDelta(source io.Reader, signature []BlockHash, ops Operati
return
}
send_data := func() error {
if err := enqueue(Operation{Type: OpData, Data: buffer[data.tail:data.head]}); err != nil {
return err
}
data.tail = data.head
return nil
}
for !lastRun {
// Determine if the buffer should be extended.
if sum.tail+r.BlockSize > validTo {
@@ -327,8 +334,7 @@ func (r *RSync) CreateDelta(source io.Reader, signature []BlockHash, ops Operati
if validTo+r.BlockSize > len(buffer) {
// Before wrapping the buffer, send any trailing data off.
if data.tail < data.head {
err = enqueue(Operation{Type: OpData, Data: buffer[data.tail:data.head]})
if err != nil {
if err = send_data(); err != nil {
return err
}
}
@@ -354,6 +360,11 @@ func (r *RSync) CreateDelta(source io.Reader, signature []BlockHash, ops Operati
data.head = validTo
}
if n == 0 {
if data.tail < data.head {
if err = send_data(); err != nil {
return err
}
}
break
}
}
@@ -382,11 +393,9 @@ func (r *RSync) CreateDelta(source io.Reader, signature []BlockHash, ops Operati
// must be flushed first), or the data chunk size has reached it's maximum size (for buffer
// allocation purposes) or to flush the end of the data.
if data.tail < data.head && (foundHash || data.head-data.tail >= r.MaxDataOp || lastRun) {
err = enqueue(Operation{Type: OpData, Data: buffer[data.tail:data.head]})
if err != nil {
if err = send_data(); err != nil {
return err
}
data.tail = data.head
}
if foundHash {

View File

@@ -54,17 +54,22 @@ func TestRsyncRoundtrip(t *testing.T) {
signature = append(signature, s)
return nil
})
delta_ops := make([]Operation, 0, 128)
p.rsync.CreateDelta(bytes.NewReader(src_data), signature, func(op Operation) error {
op.Data = slices.Clone(op.Data)
delta_ops = append(delta_ops, op)
return nil
})
outputbuf := bytes.Buffer{}
for _, op := range delta_ops {
p.rsync.ApplyDelta(&outputbuf, bytes.NewReader(src_data), op)
apply_delta := func(signature []BlockHash) []byte {
delta_ops := make([]Operation, 0, 1024)
p.rsync.CreateDelta(bytes.NewReader(src_data), signature, func(op Operation) error {
op.Data = slices.Clone(op.Data)
delta_ops = append(delta_ops, op)
return nil
})
outputbuf := bytes.Buffer{}
for _, op := range delta_ops {
p.rsync.ApplyDelta(&outputbuf, bytes.NewReader(src_data), op)
}
return outputbuf.Bytes()
}
test_equal(src_data, outputbuf.Bytes())
test_equal(src_data, apply_delta(nil))
test_equal(src_data, apply_delta(signature))
// Now try with serialization
p = NewPatcher(int64(len(src_data)))
@@ -80,7 +85,7 @@ func TestRsyncRoundtrip(t *testing.T) {
if err := d.CreateDelta(bytes.NewReader(src_data), func(b []byte) error { _, err := deltabuf.Write(b); return err }); err != nil {
t.Fatal(err)
}
outputbuf = bytes.Buffer{}
outputbuf := bytes.Buffer{}
p.StartDelta(&outputbuf, bytes.NewReader(src_data))
b := make([]byte, 30*1024)
for {