123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- # util.py
- import warnings
- import types
- import collections
- import itertools
- from functools import lru_cache
- from typing import List, Union, Iterable
- _bslash = chr(92)
- class __config_flags:
- """Internal class for defining compatibility and debugging flags"""
- _all_names: List[str] = []
- _fixed_names: List[str] = []
- _type_desc = "configuration"
- @classmethod
- def _set(cls, dname, value):
- if dname in cls._fixed_names:
- warnings.warn(
- "{}.{} {} is {} and cannot be overridden".format(
- cls.__name__,
- dname,
- cls._type_desc,
- str(getattr(cls, dname)).upper(),
- )
- )
- return
- if dname in cls._all_names:
- setattr(cls, dname, value)
- else:
- raise ValueError("no such {} {!r}".format(cls._type_desc, dname))
- enable = classmethod(lambda cls, name: cls._set(name, True))
- disable = classmethod(lambda cls, name: cls._set(name, False))
- @lru_cache(maxsize=128)
- def col(loc: int, strg: str) -> int:
- """
- Returns current column within a string, counting newlines as line separators.
- The first column is number 1.
- Note: the default parsing behavior is to expand tabs in the input string
- before starting the parsing process. See
- :class:`ParserElement.parseString` for more
- information on parsing strings containing ``<TAB>`` s, and suggested
- methods to maintain a consistent view of the parsed string, the parse
- location, and line and column positions within the parsed string.
- """
- s = strg
- return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
- @lru_cache(maxsize=128)
- def lineno(loc: int, strg: str) -> int:
- """Returns current line number within a string, counting newlines as line separators.
- The first line is number 1.
- Note - the default parsing behavior is to expand tabs in the input string
- before starting the parsing process. See :class:`ParserElement.parseString`
- for more information on parsing strings containing ``<TAB>`` s, and
- suggested methods to maintain a consistent view of the parsed string, the
- parse location, and line and column positions within the parsed string.
- """
- return strg.count("\n", 0, loc) + 1
- @lru_cache(maxsize=128)
- def line(loc: int, strg: str) -> str:
- """
- Returns the line of text containing loc within a string, counting newlines as line separators.
- """
- last_cr = strg.rfind("\n", 0, loc)
- next_cr = strg.find("\n", loc)
- return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
- class _UnboundedCache:
- def __init__(self):
- cache = {}
- cache_get = cache.get
- self.not_in_cache = not_in_cache = object()
- def get(_, key):
- return cache_get(key, not_in_cache)
- def set_(_, key, value):
- cache[key] = value
- def clear(_):
- cache.clear()
- self.size = None
- self.get = types.MethodType(get, self)
- self.set = types.MethodType(set_, self)
- self.clear = types.MethodType(clear, self)
- class _FifoCache:
- def __init__(self, size):
- self.not_in_cache = not_in_cache = object()
- cache = collections.OrderedDict()
- cache_get = cache.get
- def get(_, key):
- return cache_get(key, not_in_cache)
- def set_(_, key, value):
- cache[key] = value
- while len(cache) > size:
- cache.popitem(last=False)
- def clear(_):
- cache.clear()
- self.size = size
- self.get = types.MethodType(get, self)
- self.set = types.MethodType(set_, self)
- self.clear = types.MethodType(clear, self)
- class LRUMemo:
- """
- A memoizing mapping that retains `capacity` deleted items
- The memo tracks retained items by their access order; once `capacity` items
- are retained, the least recently used item is discarded.
- """
- def __init__(self, capacity):
- self._capacity = capacity
- self._active = {}
- self._memory = collections.OrderedDict()
- def __getitem__(self, key):
- try:
- return self._active[key]
- except KeyError:
- self._memory.move_to_end(key)
- return self._memory[key]
- def __setitem__(self, key, value):
- self._memory.pop(key, None)
- self._active[key] = value
- def __delitem__(self, key):
- try:
- value = self._active.pop(key)
- except KeyError:
- pass
- else:
- while len(self._memory) >= self._capacity:
- self._memory.popitem(last=False)
- self._memory[key] = value
- def clear(self):
- self._active.clear()
- self._memory.clear()
- class UnboundedMemo(dict):
- """
- A memoizing mapping that retains all deleted items
- """
- def __delitem__(self, key):
- pass
- def _escape_regex_range_chars(s: str) -> str:
- # escape these chars: ^-[]
- for c in r"\^-[]":
- s = s.replace(c, _bslash + c)
- s = s.replace("\n", r"\n")
- s = s.replace("\t", r"\t")
- return str(s)
- def _collapse_string_to_ranges(
- s: Union[str, Iterable[str]], re_escape: bool = True
- ) -> str:
- def is_consecutive(c):
- c_int = ord(c)
- is_consecutive.prev, prev = c_int, is_consecutive.prev
- if c_int - prev > 1:
- is_consecutive.value = next(is_consecutive.counter)
- return is_consecutive.value
- is_consecutive.prev = 0
- is_consecutive.counter = itertools.count()
- is_consecutive.value = -1
- def escape_re_range_char(c):
- return "\\" + c if c in r"\^-][" else c
- def no_escape_re_range_char(c):
- return c
- if not re_escape:
- escape_re_range_char = no_escape_re_range_char
- ret = []
- s = "".join(sorted(set(s)))
- if len(s) > 3:
- for _, chars in itertools.groupby(s, key=is_consecutive):
- first = last = next(chars)
- last = collections.deque(
- itertools.chain(iter([last]), chars), maxlen=1
- ).pop()
- if first == last:
- ret.append(escape_re_range_char(first))
- else:
- sep = "" if ord(last) == ord(first) + 1 else "-"
- ret.append(
- "{}{}{}".format(
- escape_re_range_char(first), sep, escape_re_range_char(last)
- )
- )
- else:
- ret = [escape_re_range_char(c) for c in s]
- return "".join(ret)
- def _flatten(ll: list) -> list:
- ret = []
- for i in ll:
- if isinstance(i, list):
- ret.extend(_flatten(i))
- else:
- ret.append(i)
- return ret
|