Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kitty graphics protocol support #8

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions imgcat/imgcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

import contextlib
import io
import sys
import argparse
import os
import struct
import subprocess
import sys
from urllib.request import urlopen


Expand Down Expand Up @@ -176,6 +177,21 @@ def get_tty_size():
return int(rows), int(columns)


def get_backend():
'''Determine a proper backend from environment variables.'''

term = os.getenv('TERM_PROGRAM', None)
if term == 'tmux' or term is None:
term = os.getenv('TERM', '')

if term.__contains__('kitty') or term.__contains__('ghostty'):
from . import kitty
return kitty
else:
from . import iterm2
return iterm2


def imgcat(data: Any, filename=None,
width=None, height=None, preserve_aspect_ratio=True,
pixels_per_line=24,
Expand Down Expand Up @@ -217,28 +233,48 @@ def imgcat(data: Any, filename=None,
# image height unavailable, fallback?
height = 10

from . import iterm2
iterm2._write_image(buf, fp,
filename=filename, width=width, height=height,
preserve_aspect_ratio=preserve_aspect_ratio)
backend = get_backend()
if 'kitty' in backend.__name__:
# TODO handle other parameters
backend._write_image(buf, fp, height=height)
else:
backend._write_image(buf, fp,
filename=filename, width=width, height=height,
preserve_aspect_ratio=preserve_aspect_ratio)


def clear():
"""Clear any remaining graphics if possible."""
pass


class ClearAction(argparse.Action):
def __init__(self, option_strings, dest, default=False, **kwargs):
super(ClearAction, self).__init__(
option_strings, nargs=0, dest=dest, default=False, **kwargs)

def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, True)
get_backend().clear()


def main():
import argparse
try:
from imgcat import __version__
except ImportError:
__version__ = 'N/A'

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('input', nargs='*', type=str,
help='Path to the images.')
help='Path to the image files.')
parser.add_argument('--height', default=None, type=int,
help='The number of rows (in terminal) for displaying images.')
parser.add_argument('--width', default=None, type=int,
help='The number of columns (in terminal) for displaying images.')
parser.add_argument('-v', '--version', action='version',
version='python-imgcat %s' % __version__)
parser.add_argument('-c', '--clear', action=ClearAction,
help='Clear all existing graphics (only effective in kitty)')
args = parser.parse_args()

kwargs = dict()
Expand Down Expand Up @@ -271,7 +307,7 @@ def main():

imgcat(buf, filename=os.path.basename(fname), **kwargs)

if not args.input:
if not args.clear and not args.input:
parser.print_help()

return 0
Expand Down
11 changes: 11 additions & 0 deletions imgcat/iterm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
ST = b'\a' # \a = ^G (bell)


def clear():
"""Do nothing for iTerm2."""
pass


