123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896 |
- # config.py
- # Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
- #
- # This module is part of GitPython and is released under
- # the BSD License: http://www.opensource.org/licenses/bsd-license.php
- """Module containing module parser implementation able to properly read and write
- configuration files"""
- import sys
- import abc
- from functools import wraps
- import inspect
- from io import BufferedReader, IOBase
- import logging
- import os
- import re
- import fnmatch
- from git.compat import (
- defenc,
- force_text,
- is_win,
- )
- from git.util import LockFile
- import os.path as osp
- import configparser as cp
- # typing-------------------------------------------------------
- from typing import (
- Any,
- Callable,
- Generic,
- IO,
- List,
- Dict,
- Sequence,
- TYPE_CHECKING,
- Tuple,
- TypeVar,
- Union,
- cast,
- )
- from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T
- if TYPE_CHECKING:
- from git.repo.base import Repo
- from io import BytesIO
- T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser")
- T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool)
- if sys.version_info[:3] < (3, 7, 2):
- # typing.Ordereddict not added until py 3.7.2
- from collections import OrderedDict
- OrderedDict_OMD = OrderedDict
- else:
- from typing import OrderedDict
- OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc]
- # -------------------------------------------------------------
- __all__ = ("GitConfigParser", "SectionConstraint")
- log = logging.getLogger("git.config")
- log.addHandler(logging.NullHandler())
- # invariants
- # represents the configuration level of a configuration file
- CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository")
- # Section pattern to detect conditional includes.
- # https://git-scm.com/docs/git-config#_conditional_includes
- CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\"")
- class MetaParserBuilder(abc.ABCMeta): # noqa: B024
- """Utility class wrapping base-class methods into decorators that assure read-only properties"""
- def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder":
- """
- Equip all base-class methods with a needs_values decorator, and all non-const methods
- with a set_dirty_and_flush_changes decorator in addition to that."""
- kmm = "_mutating_methods_"
- if kmm in clsdict:
- mutating_methods = clsdict[kmm]
- for base in bases:
- methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_"))
- for name, method in methods:
- if name in clsdict:
- continue
- method_with_values = needs_values(method)
- if name in mutating_methods:
- method_with_values = set_dirty_and_flush_changes(method_with_values)
- # END mutating methods handling
- clsdict[name] = method_with_values
- # END for each name/method pair
- # END for each base
- # END if mutating methods configuration is set
- new_type = super(MetaParserBuilder, cls).__new__(cls, name, bases, clsdict)
- return new_type
- def needs_values(func: Callable[..., _T]) -> Callable[..., _T]:
- """Returns method assuring we read values (on demand) before we try to access them"""
- @wraps(func)
- def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T:
- self.read()
- return func(self, *args, **kwargs)
- # END wrapper method
- return assure_data_present
- def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]:
- """Return method that checks whether given non constant function may be called.
- If so, the instance will be set dirty.
- Additionally, we flush the changes right to disk"""
- def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T:
- rval = non_const_func(self, *args, **kwargs)
- self._dirty = True
- self.write()
- return rval
- # END wrapper method
- flush_changes.__name__ = non_const_func.__name__
- return flush_changes
- class SectionConstraint(Generic[T_ConfigParser]):
- """Constrains a ConfigParser to only option commands which are constrained to
- always use the section we have been initialized with.
- It supports all ConfigParser methods that operate on an option.
- :note:
- If used as a context manager, will release the wrapped ConfigParser."""
- __slots__ = ("_config", "_section_name")
- _valid_attrs_ = (
- "get_value",
- "set_value",
- "get",
- "set",
- "getint",
- "getfloat",
- "getboolean",
- "has_option",
- "remove_section",
- "remove_option",
- "options",
- )
- def __init__(self, config: T_ConfigParser, section: str) -> None:
- self._config = config
- self._section_name = section
- def __del__(self) -> None:
- # Yes, for some reason, we have to call it explicitly for it to work in PY3 !
- # Apparently __del__ doesn't get call anymore if refcount becomes 0
- # Ridiculous ... .
- self._config.release()
- def __getattr__(self, attr: str) -> Any:
- if attr in self._valid_attrs_:
- return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs)
- return super(SectionConstraint, self).__getattribute__(attr)
- def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any:
- """Call the configuration at the given method which must take a section name
- as first argument"""
- return getattr(self._config, method)(self._section_name, *args, **kwargs)
- @property
- def config(self) -> T_ConfigParser:
- """return: Configparser instance we constrain"""
- return self._config
- def release(self) -> None:
- """Equivalent to GitConfigParser.release(), which is called on our underlying parser instance"""
- return self._config.release()
- def __enter__(self) -> "SectionConstraint[T_ConfigParser]":
- self._config.__enter__()
- return self
- def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None:
- self._config.__exit__(exception_type, exception_value, traceback)
- class _OMD(OrderedDict_OMD):
- """Ordered multi-dict."""
- def __setitem__(self, key: str, value: _T) -> None:
- super(_OMD, self).__setitem__(key, [value])
- def add(self, key: str, value: Any) -> None:
- if key not in self:
- super(_OMD, self).__setitem__(key, [value])
- return None
- super(_OMD, self).__getitem__(key).append(value)
- def setall(self, key: str, values: List[_T]) -> None:
- super(_OMD, self).__setitem__(key, values)
- def __getitem__(self, key: str) -> Any:
- return super(_OMD, self).__getitem__(key)[-1]
- def getlast(self, key: str) -> Any:
- return super(_OMD, self).__getitem__(key)[-1]
- def setlast(self, key: str, value: Any) -> None:
- if key not in self:
- super(_OMD, self).__setitem__(key, [value])
- return
- prior = super(_OMD, self).__getitem__(key)
- prior[-1] = value
- def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]:
- return super(_OMD, self).get(key, [default])[-1]
- def getall(self, key: str) -> List[_T]:
- return super(_OMD, self).__getitem__(key)
- def items(self) -> List[Tuple[str, _T]]: # type: ignore[override]
- """List of (key, last value for key)."""
- return [(k, self[k]) for k in self]
- def items_all(self) -> List[Tuple[str, List[_T]]]:
- """List of (key, list of values for key)."""
- return [(k, self.getall(k)) for k in self]
- def get_config_path(config_level: Lit_config_levels) -> str:
- # we do not support an absolute path of the gitconfig on windows ,
- # use the global config instead
- if is_win and config_level == "system":
- config_level = "global"
- if config_level == "system":
- return "/etc/gitconfig"
- elif config_level == "user":
- config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config")
- return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config")))
- elif config_level == "global":
- return osp.normpath(osp.expanduser("~/.gitconfig"))
- elif config_level == "repository":
- raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path")
- else:
- # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs
- assert_never(
- config_level, # type: ignore[unreachable]
- ValueError(f"Invalid configuration level: {config_level!r}"),
- )
- class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
- """Implements specifics required to read git style configuration files.
- This variation behaves much like the git.config command such that the configuration
- will be read on demand based on the filepath given during initialization.
- The changes will automatically be written once the instance goes out of scope, but
- can be triggered manually as well.
- The configuration file will be locked if you intend to change values preventing other
- instances to write concurrently.
- :note:
- The config is case-sensitive even when queried, hence section and option names
- must match perfectly.
- If used as a context manager, will release the locked file."""
- # { Configuration
- # The lock type determines the type of lock to use in new configuration readers.
- # They must be compatible to the LockFile interface.
- # A suitable alternative would be the BlockingLockFile
- t_lock = LockFile
- re_comment = re.compile(r"^\s*[#;]")
- # } END configuration
- optvalueonly_source = r"\s*(?P<option>[^:=\s][^:=]*)"
- OPTVALUEONLY = re.compile(optvalueonly_source)
- OPTCRE = re.compile(optvalueonly_source + r"\s*(?P<vi>[:=])\s*" + r"(?P<value>.*)$")
- del optvalueonly_source
- # list of RawConfigParser methods able to change the instance
- _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set")
- def __init__(
- self,
- file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None,
- read_only: bool = True,
- merge_includes: bool = True,
- config_level: Union[Lit_config_levels, None] = None,
- repo: Union["Repo", None] = None,
- ) -> None:
- """Initialize a configuration reader to read the given file_or_files and to
- possibly allow changes to it by setting read_only False
- :param file_or_files:
- A single file path or file objects or multiple of these
- :param read_only:
- If True, the ConfigParser may only read the data , but not change it.
- If False, only a single file path or file object may be given. We will write back the changes
- when they happen, or when the ConfigParser is released. This will not happen if other
- configuration files have been included
- :param merge_includes: if True, we will read files mentioned in [include] sections and merge their
- contents into ours. This makes it impossible to write back an individual configuration file.
- Thus, if you want to modify a single configuration file, turn this off to leave the original
- dataset unaltered when reading it.
- :param repo: Reference to repository to use if [includeIf] sections are found in configuration files.
- """
- cp.RawConfigParser.__init__(self, dict_type=_OMD)
- self._dict: Callable[..., _OMD] # type: ignore # mypy/typeshed bug?
- self._defaults: _OMD
- self._sections: _OMD # type: ignore # mypy/typeshed bug?
- # Used in python 3, needs to stay in sync with sections for underlying implementation to work
- if not hasattr(self, "_proxies"):
- self._proxies = self._dict()
- if file_or_files is not None:
- self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files
- else:
- if config_level is None:
- if read_only:
- self._file_or_files = [
- get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository"
- ]
- else:
- raise ValueError("No configuration level or configuration files specified")
- else:
- self._file_or_files = [get_config_path(config_level)]
- self._read_only = read_only
- self._dirty = False
- self._is_initialized = False
- self._merge_includes = merge_includes
- self._repo = repo
- self._lock: Union["LockFile", None] = None
- self._acquire_lock()
- def _acquire_lock(self) -> None:
- if not self._read_only:
- if not self._lock:
- if isinstance(self._file_or_files, (str, os.PathLike)):
- file_or_files = self._file_or_files
- elif isinstance(self._file_or_files, (tuple, list, Sequence)):
- raise ValueError(
- "Write-ConfigParsers can operate on a single file only, multiple files have been passed"
- )
- else:
- file_or_files = self._file_or_files.name
- # END get filename from handle/stream
- # initialize lock base - we want to write
- self._lock = self.t_lock(file_or_files)
- # END lock check
- self._lock._obtain_lock()
- # END read-only check
- def __del__(self) -> None:
- """Write pending changes if required and release locks"""
- # NOTE: only consistent in PY2
- self.release()
- def __enter__(self) -> "GitConfigParser":
- self._acquire_lock()
- return self
- def __exit__(self, *args: Any) -> None:
- self.release()
- def release(self) -> None:
- """Flush changes and release the configuration write lock. This instance must not be used anymore afterwards.
- In Python 3, it's required to explicitly release locks and flush changes, as __del__ is not called
- deterministically anymore."""
- # checking for the lock here makes sure we do not raise during write()
- # in case an invalid parser was created who could not get a lock
- if self.read_only or (self._lock and not self._lock._has_lock()):
- return
- try:
- try:
- self.write()
- except IOError:
- log.error("Exception during destruction of GitConfigParser", exc_info=True)
- except ReferenceError:
- # This happens in PY3 ... and usually means that some state cannot be written
- # as the sections dict cannot be iterated
- # Usually when shutting down the interpreter, don'y know how to fix this
- pass
- finally:
- if self._lock is not None:
- self._lock._release_lock()
- def optionxform(self, optionstr: str) -> str:
- """Do not transform options in any way when writing"""
- return optionstr
- def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None:
- """A direct copy of the py2.4 version of the super class's _read method
- to assure it uses ordered dicts. Had to change one line to make it work.
- Future versions have this fixed, but in fact its quite embarrassing for the
- guys not to have done it right in the first place !
- Removed big comments to make it more compact.
- Made sure it ignores initial whitespace as git uses tabs"""
- cursect = None # None, or a dictionary
- optname = None
- lineno = 0
- is_multi_line = False
- e = None # None, or an exception
- def string_decode(v: str) -> str:
- if v[-1] == "\\":
- v = v[:-1]
- # end cut trailing escapes to prevent decode error
- return v.encode(defenc).decode("unicode_escape")
- # end
- # end
- while True:
- # we assume to read binary !
- line = fp.readline().decode(defenc)
- if not line:
- break
- lineno = lineno + 1
- # comment or blank line?
- if line.strip() == "" or self.re_comment.match(line):
- continue
- if line.split(None, 1)[0].lower() == "rem" and line[0] in "rR":
- # no leading whitespace
- continue
- # is it a section header?
- mo = self.SECTCRE.match(line.strip())
- if not is_multi_line and mo:
- sectname: str = mo.group("header").strip()
- if sectname in self._sections:
- cursect = self._sections[sectname]
- elif sectname == cp.DEFAULTSECT:
- cursect = self._defaults
- else:
- cursect = self._dict((("__name__", sectname),))
- self._sections[sectname] = cursect
- self._proxies[sectname] = None
- # So sections can't start with a continuation line
- optname = None
- # no section header in the file?
- elif cursect is None:
- raise cp.MissingSectionHeaderError(fpname, lineno, line)
- # an option line?
- elif not is_multi_line:
- mo = self.OPTCRE.match(line)
- if mo:
- # We might just have handled the last line, which could contain a quotation we want to remove
- optname, vi, optval = mo.group("option", "vi", "value")
- if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'):
- pos = optval.find(";")
- if pos != -1 and optval[pos - 1].isspace():
- optval = optval[:pos]
- optval = optval.strip()
- if optval == '""':
- optval = ""
- # end handle empty string
- optname = self.optionxform(optname.rstrip())
- if len(optval) > 1 and optval[0] == '"' and optval[-1] != '"':
- is_multi_line = True
- optval = string_decode(optval[1:])
- # end handle multi-line
- # preserves multiple values for duplicate optnames
- cursect.add(optname, optval)
- else:
- # check if it's an option with no value - it's just ignored by git
- if not self.OPTVALUEONLY.match(line):
- if not e:
- e = cp.ParsingError(fpname)
- e.append(lineno, repr(line))
- continue
- else:
- line = line.rstrip()
- if line.endswith('"'):
- is_multi_line = False
- line = line[:-1]
- # end handle quotations
- optval = cursect.getlast(optname)
- cursect.setlast(optname, optval + string_decode(line))
- # END parse section or option
- # END while reading
- # if any parsing errors occurred, raise an exception
- if e:
- raise e
- def _has_includes(self) -> Union[bool, int]:
- return self._merge_includes and len(self._included_paths())
- def _included_paths(self) -> List[Tuple[str, str]]:
- """Return List all paths that must be included to configuration
- as Tuples of (option, value).
- """
- paths = []
- for section in self.sections():
- if section == "include":
- paths += self.items(section)
- match = CONDITIONAL_INCLUDE_REGEXP.search(section)
- if match is None or self._repo is None:
- continue
- keyword = match.group(1)
- value = match.group(2).strip()
- if keyword in ["gitdir", "gitdir/i"]:
- value = osp.expanduser(value)
- if not any(value.startswith(s) for s in ["./", "/"]):
- value = "**/" + value
- if value.endswith("/"):
- value += "**"
- # Ensure that glob is always case insensitive if required.
- if keyword.endswith("/i"):
- value = re.sub(
- r"[a-zA-Z]",
- lambda m: "[{}{}]".format(m.group().lower(), m.group().upper()),
- value,
- )
- if self._repo.git_dir:
- if fnmatch.fnmatchcase(str(self._repo.git_dir), value):
- paths += self.items(section)
- elif keyword == "onbranch":
- try:
- branch_name = self._repo.active_branch.name
- except TypeError:
- # Ignore section if active branch cannot be retrieved.
- continue
- if fnmatch.fnmatchcase(branch_name, value):
- paths += self.items(section)
- return paths
- def read(self) -> None: # type: ignore[override]
- """Reads the data stored in the files we have been initialized with. It will
- ignore files that cannot be read, possibly leaving an empty configuration
- :return: Nothing
- :raise IOError: if a file cannot be handled"""
- if self._is_initialized:
- return None
- self._is_initialized = True
- files_to_read: List[Union[PathLike, IO]] = [""]
- if isinstance(self._file_or_files, (str, os.PathLike)):
- # for str or Path, as str is a type of Sequence
- files_to_read = [self._file_or_files]
- elif not isinstance(self._file_or_files, (tuple, list, Sequence)):
- # could merge with above isinstance once runtime type known
- files_to_read = [self._file_or_files]
- else: # for lists or tuples
- files_to_read = list(self._file_or_files)
- # end assure we have a copy of the paths to handle
- seen = set(files_to_read)
- num_read_include_files = 0
- while files_to_read:
- file_path = files_to_read.pop(0)
- file_ok = False
- if hasattr(file_path, "seek"):
- # must be a file objectfile-object
- file_path = cast(IO[bytes], file_path) # replace with assert to narrow type, once sure
- self._read(file_path, file_path.name)
- else:
- # assume a path if it is not a file-object
- file_path = cast(PathLike, file_path)
- try:
- with open(file_path, "rb") as fp:
- file_ok = True
- self._read(fp, fp.name)
- except IOError:
- continue
- # Read includes and append those that we didn't handle yet
- # We expect all paths to be normalized and absolute (and will assure that is the case)
- if self._has_includes():
- for _, include_path in self._included_paths():
- if include_path.startswith("~"):
- include_path = osp.expanduser(include_path)
- if not osp.isabs(include_path):
- if not file_ok:
- continue
- # end ignore relative paths if we don't know the configuration file path
- file_path = cast(PathLike, file_path)
- assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work"
- include_path = osp.join(osp.dirname(file_path), include_path)
- # end make include path absolute
- include_path = osp.normpath(include_path)
- if include_path in seen or not os.access(include_path, os.R_OK):
- continue
- seen.add(include_path)
- # insert included file to the top to be considered first
- files_to_read.insert(0, include_path)
- num_read_include_files += 1
- # each include path in configuration file
- # end handle includes
- # END for each file object to read
- # If there was no file included, we can safely write back (potentially) the configuration file
- # without altering it's meaning
- if num_read_include_files == 0:
- self._merge_includes = False
- # end
- def _write(self, fp: IO) -> None:
- """Write an .ini-format representation of the configuration state in
- git compatible format"""
- def write_section(name: str, section_dict: _OMD) -> None:
- fp.write(("[%s]\n" % name).encode(defenc))
- values: Sequence[str] # runtime only gets str in tests, but should be whatever _OMD stores
- v: str
- for (key, values) in section_dict.items_all():
- if key == "__name__":
- continue
- for v in values:
- fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc))
- # END if key is not __name__
- # END section writing
- if self._defaults:
- write_section(cp.DEFAULTSECT, self._defaults)
- value: _OMD
- for name, value in self._sections.items():
- write_section(name, value)
- def items(self, section_name: str) -> List[Tuple[str, str]]: # type: ignore[override]
- """:return: list((option, value), ...) pairs of all items in the given section"""
- return [(k, v) for k, v in super(GitConfigParser, self).items(section_name) if k != "__name__"]
- def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]:
- """:return: list((option, [values...]), ...) pairs of all items in the given section"""
- rv = _OMD(self._defaults)
- for k, vs in self._sections[section_name].items_all():
- if k == "__name__":
- continue
- if k in rv and rv.getall(k) == vs:
- continue
- for v in vs:
- rv.add(k, v)
- return rv.items_all()
- @needs_values
- def write(self) -> None:
- """Write changes to our file, if there are changes at all
- :raise IOError: if this is a read-only writer instance or if we could not obtain
- a file lock"""
- self._assure_writable("write")
- if not self._dirty:
- return None
- if isinstance(self._file_or_files, (list, tuple)):
- raise AssertionError(
- "Cannot write back if there is not exactly a single file to write to, have %i files"
- % len(self._file_or_files)
- )
- # end assert multiple files
- if self._has_includes():
- log.debug(
- "Skipping write-back of configuration file as include files were merged in."
- + "Set merge_includes=False to prevent this."
- )
- return None
- # end
- fp = self._file_or_files
- # we have a physical file on disk, so get a lock
- is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # can't use Pathlike until 3.5 dropped
- if is_file_lock and self._lock is not None: # else raise Error?
- self._lock._obtain_lock()
- if not hasattr(fp, "seek"):
- fp = cast(PathLike, fp)
- with open(fp, "wb") as fp_open:
- self._write(fp_open)
- else:
- fp = cast("BytesIO", fp)
- fp.seek(0)
- # make sure we do not overwrite into an existing file
- if hasattr(fp, "truncate"):
- fp.truncate()
- self._write(fp)
- def _assure_writable(self, method_name: str) -> None:
- if self.read_only:
- raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name))
- def add_section(self, section: str) -> None:
- """Assures added options will stay in order"""
- return super(GitConfigParser, self).add_section(section)
- @property
- def read_only(self) -> bool:
- """:return: True if this instance may change the configuration file"""
- return self._read_only
- def get_value(
- self,
- section: str,
- option: str,
- default: Union[int, float, str, bool, None] = None,
- ) -> Union[int, float, str, bool]:
- # can default or return type include bool?
- """Get an option's value.
- If multiple values are specified for this option in the section, the
- last one specified is returned.
- :param default:
- If not None, the given default value will be returned in case
- the option did not exist
- :return: a properly typed value, either int, float or string
- :raise TypeError: in case the value could not be understood
- Otherwise the exceptions known to the ConfigParser will be raised."""
- try:
- valuestr = self.get(section, option)
- except Exception:
- if default is not None:
- return default
- raise
- return self._string_to_value(valuestr)
- def get_values(
- self,
- section: str,
- option: str,
- default: Union[int, float, str, bool, None] = None,
- ) -> List[Union[int, float, str, bool]]:
- """Get an option's values.
- If multiple values are specified for this option in the section, all are
- returned.
- :param default:
- If not None, a list containing the given default value will be
- returned in case the option did not exist
- :return: a list of properly typed values, either int, float or string
- :raise TypeError: in case the value could not be understood
- Otherwise the exceptions known to the ConfigParser will be raised."""
- try:
- lst = self._sections[section].getall(option)
- except Exception:
- if default is not None:
- return [default]
- raise
- return [self._string_to_value(valuestr) for valuestr in lst]
- def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]:
- types = (int, float)
- for numtype in types:
- try:
- val = numtype(valuestr)
- # truncated value ?
- if val != float(valuestr):
- continue
- return val
- except (ValueError, TypeError):
- continue
- # END for each numeric type
- # try boolean values as git uses them
- vl = valuestr.lower()
- if vl == "false":
- return False
- if vl == "true":
- return True
- if not isinstance(valuestr, str):
- raise TypeError(
- "Invalid value type: only int, long, float and str are allowed",
- valuestr,
- )
- return valuestr
- def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str:
- if isinstance(value, (int, float, bool)):
- return str(value)
- return force_text(value)
- @needs_values
- @set_dirty_and_flush_changes
- def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser":
- """Sets the given option in section to the given value.
- It will create the section if required, and will not throw as opposed to the default
- ConfigParser 'set' method.
- :param section: Name of the section in which the option resides or should reside
- :param option: Name of the options whose value to set
- :param value: Value to set the option to. It must be a string or convertible
- to a string
- :return: this instance"""
- if not self.has_section(section):
- self.add_section(section)
- self.set(section, option, self._value_to_string(value))
- return self
- @needs_values
- @set_dirty_and_flush_changes
- def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser":
- """Adds a value for the given option in section.
- It will create the section if required, and will not throw as opposed to the default
- ConfigParser 'set' method. The value becomes the new value of the option as returned
- by 'get_value', and appends to the list of values returned by 'get_values`'.
- :param section: Name of the section in which the option resides or should reside
- :param option: Name of the option
- :param value: Value to add to option. It must be a string or convertible
- to a string
- :return: this instance"""
- if not self.has_section(section):
- self.add_section(section)
- self._sections[section].add(option, self._value_to_string(value))
- return self
- def rename_section(self, section: str, new_name: str) -> "GitConfigParser":
- """rename the given section to new_name
- :raise ValueError: if section doesn't exit
- :raise ValueError: if a section with new_name does already exist
- :return: this instance
- """
- if not self.has_section(section):
- raise ValueError("Source section '%s' doesn't exist" % section)
- if self.has_section(new_name):
- raise ValueError("Destination section '%s' already exists" % new_name)
- super(GitConfigParser, self).add_section(new_name)
- new_section = self._sections[new_name]
- for k, vs in self.items_all(section):
- new_section.setall(k, vs)
- # end for each value to copy
- # This call writes back the changes, which is why we don't have the respective decorator
- self.remove_section(section)
- return self
|