Finish the port of the rsync algorithm to C

This commit is contained in:
Kovid Goyal
2023-07-18 18:45:24 +05:30
parent caa9c27554
commit b5498090df
3 changed files with 229 additions and 10 deletions

View File

@@ -23,6 +23,7 @@ typedef bool(*reset_hash_t)(void*);
typedef bool(*update_hash_t)(void*, const void *input, size_t length);
typedef void(*digest_hash_t)(const void*, void *output);
typedef uint64_t(*digest_hash64_t)(const void*);
typedef uint64_t(*oneshot_hash64_t)(const void*, size_t);
typedef struct hasher_t {
size_t hash_size, block_size;
@@ -33,6 +34,7 @@ typedef struct hasher_t {
update_hash_t update;
digest_hash_t digest;
digest_hash64_t digest64;
oneshot_hash64_t oneshot64;
} hasher_t;
static void xxh64_delete(void* s) { XXH3_freeState(s); }
@@ -40,6 +42,7 @@ static bool xxh64_reset(void* s) { return XXH3_64bits_reset(s) == XXH_OK; }
static void* xxh64_create(void) { void *ans = XXH3_createState(); if (ans != NULL) xxh64_reset(ans); return ans; }
static bool xxh64_update(void* s, const void *input, size_t length) { return XXH3_64bits_update(s, input, length) == XXH_OK; }
static uint64_t xxh64_digest64(const void* s) { return XXH3_64bits_digest(s); }
static uint64_t xxh64_oneshot64(const void* s, size_t len) { return XXH3_64bits(s, len); }
static void xxh64_digest(const void* s, void *output) {
XXH64_hash_t ans = XXH3_64bits_digest(s);
XXH64_canonical_t c;
@@ -51,7 +54,8 @@ static hasher_t
xxh64_hasher(void) {
hasher_t ans = {
.hash_size=sizeof(XXH64_hash_t), .block_size = 64,
.new=xxh64_create, .delete=xxh64_delete, .reset=xxh64_reset, .update=xxh64_update, .digest=xxh64_digest, .digest64=xxh64_digest64
.new=xxh64_create, .delete=xxh64_delete, .reset=xxh64_reset, .update=xxh64_update, .digest=xxh64_digest,
.digest64=xxh64_digest64, .oneshot64=xxh64_oneshot64
};
return ans;
}
@@ -360,10 +364,17 @@ apply_delta_data(Patcher *self, PyObject *args) {
Py_RETURN_NONE;
}
static PyObject*
finish_delta_data(Patcher *self, PyObject *args UNUSED) {
if (self->buf.len > 0) { PyErr_Format(RsyncError, "%zu bytes of unused delta data", self->buf.len); return NULL; }
Py_RETURN_NONE;
}
static PyMethodDef Patcher_methods[] = {
METHODB(sign_block, METH_VARARGS),
METHODB(signature_header, METH_O),
METHODB(apply_delta_data, METH_O),
METHODB(apply_delta_data, METH_VARARGS),
METHODB(finish_delta_data, METH_NOARGS),
{NULL} /* Sentinel */
};
@@ -399,6 +410,12 @@ typedef struct Differ {
bool signature_header_parsed;
buffer buf;
SignatureMap *signature_map;
PyObject *read, *write;
bool written, finished;
struct { size_t pos, sz; } window, data;
Operation pending_op; bool has_pending;
uint8_t checksum[32];
} Differ;
static int
@@ -457,7 +474,7 @@ parse_signature_header(Differ *self) {
static bool
add_collision(SignatureMap *sm, Signature s) {
if (sm->cap < sm->len + 1) {
size_t new_cap = MAX(sm->cap * 2, 8);
size_t new_cap = MAX(sm->cap * 2, 8u);
sm->weak_hash_collisions = realloc(sm->weak_hash_collisions, new_cap);
if (!sm->weak_hash_collisions) { PyErr_NoMemory(); return false; }
sm->cap = new_cap;
@@ -506,8 +523,212 @@ add_signature_data(Differ *self, PyObject *args) {
Py_RETURN_NONE;
}
static PyObject*
finish_signature_data(Differ *self, PyObject *args UNUSED) {
if (self->buf.len > 0) { PyErr_Format(RsyncError, "%zu bytes of unused delta data", self->buf.len); return NULL; }
self->buf.len = 0;
self->buf.cap = 8 * self->rsync.block_size;
self->buf.data = realloc(self->buf.data, self->buf.cap);
if (!self->buf.data) return PyErr_NoMemory();
Py_RETURN_NONE;
}
static bool
send_op(Differ *self, Operation *op) {
uint8_t metadata[32];
size_t len;
metadata[0] = op->type;
switch (op->type) {
case OpBlock:
le64b(metadata + 1, op->block_index);
len = 9;
break;
case OpBlockRange:
le64b(metadata + 1, op->block_index);
le32b(metadata + 9, op->block_index_end - op->block_index);
len = 13;
break;
case OpHash:
le16b(metadata + 1, op->data.len);
memcpy(metadata + 3, op->data.buf, op->data.len);
len = 3 + op->data.len;
break;
case OpData:
le32b(metadata + 1, op->data.len);
len = 5;
break;
}
FREE_AFTER_FUNCTION PyObject *mv = PyMemoryView_FromMemory((char*)metadata, len, PyBUF_READ);
FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(self->write, mv, NULL);
if (ret == NULL) return false;
if (op->type == OpData) {
FREE_AFTER_FUNCTION PyObject *mv = PyMemoryView_FromMemory((char*)op->data.buf, op->data.len, PyBUF_READ);
FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(self->write, mv, NULL);
if (ret == NULL) return false;
}
self->written = true;
return true;
}
static bool
send_pending(Differ *self) {
bool ret = true;
if (self->has_pending) {
ret = send_op(self, &self->pending_op);
self->has_pending = false;
}
return ret;
}
static bool
send_data(Differ *self) {
if (self->data.sz > 0) {
if (!send_pending(self)) return false;
Operation op = {.type=OpData};
op.data.buf = self->buf.data + self->data.pos;
op.data.len = self->data.sz;
self->data.pos += self->data.sz;
self->data.sz = 0;
return send_op(self, &op);
}
return true;
}
static bool
ensure_idx_valid(Differ *self, size_t idx) {
if (idx < self->buf.len) return true;
if (idx >= self->buf.cap) {
// need to wrap the buffer, so send off any data present behind the window
if (!send_data(self)) return false;
// copy the window and any data present after it to the start of the buffer
size_t distance_from_window_pos = idx - self->window.pos;
size_t amt_to_copy = self->buf.len - self->window.pos;
memmove(self->buf.data, self->buf.data + self->window.pos, amt_to_copy);
self->buf.len = amt_to_copy;
self->window.pos = 0;
self->data.pos = 0;
return ensure_idx_valid(self, distance_from_window_pos);
}
FREE_AFTER_FUNCTION PyObject *mv = PyMemoryView_FromMemory((char*)self->buf.data + self->buf.len, self->buf.cap - self->buf.len, PyBUF_WRITE);
if (!mv) return false;
FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(self->read, mv, NULL);
if (!ret) return false;
if (!PyLong_Check(ret)) { PyErr_SetString(PyExc_TypeError, "read callback did not return an integer"); return false; }
size_t n = PyLong_AsSize_t(ret);
self->rsync.checksummer.update(self->rsync.checksummer.state, self->buf.data + self->buf.len, n);
self->buf.len += n;
return self->buf.len > idx;
}
static bool
find_strong_hash(SignatureMap *sm, uint64_t q, uint64_t *block_index) {
if (sm->sig.strong_hash == q) { *block_index = sm->sig.index; return true; }
for (size_t i = 0; i < sm->len; i++) {
if (sm->weak_hash_collisions[i].strong_hash == q) { *block_index = sm->weak_hash_collisions[i].index; return true; }
}
return false;
}
static bool
enqueue(Differ *self, Operation op) {
switch (op.type) {
case OpBlock:
if (self->has_pending) {
switch (self->pending_op.type) {
case OpBlock:
if (self->pending_op.block_index+1 == op.block_index) {
self->pending_op.type = OpBlockRange;
self->pending_op.block_index_end = op.block_index;
return true;
}
break;
case OpBlockRange:
if (self->pending_op.block_index+1 == op.block_index) {
self->pending_op.block_index_end = op.block_index;
return true;
}
case OpHash: case OpData: break;
}
if (!send_pending(self)) return false;
}
self->pending_op = op;
self->has_pending = true;
return true;
case OpHash:
if (!send_pending(self)) return false;
return send_op(self, &op);
case OpBlockRange: case OpData:
PyErr_SetString(RsyncError, "enqueue() must never be called with anything other than OpHash and OpBlock");
return false;
}
return false;
}
static bool
finish_up(Differ *self) {
if (!send_data(self)) return false;
self->data.pos = self->window.pos;
self->data.sz = self->buf.len - self->window.pos;
if (!send_data(self)) return false;
self->rsync.checksummer.digest(self->rsync.checksummer.state, self->checksum);
Operation op = {.type=OpHash};
op.data.buf = self->checksum; op.data.len = self->rsync.checksummer.hash_size;
if (!enqueue(self, op)) return false;
self->finished = true;
return true;
}
static bool
read_next(Differ *self) {
if (self->window.sz > 0) {
if (!ensure_idx_valid(self, self->window.pos + self->window.sz)) {
if (PyErr_Occurred()) return false;
return finish_up(self);
}
self->window.pos++;
self->data.sz++;
rolling_checksum_add_one_byte(&self->rc, self->buf.data[self->window.pos], self->buf.data[self->window.pos + self->window.sz - 1]);
} else {
if (!ensure_idx_valid(self, self->window.pos + self->rsync.block_size - 1)) {
if (PyErr_Occurred()) return false;
return finish_up(self);
}
self->window.sz = self->rsync.block_size;
rolling_checksum_full(&self->rc, self->buf.data + self->window.pos, self->window.sz);
}
SignatureMap *sm;
int weak_hash = self->rc.val;
uint64_t block_index = 0;
HASH_FIND_INT(self->signature_map, &weak_hash, sm);
if (sm != NULL && find_strong_hash(sm, self->rsync.hasher.oneshot64(self->buf.data + self->window.pos, self->window.sz), &block_index)) {
if (!enqueue(self, (Operation){.type=OpBlock, .block_index=block_index})) return false;
self->window.pos += self->window.sz;
self->data.pos = self->window.pos;
self->window.sz = 0;
}
return true;
}
static PyObject*
next_op(Differ *self, PyObject *args) {
if (!PyArg_ParseTuple(args, "OO", &self->read, &self->write)) return NULL;
self->written = false;
while (!self->written && !self->finished) {
if (!read_next(self)) break;
}
if (self->finished && !PyErr_Occurred()) {
send_pending(self);
}
self->read = NULL; self->write = NULL;
if (PyErr_Occurred()) return NULL;
if (self->finished) { Py_RETURN_FALSE; }
Py_RETURN_TRUE;
}
static PyMethodDef Differ_methods[] = {
METHODB(add_signature_data, METH_VARARGS),
METHODB(finish_signature_data, METH_NOARGS),
METHODB(next_op, METH_VARARGS),
{NULL} /* Sentinel */
};

View File

@@ -27,3 +27,4 @@ class Patcher:
def signature_header(self, output: WriteBuffer) -> None: ...
def sign_block(self, block: ReadOnlyBuffer, output: WriteBuffer) -> None: ...
def apply_delta_data(self, data: ReadOnlyBuffer, read: Callable[[int, WriteBuffer], int], write: Callable[[ReadOnlyBuffer], None]) -> None: ...
def finish_delta_data(self) -> None: ...

View File

@@ -465,16 +465,13 @@ func (self *diff) send_data() error {
func (self *diff) pump_till_op_written() error {
self.written = false
for !self.finished && !self.written {
if err := self.read_at_least_one_operation(); err != nil {
if err := self.read_next(); err != nil {
return err
}
}
if self.finished {
if self.pending_op != nil {
if err := self.send_op(self.pending_op); err != nil {
return err
}
self.pending_op = nil
if err := self.send_pending(); err != nil {
return err
}
return io.EOF
}
@@ -531,7 +528,7 @@ func (self *diff) finish_up() (err error) {
}
// See https://rsync.samba.org/tech_report/node4.html for the design of this algorithm
func (self *diff) read_at_least_one_operation() (err error) {
func (self *diff) read_next() (err error) {
if self.window.sz > 0 {
if ok, err := self.ensure_idx_valid(self.window.pos + self.window.sz); !ok {
if err != nil {