[PATCH RFC] bundle2: add a part for pointers to changegroups

Mike Hommey mh at glandium.org
Tue Sep 2 00:21:49 CDT 2014


# HG changeset patch
# User Mike Hommey <mh at glandium.org>
# Date 1409632006 -32400
#      Tue Sep 02 13:26:46 2014 +0900
# Node ID 7a91b38219fb3f775eb5a710436a0ab9baafe650
# Parent  34de1241da516e7896394893e66c7ac8ed9f9787
bundle2: add a part for pointers to changegroups

Bundle2 opens doors to advanced features allowing to reduce load on
mercurial servers, and improve clone experience for users on unstable or
slow networks.

For instance, it could be possible to pre-generate a bundle of a
repository, and give a pointer to it to clients cloning the repository,
followed by another changegroup with the remainder. For significantly
big repositories, this could come as several base bundles with e.g. 10k
changesets, which, combined with checkpoints, would prevent users with
flaky networks from starting over any time their connection fails.

While the server-side support for those features doesn't exist yet, it
is preferable to have client-side support for this early-on.

Note the patch to the unit test doesn't apply without fuzz unless you
apply "bundle2: Add tests for multiple changegroup parts" first.

There are a few details to figure before this can be final:
- The current patch unconditionally uses hashlib for digest computation.
  I think it's sensible to have support for multiple digests (and to
  check all of them if that's what the server advertizes). The obvious
  problem is that hashlib is not supported on python 2.4. Python 2.4
  only supports md5 and sha1 out-of-the box, so we probably should just
  stick with those two, at least until python 2.4 support is phased out.
- Relatedly, it might be worthy adding the changegroup size along the
  checksums.
- Bikeshed on the part name. I'm not entirely fond of "changegroupurl",
  and see next point.
