From 5118821c10bda81f96d66123277b78775dc9b21f Mon Sep 17 00:00:00 2001 From: holger krekel Date: Thu, 20 Aug 2009 20:35:35 +0200 Subject: [PATCH] consolidate svn path implementations and tests into files named after the package namespaces. --HG-- branch : trunk --- py/__init__.py | 6 +- .../quoting.txt => notes-svn-quoting.txt} | 0 py/path/svn/__init__.py | 1 - py/path/svn/cache.py | 69 --- py/path/svn/svncommon.py | 368 ------------- py/path/svn/testing/__init__.py | 1 - py/path/svn/testing/test_test_repo.py | 25 - py/path/{svn/urlcommand.py => svnurl.py} | 43 +- py/path/{svn/wccommand.py => svnwc.py} | 493 +++++++++++++++++- py/path/{svn => }/testing/repotest.dump | 0 py/path/{svn => }/testing/svntestbase.py | 11 +- .../test_auth.py => testing/test_svnauth.py} | 0 .../test_svnurl.py} | 4 +- .../test_svnwc.py} | 36 +- 14 files changed, 515 insertions(+), 542 deletions(-) rename py/path/{svn/quoting.txt => notes-svn-quoting.txt} (100%) delete mode 100644 py/path/svn/__init__.py delete mode 100644 py/path/svn/cache.py delete mode 100644 py/path/svn/svncommon.py delete mode 100644 py/path/svn/testing/__init__.py delete mode 100644 py/path/svn/testing/test_test_repo.py rename py/path/{svn/urlcommand.py => svnurl.py} (89%) rename py/path/{svn/wccommand.py => svnwc.py} (64%) rename py/path/{svn => }/testing/repotest.dump (100%) rename py/path/{svn => }/testing/svntestbase.py (96%) rename py/path/{svn/testing/test_auth.py => testing/test_svnauth.py} (100%) rename py/path/{svn/testing/test_urlcommand.py => testing/test_svnurl.py} (97%) rename py/path/{svn/testing/test_wccommand.py => testing/test_svnwc.py} (95%) diff --git a/py/__init__.py b/py/__init__.py index 9276c296a..b8b84524b 100644 --- a/py/__init__.py +++ b/py/__init__.py @@ -104,10 +104,10 @@ initpkg(__name__, # path implementation 'path.__doc__' : ('./path/__init__.py', '__doc__'), - 'path.svnwc' : ('./path/svn/wccommand.py', 'SvnWCCommandPath'), - 'path.svnurl' : ('./path/svn/urlcommand.py', 'SvnCommandPath'), + 'path.svnwc' : ('./path/svnwc.py', 'SvnWCCommandPath'), + 'path.svnurl' : ('./path/svnurl.py', 'SvnCommandPath'), 'path.local' : ('./path/local.py', 'LocalPath'), - 'path.SvnAuth' : ('./path/svn/svncommon.py', 'SvnAuth'), + 'path.SvnAuth' : ('./path/svnwc.py', 'SvnAuth'), # some nice slightly magic APIs 'magic.__doc__' : ('./magic/__init__.py', '__doc__'), diff --git a/py/path/svn/quoting.txt b/py/path/notes-svn-quoting.txt similarity index 100% rename from py/path/svn/quoting.txt rename to py/path/notes-svn-quoting.txt diff --git a/py/path/svn/__init__.py b/py/path/svn/__init__.py deleted file mode 100644 index 792d60054..000000000 --- a/py/path/svn/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# diff --git a/py/path/svn/cache.py b/py/path/svn/cache.py deleted file mode 100644 index 67ad7ba05..000000000 --- a/py/path/svn/cache.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -# generic cache mechanism for subversion-related structures -# XXX make mt-safe -""" - -import time - -proplist = {} -info = {} -entries = {} -prop = {} - -#----------------------------------------------------------- -# Caching latest repository revision and repo-paths -# (getting them is slow with the current implementations) -# -# XXX make mt-safe -#----------------------------------------------------------- - -class RepoEntry: - def __init__(self, url, rev, timestamp): - self.url = url - self.rev = rev - self.timestamp = timestamp - - def __str__(self): - return "repo: %s;%s %s" %(self.url, self.rev, self.timestamp) - -class RepoCache: - """ The Repocache manages discovered repository paths - and their revisions. If inside a timeout the cache - will even return the revision of the root. - """ - timeout = 20 # seconds after which we forget that we know the last revision - - def __init__(self): - self.repos = [] - - def clear(self): - self.repos = [] - - def put(self, url, rev, timestamp=None): - if rev is None: - return - if timestamp is None: - timestamp = time.time() - - for entry in self.repos: - if url == entry.url: - entry.timestamp = timestamp - entry.rev = rev - #print "set repo", entry - break - else: - entry = RepoEntry(url, rev, timestamp) - self.repos.append(entry) - #print "appended repo", entry - - def get(self, url): - now = time.time() - for entry in self.repos: - if url.startswith(entry.url): - if now < entry.timestamp + self.timeout: - #print "returning immediate Etrny", entry - return entry.url, entry.rev - return entry.url, -1 - return url, -1 - -repositories = RepoCache() diff --git a/py/path/svn/svncommon.py b/py/path/svn/svncommon.py deleted file mode 100644 index 97d3c6740..000000000 --- a/py/path/svn/svncommon.py +++ /dev/null @@ -1,368 +0,0 @@ -""" -module with a base subversion path object. -""" -import os, sys, time, re, string -import py -from py.__.path import common - -ALLOWED_CHARS = "_ -/\\=$.~+" #add characters as necessary when tested -if sys.platform == "win32": - ALLOWED_CHARS += ":" -ALLOWED_CHARS_HOST = ALLOWED_CHARS + '@:' - -def _getsvnversion(ver=[]): - try: - return ver[0] - except IndexError: - v = py.process.cmdexec("svn -q --version") - v.strip() - v = '.'.join(v.split('.')[:2]) - ver.append(v) - return v - -def _escape_helper(text): - text = str(text) - if py.std.sys.platform != 'win32': - text = str(text).replace('$', '\\$') - return text - -def _check_for_bad_chars(text, allowed_chars=ALLOWED_CHARS): - for c in str(text): - if c.isalnum(): - continue - if c in allowed_chars: - continue - return True - return False - -def checkbadchars(url): - # (hpk) not quite sure about the exact purpose, guido w.? - proto, uri = url.split("://", 1) - if proto != "file": - host, uripath = uri.split('/', 1) - # only check for bad chars in the non-protocol parts - if (_check_for_bad_chars(host, ALLOWED_CHARS_HOST) \ - or _check_for_bad_chars(uripath, ALLOWED_CHARS)): - raise ValueError("bad char in %r" % (url, )) - - -#_______________________________________________________________ - -class SvnPathBase(common.PathBase): - """ Base implementation for SvnPath implementations. """ - sep = '/' - - def _geturl(self): - return self.strpath - url = property(_geturl, None, None, "url of this svn-path.") - - def __str__(self): - """ return a string representation (including rev-number) """ - return self.strpath - - def __hash__(self): - return hash(self.strpath) - - def new(self, **kw): - """ create a modified version of this path. A 'rev' argument - indicates a new revision. - the following keyword arguments modify various path parts: - - http://host.com/repo/path/file.ext - |-----------------------| dirname - |------| basename - |--| purebasename - |--| ext - """ - obj = object.__new__(self.__class__) - obj.rev = kw.get('rev', self.rev) - obj.auth = kw.get('auth', self.auth) - dirname, basename, purebasename, ext = self._getbyspec( - "dirname,basename,purebasename,ext") - if 'basename' in kw: - if 'purebasename' in kw or 'ext' in kw: - raise ValueError("invalid specification %r" % kw) - else: - pb = kw.setdefault('purebasename', purebasename) - ext = kw.setdefault('ext', ext) - if ext and not ext.startswith('.'): - ext = '.' + ext - kw['basename'] = pb + ext - - kw.setdefault('dirname', dirname) - kw.setdefault('sep', self.sep) - if kw['basename']: - obj.strpath = "%(dirname)s%(sep)s%(basename)s" % kw - else: - obj.strpath = "%(dirname)s" % kw - return obj - - def _getbyspec(self, spec): - """ get specified parts of the path. 'arg' is a string - with comma separated path parts. The parts are returned - in exactly the order of the specification. - - you may specify the following parts: - - http://host.com/repo/path/file.ext - |-----------------------| dirname - |------| basename - |--| purebasename - |--| ext - """ - res = [] - parts = self.strpath.split(self.sep) - for name in spec.split(','): - name = name.strip() - if name == 'dirname': - res.append(self.sep.join(parts[:-1])) - elif name == 'basename': - res.append(parts[-1]) - else: - basename = parts[-1] - i = basename.rfind('.') - if i == -1: - purebasename, ext = basename, '' - else: - purebasename, ext = basename[:i], basename[i:] - if name == 'purebasename': - res.append(purebasename) - elif name == 'ext': - res.append(ext) - else: - raise NameError, "Don't know part %r" % name - return res - - def __eq__(self, other): - """ return true if path and rev attributes each match """ - return (str(self) == str(other) and - (self.rev == other.rev or self.rev == other.rev)) - - def __ne__(self, other): - return not self == other - - def join(self, *args): - """ return a new Path (with the same revision) which is composed - of the self Path followed by 'args' path components. - """ - if not args: - return self - - args = tuple([arg.strip(self.sep) for arg in args]) - parts = (self.strpath, ) + args - newpath = self.__class__(self.sep.join(parts), self.rev, self.auth) - return newpath - - def propget(self, name): - """ return the content of the given property. """ - value = self._propget(name) - return value - - def proplist(self): - """ list all property names. """ - content = self._proplist() - return content - - def listdir(self, fil=None, sort=None): - """ list directory contents, possibly filter by the given fil func - and possibly sorted. - """ - if isinstance(fil, str): - fil = common.FNMatcher(fil) - nameinfo_seq = self._listdir_nameinfo() - if len(nameinfo_seq) == 1: - name, info = nameinfo_seq[0] - if name == self.basename and info.kind == 'file': - #if not self.check(dir=1): - raise py.error.ENOTDIR(self) - paths = self._make_path_tuple(nameinfo_seq) - - if fil or sort: - paths = filter(fil, paths) - paths = isinstance(paths, list) and paths or list(paths) - if callable(sort): - paths.sort(sort) - elif sort: - paths.sort() - return paths - - def info(self): - """ return an Info structure with svn-provided information. """ - parent = self.dirpath() - nameinfo_seq = parent._listdir_nameinfo() - bn = self.basename - for name, info in nameinfo_seq: - if name == bn: - return info - raise py.error.ENOENT(self) - - def size(self): - """ Return the size of the file content of the Path. """ - return self.info().size - - def mtime(self): - """ Return the last modification time of the file. """ - return self.info().mtime - - # shared help methods - - def _escape(self, cmd): - return _escape_helper(cmd) - - def _make_path_tuple(self, nameinfo_seq): - """ return a tuple of paths from a nameinfo-tuple sequence. - """ - #assert self.rev is not None, "revision of %s should not be None here" % self - res = [] - for name, info in nameinfo_seq: - child = self.join(name) - res.append(child) - return tuple(res) - - - def _childmaxrev(self): - """ return maximum revision number of childs (or self.rev if no childs) """ - rev = self.rev - for name, info in self._listdir_nameinfo(): - rev = max(rev, info.created_rev) - return rev - - #def _getlatestrevision(self): - # """ return latest repo-revision for this path. """ - # url = self.strpath - # path = self.__class__(url, None) - # - # # we need a long walk to find the root-repo and revision - # while 1: - # try: - # rev = max(rev, path._childmaxrev()) - # previous = path - # path = path.dirpath() - # except (IOError, process.cmdexec.Error): - # break - # if rev is None: - # raise IOError, "could not determine newest repo revision for %s" % self - # return rev - - class Checkers(common.Checkers): - def dir(self): - try: - return self.path.info().kind == 'dir' - except py.error.Error: - return self._listdirworks() - - def _listdirworks(self): - try: - self.path.listdir() - except py.error.ENOENT: - return False - else: - return True - - def file(self): - try: - return self.path.info().kind == 'file' - except py.error.ENOENT: - return False - - def exists(self): - try: - return self.path.info() - except py.error.ENOENT: - return self._listdirworks() - -def parse_apr_time(timestr): - i = timestr.rfind('.') - if i == -1: - raise ValueError, "could not parse %s" % timestr - timestr = timestr[:i] - parsedtime = time.strptime(timestr, "%Y-%m-%dT%H:%M:%S") - return time.mktime(parsedtime) - -class PropListDict(dict): - """ a Dictionary which fetches values (InfoSvnCommand instances) lazily""" - def __init__(self, path, keynames): - dict.__init__(self, [(x, None) for x in keynames]) - self.path = path - - def __getitem__(self, key): - value = dict.__getitem__(self, key) - if value is None: - value = self.path.propget(key) - dict.__setitem__(self, key, value) - return value - -def fixlocale(): - if sys.platform != 'win32': - return 'LC_ALL=C ' - return '' - -# some nasty chunk of code to solve path and url conversion and quoting issues -ILLEGAL_CHARS = '* | \ / : < > ? \t \n \x0b \x0c \r'.split(' ') -if os.sep in ILLEGAL_CHARS: - ILLEGAL_CHARS.remove(os.sep) -ISWINDOWS = sys.platform == 'win32' -_reg_allow_disk = re.compile(r'^([a-z]\:\\)?[^:]+$', re.I) -def _check_path(path): - illegal = ILLEGAL_CHARS[:] - sp = path.strpath - if ISWINDOWS: - illegal.remove(':') - if not _reg_allow_disk.match(sp): - raise ValueError('path may not contain a colon (:)') - for char in sp: - if char not in string.printable or char in illegal: - raise ValueError('illegal character %r in path' % (char,)) - -def path_to_fspath(path, addat=True): - _check_path(path) - sp = path.strpath - if addat and path.rev != -1: - sp = '%s@%s' % (sp, path.rev) - elif addat: - sp = '%s@HEAD' % (sp,) - return sp - -def url_from_path(path): - fspath = path_to_fspath(path, False) - quote = py.std.urllib.quote - if ISWINDOWS: - match = _reg_allow_disk.match(fspath) - fspath = fspath.replace('\\', '/') - if match.group(1): - fspath = '/%s%s' % (match.group(1).replace('\\', '/'), - quote(fspath[len(match.group(1)):])) - else: - fspath = quote(fspath) - else: - fspath = quote(fspath) - if path.rev != -1: - fspath = '%s@%s' % (fspath, path.rev) - else: - fspath = '%s@HEAD' % (fspath,) - return 'file://%s' % (fspath,) - -class SvnAuth(object): - """ container for auth information for Subversion """ - def __init__(self, username, password, cache_auth=True, interactive=True): - self.username = username - self.password = password - self.cache_auth = cache_auth - self.interactive = interactive - - def makecmdoptions(self): - uname = self.username.replace('"', '\\"') - passwd = self.password.replace('"', '\\"') - ret = [] - if uname: - ret.append('--username="%s"' % (uname,)) - if passwd: - ret.append('--password="%s"' % (passwd,)) - if not self.cache_auth: - ret.append('--no-auth-cache') - if not self.interactive: - ret.append('--non-interactive') - return ' '.join(ret) - - def __str__(self): - return "" %(self.username,) diff --git a/py/path/svn/testing/__init__.py b/py/path/svn/testing/__init__.py deleted file mode 100644 index 792d60054..000000000 --- a/py/path/svn/testing/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# diff --git a/py/path/svn/testing/test_test_repo.py b/py/path/svn/testing/test_test_repo.py deleted file mode 100644 index 0425a3632..000000000 --- a/py/path/svn/testing/test_test_repo.py +++ /dev/null @@ -1,25 +0,0 @@ -import py -from py.__.path.svn.testing.svntestbase import make_test_repo, getsvnbin - - -class TestMakeRepo(object): - def setup_class(cls): - getsvnbin() - cls.repo = make_test_repo() - cls.wc = py.path.svnwc(py.test.ensuretemp("test-wc").join("wc")) - - def test_empty_checkout(self): - self.wc.checkout(self.repo) - assert len(self.wc.listdir()) == 0 - - def test_commit(self): - self.wc.checkout(self.repo) - p = self.wc.join("a_file") - p.write("test file") - p.add() - rev = self.wc.commit("some test") - assert p.info().rev == 1 - assert rev == 1 - rev = self.wc.commit() - assert rev is None - diff --git a/py/path/svn/urlcommand.py b/py/path/svnurl.py similarity index 89% rename from py/path/svn/urlcommand.py rename to py/path/svnurl.py index 8fea634ff..ae8f34bb0 100644 --- a/py/path/svn/urlcommand.py +++ b/py/path/svnurl.py @@ -1,16 +1,14 @@ """ - module defining a subversion path object based on the external command 'svn'. This modules aims to work with svn 1.3 and higher but might also interact well with earlier versions. - """ -import os, sys, time, re, calendar +import os, sys, time, re import py from py import path, process from py.__.path import common -from py.__.path.svn import svncommon +from py.__.path import svnwc as svncommon from py.__.misc.cache import BuildcostAccessCache, AgingCache DEBUG=False @@ -253,10 +251,10 @@ rev_end is the last revision (defaulting to HEAD). if verbose is True, then the LogEntry instances also know which files changed. """ assert self.check() #make it simpler for the pipe - rev_start = rev_start is None and _Head or rev_start - rev_end = rev_end is None and _Head or rev_end + rev_start = rev_start is None and "HEAD" or rev_start + rev_end = rev_end is None and "HEAD" or rev_end - if rev_start is _Head and rev_end == 1: + if rev_start == "HEAD" and rev_end == 1: rev_opt = "" else: rev_opt = "-r %s:%s" % (rev_start, rev_end) @@ -268,7 +266,7 @@ if verbose is True, then the LogEntry instances also know which files changed. result = [] for logentry in filter(None, tree.firstChild.childNodes): if logentry.nodeType == logentry.ELEMENT_NODE: - result.append(LogEntry(logentry)) + result.append(svncommon.LogEntry(logentry)) return result #01234567890123456789012345678901234567890123467 @@ -313,6 +311,7 @@ def parse_time_with_missing_year(timestr): the svn output doesn't show the year makes the 'timestr' ambigous. """ + import calendar t_now = time.gmtime() tparts = timestr.split() @@ -341,31 +340,3 @@ class PathEntry: if self.copyfrom_path: self.copyfrom_rev = int(ppart.getAttribute('copyfrom-rev')) -class LogEntry: - def __init__(self, logentry): - self.rev = int(logentry.getAttribute('revision')) - for lpart in filter(None, logentry.childNodes): - if lpart.nodeType == lpart.ELEMENT_NODE: - if lpart.nodeName == u'author': - self.author = lpart.firstChild.nodeValue.encode('UTF-8') - elif lpart.nodeName == u'msg': - if lpart.firstChild: - self.msg = lpart.firstChild.nodeValue.encode('UTF-8') - else: - self.msg = '' - elif lpart.nodeName == u'date': - #2003-07-29T20:05:11.598637Z - timestr = lpart.firstChild.nodeValue.encode('UTF-8') - self.date = svncommon.parse_apr_time(timestr) - elif lpart.nodeName == u'paths': - self.strpaths = [] - for ppart in filter(None, lpart.childNodes): - if ppart.nodeType == ppart.ELEMENT_NODE: - self.strpaths.append(PathEntry(ppart)) - def __repr__(self): - return '' % ( - self.rev, self.author, self.date) - - -_Head = "HEAD" - diff --git a/py/path/svn/wccommand.py b/py/path/svnwc.py similarity index 64% rename from py/path/svn/wccommand.py rename to py/path/svnwc.py index aaf0ec070..f73f791cc 100644 --- a/py/path/svn/wccommand.py +++ b/py/path/svnwc.py @@ -1,20 +1,442 @@ """ - svn-Command based Implementation of a Subversion WorkingCopy Path. SvnWCCommandPath is the main class. - SvnWC is an alias to this class. - """ import os, sys, time, re, calendar import py from py.__.path import common -from py.__.path.svn import cache -from py.__.path.svn import svncommon -DEBUG = 0 +#----------------------------------------------------------- +# Caching latest repository revision and repo-paths +# (getting them is slow with the current implementations) +# +# XXX make mt-safe +#----------------------------------------------------------- + +class cache: + proplist = {} + info = {} + entries = {} + prop = {} + +class RepoEntry: + def __init__(self, url, rev, timestamp): + self.url = url + self.rev = rev + self.timestamp = timestamp + + def __str__(self): + return "repo: %s;%s %s" %(self.url, self.rev, self.timestamp) + +class RepoCache: + """ The Repocache manages discovered repository paths + and their revisions. If inside a timeout the cache + will even return the revision of the root. + """ + timeout = 20 # seconds after which we forget that we know the last revision + + def __init__(self): + self.repos = [] + + def clear(self): + self.repos = [] + + def put(self, url, rev, timestamp=None): + if rev is None: + return + if timestamp is None: + timestamp = time.time() + + for entry in self.repos: + if url == entry.url: + entry.timestamp = timestamp + entry.rev = rev + #print "set repo", entry + break + else: + entry = RepoEntry(url, rev, timestamp) + self.repos.append(entry) + #print "appended repo", entry + + def get(self, url): + now = time.time() + for entry in self.repos: + if url.startswith(entry.url): + if now < entry.timestamp + self.timeout: + #print "returning immediate Etrny", entry + return entry.url, entry.rev + return entry.url, -1 + return url, -1 + +repositories = RepoCache() + + +# svn support code + +ALLOWED_CHARS = "_ -/\\=$.~+" #add characters as necessary when tested +if sys.platform == "win32": + ALLOWED_CHARS += ":" +ALLOWED_CHARS_HOST = ALLOWED_CHARS + '@:' + +def _getsvnversion(ver=[]): + try: + return ver[0] + except IndexError: + v = py.process.cmdexec("svn -q --version") + v.strip() + v = '.'.join(v.split('.')[:2]) + ver.append(v) + return v + +def _escape_helper(text): + text = str(text) + if py.std.sys.platform != 'win32': + text = str(text).replace('$', '\\$') + return text + +def _check_for_bad_chars(text, allowed_chars=ALLOWED_CHARS): + for c in str(text): + if c.isalnum(): + continue + if c in allowed_chars: + continue + return True + return False + +def checkbadchars(url): + # (hpk) not quite sure about the exact purpose, guido w.? + proto, uri = url.split("://", 1) + if proto != "file": + host, uripath = uri.split('/', 1) + # only check for bad chars in the non-protocol parts + if (_check_for_bad_chars(host, ALLOWED_CHARS_HOST) \ + or _check_for_bad_chars(uripath, ALLOWED_CHARS)): + raise ValueError("bad char in %r" % (url, )) + + +#_______________________________________________________________ + +class SvnPathBase(common.PathBase): + """ Base implementation for SvnPath implementations. """ + sep = '/' + + def _geturl(self): + return self.strpath + url = property(_geturl, None, None, "url of this svn-path.") + + def __str__(self): + """ return a string representation (including rev-number) """ + return self.strpath + + def __hash__(self): + return hash(self.strpath) + + def new(self, **kw): + """ create a modified version of this path. A 'rev' argument + indicates a new revision. + the following keyword arguments modify various path parts: + + http://host.com/repo/path/file.ext + |-----------------------| dirname + |------| basename + |--| purebasename + |--| ext + """ + obj = object.__new__(self.__class__) + obj.rev = kw.get('rev', self.rev) + obj.auth = kw.get('auth', self.auth) + dirname, basename, purebasename, ext = self._getbyspec( + "dirname,basename,purebasename,ext") + if 'basename' in kw: + if 'purebasename' in kw or 'ext' in kw: + raise ValueError("invalid specification %r" % kw) + else: + pb = kw.setdefault('purebasename', purebasename) + ext = kw.setdefault('ext', ext) + if ext and not ext.startswith('.'): + ext = '.' + ext + kw['basename'] = pb + ext + + kw.setdefault('dirname', dirname) + kw.setdefault('sep', self.sep) + if kw['basename']: + obj.strpath = "%(dirname)s%(sep)s%(basename)s" % kw + else: + obj.strpath = "%(dirname)s" % kw + return obj + + def _getbyspec(self, spec): + """ get specified parts of the path. 'arg' is a string + with comma separated path parts. The parts are returned + in exactly the order of the specification. + + you may specify the following parts: + + http://host.com/repo/path/file.ext + |-----------------------| dirname + |------| basename + |--| purebasename + |--| ext + """ + res = [] + parts = self.strpath.split(self.sep) + for name in spec.split(','): + name = name.strip() + if name == 'dirname': + res.append(self.sep.join(parts[:-1])) + elif name == 'basename': + res.append(parts[-1]) + else: + basename = parts[-1] + i = basename.rfind('.') + if i == -1: + purebasename, ext = basename, '' + else: + purebasename, ext = basename[:i], basename[i:] + if name == 'purebasename': + res.append(purebasename) + elif name == 'ext': + res.append(ext) + else: + raise NameError, "Don't know part %r" % name + return res + + def __eq__(self, other): + """ return true if path and rev attributes each match """ + return (str(self) == str(other) and + (self.rev == other.rev or self.rev == other.rev)) + + def __ne__(self, other): + return not self == other + + def join(self, *args): + """ return a new Path (with the same revision) which is composed + of the self Path followed by 'args' path components. + """ + if not args: + return self + + args = tuple([arg.strip(self.sep) for arg in args]) + parts = (self.strpath, ) + args + newpath = self.__class__(self.sep.join(parts), self.rev, self.auth) + return newpath + + def propget(self, name): + """ return the content of the given property. """ + value = self._propget(name) + return value + + def proplist(self): + """ list all property names. """ + content = self._proplist() + return content + + def listdir(self, fil=None, sort=None): + """ list directory contents, possibly filter by the given fil func + and possibly sorted. + """ + if isinstance(fil, str): + fil = common.FNMatcher(fil) + nameinfo_seq = self._listdir_nameinfo() + if len(nameinfo_seq) == 1: + name, info = nameinfo_seq[0] + if name == self.basename and info.kind == 'file': + #if not self.check(dir=1): + raise py.error.ENOTDIR(self) + paths = self._make_path_tuple(nameinfo_seq) + + if fil or sort: + paths = filter(fil, paths) + paths = isinstance(paths, list) and paths or list(paths) + if callable(sort): + paths.sort(sort) + elif sort: + paths.sort() + return paths + + def info(self): + """ return an Info structure with svn-provided information. """ + parent = self.dirpath() + nameinfo_seq = parent._listdir_nameinfo() + bn = self.basename + for name, info in nameinfo_seq: + if name == bn: + return info + raise py.error.ENOENT(self) + + def size(self): + """ Return the size of the file content of the Path. """ + return self.info().size + + def mtime(self): + """ Return the last modification time of the file. """ + return self.info().mtime + + # shared help methods + + def _escape(self, cmd): + return _escape_helper(cmd) + + def _make_path_tuple(self, nameinfo_seq): + """ return a tuple of paths from a nameinfo-tuple sequence. + """ + #assert self.rev is not None, "revision of %s should not be None here" % self + res = [] + for name, info in nameinfo_seq: + child = self.join(name) + res.append(child) + return tuple(res) + + + def _childmaxrev(self): + """ return maximum revision number of childs (or self.rev if no childs) """ + rev = self.rev + for name, info in self._listdir_nameinfo(): + rev = max(rev, info.created_rev) + return rev + + #def _getlatestrevision(self): + # """ return latest repo-revision for this path. """ + # url = self.strpath + # path = self.__class__(url, None) + # + # # we need a long walk to find the root-repo and revision + # while 1: + # try: + # rev = max(rev, path._childmaxrev()) + # previous = path + # path = path.dirpath() + # except (IOError, process.cmdexec.Error): + # break + # if rev is None: + # raise IOError, "could not determine newest repo revision for %s" % self + # return rev + + class Checkers(common.Checkers): + def dir(self): + try: + return self.path.info().kind == 'dir' + except py.error.Error: + return self._listdirworks() + + def _listdirworks(self): + try: + self.path.listdir() + except py.error.ENOENT: + return False + else: + return True + + def file(self): + try: + return self.path.info().kind == 'file' + except py.error.ENOENT: + return False + + def exists(self): + try: + return self.path.info() + except py.error.ENOENT: + return self._listdirworks() + +def parse_apr_time(timestr): + i = timestr.rfind('.') + if i == -1: + raise ValueError, "could not parse %s" % timestr + timestr = timestr[:i] + parsedtime = time.strptime(timestr, "%Y-%m-%dT%H:%M:%S") + return time.mktime(parsedtime) + +class PropListDict(dict): + """ a Dictionary which fetches values (InfoSvnCommand instances) lazily""" + def __init__(self, path, keynames): + dict.__init__(self, [(x, None) for x in keynames]) + self.path = path + + def __getitem__(self, key): + value = dict.__getitem__(self, key) + if value is None: + value = self.path.propget(key) + dict.__setitem__(self, key, value) + return value + +def fixlocale(): + if sys.platform != 'win32': + return 'LC_ALL=C ' + return '' + +# some nasty chunk of code to solve path and url conversion and quoting issues +ILLEGAL_CHARS = '* | \ / : < > ? \t \n \x0b \x0c \r'.split(' ') +if os.sep in ILLEGAL_CHARS: + ILLEGAL_CHARS.remove(os.sep) +ISWINDOWS = sys.platform == 'win32' +_reg_allow_disk = re.compile(r'^([a-z]\:\\)?[^:]+$', re.I) +def _check_path(path): + illegal = ILLEGAL_CHARS[:] + sp = path.strpath + if ISWINDOWS: + illegal.remove(':') + if not _reg_allow_disk.match(sp): + raise ValueError('path may not contain a colon (:)') + for char in sp: + if char not in string.printable or char in illegal: + raise ValueError('illegal character %r in path' % (char,)) + +def path_to_fspath(path, addat=True): + _check_path(path) + sp = path.strpath + if addat and path.rev != -1: + sp = '%s@%s' % (sp, path.rev) + elif addat: + sp = '%s@HEAD' % (sp,) + return sp + +def url_from_path(path): + fspath = path_to_fspath(path, False) + quote = py.std.urllib.quote + if ISWINDOWS: + match = _reg_allow_disk.match(fspath) + fspath = fspath.replace('\\', '/') + if match.group(1): + fspath = '/%s%s' % (match.group(1).replace('\\', '/'), + quote(fspath[len(match.group(1)):])) + else: + fspath = quote(fspath) + else: + fspath = quote(fspath) + if path.rev != -1: + fspath = '%s@%s' % (fspath, path.rev) + else: + fspath = '%s@HEAD' % (fspath,) + return 'file://%s' % (fspath,) + +class SvnAuth(object): + """ container for auth information for Subversion """ + def __init__(self, username, password, cache_auth=True, interactive=True): + self.username = username + self.password = password + self.cache_auth = cache_auth + self.interactive = interactive + + def makecmdoptions(self): + uname = self.username.replace('"', '\\"') + passwd = self.password.replace('"', '\\"') + ret = [] + if uname: + ret.append('--username="%s"' % (uname,)) + if passwd: + ret.append('--password="%s"' % (passwd,)) + if not self.cache_auth: + ret.append('--no-auth-cache') + if not self.interactive: + ret.append('--non-interactive') + return ' '.join(ret) + + def __str__(self): + return "" %(self.username,) rex_blame = re.compile(r'\s*(\d+)\s*(\S+) (.*)') @@ -31,8 +453,8 @@ class SvnWCCommandPath(common.PathBase): if wcpath.__class__ == cls: return wcpath wcpath = wcpath.localpath - if svncommon._check_for_bad_chars(str(wcpath), - svncommon.ALLOWED_CHARS): + if _check_for_bad_chars(str(wcpath), + ALLOWED_CHARS): raise ValueError("bad char in wcpath %s" % (wcpath, )) self.localpath = py.path.local(wcpath) self.auth = auth @@ -53,7 +475,7 @@ class SvnWCCommandPath(common.PathBase): url = property(_geturl, None, None, "url of this WC item") def _escape(self, cmd): - return svncommon._escape_helper(cmd) + return _escape_helper(cmd) def dump(self, obj): """ pickle object into path location""" @@ -86,9 +508,7 @@ class SvnWCCommandPath(common.PathBase): l.extend(args) l.append('"%s"' % self._escape(self.strpath)) # try fixing the locale because we can't otherwise parse - string = svncommon.fixlocale() + " ".join(l) - if DEBUG: - print "execing", string + string = fixlocale() + " ".join(l) try: try: key = 'LC_MESSAGES' @@ -122,10 +542,10 @@ class SvnWCCommandPath(common.PathBase): url = self.url if rev is None or rev == -1: if (py.std.sys.platform != 'win32' and - svncommon._getsvnversion() == '1.3'): + _getsvnversion() == '1.3'): url += "@HEAD" else: - if svncommon._getsvnversion() == '1.3': + if _getsvnversion() == '1.3': url += "@%d" % rev else: args.append('-r' + str(rev)) @@ -280,7 +700,8 @@ class SvnWCCommandPath(common.PathBase): def blame(self): """ return a list of tuples of three elements: -(revision, commiter, line)""" + (revision, commiter, line) + """ out = self._svn('blame') result = [] blamelines = out.splitlines() @@ -298,7 +719,6 @@ class SvnWCCommandPath(common.PathBase): _rex_commit = re.compile(r'.*Committed revision (\d+)\.$', re.DOTALL) def commit(self, msg='', rec=1): """ commit with support for non-recursive commits """ - from py.__.path.svn import cache # XXX i guess escaping should be done better here?!? cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),) if not rec: @@ -343,7 +763,7 @@ If rec is True, then return a dictionary mapping sub-paths to such mappings. res = self._svn('proplist') lines = res.split('\n') lines = map(str.strip, lines[1:]) - return svncommon.PropListDict(self, lines) + return PropListDict(self, lines) def revert(self, rec=0): """ revert the local changes of this path. if rec is True, do so @@ -466,17 +886,15 @@ rev_start is the starting revision (defaulting to the first one). rev_end is the last revision (defaulting to HEAD). if verbose is True, then the LogEntry instances also know which files changed. """ - from py.__.path.svn.urlcommand import _Head, LogEntry assert self.check() # make it simpler for the pipe - rev_start = rev_start is None and _Head or rev_start - rev_end = rev_end is None and _Head or rev_end - - if rev_start is _Head and rev_end == 1: + rev_start = rev_start is None and "HEAD" or rev_start + rev_end = rev_end is None and "HEAD" or rev_end + if rev_start == "HEAD" and rev_end == 1: rev_opt = "" else: rev_opt = "-r %s:%s" % (rev_start, rev_end) verbose_opt = verbose and "-v" or "" - locale_env = svncommon.fixlocale() + locale_env = fixlocale() # some blather on stderr auth_opt = self._makeauthoptions() stdin, stdout, stderr = os.popen3(locale_env + @@ -807,7 +1225,7 @@ def make_recursive_propdict(wcroot, propname = lines.pop(0).strip() propnames.append(propname) assert propnames, "must have found properties!" - pdict[wcpath] = svncommon.PropListDict(wcpath, propnames) + pdict[wcpath] = PropListDict(wcpath, propnames) return pdict def error_enhance((cls, error, tb)): @@ -820,3 +1238,30 @@ def importxml(cache=[]): from xml.parsers.expat import ExpatError cache.extend([minidom, ExpatError]) return cache + +class LogEntry: + def __init__(self, logentry): + self.rev = int(logentry.getAttribute('revision')) + for lpart in filter(None, logentry.childNodes): + if lpart.nodeType == lpart.ELEMENT_NODE: + if lpart.nodeName == u'author': + self.author = lpart.firstChild.nodeValue.encode('UTF-8') + elif lpart.nodeName == u'msg': + if lpart.firstChild: + self.msg = lpart.firstChild.nodeValue.encode('UTF-8') + else: + self.msg = '' + elif lpart.nodeName == u'date': + #2003-07-29T20:05:11.598637Z + timestr = lpart.firstChild.nodeValue.encode('UTF-8') + self.date = parse_apr_time(timestr) + elif lpart.nodeName == u'paths': + self.strpaths = [] + for ppart in filter(None, lpart.childNodes): + if ppart.nodeType == ppart.ELEMENT_NODE: + self.strpaths.append(PathEntry(ppart)) + def __repr__(self): + return '' % ( + self.rev, self.author, self.date) + + diff --git a/py/path/svn/testing/repotest.dump b/py/path/testing/repotest.dump similarity index 100% rename from py/path/svn/testing/repotest.dump rename to py/path/testing/repotest.dump diff --git a/py/path/svn/testing/svntestbase.py b/py/path/testing/svntestbase.py similarity index 96% rename from py/path/svn/testing/svntestbase.py rename to py/path/testing/svntestbase.py index 410b32c0a..7b3cbaf5f 100644 --- a/py/path/svn/testing/svntestbase.py +++ b/py/path/testing/svntestbase.py @@ -2,7 +2,7 @@ import sys import py from py import path, test, process from py.__.path.testing.fscommon import CommonFSTests, setuptestfs -from py.__.path.svn import cache, svncommon +from py.__.path import svnwc as svncommon mypath = py.magic.autopath() repodump = mypath.dirpath('repotest.dump') @@ -66,6 +66,7 @@ def restore_repowc((savedrepo, savedwc)): # create an empty repository for testing purposes and return the url to it def make_test_repo(name="test-repository"): + getsvnbin() repo = py.test.ensuretemp(name) try: py.process.cmdexec('svnadmin create %s' % repo) @@ -149,14 +150,14 @@ class CommonCommandAndBindingTests(CommonSvnTests): # the following tests are easier if we have a path class def test_repocache_simple(self): - repocache = cache.RepoCache() + repocache = svncommon.RepoCache() repocache.put(self.root.strpath, 42) url, rev = repocache.get(self.root.join('test').strpath) assert rev == 42 assert url == self.root.strpath def test_repocache_notimeout(self): - repocache = cache.RepoCache() + repocache = svncommon.RepoCache() repocache.timeout = 0 repocache.put(self.root.strpath, self.root.rev) url, rev = repocache.get(self.root.strpath) @@ -164,7 +165,7 @@ class CommonCommandAndBindingTests(CommonSvnTests): assert url == self.root.strpath def test_repocache_outdated(self): - repocache = cache.RepoCache() + repocache = svncommon.RepoCache() repocache.put(self.root.strpath, 42, timestamp=0) url, rev = repocache.get(self.root.join('test').strpath) assert rev == -1 @@ -172,7 +173,7 @@ class CommonCommandAndBindingTests(CommonSvnTests): def _test_getreporev(self): """ this test runs so slow it's usually disabled """ - old = cache.repositories.repos + old = svncommon.repositories.repos try: _repocache.clear() root = self.root.new(rev=-1) diff --git a/py/path/svn/testing/test_auth.py b/py/path/testing/test_svnauth.py similarity index 100% rename from py/path/svn/testing/test_auth.py rename to py/path/testing/test_svnauth.py diff --git a/py/path/svn/testing/test_urlcommand.py b/py/path/testing/test_svnurl.py similarity index 97% rename from py/path/svn/testing/test_urlcommand.py rename to py/path/testing/test_svnurl.py index 966d6c738..72cb93b4b 100644 --- a/py/path/svn/testing/test_urlcommand.py +++ b/py/path/testing/test_svnurl.py @@ -1,6 +1,6 @@ import py -from py.__.path.svn.urlcommand import InfoSvnCommand -from py.__.path.svn.testing.svntestbase import CommonCommandAndBindingTests, \ +from py.__.path.svnurl import InfoSvnCommand +from py.__.path.testing.svntestbase import CommonCommandAndBindingTests, \ getrepowc, getsvnbin import datetime import time diff --git a/py/path/svn/testing/test_wccommand.py b/py/path/testing/test_svnwc.py similarity index 95% rename from py/path/svn/testing/test_wccommand.py rename to py/path/testing/test_svnwc.py index 5fe58a656..1e57b04c7 100644 --- a/py/path/svn/testing/test_wccommand.py +++ b/py/path/testing/test_svnwc.py @@ -1,9 +1,8 @@ import py import sys -from py.__.path.svn.testing.svntestbase import CommonSvnTests, getrepowc, getsvnbin -from py.__.path.svn.wccommand import InfoSvnWCCommand, XMLWCStatus -from py.__.path.svn.wccommand import parse_wcinfotime -from py.__.path.svn import svncommon +from py.__.path.testing.svntestbase import CommonSvnTests, getrepowc, getsvnbin, make_test_repo +from py.__.path.svnwc import InfoSvnWCCommand, XMLWCStatus, parse_wcinfotime +from py.__.path import svnwc as svncommon if sys.platform != 'win32': def normpath(p): @@ -23,6 +22,27 @@ else: def setup_module(mod): getsvnbin() +class TestMakeRepo(object): + def setup_class(cls): + cls.repo = make_test_repo() + cls.wc = py.path.svnwc(py.test.ensuretemp("test-wc").join("wc")) + + def test_empty_checkout(self): + self.wc.checkout(self.repo) + assert len(self.wc.listdir()) == 0 + + def test_commit(self): + self.wc.checkout(self.repo) + p = self.wc.join("a_file") + p.write("test file") + p.add() + rev = self.wc.commit("some test") + assert p.info().rev == 1 + assert rev == 1 + rev = self.wc.commit() + assert rev is None + + class TestWCSvnCommandPath(CommonSvnTests): def setup_class(cls): repo, cls.root = getrepowc() @@ -427,8 +447,8 @@ class TestInfoSvnWCCommand: def test_svn_1_2(self): output = """ - Path: test_wccommand.py - Name: test_wccommand.py + Path: test_svnwc.py + Name: test_svnwc.py URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada Revision: 28137 @@ -454,8 +474,8 @@ class TestInfoSvnWCCommand: def test_svn_1_3(self): output = """ - Path: test_wccommand.py - Name: test_wccommand.py + Path: test_svnwc.py + Name: test_svnwc.py URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py Repository Root: http://codespeak.net/svn Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada