rsync.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. # -*- coding: utf-8 -*-
  2. """
  3. 1:N rsync implemenation on top of execnet.
  4. (c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
  5. """
  6. import os
  7. import stat
  8. from hashlib import md5
  9. try:
  10. from queue import Queue
  11. except ImportError:
  12. from Queue import Queue
  13. import execnet.rsync_remote
  14. class RSync(object):
  15. """This class allows to send a directory structure (recursively)
  16. to one or multiple remote filesystems.
  17. There is limited support for symlinks, which means that symlinks
  18. pointing to the sourcetree will be send "as is" while external
  19. symlinks will be just copied (regardless of existance of such
  20. a path on remote side).
  21. """
  22. def __init__(self, sourcedir, callback=None, verbose=True):
  23. self._sourcedir = str(sourcedir)
  24. self._verbose = verbose
  25. assert callback is None or hasattr(callback, "__call__")
  26. self._callback = callback
  27. self._channels = {}
  28. self._receivequeue = Queue()
  29. self._links = []
  30. def filter(self, path):
  31. return True
  32. def _end_of_channel(self, channel):
  33. if channel in self._channels:
  34. # too early! we must have got an error
  35. channel.waitclose()
  36. # or else we raise one
  37. raise IOError("connection unexpectedly closed: {} ".format(channel.gateway))
  38. def _process_link(self, channel):
  39. for link in self._links:
  40. channel.send(link)
  41. # completion marker, this host is done
  42. channel.send(42)
  43. def _done(self, channel):
  44. """Call all callbacks"""
  45. finishedcallback = self._channels.pop(channel)
  46. if finishedcallback:
  47. finishedcallback()
  48. channel.waitclose()
  49. def _list_done(self, channel):
  50. # sum up all to send
  51. if self._callback:
  52. s = sum([self._paths[i] for i in self._to_send[channel]])
  53. self._callback("list", s, channel)
  54. def _send_item(self, channel, data):
  55. """Send one item"""
  56. modified_rel_path, checksum = data
  57. modifiedpath = os.path.join(self._sourcedir, *modified_rel_path)
  58. try:
  59. f = open(modifiedpath, "rb")
  60. data = f.read()
  61. except IOError:
  62. data = None
  63. # provide info to progress callback function
  64. modified_rel_path = "/".join(modified_rel_path)
  65. if data is not None:
  66. self._paths[modified_rel_path] = len(data)
  67. else:
  68. self._paths[modified_rel_path] = 0
  69. if channel not in self._to_send:
  70. self._to_send[channel] = []
  71. self._to_send[channel].append(modified_rel_path)
  72. # print "sending", modified_rel_path, data and len(data) or 0, checksum
  73. if data is not None:
  74. f.close()
  75. if checksum is not None and checksum == md5(data).digest():
  76. data = None # not really modified
  77. else:
  78. self._report_send_file(channel.gateway, modified_rel_path)
  79. channel.send(data)
  80. def _report_send_file(self, gateway, modified_rel_path):
  81. if self._verbose:
  82. print("{} <= {}".format(gateway, modified_rel_path))
  83. def send(self, raises=True):
  84. """Sends a sourcedir to all added targets. Flag indicates
  85. whether to raise an error or return in case of lack of
  86. targets
  87. """
  88. if not self._channels:
  89. if raises:
  90. raise IOError(
  91. "no targets available, maybe you " "are trying call send() twice?"
  92. )
  93. return
  94. # normalize a trailing '/' away
  95. self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, "x"))
  96. # send directory structure and file timestamps/sizes
  97. self._send_directory_structure(self._sourcedir)
  98. # paths and to_send are only used for doing
  99. # progress-related callbacks
  100. self._paths = {}
  101. self._to_send = {}
  102. # send modified file to clients
  103. while self._channels:
  104. channel, req = self._receivequeue.get()
  105. if req is None:
  106. self._end_of_channel(channel)
  107. else:
  108. command, data = req
  109. if command == "links":
  110. self._process_link(channel)
  111. elif command == "done":
  112. self._done(channel)
  113. elif command == "ack":
  114. if self._callback:
  115. self._callback("ack", self._paths[data], channel)
  116. elif command == "list_done":
  117. self._list_done(channel)
  118. elif command == "send":
  119. self._send_item(channel, data)
  120. del data
  121. else:
  122. assert "Unknown command %s" % command
  123. def add_target(self, gateway, destdir, finishedcallback=None, **options):
  124. """Adds a remote target specified via a gateway
  125. and a remote destination directory.
  126. """
  127. for name in options:
  128. assert name in ("delete",)
  129. def itemcallback(req):
  130. self._receivequeue.put((channel, req))
  131. channel = gateway.remote_exec(execnet.rsync_remote)
  132. channel.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
  133. channel.setcallback(itemcallback, endmarker=None)
  134. channel.send((str(destdir), options))
  135. self._channels[channel] = finishedcallback
  136. def _broadcast(self, msg):
  137. for channel in self._channels:
  138. channel.send(msg)
  139. def _send_link(self, linktype, basename, linkpoint):
  140. self._links.append((linktype, basename, linkpoint))
  141. def _send_directory(self, path):
  142. # dir: send a list of entries
  143. names = []
  144. subpaths = []
  145. for name in os.listdir(path):
  146. p = os.path.join(path, name)
  147. if self.filter(p):
  148. names.append(name)
  149. subpaths.append(p)
  150. mode = os.lstat(path).st_mode
  151. self._broadcast([mode] + names)
  152. for p in subpaths:
  153. self._send_directory_structure(p)
  154. def _send_link_structure(self, path):
  155. linkpoint = os.readlink(path)
  156. basename = path[len(self._sourcedir) + 1 :]
  157. if linkpoint.startswith(self._sourcedir):
  158. self._send_link("linkbase", basename, linkpoint[len(self._sourcedir) + 1 :])
  159. else:
  160. # relative or absolute link, just send it
  161. self._send_link("link", basename, linkpoint)
  162. self._broadcast(None)
  163. def _send_directory_structure(self, path):
  164. try:
  165. st = os.lstat(path)
  166. except OSError:
  167. self._broadcast((None, 0, 0))
  168. return
  169. if stat.S_ISREG(st.st_mode):
  170. # regular file: send a mode/timestamp/size pair
  171. self._broadcast((st.st_mode, st.st_mtime, st.st_size))
  172. elif stat.S_ISDIR(st.st_mode):
  173. self._send_directory(path)
  174. elif stat.S_ISLNK(st.st_mode):
  175. self._send_link_structure(path)
  176. else:
  177. raise ValueError("cannot sync {!r}".format(path))