diff --git a/kittens/transfer/algorithm.c b/kittens/transfer/algorithm.c index ba352da39..ad3612057 100644 --- a/kittens/transfer/algorithm.c +++ b/kittens/transfer/algorithm.c @@ -23,6 +23,7 @@ typedef bool(*reset_hash_t)(void*); typedef bool(*update_hash_t)(void*, const void *input, size_t length); typedef void(*digest_hash_t)(const void*, void *output); typedef uint64_t(*digest_hash64_t)(const void*); +typedef uint64_t(*oneshot_hash64_t)(const void*, size_t); typedef struct hasher_t { size_t hash_size, block_size; @@ -33,6 +34,7 @@ typedef struct hasher_t { update_hash_t update; digest_hash_t digest; digest_hash64_t digest64; + oneshot_hash64_t oneshot64; } hasher_t; static void xxh64_delete(void* s) { XXH3_freeState(s); } @@ -40,6 +42,7 @@ static bool xxh64_reset(void* s) { return XXH3_64bits_reset(s) == XXH_OK; } static void* xxh64_create(void) { void *ans = XXH3_createState(); if (ans != NULL) xxh64_reset(ans); return ans; } static bool xxh64_update(void* s, const void *input, size_t length) { return XXH3_64bits_update(s, input, length) == XXH_OK; } static uint64_t xxh64_digest64(const void* s) { return XXH3_64bits_digest(s); } +static uint64_t xxh64_oneshot64(const void* s, size_t len) { return XXH3_64bits(s, len); } static void xxh64_digest(const void* s, void *output) { XXH64_hash_t ans = XXH3_64bits_digest(s); XXH64_canonical_t c; @@ -51,7 +54,8 @@ static hasher_t xxh64_hasher(void) { hasher_t ans = { .hash_size=sizeof(XXH64_hash_t), .block_size = 64, - .new=xxh64_create, .delete=xxh64_delete, .reset=xxh64_reset, .update=xxh64_update, .digest=xxh64_digest, .digest64=xxh64_digest64 + .new=xxh64_create, .delete=xxh64_delete, .reset=xxh64_reset, .update=xxh64_update, .digest=xxh64_digest, + .digest64=xxh64_digest64, .oneshot64=xxh64_oneshot64 }; return ans; } @@ -360,10 +364,17 @@ apply_delta_data(Patcher *self, PyObject *args) { Py_RETURN_NONE; } +static PyObject* +finish_delta_data(Patcher *self, PyObject *args UNUSED) { + if (self->buf.len > 0) { PyErr_Format(RsyncError, "%zu bytes of unused delta data", self->buf.len); return NULL; } + Py_RETURN_NONE; +} + static PyMethodDef Patcher_methods[] = { METHODB(sign_block, METH_VARARGS), METHODB(signature_header, METH_O), - METHODB(apply_delta_data, METH_O), + METHODB(apply_delta_data, METH_VARARGS), + METHODB(finish_delta_data, METH_NOARGS), {NULL} /* Sentinel */ }; @@ -399,6 +410,12 @@ typedef struct Differ { bool signature_header_parsed; buffer buf; SignatureMap *signature_map; + + PyObject *read, *write; + bool written, finished; + struct { size_t pos, sz; } window, data; + Operation pending_op; bool has_pending; + uint8_t checksum[32]; } Differ; static int @@ -457,7 +474,7 @@ parse_signature_header(Differ *self) { static bool add_collision(SignatureMap *sm, Signature s) { if (sm->cap < sm->len + 1) { - size_t new_cap = MAX(sm->cap * 2, 8); + size_t new_cap = MAX(sm->cap * 2, 8u); sm->weak_hash_collisions = realloc(sm->weak_hash_collisions, new_cap); if (!sm->weak_hash_collisions) { PyErr_NoMemory(); return false; } sm->cap = new_cap; @@ -506,8 +523,212 @@ add_signature_data(Differ *self, PyObject *args) { Py_RETURN_NONE; } +static PyObject* +finish_signature_data(Differ *self, PyObject *args UNUSED) { + if (self->buf.len > 0) { PyErr_Format(RsyncError, "%zu bytes of unused delta data", self->buf.len); return NULL; } + self->buf.len = 0; + self->buf.cap = 8 * self->rsync.block_size; + self->buf.data = realloc(self->buf.data, self->buf.cap); + if (!self->buf.data) return PyErr_NoMemory(); + Py_RETURN_NONE; +} + +static bool +send_op(Differ *self, Operation *op) { + uint8_t metadata[32]; + size_t len; + metadata[0] = op->type; + switch (op->type) { + case OpBlock: + le64b(metadata + 1, op->block_index); + len = 9; + break; + case OpBlockRange: + le64b(metadata + 1, op->block_index); + le32b(metadata + 9, op->block_index_end - op->block_index); + len = 13; + break; + case OpHash: + le16b(metadata + 1, op->data.len); + memcpy(metadata + 3, op->data.buf, op->data.len); + len = 3 + op->data.len; + break; + case OpData: + le32b(metadata + 1, op->data.len); + len = 5; + break; + } + FREE_AFTER_FUNCTION PyObject *mv = PyMemoryView_FromMemory((char*)metadata, len, PyBUF_READ); + FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(self->write, mv, NULL); + if (ret == NULL) return false; + if (op->type == OpData) { + FREE_AFTER_FUNCTION PyObject *mv = PyMemoryView_FromMemory((char*)op->data.buf, op->data.len, PyBUF_READ); + FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(self->write, mv, NULL); + if (ret == NULL) return false; + } + self->written = true; + return true; +} + +static bool +send_pending(Differ *self) { + bool ret = true; + if (self->has_pending) { + ret = send_op(self, &self->pending_op); + self->has_pending = false; + } + return ret; +} + +static bool +send_data(Differ *self) { + if (self->data.sz > 0) { + if (!send_pending(self)) return false; + Operation op = {.type=OpData}; + op.data.buf = self->buf.data + self->data.pos; + op.data.len = self->data.sz; + self->data.pos += self->data.sz; + self->data.sz = 0; + return send_op(self, &op); + } + return true; +} + +static bool +ensure_idx_valid(Differ *self, size_t idx) { + if (idx < self->buf.len) return true; + if (idx >= self->buf.cap) { + // need to wrap the buffer, so send off any data present behind the window + if (!send_data(self)) return false; + // copy the window and any data present after it to the start of the buffer + size_t distance_from_window_pos = idx - self->window.pos; + size_t amt_to_copy = self->buf.len - self->window.pos; + memmove(self->buf.data, self->buf.data + self->window.pos, amt_to_copy); + self->buf.len = amt_to_copy; + self->window.pos = 0; + self->data.pos = 0; + return ensure_idx_valid(self, distance_from_window_pos); + } + FREE_AFTER_FUNCTION PyObject *mv = PyMemoryView_FromMemory((char*)self->buf.data + self->buf.len, self->buf.cap - self->buf.len, PyBUF_WRITE); + if (!mv) return false; + FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(self->read, mv, NULL); + if (!ret) return false; + if (!PyLong_Check(ret)) { PyErr_SetString(PyExc_TypeError, "read callback did not return an integer"); return false; } + size_t n = PyLong_AsSize_t(ret); + self->rsync.checksummer.update(self->rsync.checksummer.state, self->buf.data + self->buf.len, n); + self->buf.len += n; + return self->buf.len > idx; +} + +static bool +find_strong_hash(SignatureMap *sm, uint64_t q, uint64_t *block_index) { + if (sm->sig.strong_hash == q) { *block_index = sm->sig.index; return true; } + for (size_t i = 0; i < sm->len; i++) { + if (sm->weak_hash_collisions[i].strong_hash == q) { *block_index = sm->weak_hash_collisions[i].index; return true; } + } + return false; +} + +static bool +enqueue(Differ *self, Operation op) { + switch (op.type) { + case OpBlock: + if (self->has_pending) { + switch (self->pending_op.type) { + case OpBlock: + if (self->pending_op.block_index+1 == op.block_index) { + self->pending_op.type = OpBlockRange; + self->pending_op.block_index_end = op.block_index; + return true; + } + break; + case OpBlockRange: + if (self->pending_op.block_index+1 == op.block_index) { + self->pending_op.block_index_end = op.block_index; + return true; + } + case OpHash: case OpData: break; + } + if (!send_pending(self)) return false; + } + self->pending_op = op; + self->has_pending = true; + return true; + case OpHash: + if (!send_pending(self)) return false; + return send_op(self, &op); + case OpBlockRange: case OpData: + PyErr_SetString(RsyncError, "enqueue() must never be called with anything other than OpHash and OpBlock"); + return false; + } + return false; +} + +static bool +finish_up(Differ *self) { + if (!send_data(self)) return false; + self->data.pos = self->window.pos; + self->data.sz = self->buf.len - self->window.pos; + if (!send_data(self)) return false; + self->rsync.checksummer.digest(self->rsync.checksummer.state, self->checksum); + Operation op = {.type=OpHash}; + op.data.buf = self->checksum; op.data.len = self->rsync.checksummer.hash_size; + if (!enqueue(self, op)) return false; + self->finished = true; + return true; +} + +static bool +read_next(Differ *self) { + if (self->window.sz > 0) { + if (!ensure_idx_valid(self, self->window.pos + self->window.sz)) { + if (PyErr_Occurred()) return false; + return finish_up(self); + } + self->window.pos++; + self->data.sz++; + rolling_checksum_add_one_byte(&self->rc, self->buf.data[self->window.pos], self->buf.data[self->window.pos + self->window.sz - 1]); + } else { + if (!ensure_idx_valid(self, self->window.pos + self->rsync.block_size - 1)) { + if (PyErr_Occurred()) return false; + return finish_up(self); + } + self->window.sz = self->rsync.block_size; + rolling_checksum_full(&self->rc, self->buf.data + self->window.pos, self->window.sz); + } + SignatureMap *sm; + int weak_hash = self->rc.val; + uint64_t block_index = 0; + HASH_FIND_INT(self->signature_map, &weak_hash, sm); + if (sm != NULL && find_strong_hash(sm, self->rsync.hasher.oneshot64(self->buf.data + self->window.pos, self->window.sz), &block_index)) { + if (!enqueue(self, (Operation){.type=OpBlock, .block_index=block_index})) return false; + self->window.pos += self->window.sz; + self->data.pos = self->window.pos; + self->window.sz = 0; + } + return true; +} + +static PyObject* +next_op(Differ *self, PyObject *args) { + if (!PyArg_ParseTuple(args, "OO", &self->read, &self->write)) return NULL; + self->written = false; + while (!self->written && !self->finished) { + if (!read_next(self)) break; + } + if (self->finished && !PyErr_Occurred()) { + send_pending(self); + } + self->read = NULL; self->write = NULL; + if (PyErr_Occurred()) return NULL; + if (self->finished) { Py_RETURN_FALSE; } + Py_RETURN_TRUE; +} + static PyMethodDef Differ_methods[] = { METHODB(add_signature_data, METH_VARARGS), + METHODB(finish_signature_data, METH_NOARGS), + METHODB(next_op, METH_VARARGS), {NULL} /* Sentinel */ }; diff --git a/kittens/transfer/rsync.pyi b/kittens/transfer/rsync.pyi index a5b943fa5..8873f7b5a 100644 --- a/kittens/transfer/rsync.pyi +++ b/kittens/transfer/rsync.pyi @@ -27,3 +27,4 @@ class Patcher: def signature_header(self, output: WriteBuffer) -> None: ... def sign_block(self, block: ReadOnlyBuffer, output: WriteBuffer) -> None: ... def apply_delta_data(self, data: ReadOnlyBuffer, read: Callable[[int, WriteBuffer], int], write: Callable[[ReadOnlyBuffer], None]) -> None: ... + def finish_delta_data(self) -> None: ... diff --git a/tools/rsync/algorithm.go b/tools/rsync/algorithm.go index 5f5cba2a8..4d309d50e 100644 --- a/tools/rsync/algorithm.go +++ b/tools/rsync/algorithm.go @@ -465,16 +465,13 @@ func (self *diff) send_data() error { func (self *diff) pump_till_op_written() error { self.written = false for !self.finished && !self.written { - if err := self.read_at_least_one_operation(); err != nil { + if err := self.read_next(); err != nil { return err } } if self.finished { - if self.pending_op != nil { - if err := self.send_op(self.pending_op); err != nil { - return err - } - self.pending_op = nil + if err := self.send_pending(); err != nil { + return err } return io.EOF } @@ -531,7 +528,7 @@ func (self *diff) finish_up() (err error) { } // See https://rsync.samba.org/tech_report/node4.html for the design of this algorithm -func (self *diff) read_at_least_one_operation() (err error) { +func (self *diff) read_next() (err error) { if self.window.sz > 0 { if ok, err := self.ensure_idx_valid(self.window.pos + self.window.sz); !ok { if err != nil {