main.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. import io
  2. import logging
  3. import os
  4. import shutil
  5. import sys
  6. import tempfile
  7. from collections import OrderedDict
  8. from contextlib import contextmanager
  9. from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple,
  10. Union)
  11. from .parser import Binding, parse_stream
  12. from .variables import parse_variables
  13. logger = logging.getLogger(__name__)
  14. if sys.version_info >= (3, 6):
  15. _PathLike = os.PathLike
  16. else:
  17. _PathLike = str
  18. def with_warn_for_invalid_lines(mappings: Iterator[Binding]) -> Iterator[Binding]:
  19. for mapping in mappings:
  20. if mapping.error:
  21. logger.warning(
  22. "Python-dotenv could not parse statement starting at line %s",
  23. mapping.original.line,
  24. )
  25. yield mapping
  26. class DotEnv():
  27. def __init__(
  28. self,
  29. dotenv_path: Optional[Union[str, _PathLike]],
  30. stream: Optional[IO[str]] = None,
  31. verbose: bool = False,
  32. encoding: Union[None, str] = None,
  33. interpolate: bool = True,
  34. override: bool = True,
  35. ) -> None:
  36. self.dotenv_path = dotenv_path # type: Optional[Union[str, _PathLike]]
  37. self.stream = stream # type: Optional[IO[str]]
  38. self._dict = None # type: Optional[Dict[str, Optional[str]]]
  39. self.verbose = verbose # type: bool
  40. self.encoding = encoding # type: Union[None, str]
  41. self.interpolate = interpolate # type: bool
  42. self.override = override # type: bool
  43. @contextmanager
  44. def _get_stream(self) -> Iterator[IO[str]]:
  45. if self.dotenv_path and os.path.isfile(self.dotenv_path):
  46. with io.open(self.dotenv_path, encoding=self.encoding) as stream:
  47. yield stream
  48. elif self.stream is not None:
  49. yield self.stream
  50. else:
  51. if self.verbose:
  52. logger.info(
  53. "Python-dotenv could not find configuration file %s.",
  54. self.dotenv_path or '.env',
  55. )
  56. yield io.StringIO('')
  57. def dict(self) -> Dict[str, Optional[str]]:
  58. """Return dotenv as dict"""
  59. if self._dict:
  60. return self._dict
  61. raw_values = self.parse()
  62. if self.interpolate:
  63. self._dict = OrderedDict(resolve_variables(raw_values, override=self.override))
  64. else:
  65. self._dict = OrderedDict(raw_values)
  66. return self._dict
  67. def parse(self) -> Iterator[Tuple[str, Optional[str]]]:
  68. with self._get_stream() as stream:
  69. for mapping in with_warn_for_invalid_lines(parse_stream(stream)):
  70. if mapping.key is not None:
  71. yield mapping.key, mapping.value
  72. def set_as_environment_variables(self) -> bool:
  73. """
  74. Load the current dotenv as system environment variable.
  75. """
  76. for k, v in self.dict().items():
  77. if k in os.environ and not self.override:
  78. continue
  79. if v is not None:
  80. os.environ[k] = v
  81. return True
  82. def get(self, key: str) -> Optional[str]:
  83. """
  84. """
  85. data = self.dict()
  86. if key in data:
  87. return data[key]
  88. if self.verbose:
  89. logger.warning("Key %s not found in %s.", key, self.dotenv_path)
  90. return None
  91. def get_key(
  92. dotenv_path: Union[str, _PathLike],
  93. key_to_get: str,
  94. encoding: Optional[str] = "utf-8",
  95. ) -> Optional[str]:
  96. """
  97. Get the value of a given key from the given .env.
  98. Returns `None` if the key isn't found or doesn't have a value.
  99. """
  100. return DotEnv(dotenv_path, verbose=True, encoding=encoding).get(key_to_get)
  101. @contextmanager
  102. def rewrite(
  103. path: Union[str, _PathLike],
  104. encoding: Optional[str],
  105. ) -> Iterator[Tuple[IO[str], IO[str]]]:
  106. try:
  107. if not os.path.isfile(path):
  108. with io.open(path, "w+", encoding=encoding) as source:
  109. source.write("")
  110. with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding=encoding) as dest:
  111. with io.open(path, encoding=encoding) as source:
  112. yield (source, dest) # type: ignore
  113. except BaseException:
  114. if os.path.isfile(dest.name):
  115. os.unlink(dest.name)
  116. raise
  117. else:
  118. shutil.move(dest.name, path)
  119. def set_key(
  120. dotenv_path: Union[str, _PathLike],
  121. key_to_set: str,
  122. value_to_set: str,
  123. quote_mode: str = "always",
  124. export: bool = False,
  125. encoding: Optional[str] = "utf-8",
  126. ) -> Tuple[Optional[bool], str, str]:
  127. """
  128. Adds or Updates a key/value to the given .env
  129. If the .env path given doesn't exist, fails instead of risking creating
  130. an orphan .env somewhere in the filesystem
  131. """
  132. if quote_mode not in ("always", "auto", "never"):
  133. raise ValueError("Unknown quote_mode: {}".format(quote_mode))
  134. quote = (
  135. quote_mode == "always"
  136. or (quote_mode == "auto" and not value_to_set.isalnum())
  137. )
  138. if quote:
  139. value_out = "'{}'".format(value_to_set.replace("'", "\\'"))
  140. else:
  141. value_out = value_to_set
  142. if export:
  143. line_out = 'export {}={}\n'.format(key_to_set, value_out)
  144. else:
  145. line_out = "{}={}\n".format(key_to_set, value_out)
  146. with rewrite(dotenv_path, encoding=encoding) as (source, dest):
  147. replaced = False
  148. missing_newline = False
  149. for mapping in with_warn_for_invalid_lines(parse_stream(source)):
  150. if mapping.key == key_to_set:
  151. dest.write(line_out)
  152. replaced = True
  153. else:
  154. dest.write(mapping.original.string)
  155. missing_newline = not mapping.original.string.endswith("\n")
  156. if not replaced:
  157. if missing_newline:
  158. dest.write("\n")
  159. dest.write(line_out)
  160. return True, key_to_set, value_to_set
  161. def unset_key(
  162. dotenv_path: Union[str, _PathLike],
  163. key_to_unset: str,
  164. quote_mode: str = "always",
  165. encoding: Optional[str] = "utf-8",
  166. ) -> Tuple[Optional[bool], str]:
  167. """
  168. Removes a given key from the given .env
  169. If the .env path given doesn't exist, fails
  170. If the given key doesn't exist in the .env, fails
  171. """
  172. if not os.path.exists(dotenv_path):
  173. logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path)
  174. return None, key_to_unset
  175. removed = False
  176. with rewrite(dotenv_path, encoding=encoding) as (source, dest):
  177. for mapping in with_warn_for_invalid_lines(parse_stream(source)):
  178. if mapping.key == key_to_unset:
  179. removed = True
  180. else:
  181. dest.write(mapping.original.string)
  182. if not removed:
  183. logger.warning("Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path)
  184. return None, key_to_unset
  185. return removed, key_to_unset
  186. def resolve_variables(
  187. values: Iterable[Tuple[str, Optional[str]]],
  188. override: bool,
  189. ) -> Mapping[str, Optional[str]]:
  190. new_values = {} # type: Dict[str, Optional[str]]
  191. for (name, value) in values:
  192. if value is None:
  193. result = None
  194. else:
  195. atoms = parse_variables(value)
  196. env = {} # type: Dict[str, Optional[str]]
  197. if override:
  198. env.update(os.environ) # type: ignore
  199. env.update(new_values)
  200. else:
  201. env.update(new_values)
  202. env.update(os.environ) # type: ignore
  203. result = "".join(atom.resolve(env) for atom in atoms)
  204. new_values[name] = result
  205. return new_values
  206. def _walk_to_root(path: str) -> Iterator[str]:
  207. """
  208. Yield directories starting from the given directory up to the root
  209. """
  210. if not os.path.exists(path):
  211. raise IOError('Starting path not found')
  212. if os.path.isfile(path):
  213. path = os.path.dirname(path)
  214. last_dir = None
  215. current_dir = os.path.abspath(path)
  216. while last_dir != current_dir:
  217. yield current_dir
  218. parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
  219. last_dir, current_dir = current_dir, parent_dir
  220. def find_dotenv(
  221. filename: str = '.env',
  222. raise_error_if_not_found: bool = False,
  223. usecwd: bool = False,
  224. ) -> str:
  225. """
  226. Search in increasingly higher folders for the given file
  227. Returns path to the file if found, or an empty string otherwise
  228. """
  229. def _is_interactive():
  230. """ Decide whether this is running in a REPL or IPython notebook """
  231. main = __import__('__main__', None, None, fromlist=['__file__'])
  232. return not hasattr(main, '__file__')
  233. if usecwd or _is_interactive() or getattr(sys, 'frozen', False):
  234. # Should work without __file__, e.g. in REPL or IPython notebook.
  235. path = os.getcwd()
  236. else:
  237. # will work for .py files
  238. frame = sys._getframe()
  239. current_file = __file__
  240. while frame.f_code.co_filename == current_file:
  241. assert frame.f_back is not None
  242. frame = frame.f_back
  243. frame_filename = frame.f_code.co_filename
  244. path = os.path.dirname(os.path.abspath(frame_filename))
  245. for dirname in _walk_to_root(path):
  246. check_path = os.path.join(dirname, filename)
  247. if os.path.isfile(check_path):
  248. return check_path
  249. if raise_error_if_not_found:
  250. raise IOError('File not found')
  251. return ''
  252. def load_dotenv(
  253. dotenv_path: Union[str, _PathLike, None] = None,
  254. stream: Optional[IO[str]] = None,
  255. verbose: bool = False,
  256. override: bool = False,
  257. interpolate: bool = True,
  258. encoding: Optional[str] = "utf-8",
  259. ) -> bool:
  260. """Parse a .env file and then load all the variables found as environment variables.
  261. - *dotenv_path*: absolute or relative path to .env file.
  262. - *stream*: Text stream (such as `io.StringIO`) with .env content, used if
  263. `dotenv_path` is `None`.
  264. - *verbose*: whether to output a warning the .env file is missing. Defaults to
  265. `False`.
  266. - *override*: whether to override the system environment variables with the variables
  267. in `.env` file. Defaults to `False`.
  268. - *encoding*: encoding to be used to read the file.
  269. If both `dotenv_path` and `stream`, `find_dotenv()` is used to find the .env file.
  270. """
  271. if dotenv_path is None and stream is None:
  272. dotenv_path = find_dotenv()
  273. dotenv = DotEnv(
  274. dotenv_path=dotenv_path,
  275. stream=stream,
  276. verbose=verbose,
  277. interpolate=interpolate,
  278. override=override,
  279. encoding=encoding,
  280. )
  281. return dotenv.set_as_environment_variables()
  282. def dotenv_values(
  283. dotenv_path: Union[str, _PathLike, None] = None,
  284. stream: Optional[IO[str]] = None,
  285. verbose: bool = False,
  286. interpolate: bool = True,
  287. encoding: Optional[str] = "utf-8",
  288. ) -> Dict[str, Optional[str]]:
  289. """
  290. Parse a .env file and return its content as a dict.
  291. - *dotenv_path*: absolute or relative path to .env file.
  292. - *stream*: `StringIO` object with .env content, used if `dotenv_path` is `None`.
  293. - *verbose*: whether to output a warning the .env file is missing. Defaults to
  294. `False`.
  295. in `.env` file. Defaults to `False`.
  296. - *encoding*: encoding to be used to read the file.
  297. If both `dotenv_path` and `stream`, `find_dotenv()` is used to find the .env file.
  298. """
  299. if dotenv_path is None and stream is None:
  300. dotenv_path = find_dotenv()
  301. return DotEnv(
  302. dotenv_path=dotenv_path,
  303. stream=stream,
  304. verbose=verbose,
  305. interpolate=interpolate,
  306. override=True,
  307. encoding=encoding,
  308. ).dict()