From 1eb38bb8e0c63d02a4672d4beeccafe06dba3d04 Mon Sep 17 00:00:00 2001 From: Rob Browning Date: Sat, 23 Oct 2021 14:06:01 -0500 Subject: [PATCH] Fully (and explicitly) close PackIdxLists And stop checking _mpi_count in __del__ since there are no guarantees about if/when it will run (and so could run after another has been opened). Signed-off-by: Rob Browning Tested-by: Rob Browning --- lib/bup/client.py | 11 ++-- lib/bup/cmd/margin.py | 70 ++++++++++++------------ lib/bup/cmd/memtest.py | 47 ++++++++-------- lib/bup/cmd/tag.py | 8 +-- lib/bup/git.py | 116 +++++++++++++++++++++++++--------------- lib/bup/helpers.py | 6 ++- test/int/test_client.py | 47 ++++++++-------- test/int/test_git.py | 74 ++++++++++++------------- 8 files changed, 208 insertions(+), 171 deletions(-) diff --git a/lib/bup/client.py b/lib/bup/client.py index 637737b..ed902b7 100644 --- a/lib/bup/client.py +++ b/lib/bup/client.py @@ -10,7 +10,7 @@ from bup import git, ssh, vfs from bup.compat import environ, pending_raise, range, reraise from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1, debug2, linereader, lines_until_sentinel, - mkdirp, progress, qprogress, DemuxConn) + mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn) from bup.io import path_msg from bup.vint import write_bvec @@ -515,13 +515,16 @@ class PackWriter_Remote(git.PackWriter): # Called by other PackWriter methods like breakpoint(). # Must not close the connection (self.file) assert(run_midx) # We don't support this via remote yet - if self._packopen and self.file: + self.objcache, objcache = None, self.objcache + with nullcontext_if_not(objcache): + if not (self._packopen and self.file): + return None self.file.write(b'\0\0\0\0') self._packopen = False self.onclose() # Unbusy - self.objcache = None + if objcache is not None: + objcache.close() return self.suggest_packs() # Returns last idx received - return None def close(self): # Called by inherited __exit__ diff --git a/lib/bup/cmd/margin.py b/lib/bup/cmd/margin.py index 07f2b0f..7836c71 100755 --- a/lib/bup/cmd/margin.py +++ b/lib/bup/cmd/margin.py @@ -24,44 +24,46 @@ def main(argv): git.check_repo_or_die() - mi = git.PackIdxList(git.repo(b'objects/pack'), ignore_midx=opt.ignore_midx) + with git.PackIdxList(git.repo(b'objects/pack'), + ignore_midx=opt.ignore_midx) as mi: - def do_predict(ix, out): - total = len(ix) - maxdiff = 0 - for count,i in enumerate(ix): - prefix = struct.unpack('!Q', i[:8])[0] - expected = prefix * total // (1 << 64) - diff = count - expected - maxdiff = max(maxdiff, abs(diff)) - out.write(b'%d of %d (%.3f%%) ' - % (maxdiff, len(ix), maxdiff * 100.0 / len(ix))) - out.flush() - assert(count+1 == len(ix)) + def do_predict(ix, out): + total = len(ix) + maxdiff = 0 + for count,i in enumerate(ix): + prefix = struct.unpack('!Q', i[:8])[0] + expected = prefix * total // (1 << 64) + diff = count - expected + maxdiff = max(maxdiff, abs(diff)) + out.write(b'%d of %d (%.3f%%) ' + % (maxdiff, len(ix), maxdiff * 100.0 / len(ix))) + out.flush() + assert(count+1 == len(ix)) - sys.stdout.flush() - out = byte_stream(sys.stdout) + sys.stdout.flush() + out = byte_stream(sys.stdout) - if opt.predict: - if opt.ignore_midx: - for pack in mi.packs: - do_predict(pack, out) + if opt.predict: + if opt.ignore_midx: + for pack in mi.packs: + do_predict(pack, out) + else: + do_predict(mi, out) else: - do_predict(mi, out) - else: - # default mode: find longest matching prefix - last = b'\0'*20 - longmatch = 0 - for i in mi: - if i == last: - continue - #assert(str(i) >= last) - pm = _helpers.bitmatch(last, i) - longmatch = max(longmatch, pm) - last = i - out.write(b'%d\n' % longmatch) - log('%d matching prefix bits\n' % longmatch) - doublings = math.log(len(mi), 2) + # default mode: find longest matching prefix + last = b'\0'*20 + longmatch = 0 + for i in mi: + if i == last: + continue + #assert(str(i) >= last) + pm = _helpers.bitmatch(last, i) + longmatch = max(longmatch, pm) + last = i + out.write(b'%d\n' % longmatch) + log('%d matching prefix bits\n' % longmatch) + doublings = math.log(len(mi), 2) + bpd = longmatch / doublings log('%.2f bits per doubling\n' % bpd) remain = 160 - longmatch diff --git a/lib/bup/cmd/memtest.py b/lib/bup/cmd/memtest.py index b2027c4..e20c1b6 100755 --- a/lib/bup/cmd/memtest.py +++ b/lib/bup/cmd/memtest.py @@ -79,8 +79,6 @@ def main(argv): o.fatal('no arguments expected') git.check_repo_or_die() - m = git.PackIdxList(git.repo(b'objects/pack'), ignore_midx=opt.ignore_midx) - sys.stdout.flush() out = byte_stream(sys.stdout) @@ -88,27 +86,30 @@ def main(argv): _helpers.random_sha() report(0, out) - if opt.existing: - def foreverit(mi): - while 1: - for e in mi: - yield e - objit = iter(foreverit(m)) - - for c in range(opt.cycles): - for n in range(opt.number): - if opt.existing: - bin = next(objit) - assert(m.exists(bin)) - else: - bin = _helpers.random_sha() - - # technically, a randomly generated object id might exist. - # but the likelihood of that is the likelihood of finding - # a collision in sha-1 by accident, which is so unlikely that - # we don't care. - assert(not m.exists(bin)) - report((c+1)*opt.number, out) + with git.PackIdxList(git.repo(b'objects/pack'), + ignore_midx=opt.ignore_midx) as m: + + if opt.existing: + def foreverit(mi): + while 1: + for e in mi: + yield e + objit = iter(foreverit(m)) + + for c in range(opt.cycles): + for n in range(opt.number): + if opt.existing: + bin = next(objit) + assert(m.exists(bin)) + else: + bin = _helpers.random_sha() + + # technically, a randomly generated object id might exist. + # but the likelihood of that is the likelihood of finding + # a collision in sha-1 by accident, which is so unlikely that + # we don't care. + assert(not m.exists(bin)) + report((c+1)*opt.number, out) if bloom._total_searches: out.write(b'bloom: %d objects searched in %d steps: avg %.3f steps/object\n' diff --git a/lib/bup/cmd/tag.py b/lib/bup/cmd/tag.py index 0f5475d..3bb1b40 100755 --- a/lib/bup/cmd/tag.py +++ b/lib/bup/cmd/tag.py @@ -74,10 +74,10 @@ def main(argv): log("bup: error: commit %s not found.\n" % commit.decode('ascii')) sys.exit(2) - pL = git.PackIdxList(git.repo(b'objects/pack')) - if not pL.exists(hash): - log("bup: error: commit %s not found.\n" % commit.decode('ascii')) - sys.exit(2) + with git.PackIdxList(git.repo(b'objects/pack')) as pL: + if not pL.exists(hash): + log("bup: error: commit %s not found.\n" % commit.decode('ascii')) + sys.exit(2) tag_file = git.repo(b'refs/tags/' + tag_name) try: diff --git a/lib/bup/git.py b/lib/bup/git.py index 0dc6e72..e3b235c 100644 --- a/lib/bup/git.py +++ b/lib/bup/git.py @@ -14,6 +14,7 @@ from bup import _helpers, hashsplit, path, midx, bloom, xstat from bup.compat import (buffer, byte_int, bytes_from_byte, bytes_from_uint, environ, + ExitStack, items, pending_raise, range, @@ -27,6 +28,7 @@ from bup.helpers import (Sha1, add_error, chunkyreader, debug1, debug2, merge_dict, merge_iter, mmap_read, mmap_readwrite, + nullcontext_if_not, progress, qprogress, stat_if_exists, unlink, utc_offset_str) @@ -517,8 +519,10 @@ _mpi_count = 0 class PackIdxList: def __init__(self, dir, ignore_midx=False): global _mpi_count + # Q: was this also intended to prevent opening multiple repos? assert(_mpi_count == 0) # these things suck tons of VM; don't waste it _mpi_count += 1 + self.open = True self.dir = dir self.also = set() self.packs = [] @@ -527,10 +531,32 @@ class PackIdxList: self.ignore_midx = ignore_midx self.refresh() - def __del__(self): + def close(self): global _mpi_count + if not self.open: + assert _mpi_count == 0 + return _mpi_count -= 1 - assert(_mpi_count == 0) + assert _mpi_count == 0 + self.also = None + self.bloom, bloom = None, self.bloom + self.packs, packs = None, self.packs + self.open = False + with ExitStack() as stack: + for pack in packs: + stack.enter_context(pack) + if bloom: + bloom.close() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + with pending_raise(value, rethrow=False): + self.close() + + def __del__(self): + assert not self.open def __iter__(self): return iter(idxmerge(self.packs)) @@ -721,7 +747,6 @@ def create_commit_blob(tree, parent, l.append(msg) return b'\n'.join(l) - def _make_objcache(): return PackIdxList(repo(b'objects/pack')) @@ -878,45 +903,49 @@ class PackWriter: self.file, f = None, self.file self.idx, idx = None, self.idx self.parentfd, pfd, = None, self.parentfd - self.objcache = None - - with finalized(pfd, lambda x: x is not None and os.close(x)), \ - f: - if abort: - os.unlink(self.filename + b'.pack') - return None + try: + with nullcontext_if_not(self.objcache), \ + finalized(pfd, lambda x: x is not None and os.close(x)), \ + f: + + if abort: + os.unlink(self.filename + b'.pack') + return None + + # update object count + f.seek(8) + cp = struct.pack('!i', self.count) + assert len(cp) == 4 + f.write(cp) + + # calculate the pack sha1sum + f.seek(0) + sum = Sha1() + for b in chunkyreader(f): + sum.update(b) + packbin = sum.digest() + f.write(packbin) + f.flush() + fdatasync(f.fileno()) + f.close() - # update object count - f.seek(8) - cp = struct.pack('!i', self.count) - assert len(cp) == 4 - f.write(cp) - - # calculate the pack sha1sum - f.seek(0) - sum = Sha1() - for b in chunkyreader(f): - sum.update(b) - packbin = sum.digest() - f.write(packbin) - f.flush() - fdatasync(f.fileno()) - f.close() - - idx.write(self.filename + b'.idx', packbin) - nameprefix = os.path.join(self.repo_dir, - b'objects/pack/pack-' + hexlify(packbin)) - if os.path.exists(self.filename + b'.map'): - os.unlink(self.filename + b'.map') - os.rename(self.filename + b'.pack', nameprefix + b'.pack') - os.rename(self.filename + b'.idx', nameprefix + b'.idx') - os.fsync(pfd) - if run_midx: - auto_midx(os.path.join(self.repo_dir, b'objects/pack')) - if self.on_pack_finish: - self.on_pack_finish(nameprefix) - return nameprefix + idx.write(self.filename + b'.idx', packbin) + nameprefix = os.path.join(self.repo_dir, + b'objects/pack/pack-' + hexlify(packbin)) + if os.path.exists(self.filename + b'.map'): + os.unlink(self.filename + b'.map') + os.rename(self.filename + b'.pack', nameprefix + b'.pack') + os.rename(self.filename + b'.idx', nameprefix + b'.idx') + os.fsync(pfd) + if run_midx: + auto_midx(os.path.join(self.repo_dir, b'objects/pack')) + if self.on_pack_finish: + self.on_pack_finish(nameprefix) + return nameprefix + finally: + # Must be last -- some of the code above depends on it + self.objcache = None def abort(self): """Remove the pack file from disk.""" @@ -1090,16 +1119,15 @@ def rev_parse(committish, repo_dir=None): debug2("resolved from ref: commit = %s\n" % hexlify(head)) return head - pL = PackIdxList(repo(b'objects/pack', repo_dir=repo_dir)) - if len(committish) == 40: try: hash = unhexlify(committish) except TypeError: return None - if pL.exists(hash): - return hash + with PackIdxList(repo(b'objects/pack', repo_dir=repo_dir)) as pL: + if pL.exists(hash): + return hash return None diff --git a/lib/bup/helpers.py b/lib/bup/helpers.py index 3b37a2b..0530b51 100644 --- a/lib/bup/helpers.py +++ b/lib/bup/helpers.py @@ -12,7 +12,7 @@ import hashlib, heapq, math, operator, time, tempfile from bup import _helpers from bup import compat -from bup.compat import argv_bytes, byte_int, pending_raise +from bup.compat import argv_bytes, byte_int, nullcontext, pending_raise from bup.io import byte_stream, path_msg # This function should really be in helpers, not in bup.options. But we # want options.py to be standalone so people can include it in other projects. @@ -27,6 +27,10 @@ class Nonlocal: pass +def nullcontext_if_not(manager): + return manager if manager is not None else nullcontext() + + @contextmanager def finalized(enter_result=None, finalize=None): assert finalize diff --git a/test/int/test_client.py b/test/int/test_client.py index 757c3ab..de17ece 100644 --- a/test/int/test_client.py +++ b/test/int/test_client.py @@ -23,12 +23,10 @@ IDX_PAT = b'/*.idx' def test_server_split_with_indexes(tmpdir): environ[b'BUP_DIR'] = bupdir = tmpdir git.init_repo(bupdir) - with git.PackWriter() as lw, \ - client.Client(bupdir, create=True) as c, \ - c.new_packwriter() as rw: + with git.PackWriter() as lw: lw.new_blob(s1) - lw.close() - + with client.Client(bupdir, create=True) as c, \ + c.new_packwriter() as rw: rw.new_blob(s2) rw.breakpoint() rw.new_blob(s1) @@ -122,25 +120,26 @@ def test_midx_refreshing(tmpdir): p2base = rw.close() p2name = os.path.join(c.cachedir, p2base) - pi = git.PackIdxList(bupdir + b'/objects/pack') - assert len(pi.packs) == 2 - pi.refresh() - assert len(pi.packs) == 2 - assert sorted([os.path.basename(i.name) for i in pi.packs]) == sorted([p1base, p2base]) - - with git.open_idx(p1name) as p1, \ - git.open_idx(p2name) as p2: - assert p1.exists(s1sha) - assert not p2.exists(s1sha) - assert p2.exists(s2sha) - - subprocess.call([path.exe(), b'midx', b'-f']) - pi.refresh() - assert len(pi.packs) == 1 - pi.refresh(skip_midx=True) - assert len(pi.packs) == 2 - pi.refresh(skip_midx=False) - assert len(pi.packs) == 1 + with git.PackIdxList(bupdir + b'/objects/pack') as pi: + assert len(pi.packs) == 2 + pi.refresh() + assert len(pi.packs) == 2 + assert sorted([os.path.basename(i.name) for i in pi.packs]) \ + == sorted([p1base, p2base]) + + with git.open_idx(p1name) as p1, \ + git.open_idx(p2name) as p2: + assert p1.exists(s1sha) + assert not p2.exists(s1sha) + assert p2.exists(s2sha) + + subprocess.call([path.exe(), b'midx', b'-f']) + pi.refresh() + assert len(pi.packs) == 1 + pi.refresh(skip_midx=True) + assert len(pi.packs) == 2 + pi.refresh(skip_midx=False) + assert len(pi.packs) == 1 def test_remote_parsing(): diff --git a/test/int/test_git.py b/test/int/test_git.py index cae56ca..616fc6e 100644 --- a/test/int/test_git.py +++ b/test/int/test_git.py @@ -147,10 +147,10 @@ def test_packs(tmpdir): WVFAIL(r.find_offset(b'\0'*20)) - r = git.PackIdxList(bupdir + b'/objects/pack') - WVPASS(r.exists(hashes[5])) - WVPASS(r.exists(hashes[6])) - WVFAIL(r.exists(b'\0'*20)) + with git.PackIdxList(bupdir + b'/objects/pack') as r: + WVPASS(r.exists(hashes[5])) + WVPASS(r.exists(hashes[6])) + WVFAIL(r.exists(b'\0'*20)) def test_pack_name_lookup(tmpdir): @@ -169,11 +169,11 @@ def test_pack_name_lookup(tmpdir): log('\n') idxnames.append(os.path.basename(w.close() + b'.idx')) - r = git.PackIdxList(packdir) - WVPASSEQ(len(r.packs), 2) - for e,idxname in enumerate(idxnames): - for i in range(e*2, (e+1)*2): - WVPASSEQ(idxname, r.exists(hashes[i], want_source=True)) + with git.PackIdxList(packdir) as r: + WVPASSEQ(len(r.packs), 2) + for e,idxname in enumerate(idxnames): + for i in range(e*2, (e+1)*2): + WVPASSEQ(idxname, r.exists(hashes[i], want_source=True)) def test_long_index(tmpdir): @@ -511,35 +511,35 @@ def test_midx_close(tmpdir): for i in range(10): _create_idx(tmpdir, i) git.auto_midx(tmpdir) - l = git.PackIdxList(tmpdir) + with git.PackIdxList(tmpdir) as l: # this doesn't exist (yet) - WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0))) - for i in range(10, 15): - _create_idx(tmpdir, i) - # delete the midx ... - # TODO: why do we need to? git.auto_midx() below doesn't?! - for fn in os.listdir(tmpdir): - if fn.endswith(b'.midx'): - os.unlink(os.path.join(tmpdir, fn)) - # and make a new one - git.auto_midx(tmpdir) - # check it still doesn't exist - we haven't refreshed - WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0))) - # check that we still have the midx open, this really - # just checks more for the kernel API ('deleted' string) - for fn in openfiles(): - if not b'midx-' in fn: - continue - WVPASSEQ(True, b'deleted' in fn) - # refresh the PackIdxList - l.refresh() - # and check that an object in pack 10 exists now - WVPASSEQ(True, l.exists(struct.pack('18xBB', 10, 0))) - for fn in openfiles(): - if not b'midx-' in fn: - continue - # check that we don't have it open anymore - WVPASSEQ(False, b'deleted' in fn) + WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0))) + for i in range(10, 15): + _create_idx(tmpdir, i) + # delete the midx ... + # TODO: why do we need to? git.auto_midx() below doesn't?! + for fn in os.listdir(tmpdir): + if fn.endswith(b'.midx'): + os.unlink(os.path.join(tmpdir, fn)) + # and make a new one + git.auto_midx(tmpdir) + # check it still doesn't exist - we haven't refreshed + WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0))) + # check that we still have the midx open, this really + # just checks more for the kernel API ('deleted' string) + for fn in openfiles(): + if not b'midx-' in fn: + continue + WVPASSEQ(True, b'deleted' in fn) + # refresh the PackIdxList + l.refresh() + # and check that an object in pack 10 exists now + WVPASSEQ(True, l.exists(struct.pack('18xBB', 10, 0))) + for fn in openfiles(): + if not b'midx-' in fn: + continue + # check that we don't have it open anymore + WVPASSEQ(False, b'deleted' in fn) def test_config(): cfg_file = os.path.join(os.path.dirname(__file__), 'sample.conf') -- 2.39.2