377 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
| """
 | |
| module defining a subversion path object based on the external
 | |
| command 'svn'. This modules aims to work with svn 1.3 and higher
 | |
| but might also interact well with earlier versions. 
 | |
| """
 | |
| 
 | |
| import os, sys, time, re
 | |
| import py
 | |
| from py import path, process
 | |
| from py.impl.path import common
 | |
| from py.impl.path import svnwc as svncommon
 | |
| from py.impl.path.cacheutil import BuildcostAccessCache, AgingCache
 | |
| 
 | |
| DEBUG=False 
 | |
| 
 | |
| class SvnCommandPath(svncommon.SvnPathBase):
 | |
|     """ path implementation that offers access to (possibly remote) subversion
 | |
|     repositories. """
 | |
| 
 | |
|     _lsrevcache = BuildcostAccessCache(maxentries=128)
 | |
|     _lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0)
 | |
| 
 | |
|     def __new__(cls, path, rev=None, auth=None):
 | |
|         self = object.__new__(cls)
 | |
|         if isinstance(path, cls): 
 | |
|             rev = path.rev 
 | |
|             auth = path.auth
 | |
|             path = path.strpath 
 | |
|         svncommon.checkbadchars(path)
 | |
|         path = path.rstrip('/')
 | |
|         self.strpath = path
 | |
|         self.rev = rev
 | |
|         self.auth = auth
 | |
|         return self
 | |
| 
 | |
|     def __repr__(self):
 | |
|         if self.rev == -1:
 | |
|             return 'svnurl(%r)' % self.strpath
 | |
|         else:
 | |
|             return 'svnurl(%r, %r)' % (self.strpath, self.rev)
 | |
| 
 | |
|     def _svnwithrev(self, cmd, *args):
 | |
|         """ execute an svn command, append our own url and revision """
 | |
|         if self.rev is None:
 | |
|             return self._svnwrite(cmd, *args)
 | |
|         else:
 | |
|             args = ['-r', self.rev] + list(args)
 | |
|             return self._svnwrite(cmd, *args)
 | |
| 
 | |
|     def _svnwrite(self, cmd, *args):
 | |
|         """ execute an svn command, append our own url """
 | |
|         l = ['svn %s' % cmd]
 | |
|         args = ['"%s"' % self._escape(item) for item in args]
 | |
|         l.extend(args)
 | |
|         l.append('"%s"' % self._encodedurl())
 | |
|         # fixing the locale because we can't otherwise parse
 | |
|         string = " ".join(l)
 | |
|         if DEBUG:
 | |
|             print("execing %s" % string)
 | |
|         out = self._svncmdexecauth(string)
 | |
|         return out
 | |
| 
 | |
|     def _svncmdexecauth(self, cmd):
 | |
|         """ execute an svn command 'as is' """
 | |
|         cmd = svncommon.fixlocale() + cmd
 | |
|         if self.auth is not None:
 | |
|             cmd += ' ' + self.auth.makecmdoptions()
 | |
|         return self._cmdexec(cmd)
 | |
| 
 | |
|     def _cmdexec(self, cmd):
 | |
|         try:
 | |
|             out = process.cmdexec(cmd)
 | |
|         except py.process.cmdexec.Error:
 | |
|             e = sys.exc_info()[1]
 | |
|             if (e.err.find('File Exists') != -1 or
 | |
|                             e.err.find('File already exists') != -1):
 | |
|                 raise py.error.EEXIST(self)
 | |
|             raise
 | |
|         return out
 | |
| 
 | |
|     def _svnpopenauth(self, cmd):
 | |
|         """ execute an svn command, return a pipe for reading stdin """
 | |
|         cmd = svncommon.fixlocale() + cmd
 | |
|         if self.auth is not None:
 | |
|             cmd += ' ' + self.auth.makecmdoptions()
 | |
|         return self._popen(cmd)
 | |
| 
 | |
|     def _popen(self, cmd):
 | |
|         return os.popen(cmd)
 | |
| 
 | |
|     def _encodedurl(self):
 | |
|         return self._escape(self.strpath)
 | |
| 
 | |
|     def _norev_delentry(self, path):
 | |
|         auth = self.auth and self.auth.makecmdoptions() or None
 | |
|         self._lsnorevcache.delentry((str(path), auth))
 | |
| 
 | |
|     def open(self, mode='r'):
 | |
|         """ return an opened file with the given mode. """
 | |
|         if mode not in ("r", "rU",): 
 | |
|             raise ValueError("mode %r not supported" % (mode,))
 | |
|         assert self.check(file=1) # svn cat returns an empty file otherwise
 | |
|         if self.rev is None:
 | |