def _write_image(buf, fp,
filename, width, height, preserve_aspect_ratio):
# need to detect tmux
Expand Down Expand Up @@ -61,3 +66,9 @@ def _write_image(buf, fp,

# flush is needed so that the cursor control sequence can take effect
fp.flush()


__all__ = (
'clear',
'_write_image',
)
127 changes: 127 additions & 0 deletions imgcat/kitty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
Kitty backend for imgcat.
"""

import sys
import os
import base64
from collections import OrderedDict
from base64 import standard_b64encode


ESC = b'\033'

TMUX_WRAP_ST = b'\033Ptmux;'
TMUX_WRAP_ED = b'\033\\'


def serialize_gr_command(cmd, payload=None):
ans = []
w = ans.append

is_tmux = 'TMUX' in os.environ and 'tmux' in os.environ['TMUX']

if is_tmux:
w(TMUX_WRAP_ST + b'\033') #!

# kitty graphics sequence start
# https://sw.kovidgoyal.net/kitty/graphics-protocol/#the-graphics-escape-code
# All graphics escape codes are of the form:
# <ESC>_G<control data>;<payload><ESC>\
# [ a ][ b ][ c ][ d ]
w(b'\033_G') # [a]

# [b] control data
cmd = ','.join('{}={}'.format(k, v) for k, v in cmd.items())
w(cmd.encode('ascii'))

if payload: # [c]
w(b';')
w(payload)

if is_tmux:
w(b'\033') # escape \033

# [d] kitty graphics sequence end
w(b'\033\\')

if is_tmux:
w(TMUX_WRAP_ED) #!

return b''.join(ans)


def clear():
"""Send the sesquence for clearing all graphics."""
is_tmux = 'TMUX' in os.environ and 'tmux' in os.environ['TMUX']
seq = []
w = seq.append

if is_tmux:
w(b'\033Ptmux;\033')

w(b'\033_Ga=d,d=A')
if is_tmux:
w(b'\033')
w(b'\033\\')

if is_tmux:
w(b'\033\\')

sys.stdout.buffer.write(b''.join(seq))


def _write_image(buf, fp, height):
# https://sw.kovidgoyal.net/kitty/graphics-protocol.html
# print some blank lines
is_tmux = 'TMUX' in os.environ and 'tmux' in os.environ['TMUX']
if is_tmux:
CSI = b'\033['
fp.write(b'\n' * height)
fp.write(CSI + b'?25l')
fp.write(CSI + str(height).encode() + b"F") # PEP-461
fp.flush()

# <control data> in the kitty graphics protocol.
# https://sw.kovidgoyal.net/kitty/graphics-protocol/#control-data-reference
# e.g., _Gf=100,a=T;
cmd = OrderedDict([
('a', 'T'), # a=T tells display the image on the screen
('f', 100), # f=100 means PNG data
('r', height),
('C', int(is_tmux)), # if tmux, do not move cursor
('X', 10),
('Y', 10),
])
write_chunked(cmd, buf)

# move back the cursor
if is_tmux:
fp.write(CSI + str(height).encode() + b"E")
fp.write(CSI + b'?25h')
fp.flush()

fp.write(b'\n')
fp.flush()


def write_chunked(cmd, data):
data = standard_b64encode(data)
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
cmd['m'] = m
sys.stdout.buffer.write(serialize_gr_command(cmd, chunk))
sys.stdout.flush()
cmd.clear()


if __name__ == '__main__':
with open(sys.argv[-1], 'rb') as f:
_write_image(fp=sys.stdout.buffer, buf=f.read(), height=10)


__all__ = (
'clear',
'_write_image',
)
63 changes: 45 additions & 18 deletions imgcat/test_imgcat.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import codecs
import contextlib
import functools
import hashlib
import io
import os
import unittest
import numpy as np
import sys
import os
import io
import hashlib
import functools
import re
import contextlib

import matplotlib
import numpy as np
import pytest

if (not os.environ.get('DISPLAY', '') or \
Expand All @@ -24,20 +26,30 @@ def _importable(package_name: str) -> bool:


@pytest.fixture
def mock_env(monkeypatch, env_profile):
def mock_env(monkeypatch, term_profile, tmux_profile):
"""Mock environment variables (especially, TMUX)"""
if env_profile == 'plain':
if tmux_profile == 'plain':
monkeypatch.delenv("TMUX", raising=False)
elif env_profile == 'tmux':
elif tmux_profile == 'tmux':
monkeypatch.setenv("TMUX", "mock-tmux-session")
else:
raise ValueError("Unknown profile: " + str(env_profile))
raise ValueError("Unknown tmux_profile: " + str(tmux_profile))

if term_profile == 'iterm2':
pass # default
elif term_profile == 'kitty':
monkeypatch.setenv("TERM", 'xterm-kitty')
else:
raise ValueError("Unknown term_profile: " + str(term_profile))

def parametrize_env(callable, env_profiles=['plain', 'tmux']):

def parametrize_env(callable,
tmux_profiles=['plain', 'tmux'],
term_profiles=['iterm2', 'kitty'],
):
@pytest.mark.usefixtures('mock_env')
@pytest.mark.parametrize('env_profile', env_profiles)
@pytest.mark.parametrize('term_profile', term_profiles)
@pytest.mark.parametrize('tmux_profile', tmux_profiles)
@functools.wraps(callable)
def _wrapped(*args, **kwargs):
return callable(*args, **kwargs)
Expand Down Expand Up @@ -85,6 +97,20 @@ def _validate_iterm2(self, buf, sha1=None):
if sha1:
assert hashlib.sha1(buf).hexdigest().startswith(sha1), ("SHA1 mismatch")

def _validate_kitty(self, buf):
"""Check if graphics sequence is correct."""
assert isinstance(buf, bytes)

# https://sw.kovidgoyal.net/kitty/graphics-protocol/#remote-client
# f=100 indicates PNG data
# m=0 means the last chunk, m=1 means other chunk will follow
assert re.match(b'^\x1b_Ga=T,f=100,m=(0|1);', buf)
assert buf.endswith(b'\033\\')

# TODO: test control sequences that come in multiple chunks.
# in such cases, only the last chunk have m=0 and the rest have m=1.


@contextlib.contextmanager
def capture_and_validate(self, **kwargs):
with self._redirect_stdout(reprint=True) as f:
Expand All @@ -95,15 +121,17 @@ def capture_and_validate(self, **kwargs):
is_tmux = os.getenv('TMUX')
if is_tmux:
captured_bytes = self.tmux_unwrap_passthrough(captured_bytes)
self._validate_iterm2(captured_bytes, **kwargs)

if 'kitty' in os.getenv('TERM', ''):
self._validate_kitty(captured_bytes, **kwargs)
else:
self._validate_iterm2(captured_bytes, **kwargs)

@staticmethod
def tmux_unwrap_passthrough(b: bytes) -> bytes:
'''Strip out all tmux pass-through sequence and other cursor-movement
control sequences that come either in the beginning or in the end.'''
assert isinstance(b, bytes)
#assert b.startswith(b'\033Ptmux;')
#assert b.endswith(b'\033\\')
try:
st = b.index(b'\033Ptmux;')
ed = b.rindex(b'\033\\')
Expand All @@ -119,8 +147,6 @@ def tmux_unwrap_passthrough(b: bytes) -> bytes:

@parametrize_env
def test_numpy(self):
# TODO: The test fails if tmux is enabled

# uint8, grayscale
a = np.ones([32, 32], dtype=np.uint8) * 128
with self.capture_and_validate():
Expand Down Expand Up @@ -249,7 +275,8 @@ def test_args_filename(self):
imgcat(gray, filename='foo.png')
imgcat(gray, filename='unicode_한글.png')

@parametrize_env
# Only available in iTerm2 (not kitty)
@functools.partial(parametrize_env, term_profiles=['iterm2'])
def test_args_another(self):
b = io.BytesIO()

Expand Down
Loading