Bug 1460475 - Use stream_reader API; r?dustin
python-zstandard 0.9 has an API that exposes a file object interface
for compression and decompression. This means we can remove our
stream wrapper in order to consume a zstandard compressed tar file.
MozReview-Commit-ID: DeWWKnigJVa
--- a/taskcluster/taskgraph/docker.py
+++ b/taskcluster/taskgraph/docker.py
@@ -88,39 +88,16 @@ def build_image(name, tag, args=None):
print('*' * 50)
print('WARNING: no VERSION file found in image directory.')
print('Image is not suitable for deploying/pushing.')
print('Create an image suitable for deploying/pushing by creating')
print('a VERSION file in the image directory.')
print('*' * 50)
-# The zstandard library doesn't expose a file-like interface for its
-# decompressor, but an iterator. Support for a file-like interface is due in
-# next release. In the meanwhile, we use this proxy class to turn the iterator
-# into a file-like.
-class IteratorReader(object):
- def __init__(self, iterator):
- self._iterator = iterator
- self._buf = b''
-
- def read(self, size):
- result = b''
- while len(result) < size:
- wanted = min(size - len(result), len(self._buf))
- if not self._buf:
- try:
- self._buf = memoryview(next(self._iterator))
- except StopIteration:
- break
- result += self._buf[:wanted].tobytes()
- self._buf = self._buf[wanted:]
- return result
-
-
def load_image(url, imageName=None, imageTag=None):
"""
Load docker image from URL as imageName:tag, if no imageName or tag is given
it will use whatever is inside the zstd compressed tarball.
Returns an object with properties 'image', 'tag' and 'layer'.
"""
import zstandard as zstd
@@ -138,67 +115,72 @@ def load_image(url, imageName=None, imag
def download_and_modify_image():
# This function downloads and edits the downloaded tar file on the fly.
# It emits chunked buffers of the editted tar file, as a generator.
print("Downloading from {}".format(url))
# get_session() gets us a requests.Session set to retry several times.
req = get_session().get(url, stream=True)
req.raise_for_status()
- decompressed_reader = IteratorReader(zstd.ZstdDecompressor().read_from(req.raw))
- tarin = tarfile.open(
- mode='r|',
- fileobj=decompressed_reader,
- bufsize=zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
+
+ with zstd.ZstdDecompressor().stream_reader(req.raw) as ifh:
+
+ tarin = tarfile.open(
+ mode='r|',
+ fileobj=ifh,
+ bufsize=zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
- # Stream through each member of the downloaded tar file individually.
- for member in tarin:
- # Non-file members only need a tar header. Emit one.
- if not member.isfile():
- yield member.tobuf(tarfile.GNU_FORMAT)
- continue
+ # Stream through each member of the downloaded tar file individually.
+ for member in tarin:
+ # Non-file members only need a tar header. Emit one.
+ if not member.isfile():
+ yield member.tobuf(tarfile.GNU_FORMAT)
+ continue
- # Open stream reader for the member
- reader = tarin.extractfile(member)
+ # Open stream reader for the member
+ reader = tarin.extractfile(member)
- # If member is `repositories`, we parse and possibly rewrite the image tags
- if member.name == 'repositories':
- # Read and parse repositories
- repos = json.loads(reader.read())
- reader.close()
+ # If member is `repositories`, we parse and possibly rewrite the
+ # image tags.
+ if member.name == 'repositories':
+ # Read and parse repositories
+ repos = json.loads(reader.read())
+ reader.close()
- # If there is more than one image or tag, we can't handle it here
- if len(repos.keys()) > 1:
- raise Exception('file contains more than one image')
- info['image'] = image = repos.keys()[0]
- if len(repos[image].keys()) > 1:
- raise Exception('file contains more than one tag')
- info['tag'] = tag = repos[image].keys()[0]
- info['layer'] = layer = repos[image][tag]
+ # If there is more than one image or tag, we can't handle it
+ # here.
+ if len(repos.keys()) > 1:
+ raise Exception('file contains more than one image')
+ info['image'] = image = repos.keys()[0]
+ if len(repos[image].keys()) > 1:
+ raise Exception('file contains more than one tag')
+ info['tag'] = tag = repos[image].keys()[0]
+ info['layer'] = layer = repos[image][tag]
- # Rewrite the repositories file
- data = json.dumps({imageName or image: {imageTag or tag: layer}})
- reader = BytesIO(data)
- member.size = len(data)
+ # Rewrite the repositories file
+ data = json.dumps({imageName or image: {imageTag or tag: layer}})
+ reader = BytesIO(data)
+ member.size = len(data)
- # Emit the tar header for this member.
- yield member.tobuf(tarfile.GNU_FORMAT)
- # Then emit its content.
- remaining = member.size
- while remaining:
- length = min(remaining, zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
- buf = reader.read(length)
- remaining -= len(buf)
- yield buf
- # Pad to fill a 512 bytes block, per tar format.
- remainder = member.size % 512
- if remainder:
- yield '\0' * (512 - remainder)
+ # Emit the tar header for this member.
+ yield member.tobuf(tarfile.GNU_FORMAT)
+ # Then emit its content.
+ remaining = member.size
+ while remaining:
+ length = min(remaining,
+ zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
+ buf = reader.read(length)
+ remaining -= len(buf)
+ yield buf
+ # Pad to fill a 512 bytes block, per tar format.
+ remainder = member.size % 512
+ if remainder:
+ yield '\0' * (512 - remainder)
- reader.close()
+ reader.close()
docker.post_to_docker(download_and_modify_image(), '/images/load', quiet=0)
# Check that we found a repositories file
if not info.get('image') or not info.get('tag') or not info.get('layer'):
raise Exception('No repositories file found!')
return info