|             return self._svnpopenauth('svn cat "%s"' % (
 | |
|                                       self._escape(self.strpath), ))
 | |
|         else:
 | |
|             return self._svnpopenauth('svn cat -r %s "%s"' % (
 | |
|                                       self.rev, self._escape(self.strpath)))
 | |
| 
 | |
|     def dirpath(self, *args, **kwargs):
 | |
|         """ return the directory path of the current path joined
 | |
|             with any given path arguments.
 | |
|         """
 | |
|         l = self.strpath.split(self.sep) 
 | |
|         if len(l) < 4: 
 | |
|             raise py.error.EINVAL(self, "base is not valid") 
 | |
|         elif len(l) == 4: 
 | |
|             return self.join(*args, **kwargs) 
 | |
|         else: 
 | |
|             return self.new(basename='').join(*args, **kwargs)
 | |
| 
 | |
|     # modifying methods (cache must be invalidated)
 | |
|     def mkdir(self, *args, **kwargs):
 | |
|         """ create & return the directory joined with args. 
 | |
|         pass a 'msg' keyword argument to set the commit message.
 | |
|         """
 | |
|         commit_msg = kwargs.get('msg', "mkdir by py lib invocation")
 | |
|         createpath = self.join(*args)
 | |
|         createpath._svnwrite('mkdir', '-m', commit_msg)
 | |
|         self._norev_delentry(createpath.dirpath())
 | |
|         return createpath
 | |
| 
 | |
|     def copy(self, target, msg='copied by py lib invocation'):
 | |
|         """ copy path to target with checkin message msg."""
 | |
|         if getattr(target, 'rev', None) is not None:
 | |
|             raise py.error.EINVAL(target, "revisions are immutable")
 | |
|         self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg,
 | |
|                              self._escape(self), self._escape(target)))
 | |
|         self._norev_delentry(target.dirpath())
 | |
| 
 | |
|     def rename(self, target, msg="renamed by py lib invocation"):
 | |
|         """ rename this path to target with checkin message msg. """
 | |
|         if getattr(self, 'rev', None) is not None:
 | |
|             raise py.error.EINVAL(self, "revisions are immutable")
 | |
|         self._svncmdexecauth('svn move -m "%s" --force "%s" "%s"' %(
 | |
|                              msg, self._escape(self), self._escape(target)))
 | |
|         self._norev_delentry(self.dirpath())
 | |
|         self._norev_delentry(self)
 | |
| 
 | |
|     def remove(self, rec=1, msg='removed by py lib invocation'):
 | |
|         """ remove a file or directory (or a directory tree if rec=1) with
 | |
| checkin message msg."""
 | |
|         if self.rev is not None:
 | |
|             raise py.error.EINVAL(self, "revisions are immutable")
 | |
|         self._svncmdexecauth('svn rm -m "%s" "%s"' %(msg, self._escape(self)))
 | |
|         self._norev_delentry(self.dirpath())
 | |
| 
 | |
|     def export(self, topath):
 | |
|         """ export to a local path
 | |
| 
 | |
|             topath should not exist prior to calling this, returns a
 | |
|             py.path.local instance
 | |
|         """
 | |
|         topath = py.path.local(topath)
 | |
|         args = ['"%s"' % (self._escape(self),),
 | |
|                 '"%s"' % (self._escape(topath),)]
 | |
|         if self.rev is not None:
 | |
|             args = ['-r', str(self.rev)] + args
 | |
|         self._svncmdexecauth('svn export %s' % (' '.join(args),))
 | |
|         return topath
 | |
| 
 | |
|     def ensure(self, *args, **kwargs):
 | |
|         """ ensure that an args-joined path exists (by default as
 | |
|             a file). If you specify a keyword argument 'dir=True'
 | |
|             then the path is forced to be a directory path.
 | |
|         """
 | |
|         if getattr(self, 'rev', None) is not None:
 | |
|             raise py.error.EINVAL(self, "revisions are immutable")
 | |
|         target = self.join(*args)
 | |
|         dir = kwargs.get('dir', 0) 
 | |
|         for x in target.parts(reverse=True): 
 | |
|             if x.check(): 
 | |
|                 break 
 | |
|         else: 
 | |
|             raise py.error.ENOENT(target, "has not any valid base!") 
 | |
|         if x == target: 
 | |
|             if not x.check(dir=dir): 
 | |
|                 raise dir and py.error.ENOTDIR(x) or py.error.EISDIR(x) 
 | |
|             return x 
 | |
|         tocreate = target.relto(x) 
 | |
|         basename = tocreate.split(self.sep, 1)[0]
 | |
|         tempdir = py.path.local.mkdtemp()
 | |
|         try:    
 | |
|             tempdir.ensure(tocreate, dir=dir) 
 | |
|             cmd = 'svn import -m "%s" "%s" "%s"' % (
 | |
|                     "ensure %s" % self._escape(tocreate), 
 | |
|                     self._escape(tempdir.join(basename)), 
 | |
|                     x.join(basename)._encodedurl())
 | |
|             self._svncmdexecauth(cmd) 
 | |
|             self._norev_delentry(x)
 | |
|         finally:    
 | |
|             tempdir.remove() 
 | |
|         return target
 | |
| 
 | |
|     # end of modifying methods
 | |
|     def _propget(self, name):
 | |
|         res = self._svnwithrev('propget', name)
 | |
|         return res[:-1] # strip trailing newline
 | |
| 
 | |
|     def _proplist(self):
 | |
|         res = self._svnwithrev('proplist')
 | |
|         lines = res.split('\n')
 | |
|         lines = [x.strip() for x in lines[1:]]
 | |
|         return svncommon.PropListDict(self, lines)
 | |
| 
 | |
|     def info(self):
 | |
|         """ return an Info structure with svn-provided information. """
 | |
|         parent = self.dirpath()
 | |
|         nameinfo_seq = parent._listdir_nameinfo()
 | |
|         bn = self.basename
 | |
|         for name, info in nameinfo_seq:
 | |
|             if name == bn:
 | |
|                 return info
 | |
|         raise py.error.ENOENT(self)
 | |
| 
 | |
| 
 | |
|     def _listdir_nameinfo(self):
 | |
|         """ return sequence of name-info directory entries of self """
 | |
|         def builder():
 | |
|             try:
 | |
|                 res = self._svnwithrev('ls', '-v')
 | |
|             except process.cmdexec.Error:
 | |
|                 e = sys.exc_info()[1]
 | |
|                 if e.err.find('non-existent in that revision') != -1:
 | |
|                     raise py.error.ENOENT(self, e.err)
 | |
|                 elif e.err.find('File not found') != -1:
 | |
|                     raise py.error.ENOENT(self, e.err)
 | |
|                 elif e.err.find('not part of a repository')!=-1:
 | |
|                     raise py.error.ENOENT(self, e.err)
 | |
|                 elif e.err.find('Unable to open')!=-1:
 | |
|                     raise py.error.ENOENT(self, e.err)
 | |
|                 elif e.err.lower().find('method not allowed')!=-1:
 | |
|                     raise py.error.EACCES(self, e.err)
 | |
|                 raise py.error.Error(e.err)
 | |
|             lines = res.split('\n')
 | |
|             nameinfo_seq = []
 | |
|             for lsline in lines:
 | |
|                 if lsline:
 | |
|                     info = InfoSvnCommand(lsline)
 | |
|                     if info._name != '.':  # svn 1.5 produces '.' dirs, 
 | |
|                         nameinfo_seq.append((info._name, info))
 | |
|             nameinfo_seq.sort()
 | |
|             return nameinfo_seq
 | |
|         auth = self.auth and self.auth.makecmdoptions() or None
 | |
|         if self.rev is not None:
 | |
|             return self._lsrevcache.getorbuild((self.strpath, self.rev, auth),
 | |
|                                                builder)
 | |
|         else:
 | |
|             return self._lsnorevcache.getorbuild((self.strpath, auth),
 | |
|                                                  builder)
 | |
| 
 | |
|     def listdir(self, fil=None, sort=None):
 | |
|         """ list directory contents, possibly filter by the given fil func
 | |
|             and possibly sorted.
 | |
|         """
 | |
|         if isinstance(fil, str):
 | |
|             fil = common.FNMatcher(fil)
 | |
|         nameinfo_seq = self._listdir_nameinfo()
 | |
|         if len(nameinfo_seq) == 1:
 | |
|             name, info = nameinfo_seq[0]
 | |
|             if name == self.basename and info.kind == 'file':
 | |
|                 #if not self.check(dir=1):
 | |
|                 raise py.error.ENOTDIR(self)
 | |
|         paths = [self.join(name) for (name, info) in nameinfo_seq]
 | |
|         if fil:
 | |
|             paths = [x for x in paths if fil(x)]
 | |
|         self._sortlist(paths, sort)
 | |
|         return paths
 | |
| 
 | |
| 
 | |
|     def log(self, rev_start=None, rev_end=1, verbose=False):
 | |
|         """ return a list of LogEntry instances for this path.
 | |
| rev_start is the starting revision (defaulting to the first one).
 | |
| rev_end is the last revision (defaulting to HEAD).
 | |
| if verbose is True, then the LogEntry instances also know which files changed.
 | |
| """
 | |
