mirror of
https://github.com/kovidgoyal/kitty.git
synced 2026-07-03 11:12:30 +08:00
Python wrapper to use image to RGBA Go code
This commit is contained in:
@@ -14,6 +14,7 @@ from functools import lru_cache
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
BinaryIO,
|
||||
Callable,
|
||||
Dict,
|
||||
Generator,
|
||||
@@ -33,6 +34,7 @@ from typing import (
|
||||
|
||||
from .constants import (
|
||||
appname,
|
||||
cache_dir,
|
||||
clear_handled_signals,
|
||||
config_dir,
|
||||
is_macos,
|
||||
@@ -1223,3 +1225,32 @@ def shlex_split_with_positions(text: str, allow_ansi_quoted_strings: bool = Fals
|
||||
|
||||
def timed_debug_print(*a: Any, sep: str = ' ', end: str = '\n') -> None:
|
||||
_timed_debug_print(sep.join(map(str, a)) + end)
|
||||
|
||||
|
||||
def cached_rgba_file_descriptor_for_image_path(path: str) -> Tuple[int, int, int]:
|
||||
from hashlib import sha256
|
||||
path = os.path.realpath(path, strict=True)
|
||||
src_info = os.stat(path)
|
||||
output_name = sha256(path.encode()).hexdigest() + '.rgba'
|
||||
output_path = os.path.join(cache_dir(), 'rgba', output_name)
|
||||
|
||||
def read_data(f: BinaryIO) -> Tuple[int, int, int]:
|
||||
header = f.read(8)
|
||||
import struct
|
||||
width, height = struct.unpack('<II', header)
|
||||
return width, height, os.dup(f.fileno())
|
||||
|
||||
with suppress(OSError), open(output_path, 'rb') as f:
|
||||
dest_info = os.stat(f.fileno())
|
||||
if dest_info.st_size == src_info.st_size and dest_info.st_mtime >= src_info.st_mtime:
|
||||
return read_data(f)
|
||||
|
||||
import subprocess
|
||||
cp = subprocess.run([kitten_exe(), '__render_image__', path], capture_output=True)
|
||||
if cp.returncode != 0:
|
||||
raise ValueError(f'Failed to convert path to RGBA data with error: {cp.stderr.decode("utf-8", "replace")}')
|
||||
ans = cp.stdout.decode().strip()
|
||||
if ans != output_path:
|
||||
raise ValueError(f'The two cache name algorithms dont agree for path: {path}\n{output_path} != {ans}')
|
||||
with open(ans, 'rb') as f:
|
||||
return read_data(f)
|
||||
|
||||
@@ -12,6 +12,7 @@ from dataclasses import dataclass
|
||||
from io import BytesIO
|
||||
|
||||
from kitty.fast_data_types import base64_decode, base64_encode, has_avx2, has_sse4_2, load_png_data, shm_unlink, shm_write, test_xor64
|
||||
from kitty.utils import cached_rgba_file_descriptor_for_image_path
|
||||
|
||||
from . import BaseTest, parse_bytes
|
||||
|
||||
@@ -1240,3 +1241,19 @@ class TestGraphics(BaseTest):
|
||||
s.reset()
|
||||
self.ae(g.image_count, 0)
|
||||
self.assertEqual(g.disk_cache.total_size, 0)
|
||||
|
||||
@unittest.skipIf(Image is None, 'PIL not available, skipping PNG tests')
|
||||
def test_cached_rgba_conversion(self):
|
||||
w, h = 5, 3
|
||||
rgba_data = byte_block(w * h * 4)
|
||||
img = Image.frombytes('RGBA', (w, h), rgba_data)
|
||||
buf = BytesIO()
|
||||
img.save(buf, 'PNG')
|
||||
png_data = buf.getvalue()
|
||||
with tempfile.NamedTemporaryFile(suffix='.png') as png:
|
||||
png.write(png_data)
|
||||
png.flush()
|
||||
os.fsync(png.fileno())
|
||||
qw, qh, fd = cached_rgba_file_descriptor_for_image_path(png.name)
|
||||
os.close(fd)
|
||||
self.ae((qw, qh), (w, h))
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"image"
|
||||
@@ -93,8 +92,10 @@ func prune_cache(cdir string, max_entries int) error {
|
||||
}
|
||||
|
||||
func render_image(src_path, cdir string, max_cache_entries int) (output_path string, err error) {
|
||||
src_path, err = filepath.EvalSymlinks(src_path)
|
||||
if err != nil {
|
||||
if src_path, err = filepath.EvalSymlinks(src_path); err != nil {
|
||||
return
|
||||
}
|
||||
if src_path, err = filepath.Abs(src_path); err != nil {
|
||||
return
|
||||
}
|
||||
lock_file := filepath.Join(cdir, "lock")
|
||||
@@ -109,7 +110,7 @@ func render_image(src_path, cdir string, max_cache_entries int) (output_path str
|
||||
defer func() {
|
||||
utils.UnlockFile(lockf)
|
||||
}()
|
||||
output_path = filepath.Join(cdir, hex.EncodeToString(sha256.New().Sum([]byte(src_path)))) + ".rgba"
|
||||
output_path = filepath.Join(cdir, fmt.Sprintf("%x", sha256.Sum256([]byte(src_path)))) + ".rgba"
|
||||
needs_update := true
|
||||
input_info, err := os.Stat(src_path)
|
||||
if err != nil {
|
||||
@@ -154,16 +155,12 @@ func RenderEntryPoint(root *cli.Command) {
|
||||
if len(args) != 1 {
|
||||
return 1, fmt.Errorf("Usage: render input_image_path")
|
||||
}
|
||||
src_path, err := filepath.EvalSymlinks(args[0])
|
||||
if err != nil {
|
||||
return 1, err
|
||||
}
|
||||
cdir := utils.CacheDir()
|
||||
cdir = filepath.Join(cdir, "rgba")
|
||||
if err = os.MkdirAll(cdir, 0755); err != nil {
|
||||
return 1, err
|
||||
}
|
||||
if output_path, err := render_image(src_path, cdir, 32); err != nil {
|
||||
if output_path, err := render_image(args[0], cdir, 32); err != nil {
|
||||
return 1, err
|
||||
} else {
|
||||
fmt.Println(output_path)
|
||||
|
||||
Reference in New Issue
Block a user