123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- # Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
- import re
- import sys
- import os
- from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style, BEL
- from .winterm import enable_vt_processing, WinTerm, WinColor, WinStyle
- from .win32 import windll, winapi_test
- winterm = None
- if windll is not None:
- winterm = WinTerm()
- class StreamWrapper(object):
- '''
- Wraps a stream (such as stdout), acting as a transparent proxy for all
- attribute access apart from method 'write()', which is delegated to our
- Converter instance.
- '''
- def __init__(self, wrapped, converter):
- # double-underscore everything to prevent clashes with names of
- # attributes on the wrapped stream object.
- self.__wrapped = wrapped
- self.__convertor = converter
- def __getattr__(self, name):
- return getattr(self.__wrapped, name)
- def __enter__(self, *args, **kwargs):
- # special method lookup bypasses __getattr__/__getattribute__, see
- # https://stackoverflow.com/questions/12632894/why-doesnt-getattr-work-with-exit
- # thus, contextlib magic methods are not proxied via __getattr__
- return self.__wrapped.__enter__(*args, **kwargs)
- def __exit__(self, *args, **kwargs):
- return self.__wrapped.__exit__(*args, **kwargs)
- def __setstate__(self, state):
- self.__dict__ = state
- def __getstate__(self):
- return self.__dict__
- def write(self, text):
- self.__convertor.write(text)
- def isatty(self):
- stream = self.__wrapped
- if 'PYCHARM_HOSTED' in os.environ:
- if stream is not None and (stream is sys.__stdout__ or stream is sys.__stderr__):
- return True
- try:
- stream_isatty = stream.isatty
- except AttributeError:
- return False
- else:
- return stream_isatty()
- @property
- def closed(self):
- stream = self.__wrapped
- try:
- return stream.closed
- # AttributeError in the case that the stream doesn't support being closed
- # ValueError for the case that the stream has already been detached when atexit runs
- except (AttributeError, ValueError):
- return True
- class AnsiToWin32(object):
- '''
- Implements a 'write()' method which, on Windows, will strip ANSI character
- sequences from the text, and if outputting to a tty, will convert them into
- win32 function calls.
- '''
- ANSI_CSI_RE = re.compile('\001?\033\\[((?:\\d|;)*)([a-zA-Z])\002?') # Control Sequence Introducer
- ANSI_OSC_RE = re.compile('\001?\033\\]([^\a]*)(\a)\002?') # Operating System Command
- def __init__(self, wrapped, convert=None, strip=None, autoreset=False):
- # The wrapped stream (normally sys.stdout or sys.stderr)
- self.wrapped = wrapped
- # should we reset colors to defaults after every .write()
- self.autoreset = autoreset
- # create the proxy wrapping our output stream
- self.stream = StreamWrapper(wrapped, self)
- on_windows = os.name == 'nt'
- # We test if the WinAPI works, because even if we are on Windows
- # we may be using a terminal that doesn't support the WinAPI
- # (e.g. Cygwin Terminal). In this case it's up to the terminal
- # to support the ANSI codes.
- conversion_supported = on_windows and winapi_test()
- try:
- fd = wrapped.fileno()
- except Exception:
- fd = -1
- system_has_native_ansi = not on_windows or enable_vt_processing(fd)
- have_tty = not self.stream.closed and self.stream.isatty()
- need_conversion = conversion_supported and not system_has_native_ansi
- # should we strip ANSI sequences from our output?
- if strip is None:
- strip = need_conversion or not have_tty
- self.strip = strip
- # should we should convert ANSI sequences into win32 calls?
- if convert is None:
- convert = need_conversion and have_tty
- self.convert = convert
- # dict of ansi codes to win32 functions and parameters
- self.win32_calls = self.get_win32_calls()
- # are we wrapping stderr?
- self.on_stderr = self.wrapped is sys.stderr
- def should_wrap(self):
- '''
- True if this class is actually needed. If false, then the output
- stream will not be affected, nor will win32 calls be issued, so
- wrapping stdout is not actually required. This will generally be
- False on non-Windows platforms, unless optional functionality like
- autoreset has been requested using kwargs to init()
- '''
- return self.convert or self.strip or self.autoreset
- def get_win32_calls(self):
- if self.convert and winterm:
- return {
- AnsiStyle.RESET_ALL: (winterm.reset_all, ),
- AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT),
- AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL),
- AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL),
- AnsiFore.BLACK: (winterm.fore, WinColor.BLACK),
- AnsiFore.RED: (winterm.fore, WinColor.RED),
- AnsiFore.GREEN: (winterm.fore, WinColor.GREEN),
- AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW),
- AnsiFore.BLUE: (winterm.fore, WinColor.BLUE),
- AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA),
- AnsiFore.CYAN: (winterm.fore, WinColor.CYAN),
- AnsiFore.WHITE: (winterm.fore, WinColor.GREY),
- AnsiFore.RESET: (winterm.fore, ),
- AnsiFore.LIGHTBLACK_EX: (winterm.fore, WinColor.BLACK, True),
- AnsiFore.LIGHTRED_EX: (winterm.fore, WinColor.RED, True),
- AnsiFore.LIGHTGREEN_EX: (winterm.fore, WinColor.GREEN, True),
- AnsiFore.LIGHTYELLOW_EX: (winterm.fore, WinColor.YELLOW, True),
- AnsiFore.LIGHTBLUE_EX: (winterm.fore, WinColor.BLUE, True),
- AnsiFore.LIGHTMAGENTA_EX: (winterm.fore, WinColor.MAGENTA, True),
- AnsiFore.LIGHTCYAN_EX: (winterm.fore, WinColor.CYAN, True),
- AnsiFore.LIGHTWHITE_EX: (winterm.fore, WinColor.GREY, True),
- AnsiBack.BLACK: (winterm.back, WinColor.BLACK),
- AnsiBack.RED: (winterm.back, WinColor.RED),
- AnsiBack.GREEN: (winterm.back, WinColor.GREEN),
- AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW),
- AnsiBack.BLUE: (winterm.back, WinColor.BLUE),
- AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA),
- AnsiBack.CYAN: (winterm.back, WinColor.CYAN),
- AnsiBack.WHITE: (winterm.back, WinColor.GREY),
- AnsiBack.RESET: (winterm.back, ),
- AnsiBack.LIGHTBLACK_EX: (winterm.back, WinColor.BLACK, True),
- AnsiBack.LIGHTRED_EX: (winterm.back, WinColor.RED, True),
- AnsiBack.LIGHTGREEN_EX: (winterm.back, WinColor.GREEN, True),
- AnsiBack.LIGHTYELLOW_EX: (winterm.back, WinColor.YELLOW, True),
- AnsiBack.LIGHTBLUE_EX: (winterm.back, WinColor.BLUE, True),
- AnsiBack.LIGHTMAGENTA_EX: (winterm.back, WinColor.MAGENTA, True),
- AnsiBack.LIGHTCYAN_EX: (winterm.back, WinColor.CYAN, True),
- AnsiBack.LIGHTWHITE_EX: (winterm.back, WinColor.GREY, True),
- }
- return dict()
- def write(self, text):
- if self.strip or self.convert:
- self.write_and_convert(text)
- else:
- self.wrapped.write(text)
- self.wrapped.flush()
- if self.autoreset:
- self.reset_all()
- def reset_all(self):
- if self.convert:
- self.call_win32('m', (0,))
- elif not self.strip and not self.stream.closed:
- self.wrapped.write(Style.RESET_ALL)
- def write_and_convert(self, text):
- '''
- Write the given text to our wrapped stream, stripping any ANSI
- sequences from the text, and optionally converting them into win32
- calls.
- '''
- cursor = 0
- text = self.convert_osc(text)
- for match in self.ANSI_CSI_RE.finditer(text):
- start, end = match.span()
- self.write_plain_text(text, cursor, start)
- self.convert_ansi(*match.groups())
- cursor = end
- self.write_plain_text(text, cursor, len(text))
- def write_plain_text(self, text, start, end):
- if start < end:
- self.wrapped.write(text[start:end])
- self.wrapped.flush()
- def convert_ansi(self, paramstring, command):
- if self.convert:
- params = self.extract_params(command, paramstring)
- self.call_win32(command, params)
- def extract_params(self, command, paramstring):
- if command in 'Hf':
- params = tuple(int(p) if len(p) != 0 else 1 for p in paramstring.split(';'))
- while len(params) < 2:
- # defaults:
- params = params + (1,)
- else:
- params = tuple(int(p) for p in paramstring.split(';') if len(p) != 0)
- if len(params) == 0:
- # defaults:
- if command in 'JKm':
- params = (0,)
- elif command in 'ABCD':
- params = (1,)
- return params
- def call_win32(self, command, params):
- if command == 'm':
- for param in params:
- if param in self.win32_calls:
- func_args = self.win32_calls[param]
- func = func_args[0]
- args = func_args[1:]
- kwargs = dict(on_stderr=self.on_stderr)
- func(*args, **kwargs)
- elif command in 'J':
- winterm.erase_screen(params[0], on_stderr=self.on_stderr)
- elif command in 'K':
- winterm.erase_line(params[0], on_stderr=self.on_stderr)
- elif command in 'Hf': # cursor position - absolute
- winterm.set_cursor_position(params, on_stderr=self.on_stderr)
- elif command in 'ABCD': # cursor position - relative
- n = params[0]
- # A - up, B - down, C - forward, D - back
- x, y = {'A': (0, -n), 'B': (0, n), 'C': (n, 0), 'D': (-n, 0)}[command]
- winterm.cursor_adjust(x, y, on_stderr=self.on_stderr)
- def convert_osc(self, text):
- for match in self.ANSI_OSC_RE.finditer(text):
- start, end = match.span()
- text = text[:start] + text[end:]
- paramstring, command = match.groups()
- if command == BEL:
- if paramstring.count(";") == 1:
- params = paramstring.split(";")
- # 0 - change title and icon (we will only change title)
- # 1 - change icon (we don't support this)
- # 2 - change title
- if params[0] in '02':
- winterm.set_title(params[1])
- return text
- def flush(self):
- self.wrapped.flush()
|