[svn r37850] Split methods a bit to smaller parts.
--HG-- branch : trunk
This commit is contained in:
parent
026c2fa0bc
commit
4ffda926ab
|
@ -38,6 +38,67 @@ class RSync(object):
|
||||||
channel.send((str(destdir), self._options))
|
channel.send((str(destdir), self._options))
|
||||||
self._channels[channel] = finishedcallback
|
self._channels[channel] = finishedcallback
|
||||||
|
|
||||||
|
def _end_of_channel(self, channel):
|
||||||
|
if channel in self._channels:
|
||||||
|
# too early! we must have got an error
|
||||||
|
channel.waitclose()
|
||||||
|
# or else we raise one
|
||||||
|
raise IOError('connection unexpectedly closed: %s ' % (
|
||||||
|
channel.gateway,))
|
||||||
|
|
||||||
|
def _process_link(self, channel):
|
||||||
|
for link in self._links:
|
||||||
|
channel.send(link)
|
||||||
|
# completion marker, this host is done
|
||||||
|
channel.send(42)
|
||||||
|
|
||||||
|
def _done(self, channel):
|
||||||
|
""" Call all callbacks
|
||||||
|
"""
|
||||||
|
finishedcallback = self._channels.pop(channel)
|
||||||
|
if finishedcallback:
|
||||||
|
finishedcallback()
|
||||||
|
|
||||||
|
def _list_done(self, channel):
|
||||||
|
# sum up all to send
|
||||||
|
if self._callback:
|
||||||
|
s = sum([self._paths[i] for i in self._to_send[channel]])
|
||||||
|
self._callback("list", s, channel)
|
||||||
|
|
||||||
|
def _send_item(self, channel, data):
|
||||||
|
""" Send one item
|
||||||
|
"""
|
||||||
|
modified_rel_path, checksum = data
|
||||||
|
modifiedpath = os.path.join(self._sourcedir, *modified_rel_path)
|
||||||
|
try:
|
||||||
|
f = open(modifiedpath, 'rb')
|
||||||
|
data = f.read()
|
||||||
|
except IOError:
|
||||||
|
data = None
|
||||||
|
|
||||||
|
# provide info to progress callback function
|
||||||
|
modified_rel_path = "/".join(modified_rel_path)
|
||||||
|
if data is not None:
|
||||||
|
self._paths[modified_rel_path] = len(data)
|
||||||
|
else:
|
||||||
|
self._paths[modified_rel_path] = 0
|
||||||
|
if channel not in self._to_send:
|
||||||
|
self._to_send[channel] = []
|
||||||
|
self._to_send[channel].append(modified_rel_path)
|
||||||
|
|
||||||
|
if data is not None:
|
||||||
|
f.close()
|
||||||
|
if checksum is not None and checksum == md5.md5(data).digest():
|
||||||
|
data = None # not really modified
|
||||||
|
else:
|
||||||
|
# ! there is a reason for the interning:
|
||||||
|
# sharing multiple copies of the file's data
|
||||||
|
data = intern(data)
|
||||||
|
print '%s <= %s' % (
|
||||||
|
channel.gateway.remoteaddress,
|
||||||
|
modified_rel_path)
|
||||||
|
channel.send(data)
|
||||||
|
|
||||||
def send(self, sourcedir):
|
def send(self, sourcedir):
|
||||||
""" Sends a sourcedir to all added targets.
|
""" Sends a sourcedir to all added targets.
|
||||||
"""
|
"""
|
||||||
|
@ -56,63 +117,20 @@ class RSync(object):
|
||||||
while self._channels:
|
while self._channels:
|
||||||
channel, req = self._receivequeue.get()
|
channel, req = self._receivequeue.get()
|
||||||
if req is None:
|
if req is None:
|
||||||
# end-of-channel
|
self._end_of_channel(channel)
|
||||||
if channel in self._channels:
|
|
||||||
# too early! we must have got an error
|
|
||||||
channel.waitclose()
|
|
||||||
# or else we raise one
|
|
||||||
raise IOError('connection unexpectedly closed: %s ' % (
|
|
||||||
channel.gateway,))
|
|
||||||
else:
|
else:
|
||||||
command, data = req
|
command, data = req
|
||||||
if command == "links":
|
if command == "links":
|
||||||
for link in self._links:
|
self._process_link(channel)
|
||||||
channel.send(link)
|
|
||||||
# completion marker, this host is done
|
|
||||||
channel.send(42)
|
|
||||||
elif command == "done":
|
elif command == "done":
|
||||||
finishedcallback = self._channels.pop(channel)
|
self._done(channel)
|
||||||
if finishedcallback:
|
|
||||||
finishedcallback()
|
|
||||||
elif command == "ack":
|
elif command == "ack":
|
||||||
if self._callback:
|
if self._callback:
|
||||||
self._callback("ack", self._paths[data], channel)
|
self._callback("ack", self._paths[data], channel)
|
||||||
elif command == "list_done":
|
elif command == "list_done":
|
||||||
# sum up all to send
|
self._list_done(channel)
|
||||||
if self._callback:
|
|
||||||
s = sum([self._paths[i] for i in self._to_send[channel]])
|
|
||||||
self._callback("list", s, channel)
|
|
||||||
elif command == "send":
|
elif command == "send":
|
||||||
modified_rel_path, checksum = data
|
self._send_item(channel, data)
|
||||||
modifiedpath = os.path.join(self._sourcedir, *modified_rel_path)
|
|
||||||
try:
|
|
||||||
f = open(modifiedpath, 'rb')
|
|
||||||
data = f.read()
|
|
||||||
except IOError:
|
|
||||||
data = None
|
|
||||||
|
|
||||||
# provide info to progress callback function
|
|
||||||
modified_rel_path = "/".join(modified_rel_path)
|
|
||||||
if data is not None:
|
|
||||||
self._paths[modified_rel_path] = len(data)
|
|
||||||
else:
|
|
||||||
self._paths[modified_rel_path] = 0
|
|
||||||
if channel not in self._to_send:
|
|
||||||
self._to_send[channel] = []
|
|
||||||
self._to_send[channel].append(modified_rel_path)
|
|
||||||
|
|
||||||
if data is not None:
|
|
||||||
f.close()
|
|
||||||
if checksum is not None and checksum == md5.md5(data).digest():
|
|
||||||
data = None # not really modified
|
|
||||||
else:
|
|
||||||
# ! there is a reason for the interning:
|
|
||||||
# sharing multiple copies of the file's data
|
|
||||||
data = intern(data)
|
|
||||||
print '%s <= %s' % (
|
|
||||||
channel.gateway.remoteaddress,
|
|
||||||
modified_rel_path)
|
|
||||||
channel.send(data)
|
|
||||||
del data
|
del data
|
||||||
else:
|
else:
|
||||||
assert "Unknown command %s" % command
|
assert "Unknown command %s" % command
|
||||||
|
@ -124,6 +142,32 @@ class RSync(object):
|
||||||
def _send_link(self, basename, linkpoint):
|
def _send_link(self, basename, linkpoint):
|
||||||
self._links.append(("link", basename, linkpoint))
|
self._links.append(("link", basename, linkpoint))
|
||||||
|
|
||||||
|
def _send_directory(self, path):
|
||||||
|
# dir: send a list of entries
|
||||||
|
names = []
|
||||||
|
subpaths = []
|
||||||
|
for name in os.listdir(path):
|
||||||
|
p = os.path.join(path, name)
|
||||||
|
if self.filter(p):
|
||||||
|
names.append(name)
|
||||||
|
subpaths.append(p)
|
||||||
|
self._broadcast(names)
|
||||||
|
for p in subpaths:
|
||||||
|
self._send_directory_structure(p)
|
||||||
|
|
||||||
|
def _send_link_structure(self, path):
|
||||||
|
linkpoint = os.readlink(path)
|
||||||
|
basename = path[len(self._sourcedir) + 1:]
|
||||||
|
if not linkpoint.startswith(os.sep):
|
||||||
|
# relative link, just send it
|
||||||
|
# XXX: do sth with ../ links
|
||||||
|
self._send_link(basename, linkpoint)
|
||||||
|
elif linkpoint.startswith(self._sourcedir):
|
||||||
|
self._send_link(basename, linkpoint[len(self._sourcedir) + 1:])
|
||||||
|
else:
|
||||||
|
self._send_link(basename, linkpoint)
|
||||||
|
self._broadcast(None)
|
||||||
|
|
||||||
def _send_directory_structure(self, path):
|
def _send_directory_structure(self, path):
|
||||||
try:
|
try:
|
||||||
st = os.lstat(path)
|
st = os.lstat(path)
|
||||||
|
@ -134,29 +178,9 @@ class RSync(object):
|
||||||
# regular file: send a timestamp/size pair
|
# regular file: send a timestamp/size pair
|
||||||
self._broadcast((st.st_mtime, st.st_size))
|
self._broadcast((st.st_mtime, st.st_size))
|
||||||
elif stat.S_ISDIR(st.st_mode):
|
elif stat.S_ISDIR(st.st_mode):
|
||||||
# dir: send a list of entries
|
self._send_directory(path)
|
||||||
names = []
|
|
||||||
subpaths = []
|
|
||||||
for name in os.listdir(path):
|
|
||||||
p = os.path.join(path, name)
|
|
||||||
if self.filter(p):
|
|
||||||
names.append(name)
|
|
||||||
subpaths.append(p)
|
|
||||||
self._broadcast(names)
|
|
||||||
for p in subpaths:
|
|
||||||
self._send_directory_structure(p)
|
|
||||||
elif stat.S_ISLNK(st.st_mode):
|
elif stat.S_ISLNK(st.st_mode):
|
||||||
linkpoint = os.readlink(path)
|
self._send_link_structure(path)
|
||||||
basename = path[len(self._sourcedir) + 1:]
|
|
||||||
if not linkpoint.startswith(os.sep):
|
|
||||||
# relative link, just send it
|
|
||||||
# XXX: do sth with ../ links
|
|
||||||
self._send_link(basename, linkpoint)
|
|
||||||
elif linkpoint.startswith(self._sourcedir):
|
|
||||||
self._send_link(basename, linkpoint[len(self._sourcedir) + 1:])
|
|
||||||
else:
|
|
||||||
self._send_link(basename, linkpoint)
|
|
||||||
self._broadcast(None)
|
|
||||||
else:
|
else:
|
||||||
raise ValueError, "cannot sync %r" % (path,)
|
raise ValueError, "cannot sync %r" % (path,)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue