Add DnD protocol test framework and tests

Agent-Logs-Url: https://github.com/kovidgoyal/kitty/sessions/faa69cb9-991d-49f4-802b-263f4bb28ee9

Co-authored-by: kovidgoyal <1308621+kovidgoyal@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-01 07:22:47 +00:00
committed by GitHub
parent 4f7855aede
commit 02aec3fa3e
5 changed files with 675 additions and 7 deletions

View File

@@ -170,6 +170,8 @@ Detailed list of changes
- A new option :opt:`palette_generate` to automatically generate the 256 color palette from the first 16 colors (:pull:`9426`)
- Add a testing framework for the :doc:`drag-and-drop protocol <dnd-protocol>` and tests covering the full drop flow, error handling, data integrity, MIME type negotiation, chunked transfer, and malformed-command handling
- For builtin key mappings automatically :ref:`fallback <mapping-fallback>` to matching the US-PC layout key when the pressed key has no matches and is a non-English character (:pull:`9671`)
- Allow drag and drop of windows to re-arrange them, move them to another

View File

@@ -10,6 +10,16 @@
#include "control-codes.h"
#include "iqsort.h"
// In test mode, this callable is invoked instead of schedule_write_to_child_if_possible.
// It receives (window_id: int, data: bytes) and its return value is ignored.
static PyObject *g_dnd_test_write_func = NULL;
void
dnd_set_test_write_func(PyObject *func) {
Py_XDECREF(g_dnd_test_write_func);
g_dnd_test_write_func = Py_XNewRef(func);
}
static void
drop_free_offered_mimes(Window *w) {
if (w->drop.offerred_mimes) {
@@ -100,6 +110,16 @@ string_arrays_cmp(const char **a, size_t an, const char **b, size_t bn) {
return 0;
}
static bool
test_write_chunk(id_type id, const char *buf, size_t sz) {
// In test mode, deliver the chunk to the registered Python callable.
// Returns true when the test interceptor consumed the data (no real write needed).
if (!g_dnd_test_write_func) return false;
RAII_PyObject(ret, PyObject_CallFunction(g_dnd_test_write_func, "Ky#", (unsigned long long)id, buf, (Py_ssize_t)sz));
if (!ret) PyErr_Print();
return true;
}
static size_t
send_payload_to_child(id_type id, uint32_t client_id, const char *header, size_t header_sz, const char *data, const size_t data_sz, bool as_base64) {
size_t offset = 0;
@@ -108,9 +128,11 @@ send_payload_to_child(id_type id, uint32_t client_id, const char *header, size_t
if (client_id) header_sz += snprintf(buf + header_sz, sizeof(buf) - header_sz, ":i=%u", (unsigned)client_id);
if (!data_sz) {
buf[header_sz++] = 0x1b; buf[header_sz++] = '\\';
bool found, too_much_data;
schedule_write_to_child_if_possible(id, buf, header_sz, &found, &too_much_data);
if (too_much_data) return 0;
if (!test_write_chunk(id, buf, header_sz)) {
bool found, too_much_data;
schedule_write_to_child_if_possible(id, buf, header_sz, &found, &too_much_data);
if (too_much_data) return 0;
}
return 1;
}
buf[header_sz++] = ':'; buf[header_sz++] = 'm'; buf[header_sz++] = '=';
@@ -130,10 +152,12 @@ send_payload_to_child(id_type id, uint32_t client_id, const char *header, size_t
p += chunk;
}
buf[p++] = 0x1b; buf[p++] = '\\';
bool found, too_much_data;
schedule_write_to_child_if_possible(id, buf, p, &found, &too_much_data);
if (too_much_data) break;
if (!found) return data_sz;
if (!test_write_chunk(id, buf, p)) {
bool found, too_much_data;
schedule_write_to_child_if_possible(id, buf, p, &found, &too_much_data);
if (too_much_data) break;
if (!found) return data_sz;
}
offset += chunk;
}
return offset;

View File

@@ -17,3 +17,4 @@ void drop_set_status(Window *w, int operation, const char *payload, size_t paylo
size_t drop_update_mimes(Window *w, const char **allowed_mimes, size_t allowed_mimes_count);
void drop_dispatch_data(Window *w, const char *mime_type, const char *data, ssize_t sz);
void drop_finish(Window *w);
void dnd_set_test_write_func(PyObject *func);

View File

@@ -876,6 +876,162 @@ request_drop_status_update(OSWindow *osw) {
}
}
// DnD testing infrastructure {{{
static PyObject *
py_dnd_set_test_write_func(PyObject *self UNUSED, PyObject *func) {
// Pass None to clear the interceptor and restore normal operation.
dnd_set_test_write_func(func == Py_None ? NULL : func);
Py_RETURN_NONE;
}
static void
destroy_fake_window_contents(Window *w) {
// Free window resources without touching GPU objects (none allocated for fake windows).
drop_free_data(w);
free(w->pending_clicks.clicks); zero_at_ptr(&w->pending_clicks);
free(w->buffered_keys.key_data); zero_at_ptr(&w->buffered_keys);
Py_CLEAR(w->render_data.screen);
Py_CLEAR(w->title);
Py_CLEAR(w->title_bar_data.last_drawn_title_object_id);
free(w->title_bar_data.buf); w->title_bar_data.buf = NULL;
Py_CLEAR(w->url_target_bar_data.last_drawn_title_object_id);
free(w->url_target_bar_data.buf); w->url_target_bar_data.buf = NULL;
// render_data.vao_idx is -1 so release_gpu_resources_for_window is safe, but we skip it
// since we never allocated those resources.
}
static PyObject *
dnd_test_create_fake_window(PyObject *self UNUSED, PyObject *args UNUSED) {
// Create a minimal OS window + tab + window without any OpenGL/GPU resources.
// Returns (os_window_id, window_id).
ensure_space_for(&global_state, os_windows, OSWindow, global_state.num_os_windows + 1, capacity, 1, true);
OSWindow *osw = global_state.os_windows + global_state.num_os_windows++;
zero_at_ptr(osw);
osw->id = ++global_state.os_window_id_counter;
osw->tab_bar_render_data.vao_idx = -1;
osw->background_opacity.alpha = OPT(background_opacity);
osw->created_at = monotonic();
// osw->handle intentionally left NULL - no real GLFW window
ensure_space_for(osw, tabs, Tab, 1, capacity, 1, true);
Tab *tab = &osw->tabs[0];
zero_at_ptr(tab);
tab->id = ++global_state.tab_id_counter;
tab->border_rects.vao_idx = -1;
osw->num_tabs = 1;
osw->active_tab = 0;
ensure_space_for(tab, windows, Window, 1, capacity, 1, true);
Window *w = &tab->windows[0];
zero_at_ptr(w);
w->id = ++global_state.window_id_counter;
w->visible = true;
w->render_data.vao_idx = -1;
w->window_title_render_data.vao_idx = -1;
w->drop.wanted = true;
tab->num_windows = 1;
tab->active_window = 0;
global_state.mouse_hover_in_window = w->id;
return Py_BuildValue("KK", (unsigned long long)osw->id, (unsigned long long)w->id);
}
static PyObject *
dnd_test_cleanup_fake_window(PyObject *self UNUSED, PyObject *args) {
unsigned long long os_window_id;
if (!PyArg_ParseTuple(args, "K", &os_window_id)) return NULL;
for (size_t i = 0; i < global_state.num_os_windows; i++) {
if (global_state.os_windows[i].id == (id_type)os_window_id) {
OSWindow *osw = global_state.os_windows + i;
for (size_t t = 0; t < osw->num_tabs; t++) {
Tab *tab = osw->tabs + t;
for (size_t j = 0; j < tab->num_windows; j++) {
Window *win = tab->windows + j;
if (global_state.mouse_hover_in_window == win->id)
global_state.mouse_hover_in_window = 0;
destroy_fake_window_contents(win);
}
free(tab->border_rects.rect_buf); tab->border_rects.rect_buf = NULL;
free(tab->windows); tab->windows = NULL;
}
Py_CLEAR(osw->window_title);
Py_CLEAR(osw->tab_bar_render_data.screen);
free(osw->tabs); osw->tabs = NULL;
remove_i_from_array(global_state.os_windows, i, global_state.num_os_windows);
break;
}
}
Py_RETURN_NONE;
}
static PyObject *
dnd_test_set_mouse_pos(PyObject *self UNUSED, PyObject *args) {
unsigned long long window_id;
int cell_x, cell_y, pixel_x, pixel_y;
if (!PyArg_ParseTuple(args, "Kiiii", &window_id, &cell_x, &cell_y, &pixel_x, &pixel_y)) return NULL;
Window *w = window_for_window_id((id_type)window_id);
if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; }
w->mouse_pos.cell_x = (unsigned int)cell_x;
w->mouse_pos.cell_y = (unsigned int)cell_y;
w->mouse_pos.global_x = pixel_x;
w->mouse_pos.global_y = pixel_y;
Py_RETURN_NONE;
}
static PyObject *
dnd_test_fake_drop_event(PyObject *self UNUSED, PyObject *args) {
// Simulate a drop enter/move/drop event. mimes_seq must be a sequence of str, or
// None to simulate a leave event.
unsigned long long window_id;
int is_drop;
PyObject *mimes_seq;
if (!PyArg_ParseTuple(args, "KpO", &window_id, &is_drop, &mimes_seq)) return NULL;
Window *w = window_for_window_id((id_type)window_id);
if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; }
if (mimes_seq == Py_None) {
drop_left_child(w);
Py_RETURN_NONE;
}
RAII_PyObject(fast_seq, PySequence_Fast(mimes_seq, "mimes must be a sequence"));
if (!fast_seq) return NULL;
Py_ssize_t num_mimes = PySequence_Fast_GET_SIZE(fast_seq);
RAII_ALLOC(const char*, mimes, malloc(sizeof(const char*) * (num_mimes ? num_mimes : 1)));
if (!mimes) return PyErr_NoMemory();
for (Py_ssize_t i = 0; i < num_mimes; i++) {
mimes[i] = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(fast_seq, i));
if (!mimes[i]) return NULL;
}
drop_move_on_child(w, mimes, (size_t)num_mimes, is_drop ? true : false);
Py_RETURN_NONE;
}
static PyObject *
dnd_test_fake_drop_data(PyObject *self UNUSED, PyObject *args) {
// Simulate OS delivering drop data for the given MIME type.
// If error_code > 0, simulate an error (e.g. ENOENT=2, EIO=5, EPERM=1).
// Otherwise deliver data and the mandatory end-of-data signal.
unsigned long long window_id;
const char *mime;
RAII_PY_BUFFER(data);
int error_code = 0;
if (!PyArg_ParseTuple(args, "Ksy*|i", &window_id, &mime, &data, &error_code)) return NULL;
Window *w = window_for_window_id((id_type)window_id);
if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; }
if (error_code > 0) {
drop_dispatch_data(w, mime, NULL, -(ssize_t)error_code);
} else if (data.len > 0) {
drop_dispatch_data(w, mime, (const char*)data.buf, (ssize_t)data.len);
drop_dispatch_data(w, mime, NULL, 0); // mandatory end-of-data signal
} else {
// Empty data: just the end-of-data signal (sz=0 is the sentinel for "no more data").
drop_dispatch_data(w, mime, NULL, 0);
}
Py_RETURN_NONE;
}
// }}}
static void
application_close_requested_callback(int flags) {
if (flags) {
@@ -3065,6 +3221,12 @@ static PyMethodDef module_methods[] = {
{"glfw_get_monitor_workarea", (PyCFunction)get_monitor_workarea, METH_NOARGS, ""},
{"glfw_get_monitor_names", (PyCFunction)get_monitor_names, METH_NOARGS, ""},
{"glfw_primary_monitor_content_scale", (PyCFunction)primary_monitor_content_scale, METH_NOARGS, ""},
{"dnd_set_test_write_func", (PyCFunction)py_dnd_set_test_write_func, METH_O, ""},
METHODB(dnd_test_create_fake_window, METH_NOARGS),
METHODB(dnd_test_cleanup_fake_window, METH_VARARGS),
METHODB(dnd_test_set_mouse_pos, METH_VARARGS),
METHODB(dnd_test_fake_drop_event, METH_VARARGS),
METHODB(dnd_test_fake_drop_data, METH_VARARGS),
{NULL, NULL, 0, NULL} /* Sentinel */
};

479
kitty_tests/dnd.py Normal file
View File

@@ -0,0 +1,479 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2026, Kovid Goyal <kovid at kovidgoyal.net>
import errno
import re
from base64 import standard_b64decode, standard_b64encode
from contextlib import contextmanager
from kitty.fast_data_types import (
DND_CODE,
Screen,
dnd_set_test_write_func,
dnd_test_cleanup_fake_window,
dnd_test_create_fake_window,
dnd_test_fake_drop_data,
dnd_test_fake_drop_event,
dnd_test_set_mouse_pos,
)
from . import BaseTest, parse_bytes
# ---- helpers ----------------------------------------------------------------
def _osc(payload: str) -> bytes:
"""Wrap *payload* in an OSC escape sequence (OSC payload ST)."""
return f'\x1b]{payload}\x1b\\'.encode()
def client_register(mimes: str = '', client_id: int = 0) -> bytes:
"""Escape code a client sends to start accepting drops (t=a)."""
meta = f'{DND_CODE};t=a'
if client_id:
meta += f':i={client_id}'
return _osc(f'{meta};{mimes}')
def client_unregister(client_id: int = 0) -> bytes:
"""Escape code a client sends to stop accepting drops (t=A)."""
meta = f'{DND_CODE};t=A'
if client_id:
meta += f':i={client_id}'
return _osc(meta)
def client_accept(operation: int, mimes: str = '', client_id: int = 0) -> bytes:
"""Escape code a client sends to signal acceptance of the current drop (t=m:o=…)."""
meta = f'{DND_CODE};t=m:o={operation}'
if client_id:
meta += f':i={client_id}'
return _osc(f'{meta};{mimes}')
def client_request_data(mime: str = '', client_id: int = 0) -> bytes:
"""Escape code a client sends to request data (t=r) or finish the drop (t=r with no MIME)."""
meta = f'{DND_CODE};t=r'
if client_id:
meta += f':i={client_id}'
return _osc(f'{meta};{mime}')
# ---- escape-code decoder used by assertions ---------------------------------
_OSC_RE = re.compile(
rb'\x1b\]' + re.escape(str(DND_CODE).encode()) + rb';([^;\x1b]*?)(?:;([^\x1b]*))?\x1b\\',
)
def _decode_meta(raw: bytes) -> dict:
"""Parse the colon-separated metadata portion of a DnD escape code."""
ans: dict = {}
for kv in raw.split(b':'):
if b'=' in kv:
k, _, v = kv.partition(b'=')
ans[k.decode()] = v.decode()
elif kv:
ans[kv.decode()] = ''
return ans
def parse_escape_codes(data: bytes) -> list[dict]:
"""Decode all DnD escape codes present in *data*.
Each returned dict has keys:
* ``type`` the 't' value (single character string)
* ``meta`` full parsed metadata dict (from the first chunk)
* ``payload`` concatenated raw payload bytes from all chunks
* ``chunks`` list of individual raw chunk payloads (bytes)
Chunked sequences (m=1 m=0) are assembled into a single entry.
"""
results: list[dict] = []
pending: dict | None = None
for m in _OSC_RE.finditer(data):
meta_raw = m.group(1)
payload_raw: bytes = m.group(2) if m.group(2) is not None else b''
meta = _decode_meta(meta_raw)
more = meta.get('m', '0') == '1'
t = meta.get('t', 'a')
if pending is None:
pending = {'type': t, 'meta': meta, 'chunks': [], 'payload': b''}
pending['chunks'].append(payload_raw)
pending['payload'] += payload_raw
if not more:
results.append(pending)
pending = None
if pending is not None:
results.append(pending)
return results
def parse_escape_codes_b64(data: bytes) -> list[dict]:
"""Like *parse_escape_codes* but base64-decodes each chunk's payload."""
result = parse_escape_codes(data)
for entry in result:
decoded_chunks = []
full = b''
for chunk in entry['chunks']:
dec = standard_b64decode(chunk + b'==') if chunk else b''
decoded_chunks.append(dec)
full += dec
entry['chunks'] = decoded_chunks
entry['payload'] = full
return result
# ---- test context manager ---------------------------------------------------
class _WriteCapture:
"""Accumulates bytes delivered by the DnD write interceptor."""
def __init__(self) -> None:
self._buf: dict[int, bytearray] = {}
def __call__(self, window_id: int, data: bytes) -> None:
self._buf.setdefault(window_id, bytearray())
self._buf[window_id] += data
def consume(self, window_id: int) -> bytes:
"""Return and clear all buffered data for *window_id*."""
buf = self._buf.pop(window_id, bytearray())
return bytes(buf)
def peek(self, window_id: int) -> bytes:
return bytes(self._buf.get(window_id, bytearray()))
@contextmanager
def dnd_test_window():
"""Context manager that creates a fake window + write-capture harness.
Yields (os_window_id, window_id, screen, capture) where:
* ``os_window_id`` OS-level window ID
* ``window_id`` kitty window ID (pass to the fake-event helpers)
* ``screen`` Screen object whose window_id matches the fake window
* ``capture`` _WriteCapture accumulating bytes sent to the child
"""
from kitty.fast_data_types import get_options
from kitty.options.types import defaults
capture = _WriteCapture()
dnd_set_test_write_func(capture)
os_window_id, window_id = dnd_test_create_fake_window()
try:
screen = Screen(None, 24, 80, 0, 0, 0, window_id)
yield os_window_id, window_id, screen, capture
finally:
dnd_set_test_write_func(None)
dnd_test_cleanup_fake_window(os_window_id)
# ---- test class -------------------------------------------------------------
class TestDnDProtocol(BaseTest):
def _assert_no_output(self, capture: _WriteCapture, window_id: int) -> None:
self.ae(capture.peek(window_id), b'', 'unexpected output to child')
def _get_events(self, capture: _WriteCapture, window_id: int) -> list[dict]:
return parse_escape_codes(capture.consume(window_id))
def test_register_and_unregister(self) -> None:
"""Client can register and unregister for drops."""
with dnd_test_window() as (osw, wid, screen, cap):
# Client registers state is already wanted=True from fake-window creation,
# but calling the escape code should not break things.
parse_bytes(screen, client_register('text/plain text/uri-list'))
# No output expected at this point (no drop in progress).
self._assert_no_output(cap, wid)
# Client unregisters.
parse_bytes(screen, client_unregister())
self._assert_no_output(cap, wid)
def test_drop_move_sends_move_event(self) -> None:
"""A drop entering and moving over the window generates t=m events."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 5, 3, 100, 60)
dnd_test_fake_drop_event(wid, False, ['text/plain', 'text/uri-list'])
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
ev = events[0]
self.ae(ev['type'], 'm')
self.ae(ev['meta'].get('x'), '5')
self.ae(ev['meta'].get('y'), '3')
self.ae(ev['meta'].get('X'), '100')
self.ae(ev['meta'].get('Y'), '60')
# MIME list should be present in the payload
self.assertIn(b'text/plain', ev['payload'])
self.assertIn(b'text/uri-list', ev['payload'])
def test_drop_move_mime_always_sent(self) -> None:
"""The current implementation always includes the MIME list in move events."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
mimes = ['text/plain']
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, False, mimes)
cap.consume(wid) # discard first event
# Second move with same mimes list is still included.
dnd_test_set_mouse_pos(wid, 1, 0, 8, 0)
dnd_test_fake_drop_event(wid, False, mimes)
raw = cap.consume(wid)
events = parse_escape_codes(raw)
self.assertEqual(len(events), 1, raw)
self.ae(events[0]['type'], 'm')
self.assertIn(b'text/plain', events[0]['payload'])
def test_drop_leave_sends_leave_event(self) -> None:
"""Drop leaving sends t=m with x=-1,y=-1."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, False, ['text/plain'])
cap.consume(wid)
dnd_test_fake_drop_event(wid, False, None) # None → leave
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
ev = events[0]
self.ae(ev['type'], 'm')
self.ae(ev['meta'].get('x'), '-1')
self.ae(ev['meta'].get('y'), '-1')
def test_client_accepts_drop(self) -> None:
"""Client sending t=m:o=1 is recorded and does not trigger extra output."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, False, ['text/plain'])
cap.consume(wid)
# Client accepts with copy operation.
parse_bytes(screen, client_accept(1, 'text/plain'))
# No immediate output expected.
self._assert_no_output(cap, wid)
def test_full_drop_flow(self) -> None:
"""Complete happy-path: move → accept → drop → request → data → finish."""
payload_data = b'hello world'
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
# Move
dnd_test_set_mouse_pos(wid, 2, 3, 16, 24)
dnd_test_fake_drop_event(wid, False, ['text/plain'])
cap.consume(wid)
# Client accepts
parse_bytes(screen, client_accept(1, 'text/plain'))
# OS drops
dnd_test_set_mouse_pos(wid, 2, 3, 16, 24)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'M')
self.assertIn(b'text/plain', events[0]['payload'])
# Client requests data
parse_bytes(screen, client_request_data('text/plain'))
# OS delivers data
dnd_test_fake_drop_data(wid, 'text/plain', payload_data)
raw = cap.consume(wid)
data_events = parse_escape_codes_b64(raw)
# Should have data chunks plus an empty terminator
self.assertTrue(len(data_events) >= 1, data_events)
combined = b''.join(e['payload'] for e in data_events if e['type'] == 'r')
self.ae(combined, payload_data)
# Client finishes
parse_bytes(screen, client_request_data(''))
self._assert_no_output(cap, wid)
def test_request_unknown_mime(self) -> None:
"""Requesting a MIME type not in the offered set yields an error."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
# Client requests a MIME that was not offered.
parse_bytes(screen, client_request_data('image/png'))
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'R')
self.ae(events[0]['payload'].strip(), b'ENOENT')
def test_data_error_propagation(self) -> None:
"""When data retrieval fails the client receives a t=R error code."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('text/plain'))
# Simulate I/O error (EIO = 5 on Linux)
dnd_test_fake_drop_data(wid, 'text/plain', b'', errno.EIO)
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'R')
self.ae(events[0]['payload'].strip(), b'EIO')
def test_data_eperm_error(self) -> None:
"""EPERM error is correctly forwarded to the client."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('text/plain'))
dnd_test_fake_drop_data(wid, 'text/plain', b'', errno.EPERM)
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'R')
self.ae(events[0]['payload'].strip(), b'EPERM')
def test_large_data_chunking(self) -> None:
"""Data larger than the chunk limit is sent in multiple base64 chunks."""
# Each chunk is ≤ 3072 bytes of raw data (base64-encoded to ≤ 4096 bytes).
chunk_limit = 3072
big_payload = b'X' * (chunk_limit * 3) # 3 chunks expected
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('text/plain'))
dnd_test_fake_drop_data(wid, 'text/plain', big_payload)
raw = cap.consume(wid)
data_events = parse_escape_codes_b64(raw)
combined = b''.join(e['payload'] for e in data_events if e['type'] == 'r')
self.ae(combined, big_payload)
# Verify that we got more than one escape code (chunking happened)
self.assertGreater(len(data_events), 1, 'expected multiple chunks')
def test_client_id_propagated(self) -> None:
"""The client_id (i=…) set during registration is echoed in all replies."""
client_id = 42
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain', client_id=client_id))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, False, ['text/plain'])
raw = cap.consume(wid)
events = parse_escape_codes(raw)
self.assertEqual(len(events), 1, raw)
self.ae(events[0]['meta'].get('i'), str(client_id))
def test_multiple_mimes_priority(self) -> None:
"""The client can specify a preferred MIME ordering."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain text/uri-list'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
# OS offers both types.
dnd_test_fake_drop_event(wid, True, ['text/plain', 'text/uri-list'])
cap.consume(wid)
# Request text/uri-list first (different from registration order).
parse_bytes(screen, client_request_data('text/uri-list'))
dnd_test_fake_drop_data(wid, 'text/uri-list', b'file:///tmp/test\n')
raw = cap.consume(wid)
data_events = parse_escape_codes_b64(raw)
combined = b''.join(e['payload'] for e in data_events if e['type'] == 'r')
self.ae(combined, b'file:///tmp/test\n')
def test_drop_without_register_no_output(self) -> None:
"""If the client has not registered, no escape codes are sent on drop."""
with dnd_test_window() as (osw, wid, screen, cap):
# Explicitly unregister (clears the wanted flag).
parse_bytes(screen, client_unregister())
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
# Fake window is created with wanted=True; after unregister it should be False.
# drop_move_on_child only sends if w->drop.wanted is true, which is handled
# by the caller (on_drop in glfw.c checks w->drop.wanted before calling).
# Here we call drop_left_child which checks w->drop.wanted.
dnd_test_fake_drop_event(wid, False, None)
self._assert_no_output(cap, wid)
def test_malformed_dnd_command_invalid_type(self) -> None:
"""A DnD command with an unknown type character is silently ignored."""
with dnd_test_window() as (osw, wid, screen, cap):
# 'z' is not a valid type; the parser should emit an error and return
# without calling any handler no crash, no output.
bad_cmd = _osc(f'{DND_CODE};t=z;')
parse_bytes(screen, bad_cmd)
self._assert_no_output(cap, wid)
def test_move_event_after_mime_change(self) -> None:
"""When offered MIME list changes, the new list is included in the move event."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, False, ['text/plain'])
cap.consume(wid)
# Second move with a different MIME list list must be re-sent.
dnd_test_set_mouse_pos(wid, 1, 0, 8, 0)
dnd_test_fake_drop_event(wid, False, ['text/html', 'text/plain'])
raw = cap.consume(wid)
events = parse_escape_codes(raw)
self.assertEqual(len(events), 1, raw)
self.assertIn(b'text/html', events[0]['payload'])
def test_drop_event_has_uppercase_M(self) -> None:
"""A drop (not just a move) sends t=M (uppercase)."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'M')
def test_data_end_signal(self) -> None:
"""The end-of-data signal is an empty payload escape code."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('text/plain'))
dnd_test_fake_drop_data(wid, 'text/plain', b'hello')
raw = cap.consume(wid)
events = parse_escape_codes(raw)
# Last event must be an empty (end-of-stream) t=r.
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'no t=r events found')
last = r_events[-1]
self.ae(last['payload'], b'')
def test_empty_data(self) -> None:
"""Zero-byte payload is handled gracefully only end signal is sent."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('text/plain'))
dnd_test_fake_drop_data(wid, 'text/plain', b'')
raw = cap.consume(wid)
events = parse_escape_codes(raw)
r_events = [e for e in events if e['type'] == 'r']
# Only the end signal should be present.
self.assertEqual(len(r_events), 1, raw)
self.ae(r_events[0]['payload'], b'')