- I can think of another non-simple-url way to point to some other
  changegroup, that might be worth adding support for: a pointer to one
  or more changesets in a given repository. So instead of downloading a
  pre-generated bundle via http, the client would negociate a getbundle
  with that other repo. It's not entirely obvious how this could be
  useful, but it would allow us (Mozilla) to store, on S3, bundles of
  "patch sets" for our try server or review board, with a reference to
  the base it needs, which then can be applied on an existing clone of
  e.g. mozilla-central without having to use a full bundle (it would
  only have to get what it doesn't have) The question then becomes
  whether to create a new part for that, or to use the same part for
  both use cases and deal with the difference in the format.

Thoughts?

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -137,22 +137,25 @@ contains any uppercase char it is consid
 known for a Mandatory part, the process is aborted and an exception is raised.
 If the part is advisory and no handler is known, the part is ignored. When the
 process is aborted, the full bundle is still read from the stream to keep the
 channel usable. But none of the part read from an abort are processed. In the
 future, dropping the stream may become an option for channel we do not care to
 preserve.
 """
 
+import exchange
 import util
 import struct
 import urllib
 import string
 import obsolete
 import pushkey
+import urllib2
+import urlparse
 
 import changegroup, error
 from i18n import _
 
 _pack = struct.pack
 _unpack = struct.unpack
 
 _magicstring = 'HG2X'
@@ -815,35 +818,84 @@ def handlechangegroup(op, inpart):
     inflicted to any end-user.
     """
     # Make sure we trigger a transaction creation
     #
     # The addchangegroup function will get a transaction object by itself, but
     # we need to make sure we trigger the creation of a transaction object used
     # for the whole processing scope.
     op.gettransaction()
-    cg = changegroup.unbundle10(inpart, 'UN')
+    cg = exchange.readbundle(op.repo.ui, inpart, None)
     ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
     op.records.add('changegroup', {'return': ret})
     if op.reply is not None:
         # This is definitly not the final form of this
         # return. But one need to start somewhere.
         part = op.reply.newpart('b2x:reply:changegroup')
         part.addparam('in-reply-to', str(inpart.id), mandatory=False)
         part.addparam('return', '%i' % ret, mandatory=False)
     assert not inpart.read()
 
+class digestchecker(object):
+    """File handle wrapper that additionally checks content against hash
+    digests.
+
+        d = digestchecker(fh, {'md5': '...'})
+
+    When multiple digests are given, all of them are verified.
+    """
+    def __init__(self, fh, digests):
+        self._fh = fh
+        self._hashes = {}
+        self._digests = dict(digests)
+        for k in digests:
+            import hashlib
+            assert hasattr(hashlib, k)
+            self._hashes[k] = getattr(hashlib, k)()
+
+    def read(self, len=-1):
+        content = self._fh.read(len)
+        for h in self._hashes.values():
+            h.update(content)
+        return content
+
+    def validate(self):
+        for k, v in self._digests.items():
+            if v != self._hashes[k].hexdigest():
+                return False
+        return True
+
+ at parthandler('b2x:changegroupurl')
+def handlechangegroupurl(op, inpart):
+    """apply a changegroup on the repo, given a url to it
+    """
+    lines = inpart.read().splitlines()
+    url = lines.pop(0)
+    digests = {}
+    for line in lines:
+        assert ':' in line
+        k, v = line.split(':', 1)
+        digests[k.strip()] = v.strip()
+
+    parsed_url = urlparse.urlparse(url)
+    assert parsed_url.scheme in ('http', 'https')
+
+    real_part = digestchecker(urllib2.urlopen(url), digests)
+    handlechangegroup(op, real_part)
+    if not real_part.validate():
+        raise error.Abort('Changegroup at %s is corrupted' % url)
+
 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
-def handlechangegroup(op, inpart):
+def handlereplychangegroup(op, inpart):
     ret = int(inpart.params['return'])
     replyto = int(inpart.params['in-reply-to'])
     op.records.add('changegroup', {'return': ret}, replyto)
 
 @parthandler('b2x:check:heads')
-def handlechangegroup(op, inpart):
+def handlecheckheads(op, inpart):
     """check that head of the repo did not change
 
     This is used to detect a push race when using unbundle.
     This replaces the "heads" argument of unbundle."""
     h = inpart.read(20)
     heads = []
     while len(h) == 20:
         heads.append(h)
diff --git a/tests/test-bundle2.t b/tests/test-bundle2.t
--- a/tests/test-bundle2.t
+++ b/tests/test-bundle2.t
@@ -71,16 +71,17 @@ Create an extension to test bundle2 API
   > 
   > @command('bundle2',
   >          [('', 'param', [], 'stream level parameter'),
   >           ('', 'unknown', False, 'include an unknown mandatory part in the bundle'),
   >           ('', 'unknownparams', False, 'include an unknown part parameters in the bundle'),
   >           ('', 'parts', False, 'include some arbitrary parts to the bundle'),
   >           ('', 'reply', False, 'produce a reply bundle'),
   >           ('', 'pushrace', False, 'includes a check:head part with unknown nodes'),
+  >           ('', 'changegroupurl', '', 'includes a b2x:changegroupurl part'),
   >           ('r', 'rev', [], 'includes those changeset in the bundle'),],
   >          '[OUTPUTFILE]')
   > def cmdbundle2(ui, repo, path=None, **opts):
   >     """write a bundle2 container on standard ouput"""
   >     bundler = bundle2.bundle20(ui)
   >     for p in opts['param']:
   >         p = p.split('=', 1)
   >         try:
@@ -126,16 +127,18 @@ Create an extension to test bundle2 API
   >        # advisory known part with unknown mandatory param
   >        bundler.newpart('test:song', [('randomparam','')])
   >     if opts['unknown']:
   >        bundler.newpart('test:UNKNOWN', data='some random content')
   >     if opts['unknownparams']:
   >        bundler.newpart('test:SONG', [('randomparams', '')])
   >     if opts['parts']:
   >        bundler.newpart('test:ping')
+  >     if opts['changegroupurl']:
+  >        bundler.newpart('b2x:changegroupurl', data=opts['changegroupurl'])
   > 
   >     if path is None:
   >        file = sys.stdout
   >     else:
   >         file = open(path, 'wb')
   > 
   >     for chunk in bundler.getchunks():
   >         file.write(chunk)
@@ -857,17 +860,91 @@ with reply
   added 0 changesets with 0 changes to 2 files
   \x00\x00\x00\x00\x003\x15b2x:reply:changegroup\x00\x00\x00\x02\x00\x02\x0b\x01\x06\x01in-reply-to2return1\x00\x00\x00\x00\x00\x1f (esc)
   b2x:output\x00\x00\x00\x03\x00\x01\x0b\x01in-reply-to2\x00\x00\x00dadding changesets (esc)
   adding manifests
   adding file changes
   added 0 changesets with 0 changes to 2 files
   \x00\x00\x00\x00\x00\x00 (no-eol) (esc)
 
+Test changegroup urls
+
+  $ hg bundle --rev '8+7+5' --base 6 ../changegroup.hg
+  3 changesets found
+
   $ cd ..
+  $ cat > httpserver.py << EOF
+  > from SocketServer import ThreadingMixIn
+  > import BaseHTTPServer
+  > import os
+  > import SimpleHTTPServer
+  > import threading
+  > class HTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
+  >     def finish(self):
+  >         self.server.shutdown()
+  > class HTTPServer(ThreadingMixIn, BaseHTTPServer.HTTPServer): pass
+  > def serve_one():
+  >     server = HTTPServer(('localhost', $HGPORT), HTTPRequestHandler)
+  >     server.serve_forever()
+  > open('httpserver.pid', 'w').write('%d\n' % os.getpid())
+  > serve_one()
+  > EOF
+  $ cat > wait.py << EOF
+  > import time
+  > while True:
+  >     try:
+  >         if '\n' in open('httpserver.pid', 'r').read():
+  >             break
+  >     except IOError:
+  >         pass
+  >     time.sleep(0.2)
+  > EOF
+  $ (cd main && hg bundle2 --changegroupurl http://localhost:$HGPORT/changegroup.hg ../changegroupurl.hg2)
+  $ python httpserver.py 2> /dev/null &
+  $ python wait.py
+  $ (cd main && hg unbundle2 < ../changegroupurl.hg2)
+  adding changesets
+  adding manifests
+  adding file changes
+  added 0 changesets with 0 changes to 2 files
+  0 unread bytes
+  addchangegroup return: 1
+  $ "$TESTDIR/killdaemons.py" httpserver.pid
+  $ rm httpserver.pid
+
+Test with corruption check
+
+  $ (cd main && hg bundle2 --changegroupurl "`echo http://localhost:$HGPORT/changegroup.hg; echo md5: 44e5b1c7b57916f97f2bcc6b4bb55d0e; echo sha1: 51d32383164403798628b8bf3e066cbce451f694`" ../changegroupurl.hg2)
+  $ python httpserver.py 2> /dev/null &
+  $ python wait.py
+  $ (cd main && hg unbundle2 < ../changegroupurl.hg2)
+  adding changesets
+  adding manifests
+  adding file changes
+  added 0 changesets with 0 changes to 2 files
+  0 unread bytes
+  addchangegroup return: 1
+  $ "$TESTDIR/killdaemons.py" httpserver.pid
+
+Test with failed corruption check
+
+  $ (cd main && hg bundle2 --changegroupurl "`echo http://localhost:$HGPORT/changegroup.hg; echo md5: 00000000000000000000000000000000`" ../changegroupurl.hg2)
+  $ python httpserver.py 2> /dev/null &
+  $ python wait.py
+  $ (cd main && hg unbundle2 < ../changegroupurl.hg2)
+  adding changesets
+  adding manifests
+  adding file changes
+  added 0 changesets with 0 changes to 2 files
+  transaction abort!
+  rollback completed
+  0 unread bytes
+  abort: Changegroup at http://localhost:$HGPORT/changegroup.hg is corrupted
+  [255]
+  $ "$TESTDIR/killdaemons.py" httpserver.pid
 
 Real world exchange
 =====================
 
 Add more obsolescence information
 
   $ hg -R main debugobsolete -d '0 0' 1111111111111111111111111111111111111111 `getmainid 9520eea781bc`
   $ hg -R main debugobsolete -d '0 0' 2222222222222222222222222222222222222222 `getmainid 24b6387c8c8c`


More information about the Mercurial-devel mailing list