[svn r37278] move files from branch to trunk (and thus complete
the merge of the config branch into the trunk) --HG-- branch : trunk
This commit is contained in:
@@ -1,162 +1,153 @@
|
||||
import py, md5
|
||||
import py, os, stat, md5
|
||||
from Queue import Queue
|
||||
|
||||
|
||||
def rsync(gw, sourcedir, destdir, **options):
|
||||
for name in options:
|
||||
assert name in ('delete',)
|
||||
class RSync(object):
|
||||
""" This is an example usage of py.execnet - a sample RSync
|
||||
protocol, which can perform syncing 1-to-n.
|
||||
|
||||
channel = gw.remote_exec("""
|
||||
import os, stat, shutil, md5
|
||||
destdir, options = channel.receive()
|
||||
modifiedfiles = []
|
||||
Sample usage: you instantiate this class, eventually providing a
|
||||
callback when rsyncing is done, than add some targets
|
||||
(gateway + destdir) by running add_target and finally
|
||||
invoking send() which will send provided source tree remotely.
|
||||
|
||||
def remove(path):
|
||||
assert path.startswith(destdir)
|
||||
try:
|
||||
os.unlink(path)
|
||||
except OSError:
|
||||
# assume it's a dir
|
||||
shutil.rmtree(path)
|
||||
There is limited support for symlinks, which means that symlinks
|
||||
pointing to the sourcetree will be send "as is" while external
|
||||
symlinks will be just copied (regardless of existance of such
|
||||
a path on remote side)
|
||||
"""
|
||||
def __init__(self, callback=None, **options):
|
||||
for name in options:
|
||||
assert name in ('delete')
|
||||
self.options = options
|
||||
self.callback = callback
|
||||
self.channels = {}
|
||||
self.receivequeue = Queue()
|
||||
self.links = []
|
||||
|
||||
def receive_directory_structure(path, relcomponents):
|
||||
#print "receive directory structure", path
|
||||
try:
|
||||
st = os.lstat(path)
|
||||
except OSError:
|
||||
st = None
|
||||
msg = channel.receive()
|
||||
if isinstance(msg, list):
|
||||
if st and not stat.S_ISDIR(st.st_mode):
|
||||
os.unlink(path)
|
||||
st = None
|
||||
if not st:
|
||||
os.mkdir(path)
|
||||
entrynames = {}
|
||||
for entryname in msg:
|
||||
receive_directory_structure(os.path.join(path, entryname),
|
||||
relcomponents + [entryname])
|
||||
entrynames[entryname] = True
|
||||
if options.get('delete'):
|
||||
for othername in os.listdir(path):
|
||||
if othername not in entrynames:
|
||||
otherpath = os.path.join(path, othername)
|
||||
remove(otherpath)
|
||||
def filter(self, path):
|
||||
return True
|
||||
|
||||
def add_target(self, gateway, destdir, finishedcallback=None):
|
||||
""" Adds a target for to-be-send data
|
||||
"""
|
||||
def itemcallback(req):
|
||||
self.receivequeue.put((channel, req))
|
||||
channel = gateway.remote_exec(REMOTE_SOURCE)
|
||||
channel.setcallback(itemcallback, endmarker = None)
|
||||
channel.send((str(destdir), self.options))
|
||||
self.channels[channel] = finishedcallback
|
||||
|
||||
def send(self, sourcedir):
|
||||
""" Sends a sourcedir to previously prepared targets
|
||||
"""
|
||||
self.sourcedir = str(sourcedir)
|
||||
# normalize a trailing '/' away
|
||||
self.sourcedir = os.path.dirname(os.path.join(self.sourcedir, 'x'))
|
||||
# send directory structure and file timestamps/sizes
|
||||
self._send_directory_structure(self.sourcedir)
|
||||
|
||||
# paths and to_send are only used for doing
|
||||
# progress-related callbacks
|
||||
self.paths = {}
|
||||
self.to_send = {}
|
||||
|
||||
# send modified file to clients
|
||||
while self.channels:
|
||||
channel, req = self.receivequeue.get()
|
||||
if req is None:
|
||||
# end-of-channel
|
||||
if channel in self.channels:
|
||||
# too early! we must have got an error
|
||||
channel.waitclose()
|
||||
# or else we raise one
|
||||
raise IOError('connection unexpectedly closed: %s ' % (
|
||||
channel.gateway,))
|
||||
else:
|
||||
if st and stat.S_ISREG(st.st_mode):
|
||||
f = file(path, 'rb')
|
||||
command, data = req
|
||||
if command == "links":
|
||||
for link in self.links:
|
||||
channel.send(link)
|
||||
# completion marker, this host is done
|
||||
channel.send(42)
|
||||
elif command == "done":
|
||||
finishedcallback = self.channels.pop(channel)
|
||||
if finishedcallback:
|
||||
finishedcallback()
|
||||
elif command == "ack":
|
||||
if self.callback:
|
||||
self.callback("ack", self.paths[data], channel)
|
||||
elif command == "list_done":
|
||||
# sum up all to send
|
||||
if self.callback:
|
||||
s = sum([self.paths[i] for i in self.to_send[channel]])
|
||||
self.callback("list", s, channel)
|
||||
elif command == "send":
|
||||
modified_rel_path, checksum = data
|
||||
modifiedpath = os.path.join(self.sourcedir, *modified_rel_path)
|
||||
f = open(modifiedpath, 'rb')
|
||||
data = f.read()
|
||||
f.close()
|
||||
mychecksum = md5.md5(data).digest()
|
||||
else:
|
||||
if st:
|
||||
remove(path)
|
||||
mychecksum = None
|
||||
if mychecksum != msg:
|
||||
channel.send(relcomponents)
|
||||
modifiedfiles.append(path)
|
||||
receive_directory_structure(destdir, [])
|
||||
channel.send(None) # end marker
|
||||
for path in modifiedfiles:
|
||||
data = channel.receive()
|
||||
f = open(path, 'wb')
|
||||
f.write(data)
|
||||
f.close()
|
||||
""")
|
||||
|
||||
channel.send((str(destdir), options))
|
||||
|
||||
def send_directory_structure(path):
|
||||
if path.check(dir=1):
|
||||
subpaths = path.listdir()
|
||||
print "sending directory structure", path
|
||||
channel.send([p.basename for p in subpaths])
|
||||
# provide info to progress callback function
|
||||
modified_rel_path = "/".join(modified_rel_path)
|
||||
self.paths[modified_rel_path] = len(data)
|
||||
if channel not in self.to_send:
|
||||
self.to_send[channel] = []
|
||||
self.to_send[channel].append(modified_rel_path)
|
||||
|
||||
f.close()
|
||||
if checksum is not None and checksum == md5.md5(data).digest():
|
||||
data = None # not really modified
|
||||
else:
|
||||
# ! there is a reason for the interning:
|
||||
# sharing multiple copies of the file's data
|
||||
data = intern(data)
|
||||
print '%s <= %s' % (
|
||||
channel.gateway._getremoteaddress(),
|
||||
modified_rel_path)
|
||||
channel.send(data)
|
||||
del data
|
||||
else:
|
||||
assert "Unknown command %s" % command
|
||||
|
||||
def _broadcast(self, msg):
|
||||
for channel in self.channels:
|
||||
channel.send(msg)
|
||||
|
||||
def _send_link(self, basename, linkpoint):
|
||||
self.links.append(("link", basename, linkpoint))
|
||||
|
||||
def _send_directory_structure(self, path):
|
||||
st = os.lstat(path)
|
||||
if stat.S_ISREG(st.st_mode):
|
||||
# regular file: send a timestamp/size pair
|
||||
self._broadcast((st.st_mtime, st.st_size))
|
||||
elif stat.S_ISDIR(st.st_mode):
|
||||
# dir: send a list of entries
|
||||
names = []
|
||||
subpaths = []
|
||||
for name in os.listdir(path):
|
||||
p = os.path.join(path, name)
|
||||
if self.filter(p):
|
||||
names.append(name)
|
||||
subpaths.append(p)
|
||||
self._broadcast(names)
|
||||
for p in subpaths:
|
||||
send_directory_structure(p)
|
||||
elif path.check(file=1):
|
||||
data = path.read()
|
||||
checksum = md5.md5(data).digest()
|
||||
channel.send(checksum)
|
||||
self._send_directory_structure(p)
|
||||
elif stat.S_ISLNK(st.st_mode):
|
||||
linkpoint = os.readlink(path)
|
||||
basename = path[len(self.sourcedir) + 1:]
|
||||
if not linkpoint.startswith(os.sep):
|
||||
# relative link, just send it
|
||||
self._send_link(basename, linkpoint)
|
||||
elif linkpoint.startswith(self.sourcedir):
|
||||
self._send_link(basename, linkpoint[len(self.sourcedir) + 1:])
|
||||
else:
|
||||
self._send_link(basename, linkpoint)
|
||||
self._broadcast(None)
|
||||
else:
|
||||
raise ValueError, "cannot sync %r" % (path,)
|
||||
send_directory_structure(sourcedir)
|
||||
while True:
|
||||
modified_rel_path = channel.receive()
|
||||
if modified_rel_path is None:
|
||||
break
|
||||
modifiedpath = sourcedir.join(*modified_rel_path)
|
||||
data = modifiedpath.read()
|
||||
channel.send(data)
|
||||
channel.waitclose()
|
||||
|
||||
def copy(gw, source, dest):
|
||||
channel = gw.remote_exec("""
|
||||
import md5
|
||||
localfilename = channel.receive()
|
||||
try:
|
||||
f = file(localfilename, 'rb')
|
||||
existingdata = f.read()
|
||||
f.close()
|
||||
except (IOError, OSError):
|
||||
mycrc = None
|
||||
else:
|
||||
mycrc = md5.md5(existingdata).digest()
|
||||
remotecrc = channel.receive()
|
||||
if remotecrc == mycrc:
|
||||
channel.send(None)
|
||||
else:
|
||||
channel.send(localfilename)
|
||||
newdata = channel.receive()
|
||||
f = file(localfilename, 'wb')
|
||||
f.write(newdata)
|
||||
f.close()
|
||||
""")
|
||||
channel.send(str(dest))
|
||||
f = file(str(source), 'rb')
|
||||
localdata = f.read()
|
||||
f.close()
|
||||
channel.send(md5.md5(localdata).digest())
|
||||
status = channel.receive()
|
||||
if status is not None:
|
||||
assert status == str(dest) # for now
|
||||
channel.send(localdata)
|
||||
channel.waitclose()
|
||||
|
||||
|
||||
def setup_module(mod):
|
||||
mod.gw = py.execnet.PopenGateway()
|
||||
|
||||
def teardown_module(mod):
|
||||
mod.gw.exit()
|
||||
|
||||
|
||||
def test_filecopy():
|
||||
dir = py.test.ensuretemp('filecopy')
|
||||
source = dir.ensure('source')
|
||||
dest = dir.join('dest')
|
||||
source.write('hello world')
|
||||
copy(gw, source, dest)
|
||||
assert dest.check(file=1)
|
||||
assert dest.read() == 'hello world'
|
||||
source.write('something else')
|
||||
copy(gw, source, dest)
|
||||
assert dest.check(file=1)
|
||||
assert dest.read() == 'something else'
|
||||
|
||||
def test_dirsync():
|
||||
base = py.test.ensuretemp('dirsync')
|
||||
dest = base.join('dest')
|
||||
source = base.mkdir('source')
|
||||
|
||||
for s in ('content1', 'content2'):
|
||||
source.ensure('subdir', 'file1').write(s)
|
||||
rsync(gw, source, dest)
|
||||
assert dest.join('subdir').check(dir=1)
|
||||
assert dest.join('subdir', 'file1').check(file=1)
|
||||
assert dest.join('subdir', 'file1').read() == s
|
||||
|
||||
source.join('subdir').remove('file1')
|
||||
rsync(gw, source, dest)
|
||||
assert dest.join('subdir', 'file1').check(file=1)
|
||||
rsync(gw, source, dest, delete=True)
|
||||
assert not dest.join('subdir', 'file1').check()
|
||||
REMOTE_SOURCE = py.path.local(__file__).dirpath().\
|
||||
join('rsync_remote.py').open().read() + "\nf()"
|
||||
|
||||
|
||||
86
py/execnet/rsync_remote.py
Normal file
86
py/execnet/rsync_remote.py
Normal file
@@ -0,0 +1,86 @@
|
||||
|
||||
def f():
|
||||
import os, stat, shutil, md5
|
||||
destdir, options = channel.receive()
|
||||
modifiedfiles = []
|
||||
|
||||
def remove(path):
|
||||
assert path.startswith(destdir)
|
||||
try:
|
||||
os.unlink(path)
|
||||
except OSError:
|
||||
# assume it's a dir
|
||||
shutil.rmtree(path)
|
||||
|
||||
def receive_directory_structure(path, relcomponents):
|
||||
try:
|
||||
st = os.lstat(path)
|
||||
except OSError:
|
||||
st = None
|
||||
msg = channel.receive()
|
||||
if isinstance(msg, list):
|
||||
if st and not stat.S_ISDIR(st.st_mode):
|
||||
os.unlink(path)
|
||||
st = None
|
||||
if not st:
|
||||
os.makedirs(path)
|
||||
entrynames = {}
|
||||
for entryname in msg:
|
||||
receive_directory_structure(os.path.join(path, entryname),
|
||||
relcomponents + [entryname])
|
||||
entrynames[entryname] = True
|
||||
if options.get('delete'):
|
||||
for othername in os.listdir(path):
|
||||
if othername not in entrynames:
|
||||
otherpath = os.path.join(path, othername)
|
||||
remove(otherpath)
|
||||
elif msg is not None:
|
||||
checksum = None
|
||||
if st:
|
||||
if stat.S_ISREG(st.st_mode):
|
||||
msg_mtime, msg_size = msg
|
||||
if msg_size != st.st_size:
|
||||
pass
|
||||
elif msg_mtime != st.st_mtime:
|
||||
f = open(path, 'rb')
|
||||
checksum = md5.md5(f.read()).digest()
|
||||
f.close()
|
||||
else:
|
||||
return # already fine
|
||||
else:
|
||||
remove(path)
|
||||
channel.send(("send", (relcomponents, checksum)))
|
||||
modifiedfiles.append((path, msg))
|
||||
receive_directory_structure(destdir, [])
|
||||
|
||||
STRICT_CHECK = False # seems most useful this way for py.test
|
||||
channel.send(("list_done", None))
|
||||
|
||||
for path, (time, size) in modifiedfiles:
|
||||
data = channel.receive()
|
||||
channel.send(("ack", path[len(destdir) + 1:]))
|
||||
if data is not None:
|
||||
if STRICT_CHECK and len(data) != size:
|
||||
raise IOError('file modified during rsync: %r' % (path,))
|
||||
f = open(path, 'wb')
|
||||
f.write(data)
|
||||
f.close()
|
||||
os.utime(path, (time, time))
|
||||
del data
|
||||
channel.send(("links", None))
|
||||
|
||||
msg = channel.receive()
|
||||
while msg is not 42:
|
||||
# we get symlink
|
||||
_type, relpath, linkpoint = msg
|
||||
assert _type == "link"
|
||||
path = os.path.join(destdir, relpath)
|
||||
try:
|
||||
remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
os.symlink(os.path.join(destdir, linkpoint), path)
|
||||
msg = channel.receive()
|
||||
channel.send(("done", None))
|
||||
|
||||
80
py/execnet/testing/test_rsync.py
Normal file
80
py/execnet/testing/test_rsync.py
Normal file
@@ -0,0 +1,80 @@
|
||||
import py
|
||||
from py.execnet import RSync
|
||||
|
||||
|
||||
def setup_module(mod):
|
||||
mod.gw = py.execnet.PopenGateway()
|
||||
mod.gw2 = py.execnet.PopenGateway()
|
||||
|
||||
def teardown_module(mod):
|
||||
mod.gw.exit()
|
||||
mod.gw2.exit()
|
||||
|
||||
|
||||
def test_dirsync():
|
||||
base = py.test.ensuretemp('dirsync')
|
||||
dest = base.join('dest')
|
||||
dest2 = base.join('dest2')
|
||||
source = base.mkdir('source')
|
||||
|
||||
for s in ('content1', 'content2-a-bit-longer'):
|
||||
source.ensure('subdir', 'file1').write(s)
|
||||
rsync = RSync()
|
||||
rsync.add_target(gw, dest)
|
||||
rsync.add_target(gw2, dest2)
|
||||
rsync.send(source)
|
||||
assert dest.join('subdir').check(dir=1)
|
||||
assert dest.join('subdir', 'file1').check(file=1)
|
||||
assert dest.join('subdir', 'file1').read() == s
|
||||
assert dest2.join('subdir').check(dir=1)
|
||||
assert dest2.join('subdir', 'file1').check(file=1)
|
||||
assert dest2.join('subdir', 'file1').read() == s
|
||||
|
||||
source.join('subdir').remove('file1')
|
||||
rsync = RSync()
|
||||
rsync.add_target(gw2, dest2)
|
||||
rsync.add_target(gw, dest)
|
||||
rsync.send(source)
|
||||
assert dest.join('subdir', 'file1').check(file=1)
|
||||
assert dest2.join('subdir', 'file1').check(file=1)
|
||||
rsync = RSync(delete=True)
|
||||
rsync.add_target(gw2, dest2)
|
||||
rsync.add_target(gw, dest)
|
||||
rsync.send(source)
|
||||
assert not dest.join('subdir', 'file1').check()
|
||||
assert not dest2.join('subdir', 'file1').check()
|
||||
|
||||
def test_symlink_rsync():
|
||||
if py.std.sys.platform == 'win32':
|
||||
py.test.skip("symlinks are unsupported on Windows.")
|
||||
base = py.test.ensuretemp('symlinkrsync')
|
||||
dest = base.join('dest')
|
||||
source = base.join('source')
|
||||
source.ensure("existant")
|
||||
source.join("rellink").mksymlinkto(source.join("existant"), absolute=0)
|
||||
source.join('abslink').mksymlinkto(source.join("existant"))
|
||||
|
||||
rsync = RSync()
|
||||
rsync.add_target(gw, dest)
|
||||
rsync.send(source)
|
||||
|
||||
assert dest.join('rellink').readlink() == dest.join("existant")
|
||||
assert dest.join('abslink').readlink() == dest.join("existant")
|
||||
|
||||
def test_callback():
|
||||
base = py.test.ensuretemp('callback')
|
||||
dest = base.join("dest")
|
||||
source = base.join("source")
|
||||
source.ensure("existant").write("a" * 100)
|
||||
source.ensure("existant2").write("a" * 10)
|
||||
total = {}
|
||||
def callback(cmd, lgt, channel):
|
||||
total[(cmd, lgt)] = True
|
||||
|
||||
rsync = RSync(callback=callback)
|
||||
#rsync = RSync()
|
||||
rsync.add_target(gw, dest)
|
||||
rsync.send(source)
|
||||
|
||||
assert total == {("list", 110):True, ("ack", 100):True, ("ack", 10):True}
|
||||
|
||||
Reference in New Issue
Block a user