diff --git a/imgcat/imgcat.py b/imgcat/imgcat.py index e192667..d683dc6 100644 --- a/imgcat/imgcat.py +++ b/imgcat/imgcat.py @@ -6,10 +6,11 @@ import contextlib import io +import sys +import argparse import os import struct import subprocess -import sys from urllib.request import urlopen @@ -176,6 +177,21 @@ def get_tty_size(): return int(rows), int(columns) +def get_backend(): + '''Determine a proper backend from environment variables.''' + + term = os.getenv('TERM_PROGRAM', None) + if term == 'tmux' or term is None: + term = os.getenv('TERM', '') + + if term.__contains__('kitty') or term.__contains__('ghostty'): + from . import kitty + return kitty + else: + from . import iterm2 + return iterm2 + + def imgcat(data: Any, filename=None, width=None, height=None, preserve_aspect_ratio=True, pixels_per_line=24, @@ -217,14 +233,32 @@ def imgcat(data: Any, filename=None, # image height unavailable, fallback? height = 10 - from . import iterm2 - iterm2._write_image(buf, fp, - filename=filename, width=width, height=height, - preserve_aspect_ratio=preserve_aspect_ratio) + backend = get_backend() + if 'kitty' in backend.__name__: + # TODO handle other parameters + backend._write_image(buf, fp, height=height) + else: + backend._write_image(buf, fp, + filename=filename, width=width, height=height, + preserve_aspect_ratio=preserve_aspect_ratio) + + +def clear(): + """Clear any remaining graphics if possible.""" + pass + + +class ClearAction(argparse.Action): + def __init__(self, option_strings, dest, default=False, **kwargs): + super(ClearAction, self).__init__( + option_strings, nargs=0, dest=dest, default=False, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, True) + get_backend().clear() def main(): - import argparse try: from imgcat import __version__ except ImportError: @@ -232,13 +266,15 @@ def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('input', nargs='*', type=str, - help='Path to the images.') + help='Path to the image files.') parser.add_argument('--height', default=None, type=int, help='The number of rows (in terminal) for displaying images.') parser.add_argument('--width', default=None, type=int, help='The number of columns (in terminal) for displaying images.') parser.add_argument('-v', '--version', action='version', version='python-imgcat %s' % __version__) + parser.add_argument('-c', '--clear', action=ClearAction, + help='Clear all existing graphics (only effective in kitty)') args = parser.parse_args() kwargs = dict() @@ -271,7 +307,7 @@ def main(): imgcat(buf, filename=os.path.basename(fname), **kwargs) - if not args.input: + if not args.clear and not args.input: parser.print_help() return 0 diff --git a/imgcat/iterm2.py b/imgcat/iterm2.py index 917edf4..0de06dd 100644 --- a/imgcat/iterm2.py +++ b/imgcat/iterm2.py @@ -13,6 +13,11 @@ ST = b'\a' # \a = ^G (bell) +def clear(): + """Do nothing for iTerm2.""" + pass + + def _write_image(buf, fp, filename, width, height, preserve_aspect_ratio): # need to detect tmux @@ -61,3 +66,9 @@ def _write_image(buf, fp, # flush is needed so that the cursor control sequence can take effect fp.flush() + + +__all__ = ( + 'clear', + '_write_image', +) diff --git a/imgcat/kitty.py b/imgcat/kitty.py new file mode 100644 index 0000000..5ae7e46 --- /dev/null +++ b/imgcat/kitty.py @@ -0,0 +1,127 @@ +""" +Kitty backend for imgcat. +""" + +import sys +import os +import base64 +from collections import OrderedDict +from base64 import standard_b64encode + + +ESC = b'\033' + +TMUX_WRAP_ST = b'\033Ptmux;' +TMUX_WRAP_ED = b'\033\\' + + +def serialize_gr_command(cmd, payload=None): + ans = [] + w = ans.append + + is_tmux = 'TMUX' in os.environ and 'tmux' in os.environ['TMUX'] + + if is_tmux: + w(TMUX_WRAP_ST + b'\033') #! + + # kitty graphics sequence start + # https://sw.kovidgoyal.net/kitty/graphics-protocol/#the-graphics-escape-code + # All graphics escape codes are of the form: + # _G;\ + # [ a ][ b ][ c ][ d ] + w(b'\033_G') # [a] + + # [b] control data + cmd = ','.join('{}={}'.format(k, v) for k, v in cmd.items()) + w(cmd.encode('ascii')) + + if payload: # [c] + w(b';') + w(payload) + + if is_tmux: + w(b'\033') # escape \033 + + # [d] kitty graphics sequence end + w(b'\033\\') + + if is_tmux: + w(TMUX_WRAP_ED) #! + + return b''.join(ans) + + +def clear(): + """Send the sesquence for clearing all graphics.""" + is_tmux = 'TMUX' in os.environ and 'tmux' in os.environ['TMUX'] + seq = [] + w = seq.append + + if is_tmux: + w(b'\033Ptmux;\033') + + w(b'\033_Ga=d,d=A') + if is_tmux: + w(b'\033') + w(b'\033\\') + + if is_tmux: + w(b'\033\\') + + sys.stdout.buffer.write(b''.join(seq)) + + +def _write_image(buf, fp, height): + # https://sw.kovidgoyal.net/kitty/graphics-protocol.html + # print some blank lines + is_tmux = 'TMUX' in os.environ and 'tmux' in os.environ['TMUX'] + if is_tmux: + CSI = b'\033[' + fp.write(b'\n' * height) + fp.write(CSI + b'?25l') + fp.write(CSI + str(height).encode() + b"F") # PEP-461 + fp.flush() + + # in the kitty graphics protocol. + # https://sw.kovidgoyal.net/kitty/graphics-protocol/#control-data-reference + # e.g., _Gf=100,a=T; + cmd = OrderedDict([ + ('a', 'T'), # a=T tells display the image on the screen + ('f', 100), # f=100 means PNG data + ('r', height), + ('C', int(is_tmux)), # if tmux, do not move cursor + ('X', 10), + ('Y', 10), + ]) + write_chunked(cmd, buf) + + # move back the cursor + if is_tmux: + fp.write(CSI + str(height).encode() + b"E") + fp.write(CSI + b'?25h') + fp.flush() + + fp.write(b'\n') + fp.flush() + + +def write_chunked(cmd, data): + data = standard_b64encode(data) + while data: + chunk, data = data[:4096], data[4096:] + m = 1 if data else 0 + cmd['m'] = m + sys.stdout.buffer.write(serialize_gr_command(cmd, chunk)) + sys.stdout.flush() + cmd.clear() + + +if __name__ == '__main__': + with open(sys.argv[-1], 'rb') as f: + _write_image(fp=sys.stdout.buffer, buf=f.read(), height=10) + + +__all__ = ( + 'clear', + '_write_image', +) diff --git a/imgcat/test_imgcat.py b/imgcat/test_imgcat.py index 2ec1b5e..d030c1f 100644 --- a/imgcat/test_imgcat.py +++ b/imgcat/test_imgcat.py @@ -1,13 +1,15 @@ import codecs -import contextlib -import functools -import hashlib -import io -import os +import unittest +import numpy as np import sys +import os +import io +import hashlib +import functools +import re +import contextlib import matplotlib -import numpy as np import pytest if (not os.environ.get('DISPLAY', '') or \ @@ -24,20 +26,30 @@ def _importable(package_name: str) -> bool: @pytest.fixture -def mock_env(monkeypatch, env_profile): +def mock_env(monkeypatch, term_profile, tmux_profile): """Mock environment variables (especially, TMUX)""" - if env_profile == 'plain': + if tmux_profile == 'plain': monkeypatch.delenv("TMUX", raising=False) - elif env_profile == 'tmux': + elif tmux_profile == 'tmux': monkeypatch.setenv("TMUX", "mock-tmux-session") else: - raise ValueError("Unknown profile: " + str(env_profile)) + raise ValueError("Unknown tmux_profile: " + str(tmux_profile)) + if term_profile == 'iterm2': + pass # default + elif term_profile == 'kitty': + monkeypatch.setenv("TERM", 'xterm-kitty') + else: + raise ValueError("Unknown term_profile: " + str(term_profile)) -def parametrize_env(callable, env_profiles=['plain', 'tmux']): +def parametrize_env(callable, + tmux_profiles=['plain', 'tmux'], + term_profiles=['iterm2', 'kitty'], + ): @pytest.mark.usefixtures('mock_env') - @pytest.mark.parametrize('env_profile', env_profiles) + @pytest.mark.parametrize('term_profile', term_profiles) + @pytest.mark.parametrize('tmux_profile', tmux_profiles) @functools.wraps(callable) def _wrapped(*args, **kwargs): return callable(*args, **kwargs) @@ -85,6 +97,20 @@ def _validate_iterm2(self, buf, sha1=None): if sha1: assert hashlib.sha1(buf).hexdigest().startswith(sha1), ("SHA1 mismatch") + def _validate_kitty(self, buf): + """Check if graphics sequence is correct.""" + assert isinstance(buf, bytes) + + # https://sw.kovidgoyal.net/kitty/graphics-protocol/#remote-client + # f=100 indicates PNG data + # m=0 means the last chunk, m=1 means other chunk will follow + assert re.match(b'^\x1b_Ga=T,f=100,m=(0|1);', buf) + assert buf.endswith(b'\033\\') + + # TODO: test control sequences that come in multiple chunks. + # in such cases, only the last chunk have m=0 and the rest have m=1. + + @contextlib.contextmanager def capture_and_validate(self, **kwargs): with self._redirect_stdout(reprint=True) as f: @@ -95,15 +121,17 @@ def capture_and_validate(self, **kwargs): is_tmux = os.getenv('TMUX') if is_tmux: captured_bytes = self.tmux_unwrap_passthrough(captured_bytes) - self._validate_iterm2(captured_bytes, **kwargs) + + if 'kitty' in os.getenv('TERM', ''): + self._validate_kitty(captured_bytes, **kwargs) + else: + self._validate_iterm2(captured_bytes, **kwargs) @staticmethod def tmux_unwrap_passthrough(b: bytes) -> bytes: '''Strip out all tmux pass-through sequence and other cursor-movement control sequences that come either in the beginning or in the end.''' assert isinstance(b, bytes) - #assert b.startswith(b'\033Ptmux;') - #assert b.endswith(b'\033\\') try: st = b.index(b'\033Ptmux;') ed = b.rindex(b'\033\\') @@ -119,8 +147,6 @@ def tmux_unwrap_passthrough(b: bytes) -> bytes: @parametrize_env def test_numpy(self): - # TODO: The test fails if tmux is enabled - # uint8, grayscale a = np.ones([32, 32], dtype=np.uint8) * 128 with self.capture_and_validate(): @@ -249,7 +275,8 @@ def test_args_filename(self): imgcat(gray, filename='foo.png') imgcat(gray, filename='unicode_한글.png') - @parametrize_env + # Only available in iTerm2 (not kitty) + @functools.partial(parametrize_env, term_profiles=['iterm2']) def test_args_another(self): b = io.BytesIO()