Bug 1432390 - Directly call the docker API over its unix socket instead of calling docker load. r?dustin
While spawning `docker load` is likely to work on developer machines,
on automation, it requires a docker client that is the exact same
version as the server running on the taskcluster worker for
docker-in-docker, which is not convenient. The API required for `docker
load` is rather simple, though, and can be mimicked quite easily.
While this change in itself is not necessary for developer machines,
it will allow to re-use the same command for the image-builder to
load a parent docker images when deriving one from another. We could
keep a code branch using `docker load` but it seems wasteful to maintain
two branches when one can work for both use cases.
--- a/taskcluster/taskgraph/docker.py
+++ b/taskcluster/taskgraph/docker.py
@@ -7,17 +7,16 @@
from __future__ import absolute_import, print_function, unicode_literals
import json
import os
import subprocess
import tarfile
import tempfile
import which
-from subprocess import Popen, PIPE
from io import BytesIO
from taskgraph.util import docker
from taskgraph.util.taskcluster import (
find_task_id,
get_artifact_url,
get_session,
)
@@ -150,70 +149,76 @@ def load_image(url, imageName=None, imag
# we parse out the imageTag from imageName, or default it to 'latest'
# if no imageName and no imageTag is given, 'repositories' won't be rewritten
if imageName and not imageTag:
if ':' in imageName:
imageName, imageTag = imageName.split(':', 1)
else:
imageTag = 'latest'
- docker = None
- image, tag, layer = None, None, None
- try:
+ info = {}
+
+ def download_and_modify_image():
+ # This function downloads and edits the downloaded tar file on the fly.
+ # It emits chunked buffers of the editted tar file, as a generator.
print("Downloading from {}".format(url))
# get_session() gets us a requests.Session set to retry several times.
req = get_session().get(url, stream=True)
req.raise_for_status()
decompressed_reader = IteratorReader(zstd.ZstdDecompressor().read_from(req.raw))
tarin = tarfile.open(
mode='r|',
fileobj=decompressed_reader,
bufsize=zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
- # Seutp piping: tarout | docker
- docker = Popen(['docker', 'load'], stdin=PIPE)
- tarout = tarfile.open(mode='w|', fileobj=docker.stdin, format=tarfile.GNU_FORMAT)
-
- # Read from tarin and write to tarout
+ # Stream through each member of the downloaded tar file individually.
for member in tarin:
- # Write non-file members directly (don't use extractfile on links)
+ # Non-file members only need a tar header. Emit one.
if not member.isfile():
- tarout.addfile(member)
+ yield member.tobuf(tarfile.GNU_FORMAT)
continue
- # Open reader for the member
+ # Open stream reader for the member
reader = tarin.extractfile(member)
- # If member is repository, we parse and possibly rewrite the image tags
+ # If member is `repositories`, we parse and possibly rewrite the image tags
if member.name == 'repositories':
# Read and parse repositories
repos = json.loads(reader.read())
reader.close()
# If there is more than one image or tag, we can't handle it here
if len(repos.keys()) > 1:
raise Exception('file contains more than one image')
- image = repos.keys()[0]
+ info['image'] = image = repos.keys()[0]
if len(repos[image].keys()) > 1:
raise Exception('file contains more than one tag')
- tag = repos[image].keys()[0]
- layer = repos[image][tag]
+ info['tag'] = tag = repos[image].keys()[0]
+ info['layer'] = layer = repos[image][tag]
# Rewrite the repositories file
data = json.dumps({imageName or image: {imageTag or tag: layer}})
reader = BytesIO(data)
member.size = len(data)
- # Add member and reader
- tarout.addfile(member, reader)
+ # Emit the tar header for this member.
+ yield member.tobuf(tarfile.GNU_FORMAT)
+ # Then emit its content.
+ remaining = member.size
+ while remaining:
+ length = min(remaining, zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
+ buf = reader.read(length)
+ remaining -= len(buf)
+ yield buf
+ # Pad to fill a 512 bytes block, per tar format.
+ remainder = member.size % 512
+ if remainder:
+ yield '\0' * (512 - remainder)
+
reader.close()
- tarout.close()
- finally:
- if docker:
- docker.stdin.close()
- if docker and docker.wait() != 0:
- raise Exception('loading into docker failed')
+
+ docker.post_to_docker(download_and_modify_image(), '/images/load', quiet=0)
# Check that we found a repositories file
- if not image or not tag or not layer:
+ if not info.get('image') or not info.get('tag') or not info.get('layer'):
raise Exception('No repositories file found!')
- return {'image': image, 'tag': tag, 'layer': layer}
+ return info
--- a/taskcluster/taskgraph/util/docker.py
+++ b/taskcluster/taskgraph/util/docker.py
@@ -1,34 +1,124 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import hashlib
+import json
import os
import re
+import requests_unixsocket
import shutil
import subprocess
+import sys
import tarfile
import tempfile
+import urllib
+import urlparse
import yaml
from mozbuild.util import memoize
from mozpack.files import GeneratedFile
from mozpack.archive import (
create_tar_gz_from_files,
)
from .. import GECKO
IMAGE_DIR = os.path.join(GECKO, 'taskcluster', 'docker')
+def docker_url(path, **kwargs):
+ docker_socket = os.environ.get('DOCKER_SOCKET', '/var/run/docker.sock')
+ return urlparse.urlunparse((
+ 'http+unix',
+ urllib.quote(docker_socket, safe=''),
+ path,
+ '',
+ urllib.urlencode(kwargs),
+ ''))
+
+
+def post_to_docker(tar, api_path, **kwargs):
+ """POSTs a tar file to a given docker API path.
+
+ The tar argument can be anything that can be passed to requests.post()
+ as data (e.g. iterator or file object).
+ The extra keyword arguments are passed as arguments to the docker API.
+ """
+ req = requests_unixsocket.Session().post(
+ docker_url(api_path, **kwargs),
+ data=tar,
+ stream=True,
+ headers={'Content-Type': 'application/x-tar'},
+ )
+ if req.status_code != 200:
+ message = req.json().get('message')
+ if not message:
+ message = 'docker API returned HTTP code {}'.format(
+ req.status_code)
+ raise Exception(message)
+ status_line = {}
+
+ buf = b''
+ for content in req.iter_content(chunk_size=None):
+ if not content:
+ continue
+ # Sometimes, a chunk of content is not a complete json, so we cumulate
+ # with leftovers from previous iterations.
+ buf += content
+ try:
+ data = json.loads(buf)
+ except Exception:
+ continue
+ buf = b''
+ # data is sometimes an empty dict.
+ if not data:
+ continue
+ # Mimick how docker itself presents the output. This code was tested
+ # with API version 1.18 and 1.26.
+ if 'status' in data:
+ if 'id' in data:
+ if sys.stderr.isatty():
+ total_lines = len(status_line)
+ line = status_line.setdefault(data['id'], total_lines)
+ n = total_lines - line
+ if n > 0:
+ # Move the cursor up n lines.
+ sys.stderr.write('\033[{}A'.format(n))
+ # Clear line and move the cursor to the beginning of it.
+ sys.stderr.write('\033[2K\r')
+ sys.stderr.write('{}: {} {}\n'.format(
+ data['id'], data['status'], data.get('progress', '')))
+ if n > 1:
+ # Move the cursor down n - 1 lines, which, considering
+ # the carriage return on the last write, gets us back
+ # where we started.
+ sys.stderr.write('\033[{}B'.format(n - 1))
+ else:
+ status = status_line.get(data['id'])
+ # Only print status changes.
+ if status != data['status']:
+ sys.stderr.write('{}: {}\n'.format(data['id'], data['status']))
+ status_line[data['id']] = data['status']
+ else:
+ status_line = {}
+ sys.stderr.write('{}\n'.format(data['status']))
+ elif 'stream' in data:
+ sys.stderr.write(data['stream'])
+ elif 'error' in data:
+ raise Exception(data['error'])
+ else:
+ raise NotImplementedError(repr(data))
+ sys.stderr.flush()
+
+
def docker_image(name, by_tag=False):
'''
Resolve in-tree prebuilt docker image to ``<registry>/<repository>@sha256:<digest>``,
or ``<registry>/<repository>:<tag>`` if `by_tag` is `True`.
'''
try:
with open(os.path.join(IMAGE_DIR, name, 'REGISTRY')) as f:
registry = f.read().strip()