move test files out of py lib proper
* separate all tests from plugins * simplify implicit inclusion of plugins under test * have test_initpkg perform direct checks instead of yielding tests * fix example tests for 3k --HG-- branch : trunk
This commit is contained in:
1
testing/path/__init__.py
Normal file
1
testing/path/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
#
|
||||
425
testing/path/common.py
Normal file
425
testing/path/common.py
Normal file
@@ -0,0 +1,425 @@
|
||||
import py
|
||||
import sys
|
||||
|
||||
class CommonFSTests(object):
|
||||
def test_constructor_equality(self, path1):
|
||||
p = path1.__class__(path1)
|
||||
assert p == path1
|
||||
|
||||
def test_eq_nonstring(self, path1):
|
||||
p1 = path1.join('sampledir')
|
||||
p2 = path1.join('sampledir')
|
||||
assert p1 == p2
|
||||
|
||||
def test_new_identical(self, path1):
|
||||
assert path1 == path1.new()
|
||||
|
||||
def test_join(self, path1):
|
||||
p = path1.join('sampledir')
|
||||
strp = str(p)
|
||||
assert strp.endswith('sampledir')
|
||||
assert strp.startswith(str(path1))
|
||||
|
||||
def test_join_normalized(self, path1):
|
||||
newpath = path1.join(path1.sep+'sampledir')
|
||||
strp = str(newpath)
|
||||
assert strp.endswith('sampledir')
|
||||
assert strp.startswith(str(path1))
|
||||
newpath = path1.join((path1.sep*2) + 'sampledir')
|
||||
strp = str(newpath)
|
||||
assert strp.endswith('sampledir')
|
||||
assert strp.startswith(str(path1))
|
||||
|
||||
def test_join_noargs(self, path1):
|
||||
newpath = path1.join()
|
||||
assert path1 == newpath
|
||||
|
||||
def test_add_something(self, path1):
|
||||
p = path1.join('sample')
|
||||
p = p + 'dir'
|
||||
assert p.check()
|
||||
|
||||
def test_parts(self, path1):
|
||||
newpath = path1.join('sampledir', 'otherfile')
|
||||
par = newpath.parts()[-3:]
|
||||
assert par == [path1, path1.join('sampledir'), newpath]
|
||||
|
||||
revpar = newpath.parts(reverse=True)[:3]
|
||||
assert revpar == [newpath, path1.join('sampledir'), path1]
|
||||
|
||||
def test_common(self, path1):
|
||||
other = path1.join('sampledir')
|
||||
x = other.common(path1)
|
||||
assert x == path1
|
||||
|
||||
#def test_parents_nonexisting_file(self, path1):
|
||||
# newpath = path1 / 'dirnoexist' / 'nonexisting file'
|
||||
# par = list(newpath.parents())
|
||||
# assert par[:2] == [path1 / 'dirnoexist', path1]
|
||||
|
||||
def test_basename_checks(self, path1):
|
||||
newpath = path1.join('sampledir')
|
||||
assert newpath.check(basename='sampledir')
|
||||
assert newpath.check(notbasename='xyz')
|
||||
assert newpath.basename == 'sampledir'
|
||||
|
||||
def test_basename(self, path1):
|
||||
newpath = path1.join('sampledir')
|
||||
assert newpath.check(basename='sampledir')
|
||||
assert newpath.basename, 'sampledir'
|
||||
|
||||
def test_dirpath(self, path1):
|
||||
newpath = path1.join('sampledir')
|
||||
assert newpath.dirpath() == path1
|
||||
|
||||
def test_dirpath_with_args(self, path1):
|
||||
newpath = path1.join('sampledir')
|
||||
assert newpath.dirpath('x') == path1.join('x')
|
||||
|
||||
def test_newbasename(self, path1):
|
||||
newpath = path1.join('samplefile')
|
||||
newbase = newpath.new(basename="samplefile2")
|
||||
assert newbase.basename == "samplefile2"
|
||||
assert newbase.dirpath() == newpath.dirpath()
|
||||
|
||||
def test_not_exists(self, path1):
|
||||
assert not path1.join('does_not_exist').check()
|
||||
assert path1.join('does_not_exist').check(exists=0)
|
||||
|
||||
def test_exists(self, path1):
|
||||
assert path1.join("samplefile").check()
|
||||
assert path1.join("samplefile").check(exists=1)
|
||||
|
||||
def test_dir(self, path1):
|
||||
#print repr(path1.join("sampledir"))
|
||||
assert path1.join("sampledir").check(dir=1)
|
||||
assert path1.join('samplefile').check(notdir=1)
|
||||
assert not path1.join("samplefile").check(dir=1)
|
||||
|
||||
def test_fnmatch_file(self, path1):
|
||||
assert path1.join("samplefile").check(fnmatch='s*e')
|
||||
assert path1.join("samplefile").check(notfnmatch='s*x')
|
||||
assert not path1.join("samplefile").check(fnmatch='s*x')
|
||||
|
||||
#def test_fnmatch_dir(self, path1):
|
||||
|
||||
# pattern = path1.sep.join(['s*file'])
|
||||
# sfile = path1.join("samplefile")
|
||||
# assert sfile.check(fnmatch=pattern)
|
||||
|
||||
def test_relto(self, path1):
|
||||
l=path1.join("sampledir", "otherfile")
|
||||
assert l.relto(path1) == l.sep.join(["sampledir", "otherfile"])
|
||||
assert l.check(relto=path1)
|
||||
assert path1.check(notrelto=l)
|
||||
assert not path1.check(relto=l)
|
||||
|
||||
def test_bestrelpath(self, path1):
|
||||
curdir = path1
|
||||
sep = curdir.sep
|
||||
s = curdir.bestrelpath(curdir.join("hello", "world"))
|
||||
assert s == "hello" + sep + "world"
|
||||
|
||||
s = curdir.bestrelpath(curdir.dirpath().join("sister"))
|
||||
assert s == ".." + sep + "sister"
|
||||
assert curdir.bestrelpath(curdir.dirpath()) == ".."
|
||||
|
||||
assert curdir.bestrelpath("hello") == "hello"
|
||||
|
||||
def test_relto_not_relative(self, path1):
|
||||
l1=path1.join("bcde")
|
||||
l2=path1.join("b")
|
||||
assert not l1.relto(l2)
|
||||
assert not l2.relto(l1)
|
||||
|
||||
def test_listdir(self, path1):
|
||||
l = path1.listdir()
|
||||
assert path1.join('sampledir') in l
|
||||
assert path1.join('samplefile') in l
|
||||
py.test.raises(py.error.ENOTDIR,
|
||||
"path1.join('samplefile').listdir()")
|
||||
|
||||
def test_listdir_fnmatchstring(self, path1):
|
||||
l = path1.listdir('s*dir')
|
||||
assert len(l)
|
||||
assert l[0], path1.join('sampledir')
|
||||
|
||||
def test_listdir_filter(self, path1):
|
||||
l = path1.listdir(lambda x: x.check(dir=1))
|
||||
assert path1.join('sampledir') in l
|
||||
assert not path1.join('samplefile') in l
|
||||
|
||||
def test_listdir_sorted(self, path1):
|
||||
l = path1.listdir(lambda x: x.check(basestarts="sample"), sort=True)
|
||||
assert path1.join('sampledir') == l[0]
|
||||
assert path1.join('samplefile') == l[1]
|
||||
assert path1.join('samplepickle') == l[2]
|
||||
|
||||
def test_visit_nofilter(self, path1):
|
||||
l = []
|
||||
for i in path1.visit():
|
||||
l.append(i.relto(path1))
|
||||
assert "sampledir" in l
|
||||
assert path1.sep.join(["sampledir", "otherfile"]) in l
|
||||
|
||||
def test_visit_norecurse(self, path1):
|
||||
l = []
|
||||
for i in path1.visit(None, lambda x: x.basename != "sampledir"):
|
||||
l.append(i.relto(path1))
|
||||
assert "sampledir" in l
|
||||
assert not path1.sep.join(["sampledir", "otherfile"]) in l
|
||||
|
||||
def test_visit_filterfunc_is_string(self, path1):
|
||||
l = []
|
||||
for i in path1.visit('*dir'):
|
||||
l.append(i.relto(path1))
|
||||
assert len(l), 2
|
||||
assert "sampledir" in l
|
||||
assert "otherdir" in l
|
||||
|
||||
def test_visit_ignore(self, path1):
|
||||
p = path1.join('nonexisting')
|
||||
assert list(p.visit(ignore=py.error.ENOENT)) == []
|
||||
|
||||
def test_visit_endswith(self, path1):
|
||||
l = []
|
||||
for i in path1.visit(lambda x: x.check(endswith="file")):
|
||||
l.append(i.relto(path1))
|
||||
assert path1.sep.join(["sampledir", "otherfile"]) in l
|
||||
assert "samplefile" in l
|
||||
|
||||
def test_endswith(self, path1):
|
||||
assert path1.check(notendswith='.py')
|
||||
x = path1.join('samplefile')
|
||||
assert x.check(endswith='file')
|
||||
|
||||
def test_cmp(self, path1):
|
||||
path1 = path1.join('samplefile')
|
||||
path2 = path1.join('samplefile2')
|
||||
assert (path1 < path2) == ('samplefile' < 'samplefile2')
|
||||
assert not (path1 < path1)
|
||||
|
||||
def test_simple_read(self, path1):
|
||||
x = path1.join('samplefile').read('r')
|
||||
assert x == 'samplefile\n'
|
||||
|
||||
def test_join_div_operator(self, path1):
|
||||
newpath = path1 / '/sampledir' / '/test//'
|
||||
newpath2 = path1.join('sampledir', 'test')
|
||||
assert newpath == newpath2
|
||||
|
||||
def test_ext(self, path1):
|
||||
newpath = path1.join('sampledir.ext')
|
||||
assert newpath.ext == '.ext'
|
||||
newpath = path1.join('sampledir')
|
||||
assert not newpath.ext
|
||||
|
||||
def test_purebasename(self, path1):
|
||||
newpath = path1.join('samplefile.py')
|
||||
assert newpath.purebasename == 'samplefile'
|
||||
|
||||
def test_multiple_parts(self, path1):
|
||||
newpath = path1.join('samplefile.py')
|
||||
dirname, purebasename, basename, ext = newpath._getbyspec(
|
||||
'dirname,purebasename,basename,ext')
|
||||
assert str(path1).endswith(dirname) # be careful with win32 'drive'
|
||||
assert purebasename == 'samplefile'
|
||||
assert basename == 'samplefile.py'
|
||||
assert ext == '.py'
|
||||
|
||||
def test_dotted_name_ext(self, path1):
|
||||
newpath = path1.join('a.b.c')
|
||||
ext = newpath.ext
|
||||
assert ext == '.c'
|
||||
assert newpath.ext == '.c'
|
||||
|
||||
def test_newext(self, path1):
|
||||
newpath = path1.join('samplefile.py')
|
||||
newext = newpath.new(ext='.txt')
|
||||
assert newext.basename == "samplefile.txt"
|
||||
assert newext.purebasename == "samplefile"
|
||||
|
||||
def test_readlines(self, path1):
|
||||
fn = path1.join('samplefile')
|
||||
contents = fn.readlines()
|
||||
assert contents == ['samplefile\n']
|
||||
|
||||
def test_readlines_nocr(self, path1):
|
||||
fn = path1.join('samplefile')
|
||||
contents = fn.readlines(cr=0)
|
||||
assert contents == ['samplefile', '']
|
||||
|
||||
def test_file(self, path1):
|
||||
assert path1.join('samplefile').check(file=1)
|
||||
|
||||
def test_not_file(self, path1):
|
||||
assert not path1.join("sampledir").check(file=1)
|
||||
assert path1.join("sampledir").check(file=0)
|
||||
|
||||
def test_non_existent(self, path1):
|
||||
assert path1.join("sampledir.nothere").check(dir=0)
|
||||
assert path1.join("sampledir.nothere").check(file=0)
|
||||
assert path1.join("sampledir.nothere").check(notfile=1)
|
||||
assert path1.join("sampledir.nothere").check(notdir=1)
|
||||
assert path1.join("sampledir.nothere").check(notexists=1)
|
||||
assert not path1.join("sampledir.nothere").check(notfile=0)
|
||||
|
||||
# pattern = path1.sep.join(['s*file'])
|
||||
# sfile = path1.join("samplefile")
|
||||
# assert sfile.check(fnmatch=pattern)
|
||||
|
||||
def test_size(self, path1):
|
||||
url = path1.join("samplefile")
|
||||
assert url.size() > len("samplefile")
|
||||
|
||||
def test_mtime(self, path1):
|
||||
url = path1.join("samplefile")
|
||||
assert url.mtime() > 0
|
||||
|
||||
def test_relto_wrong_type(self, path1):
|
||||
py.test.raises(TypeError, "path1.relto(42)")
|
||||
|
||||
def test_visit_filesonly(self, path1):
|
||||
l = []
|
||||
for i in path1.visit(lambda x: x.check(file=1)):
|
||||
l.append(i.relto(path1))
|
||||
assert not "sampledir" in l
|
||||
assert path1.sep.join(["sampledir", "otherfile"]) in l
|
||||
|
||||
def test_load(self, path1):
|
||||
p = path1.join('samplepickle')
|
||||
obj = p.load()
|
||||
assert type(obj) is dict
|
||||
assert obj.get('answer',None) == 42
|
||||
|
||||
def test_visit_nodotfiles(self, path1):
|
||||
l = []
|
||||
for i in path1.visit(lambda x: x.check(dotfile=0)):
|
||||
l.append(i.relto(path1))
|
||||
assert "sampledir" in l
|
||||
assert path1.sep.join(["sampledir", "otherfile"]) in l
|
||||
assert not ".dotfile" in l
|
||||
|
||||
def test_endswith(self, path1):
|
||||
def chk(p):
|
||||
return p.check(endswith="pickle")
|
||||
assert not chk(path1)
|
||||
assert not chk(path1.join('samplefile'))
|
||||
assert chk(path1.join('somepickle'))
|
||||
|
||||
def test_copy_file(self, path1):
|
||||
otherdir = path1.join('otherdir')
|
||||
initpy = otherdir.join('__init__.py')
|
||||
copied = otherdir.join('copied')
|
||||
initpy.copy(copied)
|
||||
try:
|
||||
assert copied.check()
|
||||
s1 = initpy.read()
|
||||
s2 = copied.read()
|
||||
assert s1 == s2
|
||||
finally:
|
||||
if copied.check():
|
||||
copied.remove()
|
||||
|
||||
def test_copy_dir(self, path1):
|
||||
otherdir = path1.join('otherdir')
|
||||
copied = path1.join('newdir')
|
||||
try:
|
||||
otherdir.copy(copied)
|
||||
assert copied.check(dir=1)
|
||||
assert copied.join('__init__.py').check(file=1)
|
||||
s1 = otherdir.join('__init__.py').read()
|
||||
s2 = copied.join('__init__.py').read()
|
||||
assert s1 == s2
|
||||
finally:
|
||||
if copied.check(dir=1):
|
||||
copied.remove(rec=1)
|
||||
|
||||
def test_remove_file(self, path1):
|
||||
d = path1.ensure('todeleted')
|
||||
assert d.check()
|
||||
d.remove()
|
||||
assert not d.check()
|
||||
|
||||
def test_remove_dir_recursive_by_default(self, path1):
|
||||
d = path1.ensure('to', 'be', 'deleted')
|
||||
assert d.check()
|
||||
p = path1.join('to')
|
||||
p.remove()
|
||||
assert not p.check()
|
||||
|
||||
def test_mkdir_and_remove(self, path1):
|
||||
tmpdir = path1
|
||||
py.test.raises(py.error.EEXIST, tmpdir.mkdir, 'sampledir')
|
||||
new = tmpdir.join('mktest1')
|
||||
new.mkdir()
|
||||
assert new.check(dir=1)
|
||||
new.remove()
|
||||
|
||||
new = tmpdir.mkdir('mktest')
|
||||
assert new.check(dir=1)
|
||||
new.remove()
|
||||
assert tmpdir.join('mktest') == new
|
||||
|
||||
def test_move_file(self, path1):
|
||||
p = path1.join('samplefile')
|
||||
newp = p.dirpath('moved_samplefile')
|
||||
p.move(newp)
|
||||
try:
|
||||
assert newp.check(file=1)
|
||||
assert not p.check()
|
||||
finally:
|
||||
dp = newp.dirpath()
|
||||
if hasattr(dp, 'revert'):
|
||||
dp.revert()
|
||||
else:
|
||||
newp.move(p)
|
||||
assert p.check()
|
||||
|
||||
def test_move_directory(self, path1):
|
||||
source = path1.join('sampledir')
|
||||
dest = path1.join('moveddir')
|
||||
source.move(dest)
|
||||
assert dest.check(dir=1)
|
||||
assert dest.join('otherfile').check(file=1)
|
||||
assert not source.join('sampledir').check()
|
||||
|
||||
def setuptestfs(path):
|
||||
if path.join('samplefile').check():
|
||||
return
|
||||
#print "setting up test fs for", repr(path)
|
||||
samplefile = path.ensure('samplefile')
|
||||
samplefile.write('samplefile\n')
|
||||
|
||||
execfile = path.ensure('execfile')
|
||||
execfile.write('x=42')
|
||||
|
||||
execfilepy = path.ensure('execfile.py')
|
||||
execfilepy.write('x=42')
|
||||
|
||||
d = {1:2, 'hello': 'world', 'answer': 42}
|
||||
path.ensure('samplepickle').dump(d)
|
||||
|
||||
sampledir = path.ensure('sampledir', dir=1)
|
||||
sampledir.ensure('otherfile')
|
||||
|
||||
otherdir = path.ensure('otherdir', dir=1)
|
||||
otherdir.ensure('__init__.py')
|
||||
|
||||
module_a = otherdir.ensure('a.py')
|
||||
if sys.version_info >= (2,6):
|
||||
module_a.write('from .b import stuff as result\n')
|
||||
else:
|
||||
module_a.write('from b import stuff as result\n')
|
||||
module_b = otherdir.ensure('b.py')
|
||||
module_b.write('stuff="got it"\n')
|
||||
module_c = otherdir.ensure('c.py')
|
||||
module_c.write('''import py;
|
||||
import otherdir.a
|
||||
value = otherdir.a.result
|
||||
''')
|
||||
module_d = otherdir.ensure('d.py')
|
||||
module_d.write('''import py;
|
||||
from otherdir import a
|
||||
value2 = a.result
|
||||
''')
|
||||
73
testing/path/conftest.py
Normal file
73
testing/path/conftest.py
Normal file
@@ -0,0 +1,73 @@
|
||||
import py
|
||||
from py.__.path import svnwc as svncommon
|
||||
|
||||
svnbin = py.path.local.sysfind('svn')
|
||||
repodump = py.path.local(__file__).dirpath('repotest.dump')
|
||||
from py.builtin import print_
|
||||
|
||||
def pytest_funcarg__repowc1(request):
|
||||
if svnbin is None:
|
||||
py.test.skip("svn binary not found")
|
||||
|
||||
modname = request.module.__name__
|
||||
tmpdir = request.getfuncargvalue("tmpdir")
|
||||
repo, wc = request.cached_setup(
|
||||
setup=lambda: getrepowc(tmpdir, "repo-"+modname, "wc-" + modname),
|
||||
scope="module",
|
||||
)
|
||||
for x in ('test_remove', 'test_move', 'test_status_deleted'):
|
||||
if request.function.__name__.startswith(x):
|
||||
_savedrepowc = save_repowc(repo, wc)
|
||||
request.addfinalizer(lambda: restore_repowc(_savedrepowc))
|
||||
return repo, wc
|
||||
|
||||
def pytest_funcarg__repowc2(request):
|
||||
tmpdir = request.getfuncargvalue("tmpdir")
|
||||
name = request.function.__name__
|
||||
return getrepowc(tmpdir, "%s-repo-2" % name, "%s-wc-2" % name)
|
||||
|
||||
def getsvnbin():
|
||||
if svnbin is None:
|
||||
py.test.skip("svn binary not found")
|
||||
return svnbin
|
||||
|
||||
# make a wc directory out of a given root url
|
||||
# cache previously obtained wcs!
|
||||
#
|
||||
def getrepowc(tmpdir, reponame='basetestrepo', wcname='wc'):
|
||||
repo = tmpdir.mkdir(reponame)
|
||||
wcdir = tmpdir.mkdir(wcname)
|
||||
repo.ensure(dir=1)
|
||||
py.process.cmdexec('svnadmin create "%s"' %
|
||||
svncommon._escape_helper(repo))
|
||||
py.process.cmdexec('svnadmin load -q "%s" <"%s"' %
|
||||
(svncommon._escape_helper(repo), repodump))
|
||||
print_("created svn repository", repo)
|
||||
wcdir.ensure(dir=1)
|
||||
wc = py.path.svnwc(wcdir)
|
||||
if py.std.sys.platform == 'win32':
|
||||
repo = '/' + str(repo).replace('\\', '/')
|
||||
wc.checkout(url='file://%s' % repo)
|
||||
print_("checked out new repo into", wc)
|
||||
return ("file://%s" % repo, wc)
|
||||
|
||||
|
||||
def save_repowc(repo, wc):
|
||||
repo = py.path.local(repo[len("file://"):])
|
||||
assert repo.check()
|
||||
savedrepo = repo.dirpath(repo.basename+".1")
|
||||
savedwc = wc.dirpath(wc.basename+".1")
|
||||
repo.copy(savedrepo)
|
||||
wc.localpath.copy(savedwc.localpath)
|
||||
return savedrepo, savedwc
|
||||
|
||||
def restore_repowc(obj):
|
||||
savedrepo, savedwc = obj
|
||||
repo = savedrepo.new(basename=savedrepo.basename[:-2])
|
||||
assert repo.check()
|
||||
wc = savedwc.new(basename=savedwc.basename[:-2])
|
||||
assert wc.check()
|
||||
wc.localpath.remove()
|
||||
repo.remove()
|
||||
savedrepo.move(repo)
|
||||
savedwc.localpath.move(wc.localpath)
|
||||
228
testing/path/repotest.dump
Normal file
228
testing/path/repotest.dump
Normal file
@@ -0,0 +1,228 @@
|
||||
SVN-fs-dump-format-version: 2
|
||||
|
||||
UUID: 876a30f4-1eed-0310-aeb7-ae314d1e5934
|
||||
|
||||
Revision-number: 0
|
||||
Prop-content-length: 56
|
||||
Content-length: 56
|
||||
|
||||
K 8
|
||||
svn:date
|
||||
V 27
|
||||
2005-01-07T23:55:31.755989Z
|
||||
PROPS-END
|
||||
|
||||
Revision-number: 1
|
||||
Prop-content-length: 118
|
||||
Content-length: 118
|
||||
|
||||
K 7
|
||||
svn:log
|
||||
V 20
|
||||
testrepo setup rev 1
|
||||
K 10
|
||||
svn:author
|
||||
V 3
|
||||
hpk
|
||||
K 8
|
||||
svn:date
|
||||
V 27
|
||||
2005-01-07T23:55:37.815386Z
|
||||
PROPS-END
|
||||
|
||||
Node-path: execfile
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 4
|
||||
Text-content-md5: d4b5bc61e16310f08c5d11866eba0a22
|
||||
Content-length: 14
|
||||
|
||||
PROPS-END
|
||||
x=42
|
||||
|
||||
Node-path: otherdir
|
||||
Node-kind: dir
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Content-length: 10
|
||||
|
||||
PROPS-END
|
||||
|
||||
|
||||
Node-path: otherdir/__init__.py
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 0
|
||||
Text-content-md5: d41d8cd98f00b204e9800998ecf8427e
|
||||
Content-length: 10
|
||||
|
||||
PROPS-END
|
||||
|
||||
|
||||
Node-path: otherdir/a.py
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 30
|
||||
Text-content-md5: 247c7daeb2ee5dcab0aba7bd12bad665
|
||||
Content-length: 40
|
||||
|
||||
PROPS-END
|
||||
from b import stuff as result
|
||||
|
||||
|
||||
Node-path: otherdir/b.py
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 15
|
||||
Text-content-md5: c1b13503469a7711306d03a4b0721bc6
|
||||
Content-length: 25
|
||||
|
||||
PROPS-END
|
||||
stuff="got it"
|
||||
|
||||
|
||||
Node-path: otherdir/c.py
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 75
|
||||
Text-content-md5: 250cdb6b5df68536152c681f48297569
|
||||
Content-length: 85
|
||||
|
||||
PROPS-END
|
||||
import py; py.magic.autopath()
|
||||
import otherdir.a
|
||||
value = otherdir.a.result
|
||||
|
||||
|
||||
Node-path: otherdir/d.py
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 72
|
||||
Text-content-md5: 940c9c621e7b198e081459642c37f5a7
|
||||
Content-length: 82
|
||||
|
||||
PROPS-END
|
||||
import py; py.magic.autopath()
|
||||
from otherdir import a
|
||||
value2 = a.result
|
||||
|
||||
|
||||
Node-path: sampledir
|
||||
Node-kind: dir
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Content-length: 10
|
||||
|
||||
PROPS-END
|
||||
|
||||
|
||||
Node-path: sampledir/otherfile
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 0
|
||||
Text-content-md5: d41d8cd98f00b204e9800998ecf8427e
|
||||
Content-length: 10
|
||||
|
||||
PROPS-END
|
||||
|
||||
|
||||
Node-path: samplefile
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 40
|
||||
Text-content-length: 11
|
||||
Text-content-md5: 9225ac28b32156979ab6482b8bb5fb8c
|
||||
Content-length: 51
|
||||
|
||||
K 13
|
||||
svn:eol-style
|
||||
V 6
|
||||
native
|
||||
PROPS-END
|
||||
samplefile
|
||||
|
||||
|
||||
Node-path: samplepickle
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 56
|
||||
Text-content-md5: 719d85c1329a33134bb98f56b756c545
|
||||
Content-length: 66
|
||||
|
||||
PROPS-END
|
||||
(dp1
|
||||
S'answer'
|
||||
p2
|
||||
I42
|
||||
sI1
|
||||
I2
|
||||
sS'hello'
|
||||
p3
|
||||
S'world'
|
||||
p4
|
||||
s.
|
||||
|
||||
Revision-number: 2
|
||||
Prop-content-length: 108
|
||||
Content-length: 108
|
||||
|
||||
K 7
|
||||
svn:log
|
||||
V 10
|
||||
second rev
|
||||
K 10
|
||||
svn:author
|
||||
V 3
|
||||
hpk
|
||||
K 8
|
||||
svn:date
|
||||
V 27
|
||||
2005-01-07T23:55:39.223202Z
|
||||
PROPS-END
|
||||
|
||||
Node-path: anotherfile
|
||||
Node-kind: file
|
||||
Node-action: add
|
||||
Prop-content-length: 10
|
||||
Text-content-length: 5
|
||||
Text-content-md5: 5d41402abc4b2a76b9719d911017c592
|
||||
Content-length: 15
|
||||
|
||||
PROPS-END
|
||||
hello
|
||||
|
||||
Revision-number: 3
|
||||
Prop-content-length: 106
|
||||
Content-length: 106
|
||||
|
||||
K 7
|
||||
svn:log
|
||||
V 9
|
||||
third rev
|
||||
K 10
|
||||
svn:author
|
||||
V 3
|
||||
hpk
|
||||
K 8
|
||||
svn:date
|
||||
V 27
|
||||
2005-01-07T23:55:41.556642Z
|
||||
PROPS-END
|
||||
|
||||
Node-path: anotherfile
|
||||
Node-kind: file
|
||||
Node-action: change
|
||||
Text-content-length: 5
|
||||
Text-content-md5: 7d793037a0760186574b0282f2f435e7
|
||||
Content-length: 5
|
||||
|
||||
world
|
||||
|
||||
31
testing/path/svntestbase.py
Normal file
31
testing/path/svntestbase.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import sys
|
||||
import py
|
||||
from py.__.path import svnwc as svncommon
|
||||
from testing.path.common import CommonFSTests
|
||||
|
||||
class CommonSvnTests(CommonFSTests):
|
||||
|
||||
def test_propget(self, path1):
|
||||
url = path1.join("samplefile")
|
||||
value = url.propget('svn:eol-style')
|
||||
assert value == 'native'
|
||||
|
||||
def test_proplist(self, path1):
|
||||
url = path1.join("samplefile")
|
||||
res = url.proplist()
|
||||
assert res['svn:eol-style'] == 'native'
|
||||
|
||||
def test_info(self, path1):
|
||||
url = path1.join("samplefile")
|
||||
res = url.info()
|
||||
assert res.size > len("samplefile") and res.created_rev >= 0
|
||||
|
||||
def test_log_simple(self, path1):
|
||||
url = path1.join("samplefile")
|
||||
logentries = url.log()
|
||||
for logentry in logentries:
|
||||
assert logentry.rev == 1
|
||||
assert hasattr(logentry, 'author')
|
||||
assert hasattr(logentry, 'date')
|
||||
|
||||
#cache.repositories.put(svnrepourl, 1200, 0)
|
||||
81
testing/path/test_cacheutil.py
Normal file
81
testing/path/test_cacheutil.py
Normal file
@@ -0,0 +1,81 @@
|
||||
import py
|
||||
from py.__.path import cacheutil
|
||||
|
||||
class BasicCacheAPITest:
|
||||
cache = None
|
||||
def test_getorbuild(self):
|
||||
val = self.cache.getorbuild(-42, lambda: 42)
|
||||
assert val == 42
|
||||
val = self.cache.getorbuild(-42, lambda: 23)
|
||||
assert val == 42
|
||||
|
||||
def test_cache_get_key_error(self):
|
||||
py.test.raises(KeyError, "self.cache._getentry(-23)")
|
||||
|
||||
def test_delentry_non_raising(self):
|
||||
val = self.cache.getorbuild(100, lambda: 100)
|
||||
self.cache.delentry(100)
|
||||
py.test.raises(KeyError, "self.cache._getentry(100)")
|
||||
|
||||
def test_delentry_raising(self):
|
||||
val = self.cache.getorbuild(100, lambda: 100)
|
||||
self.cache.delentry(100)
|
||||
py.test.raises(KeyError, "self.cache.delentry(100, raising=True)")
|
||||
|
||||
class TestBuildcostAccess(BasicCacheAPITest):
|
||||
cache = cacheutil.BuildcostAccessCache(maxentries=128)
|
||||
|
||||
def test_cache_works_somewhat_simple(self, monkeypatch):
|
||||
cache = cacheutil.BuildcostAccessCache()
|
||||
# the default gettime
|
||||
# BuildcostAccessCache.build can
|
||||
# result into time()-time() == 0 which makes the below
|
||||
# test fail randomly. Let's rather use incrementing
|
||||
# numbers instead.
|
||||
l = [0]
|
||||
def counter():
|
||||
l[0] = l[0] + 1
|
||||
return l[0]
|
||||
monkeypatch.setattr(cacheutil, 'gettime', counter)
|
||||
for x in range(cache.maxentries):
|
||||
y = cache.getorbuild(x, lambda: x)
|
||||
assert x == y
|
||||
for x in range(cache.maxentries):
|
||||
assert cache.getorbuild(x, None) == x
|
||||
halfentries = int(cache.maxentries / 2)
|
||||
for x in range(halfentries):
|
||||
assert cache.getorbuild(x, None) == x
|
||||
assert cache.getorbuild(x, None) == x
|
||||
# evict one entry
|
||||
val = cache.getorbuild(-1, lambda: 42)
|
||||
assert val == 42
|
||||
# check that recently used ones are still there
|
||||
# and are not build again
|
||||
for x in range(halfentries):
|
||||
assert cache.getorbuild(x, None) == x
|
||||
assert cache.getorbuild(-1, None) == 42
|
||||
|
||||
|
||||
class TestAging(BasicCacheAPITest):
|
||||
maxsecs = 0.10
|
||||
cache = cacheutil.AgingCache(maxentries=128, maxseconds=maxsecs)
|
||||
|
||||
def test_cache_eviction(self):
|
||||
self.cache.getorbuild(17, lambda: 17)
|
||||
endtime = py.std.time.time() + self.maxsecs * 10
|
||||
while py.std.time.time() < endtime:
|
||||
try:
|
||||
self.cache._getentry(17)
|
||||
except KeyError:
|
||||
break
|
||||
py.std.time.sleep(self.maxsecs*0.3)
|
||||
else:
|
||||
py.test.fail("waiting for cache eviction failed")
|
||||
|
||||
def test_prune_lowestweight():
|
||||
maxsecs = 0.05
|
||||
cache = cacheutil.AgingCache(maxentries=10, maxseconds=maxsecs)
|
||||
for x in range(cache.maxentries):
|
||||
cache.getorbuild(x, lambda: x)
|
||||
py.std.time.sleep(maxsecs*1.1)
|
||||
cache.getorbuild(cache.maxentries+1, lambda: 42)
|
||||
558
testing/path/test_local.py
Normal file
558
testing/path/test_local.py
Normal file
@@ -0,0 +1,558 @@
|
||||
import py
|
||||
import sys
|
||||
from py.path import local
|
||||
from testing.path import common
|
||||
|
||||
def pytest_funcarg__path1(request):
|
||||
def setup():
|
||||
path1 = request.config.mktemp("path1")
|
||||
common.setuptestfs(path1)
|
||||
return path1
|
||||
def teardown(path1):
|
||||
# post check
|
||||
assert path1.join("samplefile").check()
|
||||
return request.cached_setup(setup, teardown, scope="session")
|
||||
|
||||
def pytest_funcarg__tmpdir(request):
|
||||
basedir = request.config.getbasetemp()
|
||||
if request.cls:
|
||||
try:
|
||||
basedir = basedir.mkdir(request.cls.__name__)
|
||||
except py.error.EEXIST:
|
||||
pass
|
||||
for i in range(1000):
|
||||
name = request.function.__name__
|
||||
if i > 0:
|
||||
name += str(i)
|
||||
try:
|
||||
return basedir.mkdir(name)
|
||||
except py.error.EEXIST:
|
||||
continue
|
||||
raise ValueError("could not create tempdir")
|
||||
|
||||
class TestLocalPath(common.CommonFSTests):
|
||||
def test_join_normpath(self, tmpdir):
|
||||
assert tmpdir.join(".") == tmpdir
|
||||
p = tmpdir.join("../%s" % tmpdir.basename)
|
||||
assert p == tmpdir
|
||||
p = tmpdir.join("..//%s/" % tmpdir.basename)
|
||||
assert p == tmpdir
|
||||
|
||||
def test_gethash(self, tmpdir):
|
||||
md5 = py.builtin._tryimport('md5', 'hashlib').md5
|
||||
lib = py.builtin._tryimport('sha', 'hashlib')
|
||||
sha = getattr(lib, 'sha1', getattr(lib, 'sha', None))
|
||||
fn = tmpdir.join("testhashfile")
|
||||
data = 'hello'.encode('ascii')
|
||||
fn.write(data, mode="wb")
|
||||
assert fn.computehash("md5") == md5(data).hexdigest()
|
||||
assert fn.computehash("sha1") == sha(data).hexdigest()
|
||||
py.test.raises(ValueError, fn.computehash, "asdasd")
|
||||
|
||||
def test_remove_removes_readonly_file(self, tmpdir):
|
||||
readonly_file = tmpdir.join('readonly').ensure()
|
||||
readonly_file.chmod(0)
|
||||
readonly_file.remove()
|
||||
assert not readonly_file.check(exists=1)
|
||||
|
||||
def test_remove_removes_readonly_dir(self, tmpdir):
|
||||
readonly_dir = tmpdir.join('readonlydir').ensure(dir=1)
|
||||
readonly_dir.chmod(int("500", 8))
|
||||
readonly_dir.remove()
|
||||
assert not readonly_dir.check(exists=1)
|
||||
|
||||
def test_remove_removes_dir_and_readonly_file(self, tmpdir):
|
||||
readonly_dir = tmpdir.join('readonlydir').ensure(dir=1)
|
||||
readonly_file = readonly_dir.join('readonlyfile').ensure()
|
||||
readonly_file.chmod(0)
|
||||
readonly_dir.remove()
|
||||
assert not readonly_dir.check(exists=1)
|
||||
|
||||
def test_initialize_curdir(self):
|
||||
assert str(local()) == py.std.os.getcwd()
|
||||
|
||||
def test_initialize_reldir(self, path1):
|
||||
old = path1.chdir()
|
||||
try:
|
||||
p = local('samplefile')
|
||||
assert p.check()
|
||||
finally:
|
||||
old.chdir()
|
||||
|
||||
def test_eq_with_strings(self, path1):
|
||||
path1 = path1.join('sampledir')
|
||||
path2 = str(path1)
|
||||
assert path1 == path2
|
||||
assert path2 == path1
|
||||
path3 = path1.join('samplefile')
|
||||
assert path3 != path2
|
||||
assert path2 != path3
|
||||
|
||||
@py.test.mark.multi(bin=(False, True))
|
||||
def test_dump(self, tmpdir, bin):
|
||||
path = tmpdir.join("dumpfile%s" % int(bin))
|
||||
try:
|
||||
d = {'answer' : 42}
|
||||
path.dump(d, bin=bin)
|
||||
f = path.open('rb+')
|
||||
dnew = py.std.pickle.load(f)
|
||||
assert d == dnew
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
def test_setmtime(self):
|
||||
import tempfile
|
||||
import time
|
||||
try:
|
||||
fd, name = tempfile.mkstemp()
|
||||
py.std.os.close(fd)
|
||||
except AttributeError:
|
||||
name = tempfile.mktemp()
|
||||
open(name, 'w').close()
|
||||
try:
|
||||
mtime = int(time.time())-100
|
||||
path = local(name)
|
||||
assert path.mtime() != mtime
|
||||
path.setmtime(mtime)
|
||||
assert path.mtime() == mtime
|
||||
path.setmtime()
|
||||
assert path.mtime() != mtime
|
||||
finally:
|
||||
py.std.os.remove(name)
|
||||
|
||||
def test_normpath(self, path1):
|
||||
new1 = path1.join("/otherdir")
|
||||
new2 = path1.join("otherdir")
|
||||
assert str(new1) == str(new2)
|
||||
|
||||
def test_mkdtemp_creation(self):
|
||||
d = local.mkdtemp()
|
||||
try:
|
||||
assert d.check(dir=1)
|
||||
finally:
|
||||
d.remove(rec=1)
|
||||
|
||||
def test_tmproot(self):
|
||||
d = local.mkdtemp()
|
||||
tmproot = local.get_temproot()
|
||||
try:
|
||||
assert d.check(dir=1)
|
||||
assert d.dirpath() == tmproot
|
||||
finally:
|
||||
d.remove(rec=1)
|
||||
|
||||
def test_chdir(self, tmpdir):
|
||||
old = local()
|
||||
try:
|
||||
res = tmpdir.chdir()
|
||||
assert str(res) == str(old)
|
||||
assert py.std.os.getcwd() == str(tmpdir)
|
||||
finally:
|
||||
old.chdir()
|
||||
|
||||
def test_ensure_filepath_withdir(self, tmpdir):
|
||||
newfile = tmpdir.join('test1','test')
|
||||
newfile.ensure()
|
||||
assert newfile.check(file=1)
|
||||
newfile.write("42")
|
||||
newfile.ensure()
|
||||
s = newfile.read()
|
||||
assert s == "42"
|
||||
|
||||
def test_ensure_filepath_withoutdir(self, tmpdir):
|
||||
newfile = tmpdir.join('test1file')
|
||||
t = newfile.ensure()
|
||||
assert t == newfile
|
||||
assert newfile.check(file=1)
|
||||
|
||||
def test_ensure_dirpath(self, tmpdir):
|
||||
newfile = tmpdir.join('test1','testfile')
|
||||
t = newfile.ensure(dir=1)
|
||||
assert t == newfile
|
||||
assert newfile.check(dir=1)
|
||||
|
||||
def test_init_from_path(self, tmpdir):
|
||||
l = local()
|
||||
l2 = local(l)
|
||||
assert l2 is l
|
||||
|
||||
wc = py.path.svnwc('.')
|
||||
l3 = local(wc)
|
||||
assert l3 is not wc
|
||||
assert l3.strpath == wc.strpath
|
||||
assert not hasattr(l3, 'commit')
|
||||
|
||||
def test_long_filenames(self, tmpdir):
|
||||
if sys.platform == "win32":
|
||||
py.test.skip("win32: work around needed for path length limit")
|
||||
# see http://codespeak.net/pipermail/py-dev/2008q2/000922.html
|
||||
|
||||
# testing paths > 260 chars (which is Windows' limitation, but
|
||||
# depending on how the paths are used), but > 4096 (which is the
|
||||
# Linux' limitation) - the behaviour of paths with names > 4096 chars
|
||||
# is undetermined
|
||||
newfilename = '/test' * 60
|
||||
l = tmpdir.join(newfilename)
|
||||
l.ensure(file=True)
|
||||
l.write('foo')
|
||||
l2 = tmpdir.join(newfilename)
|
||||
assert l2.read() == 'foo'
|
||||
|
||||
class TestExecutionOnWindows:
|
||||
disabled = py.std.sys.platform != 'win32'
|
||||
|
||||
def test_sysfind(self):
|
||||
x = py.path.local.sysfind('cmd')
|
||||
assert x.check(file=1)
|
||||
assert py.path.local.sysfind('jaksdkasldqwe') is None
|
||||
|
||||
class TestExecution:
|
||||
disabled = py.std.sys.platform == 'win32'
|
||||
|
||||
def test_sysfind(self):
|
||||
x = py.path.local.sysfind('test')
|
||||
assert x.check(file=1)
|
||||
assert py.path.local.sysfind('jaksdkasldqwe') is None
|
||||
|
||||
def test_sysfind_no_permisson_ignored(self, monkeypatch, tmpdir):
|
||||
noperm = tmpdir.ensure('noperm', dir=True)
|
||||
monkeypatch.setenv("PATH", noperm, prepend=":")
|
||||
noperm.chmod(0)
|
||||
assert py.path.local.sysfind('jaksdkasldqwe') is None
|
||||
|
||||
def test_sysfind_absolute(self):
|
||||
x = py.path.local.sysfind('test')
|
||||
assert x.check(file=1)
|
||||
y = py.path.local.sysfind(str(x))
|
||||
assert y.check(file=1)
|
||||
assert y == x
|
||||
|
||||
def test_sysfind_multiple(self, tmpdir, monkeypatch):
|
||||
monkeypatch.setenv('PATH',
|
||||
"%s:%s" % (tmpdir.ensure('a'),
|
||||
tmpdir.join('b')),
|
||||
prepend=":")
|
||||
tmpdir.ensure('b', 'a')
|
||||
checker = lambda x: x.dirpath().basename == 'b'
|
||||
x = py.path.local.sysfind('a', checker=checker)
|
||||
assert x.basename == 'a'
|
||||
assert x.dirpath().basename == 'b'
|
||||
checker = lambda x: None
|
||||
assert py.path.local.sysfind('a', checker=checker) is None
|
||||
|
||||
def test_sysexec(self):
|
||||
x = py.path.local.sysfind('ls')
|
||||
out = x.sysexec('-a')
|
||||
for x in py.path.local().listdir():
|
||||
assert out.find(x.basename) != -1
|
||||
|
||||
def test_sysexec_failing(self):
|
||||
x = py.path.local.sysfind('false')
|
||||
py.test.raises(py.process.cmdexec.Error, """
|
||||
x.sysexec('aksjdkasjd')
|
||||
""")
|
||||
|
||||
def test_make_numbered_dir(self, tmpdir):
|
||||
tmpdir.ensure('base.not_an_int', dir=1)
|
||||
for i in range(10):
|
||||
numdir = local.make_numbered_dir(prefix='base.', rootdir=tmpdir,
|
||||
keep=2, lock_timeout=0)
|
||||
assert numdir.check()
|
||||
assert numdir.basename == 'base.%d' %i
|
||||
if i>=1:
|
||||
assert numdir.new(ext=str(i-1)).check()
|
||||
if i>=2:
|
||||
assert numdir.new(ext=str(i-2)).check()
|
||||
if i>=3:
|
||||
assert not numdir.new(ext=str(i-3)).check()
|
||||
|
||||
def test_locked_make_numbered_dir(self, tmpdir):
|
||||
for i in range(10):
|
||||
numdir = local.make_numbered_dir(prefix='base2.', rootdir=tmpdir,
|
||||
keep=2)
|
||||
assert numdir.check()
|
||||
assert numdir.basename == 'base2.%d' %i
|
||||
for j in range(i):
|
||||
assert numdir.new(ext=str(j)).check()
|
||||
|
||||
def test_error_preservation(self, path1):
|
||||
py.test.raises (EnvironmentError, path1.join('qwoeqiwe').mtime)
|
||||
py.test.raises (EnvironmentError, path1.join('qwoeqiwe').read)
|
||||
|
||||
#def test_parentdirmatch(self):
|
||||
# local.parentdirmatch('std', startmodule=__name__)
|
||||
#
|
||||
|
||||
|
||||
class TestImport:
|
||||
def test_pyimport(self, path1):
|
||||
obj = path1.join('execfile.py').pyimport()
|
||||
assert obj.x == 42
|
||||
assert obj.__name__ == 'execfile'
|
||||
|
||||
def test_pyimport_execfile_different_name(self, path1):
|
||||
obj = path1.join('execfile.py').pyimport(modname="0x.y.z")
|
||||
assert obj.x == 42
|
||||
assert obj.__name__ == '0x.y.z'
|
||||
|
||||
def test_pyimport_a(self, path1):
|
||||
otherdir = path1.join('otherdir')
|
||||
mod = otherdir.join('a.py').pyimport()
|
||||
assert mod.result == "got it"
|
||||
assert mod.__name__ == 'otherdir.a'
|
||||
|
||||
def test_pyimport_b(self, path1):
|
||||
otherdir = path1.join('otherdir')
|
||||
mod = otherdir.join('b.py').pyimport()
|
||||
assert mod.stuff == "got it"
|
||||
assert mod.__name__ == 'otherdir.b'
|
||||
|
||||
def test_pyimport_c(self, path1):
|
||||
otherdir = path1.join('otherdir')
|
||||
mod = otherdir.join('c.py').pyimport()
|
||||
assert mod.value == "got it"
|
||||
|
||||
def test_pyimport_d(self, path1):
|
||||
otherdir = path1.join('otherdir')
|
||||
mod = otherdir.join('d.py').pyimport()
|
||||
assert mod.value2 == "got it"
|
||||
|
||||
def test_pyimport_and_import(self, tmpdir):
|
||||
tmpdir.ensure('xxxpackage', '__init__.py')
|
||||
mod1path = tmpdir.ensure('xxxpackage', 'module1.py')
|
||||
mod1 = mod1path.pyimport()
|
||||
assert mod1.__name__ == 'xxxpackage.module1'
|
||||
from xxxpackage import module1
|
||||
assert module1 is mod1
|
||||
|
||||
def test_pypkgdir(tmpdir):
|
||||
pkg = tmpdir.ensure('pkg1', dir=1)
|
||||
pkg.ensure("__init__.py")
|
||||
pkg.ensure("subdir/__init__.py")
|
||||
assert pkg.pypkgpath() == pkg
|
||||
assert pkg.join('subdir', '__init__.py').pypkgpath() == pkg
|
||||
|
||||
def test_homedir():
|
||||
homedir = py.path.local._gethomedir()
|
||||
assert homedir.check(dir=1)
|
||||
|
||||
class TestWINLocalPath:
|
||||
#root = local(TestLocalPath.root)
|
||||
disabled = py.std.sys.platform != 'win32'
|
||||
|
||||
def test_owner_group_not_implemented(self):
|
||||
py.test.raises(NotImplementedError, "path1.stat().owner")
|
||||
py.test.raises(NotImplementedError, "path1.stat().group")
|
||||
|
||||
def test_chmod_simple_int(self):
|
||||
py.builtin.print_("path1 is", path1)
|
||||
mode = path1.stat().mode
|
||||
# Ensure that we actually change the mode to something different.
|
||||
path1.chmod(mode == 0 and 1 or 0)
|
||||
try:
|
||||
print(path1.stat().mode)
|
||||
print(mode)
|
||||
assert path1.stat().mode != mode
|
||||
finally:
|
||||
path1.chmod(mode)
|
||||
assert path1.stat().mode == mode
|
||||
|
||||
def test_path_comparison_lowercase_mixed(self):
|
||||
t1 = path1.join("a_path")
|
||||
t2 = path1.join("A_path")
|
||||
assert t1 == t1
|
||||
assert t1 == t2
|
||||
|
||||
def test_relto_with_mixed_case(self):
|
||||
t1 = path1.join("a_path", "fiLe")
|
||||
t2 = path1.join("A_path")
|
||||
assert t1.relto(t2) == "fiLe"
|
||||
|
||||
def test_allow_unix_style_paths(self):
|
||||
t1 = path1.join('a_path')
|
||||
assert t1 == str(path1) + '\\a_path'
|
||||
t1 = path1.join('a_path/')
|
||||
assert t1 == str(path1) + '\\a_path'
|
||||
t1 = path1.join('dir/a_path')
|
||||
assert t1 == str(path1) + '\\dir\\a_path'
|
||||
|
||||
def test_sysfind_in_currentdir(self):
|
||||
cmd = py.path.local.sysfind('cmd')
|
||||
root = cmd.new(dirname='', basename='') # c:\ in most installations
|
||||
old = root.chdir()
|
||||
try:
|
||||
x = py.path.local.sysfind(cmd.relto(root))
|
||||
assert x.check(file=1)
|
||||
finally:
|
||||
old.chdir()
|
||||
|
||||
class TestPOSIXLocalPath:
|
||||
disabled = py.std.sys.platform == 'win32'
|
||||
|
||||
def test_samefile(self, tmpdir):
|
||||
assert tmpdir.samefile(tmpdir)
|
||||
p = tmpdir.ensure("hello")
|
||||
assert p.samefile(p)
|
||||
|
||||
def test_hardlink(self, tmpdir):
|
||||
linkpath = tmpdir.join('test')
|
||||
filepath = tmpdir.join('file')
|
||||
filepath.write("Hello")
|
||||
nlink = filepath.stat().nlink
|
||||
linkpath.mklinkto(filepath)
|
||||
assert filepath.stat().nlink == nlink + 1
|
||||
|
||||
def test_symlink_are_identical(self, tmpdir):
|
||||
filepath = tmpdir.join('file')
|
||||
filepath.write("Hello")
|
||||
linkpath = tmpdir.join('test')
|
||||
linkpath.mksymlinkto(filepath)
|
||||
assert linkpath.readlink() == str(filepath)
|
||||
|
||||
def test_symlink_isfile(self, tmpdir):
|
||||
linkpath = tmpdir.join('test')
|
||||
filepath = tmpdir.join('file')
|
||||
filepath.write("")
|
||||
linkpath.mksymlinkto(filepath)
|
||||
assert linkpath.check(file=1)
|
||||
assert not linkpath.check(link=0, file=1)
|
||||
|
||||
def test_symlink_relative(self, tmpdir):
|
||||
linkpath = tmpdir.join('test')
|
||||
filepath = tmpdir.join('file')
|
||||
filepath.write("Hello")
|
||||
linkpath.mksymlinkto(filepath, absolute=False)
|
||||
assert linkpath.readlink() == "file"
|
||||
assert filepath.read() == linkpath.read()
|
||||
|
||||
def test_symlink_not_existing(self, tmpdir):
|
||||
linkpath = tmpdir.join('testnotexisting')
|
||||
assert not linkpath.check(link=1)
|
||||
assert linkpath.check(link=0)
|
||||
|
||||
def test_relto_with_root(self, path1, tmpdir):
|
||||
y = path1.join('x').relto(py.path.local('/'))
|
||||
assert y[0] == str(path1)[1]
|
||||
|
||||
def test_visit_recursive_symlink(self, tmpdir):
|
||||
linkpath = tmpdir.join('test')
|
||||
linkpath.mksymlinkto(tmpdir)
|
||||
visitor = tmpdir.visit(None, lambda x: x.check(link=0))
|
||||
assert list(visitor) == [linkpath]
|
||||
|
||||
def test_symlink_isdir(self, tmpdir):
|
||||
linkpath = tmpdir.join('test')
|
||||
linkpath.mksymlinkto(tmpdir)
|
||||
assert linkpath.check(dir=1)
|
||||
assert not linkpath.check(link=0, dir=1)
|
||||
|
||||
def test_symlink_remove(self, tmpdir):
|
||||
linkpath = tmpdir.join('test')
|
||||
linkpath.mksymlinkto(linkpath) # point to itself
|
||||
assert linkpath.check(link=1)
|
||||
linkpath.remove()
|
||||
assert not linkpath.check()
|
||||
|
||||
def test_realpath_file(self, tmpdir):
|
||||
linkpath = tmpdir.join('test')
|
||||
filepath = tmpdir.join('file')
|
||||
filepath.write("")
|
||||
linkpath.mksymlinkto(filepath)
|
||||
realpath = linkpath.realpath()
|
||||
assert realpath.basename == 'file'
|
||||
|
||||
def test_owner(self, path1, tmpdir):
|
||||
from pwd import getpwuid
|
||||
from grp import getgrgid
|
||||
stat = path1.stat()
|
||||
assert stat.path == path1
|
||||
|
||||
uid = stat.uid
|
||||
gid = stat.gid
|
||||
owner = getpwuid(uid)[0]
|
||||
group = getgrgid(gid)[0]
|
||||
|
||||
assert uid == stat.uid
|
||||
assert owner == stat.owner
|
||||
assert gid == stat.gid
|
||||
assert group == stat.group
|
||||
|
||||
def test_atime(self, tmpdir):
|
||||
import time
|
||||
path = tmpdir.ensure('samplefile')
|
||||
now = time.time()
|
||||
atime1 = path.atime()
|
||||
# we could wait here but timer resolution is very
|
||||
# system dependent
|
||||
path.read()
|
||||
atime2 = path.atime()
|
||||
duration = time.time() - now
|
||||
assert (atime2-atime1) <= duration
|
||||
|
||||
def test_commondir(self, path1):
|
||||
# XXX This is here in local until we find a way to implement this
|
||||
# using the subversion command line api.
|
||||
p1 = path1.join('something')
|
||||
p2 = path1.join('otherthing')
|
||||
assert p1.common(p2) == path1
|
||||
assert p2.common(p1) == path1
|
||||
|
||||
def test_commondir_nocommon(self, path1):
|
||||
# XXX This is here in local until we find a way to implement this
|
||||
# using the subversion command line api.
|
||||
p1 = path1.join('something')
|
||||
p2 = py.path.local(path1.sep+'blabla')
|
||||
assert p1.common(p2) == '/'
|
||||
|
||||
def test_join_to_root(self, path1):
|
||||
root = path1.parts()[0]
|
||||
assert len(str(root)) == 1
|
||||
assert str(root.join('a')) == '/a'
|
||||
|
||||
def test_join_root_to_root_with_no_abs(self, path1):
|
||||
nroot = path1.join('/')
|
||||
assert str(path1) == str(nroot)
|
||||
assert path1 == nroot
|
||||
|
||||
def test_chmod_simple_int(self, path1):
|
||||
mode = path1.stat().mode
|
||||
path1.chmod(int(mode/2))
|
||||
try:
|
||||
assert path1.stat().mode != mode
|
||||
finally:
|
||||
path1.chmod(mode)
|
||||
assert path1.stat().mode == mode
|
||||
|
||||
def test_chmod_rec_int(self, path1):
|
||||
# XXX fragile test
|
||||
recfilter = lambda x: x.check(dotfile=0, link=0)
|
||||
oldmodes = {}
|
||||
for x in path1.visit(rec=recfilter):
|
||||
oldmodes[x] = x.stat().mode
|
||||
path1.chmod(int("772", 8), rec=recfilter)
|
||||
try:
|
||||
for x in path1.visit(rec=recfilter):
|
||||
assert x.stat().mode & int("777", 8) == int("772", 8)
|
||||
finally:
|
||||
for x,y in oldmodes.items():
|
||||
x.chmod(y)
|
||||
|
||||
def test_chown_identity(self, path1):
|
||||
owner = path1.stat().owner
|
||||
group = path1.stat().group
|
||||
path1.chown(owner, group)
|
||||
|
||||
def test_chown_dangling_link(self, path1):
|
||||
owner = path1.stat().owner
|
||||
group = path1.stat().group
|
||||
x = path1.join('hello')
|
||||
x.mksymlinkto('qlwkejqwlek')
|
||||
try:
|
||||
path1.chown(owner, group, rec=1)
|
||||
finally:
|
||||
x.remove(rec=0)
|
||||
|
||||
def test_chown_identity_rec_mayfail(self, path1):
|
||||
owner = path1.stat().owner
|
||||
group = path1.stat().group
|
||||
path1.chown(owner, group)
|
||||
443
testing/path/test_svnauth.py
Normal file
443
testing/path/test_svnauth.py
Normal file
@@ -0,0 +1,443 @@
|
||||
import py
|
||||
from testing.path import svntestbase
|
||||
from py.path import SvnAuth
|
||||
import time
|
||||
import sys
|
||||
|
||||
def make_repo_auth(repo, userdata):
|
||||
""" write config to repo
|
||||
|
||||
user information in userdata is used for auth
|
||||
userdata has user names as keys, and a tuple (password, readwrite) as
|
||||
values, where 'readwrite' is either 'r' or 'rw'
|
||||
"""
|
||||
confdir = py.path.local(repo).join('conf')
|
||||
confdir.join('svnserve.conf').write('''\
|
||||
[general]
|
||||
anon-access = none
|
||||
password-db = passwd
|
||||
authz-db = authz
|
||||
realm = TestRepo
|
||||
''')
|
||||
authzdata = '[/]\n'
|
||||
passwddata = '[users]\n'
|
||||
for user in userdata:
|
||||
authzdata += '%s = %s\n' % (user, userdata[user][1])
|
||||
passwddata += '%s = %s\n' % (user, userdata[user][0])
|
||||
confdir.join('authz').write(authzdata)
|
||||
confdir.join('passwd').write(passwddata)
|
||||
|
||||
def serve_bg(repopath):
|
||||
pidfile = py.path.local(repopath).join('pid')
|
||||
port = 10000
|
||||
e = None
|
||||
while port < 10010:
|
||||
cmd = 'svnserve -d -T --listen-port=%d --pid-file=%s -r %s' % (
|
||||
port, pidfile, repopath)
|
||||
print(cmd)
|
||||
try:
|
||||
py.process.cmdexec(cmd)
|
||||
except py.process.cmdexec.Error:
|
||||
e = sys.exc_info()[1]
|
||||
else:
|
||||
# XXX we assume here that the pid file gets written somewhere, I
|
||||
# guess this should be relatively safe... (I hope, at least?)
|
||||
counter = pid = 0
|
||||
while counter < 10:
|
||||
counter += 1
|
||||
try:
|
||||
pid = pidfile.read()
|
||||
except py.error.ENOENT:
|
||||
pass
|
||||
if pid:
|
||||
break
|
||||
time.sleep(0.2)
|
||||
return port, int(pid)
|
||||
port += 1
|
||||
raise IOError('could not start svnserve: %s' % (e,))
|
||||
|
||||
class TestSvnAuth(object):
|
||||
def test_basic(self):
|
||||
auth = SvnAuth('foo', 'bar')
|
||||
assert auth.username == 'foo'
|
||||
assert auth.password == 'bar'
|
||||
assert str(auth)
|
||||
|
||||
def test_makecmdoptions_uname_pw_makestr(self):
|
||||
auth = SvnAuth('foo', 'bar')
|
||||
assert auth.makecmdoptions() == '--username="foo" --password="bar"'
|
||||
|
||||
def test_makecmdoptions_quote_escape(self):
|
||||
auth = SvnAuth('fo"o', '"ba\'r"')
|
||||
assert auth.makecmdoptions() == '--username="fo\\"o" --password="\\"ba\'r\\""'
|
||||
|
||||
def test_makecmdoptions_no_cache_auth(self):
|
||||
auth = SvnAuth('foo', 'bar', cache_auth=False)
|
||||
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
|
||||
'--no-auth-cache')
|
||||
|
||||
def test_makecmdoptions_no_interactive(self):
|
||||
auth = SvnAuth('foo', 'bar', interactive=False)
|
||||
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
|
||||
'--non-interactive')
|
||||
|
||||
def test_makecmdoptions_no_interactive_no_cache_auth(self):
|
||||
auth = SvnAuth('foo', 'bar', cache_auth=False,
|
||||
interactive=False)
|
||||
assert auth.makecmdoptions() == ('--username="foo" --password="bar" '
|
||||
'--no-auth-cache --non-interactive')
|
||||
|
||||
class svnwc_no_svn(py.path.svnwc):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.commands = []
|
||||
super(svnwc_no_svn, self).__init__(*args, **kwargs)
|
||||
|
||||
def _svn(self, *args):
|
||||
self.commands.append(args)
|
||||
|
||||
class TestSvnWCAuth(object):
|
||||
def setup_method(self, meth):
|
||||
self.auth = SvnAuth('user', 'pass', cache_auth=False)
|
||||
|
||||
def test_checkout(self):
|
||||
wc = svnwc_no_svn('foo', auth=self.auth)
|
||||
wc.checkout('url')
|
||||
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
|
||||
'--no-auth-cache')
|
||||
|
||||
def test_commit(self):
|
||||
wc = svnwc_no_svn('foo', auth=self.auth)
|
||||
wc.commit('msg')
|
||||
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
|
||||
'--no-auth-cache')
|
||||
|
||||
def test_checkout_no_cache_auth(self):
|
||||
wc = svnwc_no_svn('foo', auth=self.auth)
|
||||
wc.checkout('url')
|
||||
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
|
||||
'--no-auth-cache')
|
||||
|
||||
def test_checkout_auth_from_constructor(self):
|
||||
wc = svnwc_no_svn('foo', auth=self.auth)
|
||||
wc.checkout('url')
|
||||
assert wc.commands[0][-1] == ('--username="user" --password="pass" '
|
||||
'--no-auth-cache')
|
||||
|
||||
class svnurl_no_svn(py.path.svnurl):
|
||||
cmdexec_output = 'test'
|
||||
popen_output = 'test'
|
||||
def __init__(self, *args, **kwargs):
|
||||
py.path.svnurl.__init__(self, *args, **kwargs)
|
||||
self.commands = []
|
||||
|
||||
def _cmdexec(self, cmd):
|
||||
self.commands.append(cmd)
|
||||
return self.cmdexec_output
|
||||
|
||||
def _popen(self, cmd):
|
||||
self.commands.append(cmd)
|
||||
return self.popen_output
|
||||
|
||||
class TestSvnURLAuth(object):
|
||||
def setup_method(self, meth):
|
||||
self.auth = SvnAuth('foo', 'bar')
|
||||
|
||||
def test_init(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn')
|
||||
assert u.auth is None
|
||||
|
||||
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
|
||||
assert u.auth is self.auth
|
||||
|
||||
def test_new(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
|
||||
new = u.new(basename='bar')
|
||||
assert new.auth is self.auth
|
||||
assert new.url == 'http://foo.bar/svn/bar'
|
||||
|
||||
def test_join(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
|
||||
new = u.join('foo')
|
||||
assert new.auth is self.auth
|
||||
assert new.url == 'http://foo.bar/svn/foo'
|
||||
|
||||
def test_listdir(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
|
||||
u.cmdexec_output = '''\
|
||||
1717 johnny 1529 Nov 04 14:32 LICENSE.txt
|
||||
1716 johnny 5352 Nov 04 14:28 README.txt
|
||||
'''
|
||||
paths = u.listdir()
|
||||
assert paths[0].auth is self.auth
|
||||
assert paths[1].auth is self.auth
|
||||
assert paths[0].basename == 'LICENSE.txt'
|
||||
|
||||
def test_info(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn/LICENSE.txt', auth=self.auth)
|
||||
def dirpath(self):
|
||||
return self
|
||||
u.cmdexec_output = '''\
|
||||
1717 johnny 1529 Nov 04 14:32 LICENSE.txt
|
||||
1716 johnny 5352 Nov 04 14:28 README.txt
|
||||
'''
|
||||
org_dp = u.__class__.dirpath
|
||||
u.__class__.dirpath = dirpath
|
||||
try:
|
||||
info = u.info()
|
||||
finally:
|
||||
u.dirpath = org_dp
|
||||
assert info.size == 1529
|
||||
|
||||
def test_open(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
|
||||
foo = u.join('foo')
|
||||
foo.check = lambda *args, **kwargs: True
|
||||
ret = foo.open()
|
||||
assert ret == 'test'
|
||||
assert '--username="foo" --password="bar"' in foo.commands[0]
|
||||
|
||||
def test_dirpath(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
|
||||
parent = u.dirpath()
|
||||
assert parent.auth is self.auth
|
||||
|
||||
def test_mkdir(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn/qweqwe', auth=self.auth)
|
||||
assert not u.commands
|
||||
u.mkdir(msg='created dir foo')
|
||||
assert u.commands
|
||||
assert '--username="foo" --password="bar"' in u.commands[0]
|
||||
|
||||
def test_copy(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
|
||||
u2 = svnurl_no_svn('http://foo.bar/svn2')
|
||||
u.copy(u2, 'copied dir')
|
||||
assert '--username="foo" --password="bar"' in u.commands[0]
|
||||
|
||||
def test_rename(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
|
||||
u.rename('http://foo.bar/svn/bar', 'moved foo to bar')
|
||||
assert '--username="foo" --password="bar"' in u.commands[0]
|
||||
|
||||
def test_remove(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
|
||||
u.remove(msg='removing foo')
|
||||
assert '--username="foo" --password="bar"' in u.commands[0]
|
||||
|
||||
def test_export(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
|
||||
target = py.path.local('/foo')
|
||||
u.export(target)
|
||||
assert '--username="foo" --password="bar"' in u.commands[0]
|
||||
|
||||
def test_log(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth)
|
||||
u.popen_output = py.io.TextIO('''\
|
||||
<?xml version="1.0"?>
|
||||
<log>
|
||||
<logentry revision="51381">
|
||||
<author>guido</author>
|
||||
<date>2008-02-11T12:12:18.476481Z</date>
|
||||
<msg>Creating branch to work on auth support for py.path.svn*.
|
||||
</msg>
|
||||
</logentry>
|
||||
</log>
|
||||
''')
|
||||
u.check = lambda *args, **kwargs: True
|
||||
ret = u.log(10, 20, verbose=True)
|
||||
assert '--username="foo" --password="bar"' in u.commands[0]
|
||||
assert len(ret) == 1
|
||||
assert int(ret[0].rev) == 51381
|
||||
assert ret[0].author == 'guido'
|
||||
|
||||
def test_propget(self):
|
||||
u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth)
|
||||
u.propget('foo')
|
||||
assert '--username="foo" --password="bar"' in u.commands[0]
|
||||
|
||||
class pytest_funcarg__setup:
|
||||
def __init__(self, request):
|
||||
if not request.config.option.runslowtests:
|
||||
py.test.skip('use --runslowtests to run these tests')
|
||||
|
||||
tmpdir = request.getfuncargvalue("tmpdir")
|
||||
repodir = tmpdir.join("repo")
|
||||
py.process.cmdexec('svnadmin create %s' % repodir)
|
||||
if sys.platform == 'win32':
|
||||
repodir = '/' + str(repodir).replace('\\', '/')
|
||||
self.repo = py.path.svnurl("file://%s" % repodir)
|
||||
if py.std.sys.platform == 'win32':
|
||||
# remove trailing slash...
|
||||
repodir = repodir[1:]
|
||||
self.repopath = py.path.local(repodir)
|
||||
self.temppath = tmpdir.mkdir("temppath")
|
||||
self.auth = SvnAuth('johnny', 'foo', cache_auth=False,
|
||||
interactive=False)
|
||||
make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
|
||||
self.port, self.pid = serve_bg(self.repopath.dirpath())
|
||||
# XXX caching is too global
|
||||
py.path.svnurl._lsnorevcache._dict.clear()
|
||||
request.addfinalizer(lambda: py.process.kill(self.pid))
|
||||
|
||||
class TestSvnWCAuthFunctional:
|
||||
def test_checkout_constructor_arg(self, setup):
|
||||
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
|
||||
wc.checkout(
|
||||
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
|
||||
assert wc.join('.svn').check()
|
||||
|
||||
def test_checkout_function_arg(self, setup):
|
||||
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
|
||||
wc.checkout(
|
||||
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
|
||||
assert wc.join('.svn').check()
|
||||
|
||||
def test_checkout_failing_non_interactive(self, setup):
|
||||
auth = SvnAuth('johnny', 'bar', cache_auth=False,
|
||||
interactive=False)
|
||||
wc = py.path.svnwc(setup.temppath, auth)
|
||||
py.test.raises(Exception,
|
||||
("wc.checkout('svn://localhost:%(port)s/%(repopath)s')" %
|
||||
setup.__dict__))
|
||||
|
||||
def test_log(self, setup):
|
||||
wc = py.path.svnwc(setup.temppath, setup.auth)
|
||||
wc.checkout(
|
||||
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
|
||||
foo = wc.ensure('foo.txt')
|
||||
wc.commit('added foo.txt')
|
||||
log = foo.log()
|
||||
assert len(log) == 1
|
||||
assert log[0].msg == 'added foo.txt'
|
||||
|
||||
def test_switch(self, setup):
|
||||
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
|
||||
svnurl = 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)
|
||||
wc.checkout(svnurl)
|
||||
wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
|
||||
wc.commit('added foo dir with foo.txt file')
|
||||
wc.ensure('bar', dir=True)
|
||||
wc.commit('added bar dir')
|
||||
bar = wc.join('bar')
|
||||
bar.switch(svnurl + '/foo')
|
||||
assert bar.join('foo.txt')
|
||||
|
||||
def test_update(self, setup):
|
||||
wc1 = py.path.svnwc(setup.temppath.ensure('wc1', dir=True),
|
||||
auth=setup.auth)
|
||||
wc2 = py.path.svnwc(setup.temppath.ensure('wc2', dir=True),
|
||||
auth=setup.auth)
|
||||
wc1.checkout(
|
||||
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
|
||||
wc2.checkout(
|
||||
'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename))
|
||||
wc1.ensure('foo', dir=True)
|
||||
wc1.commit('added foo dir')
|
||||
wc2.update()
|
||||
assert wc2.join('foo').check()
|
||||
|
||||
auth = SvnAuth('unknown', 'unknown', interactive=False)
|
||||
wc2.auth = auth
|
||||
py.test.raises(Exception, 'wc2.update()')
|
||||
|
||||
def test_lock_unlock_status(self, setup):
|
||||
port = setup.port
|
||||
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
|
||||
wc.checkout(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename,))
|
||||
wc.ensure('foo', file=True)
|
||||
wc.commit('added foo file')
|
||||
foo = wc.join('foo')
|
||||
foo.lock()
|
||||
status = foo.status()
|
||||
assert status.locked
|
||||
foo.unlock()
|
||||
status = foo.status()
|
||||
assert not status.locked
|
||||
|
||||
auth = SvnAuth('unknown', 'unknown', interactive=False)
|
||||
foo.auth = auth
|
||||
py.test.raises(Exception, 'foo.lock()')
|
||||
py.test.raises(Exception, 'foo.unlock()')
|
||||
|
||||
def test_diff(self, setup):
|
||||
port = setup.port
|
||||
wc = py.path.svnwc(setup.temppath, auth=setup.auth)
|
||||
wc.checkout(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename,))
|
||||
wc.ensure('foo', file=True)
|
||||
wc.commit('added foo file')
|
||||
wc.update()
|
||||
rev = int(wc.status().rev)
|
||||
foo = wc.join('foo')
|
||||
foo.write('bar')
|
||||
diff = foo.diff()
|
||||
assert '\n+bar\n' in diff
|
||||
foo.commit('added some content')
|
||||
diff = foo.diff()
|
||||
assert not diff
|
||||
diff = foo.diff(rev=rev)
|
||||
assert '\n+bar\n' in diff
|
||||
|
||||
auth = SvnAuth('unknown', 'unknown', interactive=False)
|
||||
foo.auth = auth
|
||||
py.test.raises(Exception, 'foo.diff(rev=rev)')
|
||||
|
||||
class TestSvnURLAuthFunctional:
|
||||
def test_listdir(self, setup):
|
||||
port = setup.port
|
||||
u = py.path.svnurl(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
|
||||
auth=setup.auth)
|
||||
u.ensure('foo')
|
||||
paths = u.listdir()
|
||||
assert len(paths) == 1
|
||||
assert paths[0].auth is setup.auth
|
||||
|
||||
auth = SvnAuth('foo', 'bar', interactive=False)
|
||||
u = py.path.svnurl(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
|
||||
auth=auth)
|
||||
py.test.raises(Exception, 'u.listdir()')
|
||||
|
||||
def test_copy(self, setup):
|
||||
port = setup.port
|
||||
u = py.path.svnurl(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
|
||||
auth=setup.auth)
|
||||
foo = u.mkdir('foo')
|
||||
assert foo.check()
|
||||
bar = u.join('bar')
|
||||
foo.copy(bar)
|
||||
assert bar.check()
|
||||
assert bar.auth is setup.auth
|
||||
|
||||
auth = SvnAuth('foo', 'bar', interactive=False)
|
||||
u = py.path.svnurl(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
|
||||
auth=auth)
|
||||
foo = u.join('foo')
|
||||
bar = u.join('bar')
|
||||
py.test.raises(Exception, 'foo.copy(bar)')
|
||||
|
||||
def test_write_read(self, setup):
|
||||
port = setup.port
|
||||
u = py.path.svnurl(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
|
||||
auth=setup.auth)
|
||||
foo = u.ensure('foo')
|
||||
fp = foo.open()
|
||||
try:
|
||||
data = fp.read()
|
||||
finally:
|
||||
fp.close()
|
||||
assert data == ''
|
||||
|
||||
auth = SvnAuth('foo', 'bar', interactive=False)
|
||||
u = py.path.svnurl(
|
||||
'svn://localhost:%s/%s' % (port, setup.repopath.basename),
|
||||
auth=auth)
|
||||
foo = u.join('foo')
|
||||
py.test.raises(Exception, 'foo.open()')
|
||||
|
||||
# XXX rinse, repeat... :|
|
||||
106
testing/path/test_svnurl.py
Normal file
106
testing/path/test_svnurl.py
Normal file
@@ -0,0 +1,106 @@
|
||||
import py
|
||||
from py.__.path.svnurl import InfoSvnCommand
|
||||
import datetime
|
||||
import time
|
||||
from testing.path.svntestbase import CommonSvnTests
|
||||
|
||||
def pytest_funcarg__path1(request):
|
||||
repo, wc = request.getfuncargvalue("repowc1")
|
||||
return py.path.svnurl(repo)
|
||||
|
||||
class TestSvnURLCommandPath(CommonSvnTests):
|
||||
@py.test.mark.xfail
|
||||
def test_load(self, path1):
|
||||
super(TestSvnURLCommandPath, self).test_load(path1)
|
||||
|
||||
def test_move_file(self, path1): # overrides base class
|
||||
p = path1.ensure('origfile')
|
||||
newp = p.dirpath('newfile')
|
||||
p.move(newp)
|
||||
assert newp.check(file=1)
|
||||
newp.remove()
|
||||
assert not p.check()
|
||||
|
||||
def test_move_dir(self, path1): # overrides base class
|
||||
p = path1.ensure('origdir', dir=1)
|
||||
newp = p.dirpath('newdir')
|
||||
p.move(newp)
|
||||
assert newp.check(dir=1)
|
||||
newp.remove()
|
||||
assert not p.check()
|
||||
|
||||
def test_svnurl_needs_arg(self, path1):
|
||||
py.test.raises(TypeError, "py.path.svnurl()")
|
||||
|
||||
def test_svnurl_does_not_accept_None_either(self, path1):
|
||||
py.test.raises(Exception, "py.path.svnurl(None)")
|
||||
|
||||
def test_svnurl_characters_simple(self, path1):
|
||||
py.path.svnurl("svn+ssh://hello/world")
|
||||
|
||||
def test_svnurl_characters_at_user(self, path1):
|
||||
py.path.svnurl("http://user@host.com/some/dir")
|
||||
|
||||
def test_svnurl_characters_at_path(self, path1):
|
||||
py.test.raises(ValueError, 'py.path.svnurl("http://host.com/foo@bar")')
|
||||
|
||||
def test_svnurl_characters_colon_port(self, path1):
|
||||
py.path.svnurl("http://host.com:8080/some/dir")
|
||||
|
||||
def test_svnurl_characters_tilde_end(self, path1):
|
||||
py.path.svnurl("http://host.com/some/file~")
|
||||
|
||||
def test_svnurl_characters_colon_path(self, path1):
|
||||
if py.std.sys.platform == 'win32':
|
||||
# colons are allowed on win32, because they're part of the drive
|
||||
# part of an absolute path... however, they shouldn't be allowed in
|
||||
# other parts, I think
|
||||
py.test.skip('XXX fixme win32')
|
||||
py.test.raises(ValueError, 'py.path.svnurl("http://host.com/foo:bar")')
|
||||
|
||||
def test_export(self, path1, tmpdir):
|
||||
tmpdir = tmpdir.join("empty")
|
||||
p = path1.export(tmpdir)
|
||||
assert p == tmpdir # XXX should return None
|
||||
n1 = [x.basename for x in tmpdir.listdir()]
|
||||
n2 = [x.basename for x in path1.listdir()]
|
||||
n1.sort()
|
||||
n2.sort()
|
||||
assert n1 == n2
|
||||
assert not p.join('.svn').check()
|
||||
rev = path1.mkdir("newdir")
|
||||
tmpdir.remove()
|
||||
assert not tmpdir.check()
|
||||
path1.new(rev=1).export(tmpdir)
|
||||
for p in tmpdir.listdir():
|
||||
assert p.basename in n2
|
||||
|
||||
class TestSvnInfoCommand:
|
||||
|
||||
def test_svn_1_2(self):
|
||||
line = " 2256 hpk 165 Nov 24 17:55 __init__.py"
|
||||
info = InfoSvnCommand(line)
|
||||
now = datetime.datetime.now()
|
||||
assert info.last_author == 'hpk'
|
||||
assert info.created_rev == 2256
|
||||
assert info.kind == 'file'
|
||||
# we don't check for the year (2006), because that depends
|
||||
# on the clock correctly being setup
|
||||
assert time.gmtime(info.mtime)[1:6] == (11, 24, 17, 55, 0)
|
||||
assert info.size == 165
|
||||
assert info.time == info.mtime * 1000000
|
||||
|
||||
def test_svn_1_3(self):
|
||||
line =" 4784 hpk 2 Jun 01 2004 __init__.py"
|
||||
info = InfoSvnCommand(line)
|
||||
assert info.last_author == 'hpk'
|
||||
assert info.kind == 'file'
|
||||
|
||||
def test_svn_1_3_b(self):
|
||||
line =" 74 autoadmi Oct 06 23:59 plonesolutions.com/"
|
||||
info = InfoSvnCommand(line)
|
||||
assert info.last_author == 'autoadmi'
|
||||
assert info.kind == 'dir'
|
||||
|
||||
def test_badchars():
|
||||
py.test.raises(ValueError, "py.path.svnurl('http://host/tmp/@@@:')")
|
||||
561
testing/path/test_svnwc.py
Normal file
561
testing/path/test_svnwc.py
Normal file
@@ -0,0 +1,561 @@
|
||||
import py
|
||||
import sys
|
||||
from py.__.path.svnwc import InfoSvnWCCommand, XMLWCStatus, parse_wcinfotime
|
||||
from py.__.path import svnwc as svncommon
|
||||
from testing.path.svntestbase import CommonSvnTests
|
||||
|
||||
if sys.platform == 'win32':
|
||||
def normpath(p):
|
||||
return p
|
||||
else:
|
||||
def normpath(p):
|
||||
p = py.test.importorskip("win32").GetShortPathName(p)
|
||||
return os.path.normpath(os.path.normcase(p))
|
||||
|
||||
def test_make_repo(path1, tmpdir):
|
||||
repo = tmpdir.join("repo")
|
||||
py.process.cmdexec('svnadmin create %s' % repo)
|
||||
if sys.platform == 'win32':
|
||||
repo = '/' + str(repo).replace('\\', '/')
|
||||
repo = py.path.svnurl("file://%s" % repo)
|
||||
wc = py.path.svnwc(tmpdir.join("wc"))
|
||||
wc.checkout(repo)
|
||||
assert wc.info().rev == 0
|
||||
assert len(wc.listdir()) == 0
|
||||
p = wc.join("a_file")
|
||||
p.write("test file")
|
||||
p.add()
|
||||
rev = wc.commit("some test")
|
||||
assert p.info().rev == 1
|
||||
assert rev == 1
|
||||
rev = wc.commit()
|
||||
assert rev is None
|
||||
|
||||
def pytest_funcarg__path1(request):
|
||||
repo, wc = request.getfuncargvalue("repowc1")
|
||||
return wc
|
||||
|
||||
class TestWCSvnCommandPath(CommonSvnTests):
|
||||
def test_move_file(self, path1): # overrides base class
|
||||
try:
|
||||
super(TestWCSvnCommandPath, self).test_move_file(path1)
|
||||
finally:
|
||||
path1.revert(rec=1)
|
||||
|
||||
def test_move_directory(self, path1): # overrides base class
|
||||
try:
|
||||
super(TestWCSvnCommandPath, self).test_move_directory(path1)
|
||||
finally:
|
||||
path1.revert(rec=1)
|
||||
|
||||
def test_status_attributes_simple(self, path1):
|
||||
def assert_nochange(p):
|
||||
s = p.status()
|
||||
assert not s.modified
|
||||
assert not s.prop_modified
|
||||
assert not s.added
|
||||
assert not s.deleted
|
||||
assert not s.replaced
|
||||
|
||||
dpath = path1.join('sampledir')
|
||||
assert_nochange(path1.join('sampledir'))
|
||||
assert_nochange(path1.join('samplefile'))
|
||||
|
||||
def test_status_added(self, path1):
|
||||
nf = path1.join('newfile')
|
||||
nf.write('hello')
|
||||
nf.add()
|
||||
try:
|
||||
s = nf.status()
|
||||
assert s.added
|
||||
assert not s.modified
|
||||
assert not s.prop_modified
|
||||
assert not s.replaced
|
||||
finally:
|
||||
nf.revert()
|
||||
|
||||
def test_status_change(self, path1):
|
||||
nf = path1.join('samplefile')
|
||||
try:
|
||||
nf.write(nf.read() + 'change')
|
||||
s = nf.status()
|
||||
assert not s.added
|
||||
assert s.modified
|
||||
assert not s.prop_modified
|
||||
assert not s.replaced
|
||||
finally:
|
||||
nf.revert()
|
||||
|
||||
def test_status_added_ondirectory(self, path1):
|
||||
sampledir = path1.join('sampledir')
|
||||
try:
|
||||
t2 = sampledir.mkdir('t2')
|
||||
t1 = t2.join('t1')
|
||||
t1.write('test')
|
||||
t1.add()
|
||||
s = sampledir.status(rec=1)
|
||||
# Comparing just the file names, because paths are unpredictable
|
||||
# on Windows. (long vs. 8.3 paths)
|
||||
assert t1.basename in [item.basename for item in s.added]
|
||||
assert t2.basename in [item.basename for item in s.added]
|
||||
finally:
|
||||
t2.revert(rec=1)
|
||||
t2.localpath.remove(rec=1)
|
||||
|
||||
def test_status_unknown(self, path1):
|
||||
t1 = path1.join('un1')
|
||||
try:
|
||||
t1.write('test')
|
||||
s = path1.status()
|
||||
# Comparing just the file names, because paths are unpredictable
|
||||
# on Windows. (long vs. 8.3 paths)
|
||||
assert t1.basename in [item.basename for item in s.unknown]
|
||||
finally:
|
||||
t1.localpath.remove()
|
||||
|
||||
def test_status_unchanged(self, path1):
|
||||
r = path1
|
||||
s = path1.status(rec=1)
|
||||
# Comparing just the file names, because paths are unpredictable
|
||||
# on Windows. (long vs. 8.3 paths)
|
||||
assert r.join('samplefile').basename in [item.basename
|
||||
for item in s.unchanged]
|
||||
assert r.join('sampledir').basename in [item.basename
|
||||
for item in s.unchanged]
|
||||
assert r.join('sampledir/otherfile').basename in [item.basename
|
||||
for item in s.unchanged]
|
||||
|
||||
def test_status_update(self, path1):
|
||||
r = path1
|
||||
try:
|
||||
r.update(rev=1)
|
||||
s = r.status(updates=1, rec=1)
|
||||
# Comparing just the file names, because paths are unpredictable
|
||||
# on Windows. (long vs. 8.3 paths)
|
||||
assert r.join('anotherfile').basename in [item.basename for
|
||||
item in s.update_available]
|
||||
#assert len(s.update_available) == 1
|
||||
finally:
|
||||
r.update()
|
||||
|
||||
def test_status_replaced(self, path1):
|
||||
p = path1.join("samplefile")
|
||||
p.remove()
|
||||
p.ensure(dir=0)
|
||||
p.add()
|
||||
try:
|
||||
s = path1.status()
|
||||
assert p.basename in [item.basename for item in s.replaced]
|
||||
finally:
|
||||
path1.revert(rec=1)
|
||||
|
||||
def test_status_ignored(self, path1):
|
||||
try:
|
||||
d = path1.join('sampledir')
|
||||
p = py.path.local(d).join('ignoredfile')
|
||||
p.ensure(file=True)
|
||||
s = d.status()
|
||||
assert [x.basename for x in s.unknown] == ['ignoredfile']
|
||||
assert [x.basename for x in s.ignored] == []
|
||||
d.propset('svn:ignore', 'ignoredfile')
|
||||
s = d.status()
|
||||
assert [x.basename for x in s.unknown] == []
|
||||
assert [x.basename for x in s.ignored] == ['ignoredfile']
|
||||
finally:
|
||||
path1.revert(rec=1)
|
||||
|
||||
def test_status_conflict(self, path1, tmpdir):
|
||||
wc = path1
|
||||
wccopy = py.path.svnwc(tmpdir.join("conflict_copy"))
|
||||
wccopy.checkout(wc.url)
|
||||
p = wc.ensure('conflictsamplefile', file=1)
|
||||
p.write('foo')
|
||||
wc.commit('added conflictsamplefile')
|
||||
wccopy.update()
|
||||
assert wccopy.join('conflictsamplefile').check()
|
||||
p.write('bar')
|
||||
wc.commit('wrote some data')
|
||||
wccopy.join('conflictsamplefile').write('baz')
|
||||
wccopy.update()
|
||||
s = wccopy.status()
|
||||
assert [x.basename for x in s.conflict] == ['conflictsamplefile']
|
||||
|
||||
def test_status_external(self, path1, repowc2):
|
||||
otherrepo, otherwc = repowc2
|
||||
d = path1.ensure('sampledir', dir=1)
|
||||
try:
|
||||
d.remove()
|
||||
d.add()
|
||||
d.update()
|
||||
d.propset('svn:externals', 'otherwc %s' % (otherwc.url,))
|
||||
d.update()
|
||||
s = d.status()
|
||||
assert [x.basename for x in s.external] == ['otherwc']
|
||||
assert 'otherwc' not in [x.basename for x in s.unchanged]
|
||||
s = d.status(rec=1)
|
||||
assert [x.basename for x in s.external] == ['otherwc']
|
||||
assert 'otherwc' in [x.basename for x in s.unchanged]
|
||||
finally:
|
||||
path1.revert(rec=1)
|
||||
|
||||
def test_status_deleted(self, path1):
|
||||
d = path1.ensure('sampledir', dir=1)
|
||||
d.remove()
|
||||
d.add()
|
||||
path1.commit()
|
||||
d.ensure('deletefile', dir=0)
|
||||
d.commit()
|
||||
s = d.status()
|
||||
assert 'deletefile' in [x.basename for x in s.unchanged]
|
||||
assert not s.deleted
|
||||
p = d.join('deletefile')
|
||||
p.remove()
|
||||
s = d.status()
|
||||
assert 'deletefile' not in s.unchanged
|
||||
assert [x.basename for x in s.deleted] == ['deletefile']
|
||||
|
||||
def test_status_noauthor(self, path1):
|
||||
# testing for XML without author - this used to raise an exception
|
||||
xml = '''\
|
||||
<entry path="/tmp/pytest-23/wc">
|
||||
<wc-status item="normal" props="none" revision="0">
|
||||
<commit revision="0">
|
||||
<date>2008-08-19T16:50:53.400198Z</date>
|
||||
</commit>
|
||||
</wc-status>
|
||||
</entry>
|
||||
'''
|
||||
XMLWCStatus.fromstring(xml, path1)
|
||||
|
||||
def test_status_wrong_xml(self, path1):
|
||||
# testing for XML without author - this used to raise an exception
|
||||
xml = '<entry path="/home/jean/zope/venv/projectdb/parts/development-products/DataGridField">\n<wc-status item="incomplete" props="none" revision="784">\n</wc-status>\n</entry>'
|
||||
st = XMLWCStatus.fromstring(xml, path1)
|
||||
assert len(st.incomplete) == 1
|
||||
|
||||
def test_diff(self, path1):
|
||||
p = path1 / 'anotherfile'
|
||||
out = p.diff(rev=2)
|
||||
assert out.find('hello') != -1
|
||||
|
||||
def test_blame(self, path1):
|
||||
p = path1.join('samplepickle')
|
||||
lines = p.blame()
|
||||
assert sum([l[0] for l in lines]) == len(lines)
|
||||
for l1, l2 in zip(p.readlines(), [l[2] for l in lines]):
|
||||
assert l1 == l2
|
||||
assert [l[1] for l in lines] == ['hpk'] * len(lines)
|
||||
p = path1.join('samplefile')
|
||||
lines = p.blame()
|
||||
assert sum([l[0] for l in lines]) == len(lines)
|
||||
for l1, l2 in zip(p.readlines(), [l[2] for l in lines]):
|
||||
assert l1 == l2
|
||||
assert [l[1] for l in lines] == ['hpk'] * len(lines)
|
||||
|
||||
def test_join_abs(self, path1):
|
||||
s = str(path1.localpath)
|
||||
n = path1.join(s, abs=1)
|
||||
assert path1 == n
|
||||
|
||||
def test_join_abs2(self, path1):
|
||||
assert path1.join('samplefile', abs=1) == path1.join('samplefile')
|
||||
|
||||
def test_str_gives_localpath(self, path1):
|
||||
assert str(path1) == str(path1.localpath)
|
||||
|
||||
def test_versioned(self, path1):
|
||||
assert path1.check(versioned=1)
|
||||
# TODO: Why does my copy of svn think .svn is versioned?
|
||||
#assert path1.join('.svn').check(versioned=0)
|
||||
assert path1.join('samplefile').check(versioned=1)
|
||||
assert not path1.join('notexisting').check(versioned=1)
|
||||
notexisting = path1.join('hello').localpath
|
||||
try:
|
||||
notexisting.write("")
|
||||
assert path1.join('hello').check(versioned=0)
|
||||
finally:
|
||||
notexisting.remove()
|
||||
|
||||
def test_nonversioned_remove(self, path1):
|
||||
assert path1.check(versioned=1)
|
||||
somefile = path1.join('nonversioned/somefile')
|
||||
nonwc = py.path.local(somefile)
|
||||
nonwc.ensure()
|
||||
assert somefile.check()
|
||||
assert not somefile.check(versioned=True)
|
||||
somefile.remove() # this used to fail because it tried to 'svn rm'
|
||||
|
||||
def test_properties(self, path1):
|
||||
try:
|
||||
path1.propset('gaga', 'this')
|
||||
assert path1.propget('gaga') == 'this'
|
||||
# Comparing just the file names, because paths are unpredictable
|
||||
# on Windows. (long vs. 8.3 paths)
|
||||
assert path1.basename in [item.basename for item in
|
||||
path1.status().prop_modified]
|
||||
assert 'gaga' in path1.proplist()
|
||||
assert path1.proplist()['gaga'] == 'this'
|
||||
|
||||
finally:
|
||||
path1.propdel('gaga')
|
||||
|
||||
def test_proplist_recursive(self, path1):
|
||||
s = path1.join('samplefile')
|
||||
s.propset('gugu', 'that')
|
||||
try:
|
||||
p = path1.proplist(rec=1)
|
||||
# Comparing just the file names, because paths are unpredictable
|
||||
# on Windows. (long vs. 8.3 paths)
|
||||
assert (path1 / 'samplefile').basename in [item.basename
|
||||
for item in p]
|
||||
finally:
|
||||
s.propdel('gugu')
|
||||
|
||||
def test_long_properties(self, path1):
|
||||
value = """
|
||||
vadm:posix : root root 0100755
|
||||
Properties on 'chroot/dns/var/bind/db.net.xots':
|
||||
"""
|
||||
try:
|
||||
path1.propset('gaga', value)
|
||||
backvalue = path1.propget('gaga')
|
||||
assert backvalue == value
|
||||
#assert len(backvalue.split('\n')) == 1
|
||||
finally:
|
||||
path1.propdel('gaga')
|
||||
|
||||
|
||||
def test_ensure(self, path1):
|
||||
newpath = path1.ensure('a', 'b', 'c')
|
||||
try:
|
||||
assert newpath.check(exists=1, versioned=1)
|
||||
newpath.write("hello")
|
||||
newpath.ensure()
|
||||
assert newpath.read() == "hello"
|
||||
finally:
|
||||
path1.join('a').remove(force=1)
|
||||
|
||||
def test_not_versioned(self, path1):
|
||||
p = path1.localpath.mkdir('whatever')
|
||||
f = path1.localpath.ensure('testcreatedfile')
|
||||
try:
|
||||
assert path1.join('whatever').check(versioned=0)
|
||||
assert path1.join('testcreatedfile').check(versioned=0)
|
||||
assert not path1.join('testcreatedfile').check(versioned=1)
|
||||
finally:
|
||||
p.remove(rec=1)
|
||||
f.remove()
|
||||
|
||||
def test_lock_unlock(self, path1):
|
||||
root = path1
|
||||
somefile = root.join('somefile')
|
||||
somefile.ensure(file=True)
|
||||
# not yet added to repo
|
||||
py.test.raises(py.process.cmdexec.Error, 'somefile.lock()')
|
||||
somefile.write('foo')
|
||||
somefile.commit('test')
|
||||
assert somefile.check(versioned=True)
|
||||
somefile.lock()
|
||||
try:
|
||||
locked = root.status().locked
|
||||
assert len(locked) == 1
|
||||
assert normpath(str(locked[0])) == normpath(str(somefile))
|
||||
#assert somefile.locked()
|
||||
py.test.raises(Exception, 'somefile.lock()')
|
||||
finally:
|
||||
somefile.unlock()
|
||||
#assert not somefile.locked()
|
||||
locked = root.status().locked
|
||||
assert locked == []
|
||||
py.test.raises(Exception, 'somefile,unlock()')
|
||||
somefile.remove()
|
||||
|
||||
def test_commit_nonrecursive(self, path1):
|
||||
somedir = path1.join('sampledir')
|
||||
somedir.mkdir("subsubdir")
|
||||
somedir.propset('foo', 'bar')
|
||||
status = somedir.status()
|
||||
assert len(status.prop_modified) == 1
|
||||
assert len(status.added) == 1
|
||||
|
||||
somedir.commit('non-recursive commit', rec=0)
|
||||
status = somedir.status()
|
||||
assert len(status.prop_modified) == 0
|
||||
assert len(status.added) == 1
|
||||
|
||||
somedir.commit('recursive commit')
|
||||
status = somedir.status()
|
||||
assert len(status.prop_modified) == 0
|
||||
assert len(status.added) == 0
|
||||
|
||||
def test_commit_return_value(self, path1):
|
||||
testfile = path1.join('test.txt').ensure(file=True)
|
||||
testfile.write('test')
|
||||
rev = path1.commit('testing')
|
||||
assert type(rev) == int
|
||||
|
||||
anotherfile = path1.join('another.txt').ensure(file=True)
|
||||
anotherfile.write('test')
|
||||
rev2 = path1.commit('testing more')
|
||||
assert type(rev2) == int
|
||||
assert rev2 == rev + 1
|
||||
|
||||
#def test_log(self, path1):
|
||||
# l = path1.log()
|
||||
# assert len(l) == 3 # might need to be upped if more tests are added
|
||||
|
||||
class XTestWCSvnCommandPathSpecial:
|
||||
|
||||
rooturl = 'http://codespeak.net/svn/py.path/trunk/dist/py.path/test/data'
|
||||
#def test_update_none_rev(self, path1):
|
||||
# path = tmpdir.join('checkouttest')
|
||||
# wcpath = newpath(xsvnwc=str(path), url=path1url)
|
||||
# try:
|
||||
# wcpath.checkout(rev=2100)
|
||||
# wcpath.update()
|
||||
# assert wcpath.info().rev > 2100
|
||||
# finally:
|
||||
# wcpath.localpath.remove(rec=1)
|
||||
|
||||
def test_parse_wcinfotime():
|
||||
assert (parse_wcinfotime('2006-05-30 20:45:26 +0200 (Tue, 30 May 2006)') ==
|
||||
1149021926)
|
||||
assert (parse_wcinfotime('2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003)') ==
|
||||
1067287394)
|
||||
|
||||
class TestInfoSvnWCCommand:
|
||||
|
||||
def test_svn_1_2(self, path1):
|
||||
output = """
|
||||
Path: test_svnwc.py
|
||||
Name: test_svnwc.py
|
||||
URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py
|
||||
Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada
|
||||
Revision: 28137
|
||||
Node Kind: file
|
||||
Schedule: normal
|
||||
Last Changed Author: jan
|
||||
Last Changed Rev: 27939
|
||||
Last Changed Date: 2006-05-30 20:45:26 +0200 (Tue, 30 May 2006)
|
||||
Text Last Updated: 2006-06-01 00:42:53 +0200 (Thu, 01 Jun 2006)
|
||||
Properties Last Updated: 2006-05-23 11:54:59 +0200 (Tue, 23 May 2006)
|
||||
Checksum: 357e44880e5d80157cc5fbc3ce9822e3
|
||||
"""
|
||||
path = py.path.local(__file__).dirpath().chdir()
|
||||
try:
|
||||
info = InfoSvnWCCommand(output)
|
||||
finally:
|
||||
path.chdir()
|
||||
assert info.last_author == 'jan'
|
||||
assert info.kind == 'file'
|
||||
assert info.mtime == 1149021926.0
|
||||
assert info.url == 'http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py'
|
||||
assert info.time == 1149021926000000.0
|
||||
assert info.rev == 28137
|
||||
|
||||
|
||||
def test_svn_1_3(self, path1):
|
||||
output = """
|
||||
Path: test_svnwc.py
|
||||
Name: test_svnwc.py
|
||||
URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py
|
||||
Repository Root: http://codespeak.net/svn
|
||||
Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada
|
||||
Revision: 28124
|
||||
Node Kind: file
|
||||
Schedule: normal
|
||||
Last Changed Author: jan
|
||||
Last Changed Rev: 27939
|
||||
Last Changed Date: 2006-05-30 20:45:26 +0200 (Tue, 30 May 2006)
|
||||
Text Last Updated: 2006-06-02 23:46:11 +0200 (Fri, 02 Jun 2006)
|
||||
Properties Last Updated: 2006-06-02 23:45:28 +0200 (Fri, 02 Jun 2006)
|
||||
Checksum: 357e44880e5d80157cc5fbc3ce9822e3
|
||||
"""
|
||||
path = py.path.local(__file__).dirpath().chdir()
|
||||
try:
|
||||
info = InfoSvnWCCommand(output)
|
||||
finally:
|
||||
path.chdir()
|
||||
assert info.last_author == 'jan'
|
||||
assert info.kind == 'file'
|
||||
assert info.mtime == 1149021926.0
|
||||
assert info.url == 'http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py'
|
||||
assert info.rev == 28124
|
||||
assert info.time == 1149021926000000.0
|
||||
|
||||
|
||||
def test_characters_at():
|
||||
py.test.raises(ValueError, "py.path.svnwc('/tmp/@@@:')")
|
||||
|
||||
def test_characters_tilde():
|
||||
py.path.svnwc('/tmp/test~')
|
||||
|
||||
|
||||
class TestRepo:
|
||||
def test_trailing_slash_is_stripped(self, path1):
|
||||
# XXX we need to test more normalizing properties
|
||||
url = path1.join("/")
|
||||
assert path1 == url
|
||||
|
||||
#def test_different_revs_compare_unequal(self, path1):
|
||||
# newpath = path1.new(rev=1199)
|
||||
# assert newpath != path1
|
||||
|
||||
def test_exists_svn_root(self, path1):
|
||||
assert path1.check()
|
||||
|
||||
#def test_not_exists_rev(self, path1):
|
||||
# url = path1.__class__(path1url, rev=500)
|
||||
# assert url.check(exists=0)
|
||||
|
||||
#def test_nonexisting_listdir_rev(self, path1):
|
||||
# url = path1.__class__(path1url, rev=500)
|
||||
# raises(py.error.ENOENT, url.listdir)
|
||||
|
||||
#def test_newrev(self, path1):
|
||||
# url = path1.new(rev=None)
|
||||
# assert url.rev == None
|
||||
# assert url.strpath == path1.strpath
|
||||
# url = path1.new(rev=10)
|
||||
# assert url.rev == 10
|
||||
|
||||
#def test_info_rev(self, path1):
|
||||
# url = path1.__class__(path1url, rev=1155)
|
||||
# url = url.join("samplefile")
|
||||
# res = url.info()
|
||||
# assert res.size > len("samplefile") and res.created_rev == 1155
|
||||
|
||||
# the following tests are easier if we have a path class
|
||||
def test_repocache_simple(self, path1):
|
||||
repocache = svncommon.RepoCache()
|
||||
repocache.put(path1.strpath, 42)
|
||||
url, rev = repocache.get(path1.join('test').strpath)
|
||||
assert rev == 42
|
||||
assert url == path1.strpath
|
||||
|
||||
def test_repocache_notimeout(self, path1):
|
||||
repocache = svncommon.RepoCache()
|
||||
repocache.timeout = 0
|
||||
repocache.put(path1.strpath, path1.rev)
|
||||
url, rev = repocache.get(path1.strpath)
|
||||
assert rev == -1
|
||||
assert url == path1.strpath
|
||||
|
||||
def test_repocache_outdated(self, path1):
|
||||
repocache = svncommon.RepoCache()
|
||||
repocache.put(path1.strpath, 42, timestamp=0)
|
||||
url, rev = repocache.get(path1.join('test').strpath)
|
||||
assert rev == -1
|
||||
assert url == path1.strpath
|
||||
|
||||
def _test_getreporev(self):
|
||||
""" this test runs so slow it's usually disabled """
|
||||
old = svncommon.repositories.repos
|
||||
try:
|
||||
_repocache.clear()
|
||||
root = path1.new(rev=-1)
|
||||
url, rev = cache.repocache.get(root.strpath)
|
||||
assert rev>=0
|
||||
assert url == svnrepourl
|
||||
finally:
|
||||
repositories.repos = old
|
||||
Reference in New Issue
Block a user