util.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. # util.py
  2. import warnings
  3. import types
  4. import collections
  5. import itertools
  6. from functools import lru_cache
  7. from typing import List, Union, Iterable
  8. _bslash = chr(92)
  9. class __config_flags:
  10. """Internal class for defining compatibility and debugging flags"""
  11. _all_names: List[str] = []
  12. _fixed_names: List[str] = []
  13. _type_desc = "configuration"
  14. @classmethod
  15. def _set(cls, dname, value):
  16. if dname in cls._fixed_names:
  17. warnings.warn(
  18. "{}.{} {} is {} and cannot be overridden".format(
  19. cls.__name__,
  20. dname,
  21. cls._type_desc,
  22. str(getattr(cls, dname)).upper(),
  23. )
  24. )
  25. return
  26. if dname in cls._all_names:
  27. setattr(cls, dname, value)
  28. else:
  29. raise ValueError("no such {} {!r}".format(cls._type_desc, dname))
  30. enable = classmethod(lambda cls, name: cls._set(name, True))
  31. disable = classmethod(lambda cls, name: cls._set(name, False))
  32. @lru_cache(maxsize=128)
  33. def col(loc: int, strg: str) -> int:
  34. """
  35. Returns current column within a string, counting newlines as line separators.
  36. The first column is number 1.
  37. Note: the default parsing behavior is to expand tabs in the input string
  38. before starting the parsing process. See
  39. :class:`ParserElement.parseString` for more
  40. information on parsing strings containing ``<TAB>`` s, and suggested
  41. methods to maintain a consistent view of the parsed string, the parse
  42. location, and line and column positions within the parsed string.
  43. """
  44. s = strg
  45. return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
  46. @lru_cache(maxsize=128)
  47. def lineno(loc: int, strg: str) -> int:
  48. """Returns current line number within a string, counting newlines as line separators.
  49. The first line is number 1.
  50. Note - the default parsing behavior is to expand tabs in the input string
  51. before starting the parsing process. See :class:`ParserElement.parseString`
  52. for more information on parsing strings containing ``<TAB>`` s, and
  53. suggested methods to maintain a consistent view of the parsed string, the
  54. parse location, and line and column positions within the parsed string.
  55. """
  56. return strg.count("\n", 0, loc) + 1
  57. @lru_cache(maxsize=128)
  58. def line(loc: int, strg: str) -> str:
  59. """
  60. Returns the line of text containing loc within a string, counting newlines as line separators.
  61. """
  62. last_cr = strg.rfind("\n", 0, loc)
  63. next_cr = strg.find("\n", loc)
  64. return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
  65. class _UnboundedCache:
  66. def __init__(self):
  67. cache = {}
  68. cache_get = cache.get
  69. self.not_in_cache = not_in_cache = object()
  70. def get(_, key):
  71. return cache_get(key, not_in_cache)
  72. def set_(_, key, value):
  73. cache[key] = value
  74. def clear(_):
  75. cache.clear()
  76. self.size = None
  77. self.get = types.MethodType(get, self)
  78. self.set = types.MethodType(set_, self)
  79. self.clear = types.MethodType(clear, self)
  80. class _FifoCache:
  81. def __init__(self, size):
  82. self.not_in_cache = not_in_cache = object()
  83. cache = collections.OrderedDict()
  84. cache_get = cache.get
  85. def get(_, key):
  86. return cache_get(key, not_in_cache)
  87. def set_(_, key, value):
  88. cache[key] = value
  89. while len(cache) > size:
  90. cache.popitem(last=False)
  91. def clear(_):
  92. cache.clear()
  93. self.size = size
  94. self.get = types.MethodType(get, self)
  95. self.set = types.MethodType(set_, self)
  96. self.clear = types.MethodType(clear, self)
  97. class LRUMemo:
  98. """
  99. A memoizing mapping that retains `capacity` deleted items
  100. The memo tracks retained items by their access order; once `capacity` items
  101. are retained, the least recently used item is discarded.
  102. """
  103. def __init__(self, capacity):
  104. self._capacity = capacity
  105. self._active = {}
  106. self._memory = collections.OrderedDict()
  107. def __getitem__(self, key):
  108. try:
  109. return self._active[key]
  110. except KeyError:
  111. self._memory.move_to_end(key)
  112. return self._memory[key]
  113. def __setitem__(self, key, value):
  114. self._memory.pop(key, None)
  115. self._active[key] = value
  116. def __delitem__(self, key):
  117. try:
  118. value = self._active.pop(key)
  119. except KeyError:
  120. pass
  121. else:
  122. while len(self._memory) >= self._capacity:
  123. self._memory.popitem(last=False)
  124. self._memory[key] = value
  125. def clear(self):
  126. self._active.clear()
  127. self._memory.clear()
  128. class UnboundedMemo(dict):
  129. """
  130. A memoizing mapping that retains all deleted items
  131. """
  132. def __delitem__(self, key):
  133. pass
  134. def _escape_regex_range_chars(s: str) -> str:
  135. # escape these chars: ^-[]
  136. for c in r"\^-[]":
  137. s = s.replace(c, _bslash + c)
  138. s = s.replace("\n", r"\n")
  139. s = s.replace("\t", r"\t")
  140. return str(s)
  141. def _collapse_string_to_ranges(
  142. s: Union[str, Iterable[str]], re_escape: bool = True
  143. ) -> str:
  144. def is_consecutive(c):
  145. c_int = ord(c)
  146. is_consecutive.prev, prev = c_int, is_consecutive.prev
  147. if c_int - prev > 1:
  148. is_consecutive.value = next(is_consecutive.counter)
  149. return is_consecutive.value
  150. is_consecutive.prev = 0
  151. is_consecutive.counter = itertools.count()
  152. is_consecutive.value = -1
  153. def escape_re_range_char(c):
  154. return "\\" + c if c in r"\^-][" else c
  155. def no_escape_re_range_char(c):
  156. return c
  157. if not re_escape:
  158. escape_re_range_char = no_escape_re_range_char
  159. ret = []
  160. s = "".join(sorted(set(s)))
  161. if len(s) > 3:
  162. for _, chars in itertools.groupby(s, key=is_consecutive):
  163. first = last = next(chars)
  164. last = collections.deque(
  165. itertools.chain(iter([last]), chars), maxlen=1
  166. ).pop()
  167. if first == last:
  168. ret.append(escape_re_range_char(first))
  169. else:
  170. sep = "" if ord(last) == ord(first) + 1 else "-"
  171. ret.append(
  172. "{}{}{}".format(
  173. escape_re_range_char(first), sep, escape_re_range_char(last)
  174. )
  175. )
  176. else:
  177. ret = [escape_re_range_char(c) for c in s]
  178. return "".join(ret)
  179. def _flatten(ll: list) -> list:
  180. ret = []
  181. for i in ll:
  182. if isinstance(i, list):
  183. ret.extend(_flatten(i))
  184. else:
  185. ret.append(i)
  186. return ret