|         assert self.check() #make it simpler for the pipe
 | |
|         rev_start = rev_start is None and "HEAD" or rev_start
 | |
|         rev_end = rev_end is None and "HEAD" or rev_end
 | |
| 
 | |
|         if rev_start == "HEAD" and rev_end == 1:
 | |
|             rev_opt = ""
 | |
|         else:
 | |
|             rev_opt = "-r %s:%s" % (rev_start, rev_end)
 | |
|         verbose_opt = verbose and "-v" or ""
 | |
|         xmlpipe =  self._svnpopenauth('svn log --xml %s %s "%s"' %
 | |
|                                       (rev_opt, verbose_opt, self.strpath))
 | |
|         from xml.dom import minidom
 | |
|         tree = minidom.parse(xmlpipe)
 | |
|         result = []
 | |
|         for logentry in filter(None, tree.firstChild.childNodes):
 | |
|             if logentry.nodeType == logentry.ELEMENT_NODE:
 | |
|                 result.append(svncommon.LogEntry(logentry))
 | |
|         return result
 | |
| 
 | |
| #01234567890123456789012345678901234567890123467
 | |
| #   2256      hpk        165 Nov 24 17:55 __init__.py
 | |
| # XXX spotted by Guido, SVN 1.3.0 has different aligning, breaks the code!!!
 | |
| #   1312 johnny           1627 May 05 14:32 test_decorators.py
 | |
| #
 | |
| class InfoSvnCommand:
 | |
|     # the '0?' part in the middle is an indication of whether the resource is
 | |
|     # locked, see 'svn help ls'
 | |
|     lspattern = re.compile(
 | |
|         r'^ *(?P<rev>\d+) +(?P<author>.+?) +(0? *(?P<size>\d+))? '
 | |
|             '*(?P<date>\w+ +\d{2} +[\d:]+) +(?P<file>.*)$')
 | |
|     def __init__(self, line):
 | |
|         # this is a typical line from 'svn ls http://...'
 | |
|         #_    1127      jum        0 Jul 13 15:28 branch/
 | |
|         match = self.lspattern.match(line)
 | |
|         data = match.groupdict()
 | |
|         self._name = data['file']
 | |
|         if self._name[-1] == '/':
 | |
|             self._name = self._name[:-1]
 | |
|             self.kind = 'dir'
 | |
|         else:
 | |
|             self.kind = 'file'
 | |
|         #self.has_props = l.pop(0) == 'P'
 | |
|         self.created_rev = int(data['rev'])
 | |
|         self.last_author = data['author']
 | |
|         self.size = data['size'] and int(data['size']) or 0
 | |
|         self.mtime = parse_time_with_missing_year(data['date'])
 | |
|         self.time = self.mtime * 1000000
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         return self.__dict__ == other.__dict__
 | |
| 
 | |
| 
 | |
| #____________________________________________________
 | |
| #
 | |
| # helper functions
 | |
| #____________________________________________________
 | |
| def parse_time_with_missing_year(timestr):
 | |
|     """ analyze the time part from a single line of "svn ls -v"
 | |
|     the svn output doesn't show the year makes the 'timestr'
 | |
|     ambigous.
 | |
|     """
 | |
|     import calendar
 | |
|     t_now = time.gmtime()
 | |
| 
 | |
|     tparts = timestr.split()
 | |
|     month = time.strptime(tparts.pop(0), '%b')[1]
 | |
|     day = time.strptime(tparts.pop(0), '%d')[2]
 | |
|     last = tparts.pop(0) # year or hour:minute
 | |
|     try:
 | |
|         year = time.strptime(last, '%Y')[0]
 | |
|         hour = minute = 0
 | |
|     except ValueError:
 | |
|         hour, minute = time.strptime(last, '%H:%M')[3:5]
 | |
|         year = t_now[0]
 | |
| 
 | |
|         t_result = (year, month, day, hour, minute, 0,0,0,0)
 | |
|         if t_result > t_now:
 | |
|             year -= 1
 | |
|     t_result = (year, month, day, hour, minute, 0,0,0,0)
 | |
|     return calendar.timegm(t_result)
 | |
| 
 | |
| class PathEntry:
 | |
|     def __init__(self, ppart):
 | |
|         self.strpath = ppart.firstChild.nodeValue.encode('UTF-8')
 | |
|         self.action = ppart.getAttribute('action').encode('UTF-8')
 | |
|         if self.action == 'A':
 | |
|             self.copyfrom_path = ppart.getAttribute('copyfrom-path').encode('UTF-8')
 | |
|             if self.copyfrom_path:
 | |
|                 self.copyfrom_rev = int(ppart.getAttribute('copyfrom-rev'))
 | |
| 
 |