Bug 1334167: allow by-project for cron jobs' when property; r?Callek draft
authorDustin J. Mitchell <dustin@mozilla.com>
Wed, 01 Feb 2017 00:30:52 +0000
changeset 469655 753a78516ce4678500a11cabe4781496ee989b16
parent 469654 d79cd3fe68afcdd7f4165abe1a59e9d3e29f939d
child 469656 3e3d91d4dfed8701543e35a468233699bbe2c6f5
push id43793
push userdmitchell@mozilla.com
push dateThu, 02 Feb 2017 14:41:57 +0000
reviewersCallek
bugs1334167
milestone54.0a1
Bug 1334167: allow by-project for cron jobs' when property; r?Callek This requires moving the schema utilities to their own util module. MozReview-Commit-ID: KR5xSJ9ak5Y
.cron.yml
taskcluster/taskgraph/cron/__init__.py
taskcluster/taskgraph/cron/schema.py
taskcluster/taskgraph/test/test_transforms_base.py
taskcluster/taskgraph/test/test_util_schema.py
taskcluster/taskgraph/transforms/balrog.py
taskcluster/taskgraph/transforms/base.py
taskcluster/taskgraph/transforms/beetmover.py
taskcluster/taskgraph/transforms/job/__init__.py
taskcluster/taskgraph/transforms/l10n.py
taskcluster/taskgraph/transforms/signing.py
taskcluster/taskgraph/transforms/task.py
taskcluster/taskgraph/transforms/tests.py
taskcluster/taskgraph/util/schema.py
--- a/.cron.yml
+++ b/.cron.yml
@@ -8,17 +8,19 @@ jobs:
           type: decision-task
           treeherder-symbol: Nd
           triggered-by: nightly
           target-tasks-method: nightly_linux
       run-on-projects:
           - mozilla-central
           - date
       when:
-          - {hour: 16, minute: 0}
+          by-project:
+              mozilla-central: [{hour: 16, minute: 0}]
+              date: [{hour: 13, minute: 0}]
 
     - name: nightly-android
       job:
           type: decision-task
           treeherder-symbol: Na
           triggered-by: nightly
           target-tasks-method: nightly_fennec
       run-on-projects:
--- a/taskcluster/taskgraph/cron/__init__.py
+++ b/taskcluster/taskgraph/cron/__init__.py
@@ -17,16 +17,17 @@ import yaml
 
 from . import decision, schema
 from .util import (
     match_utc,
     calculate_head_rev
 )
 from ..create import create_task
 from taskgraph.util.attributes import match_run_on_projects
+from taskgraph.util.schema import resolve_keyed_by
 
 # Functions to handle each `job.type` in `.cron.yml`.  These are called with
 # the contents of the `job` property from `.cron.yml` and should return a
 # sequence of (taskId, task) tuples which will subsequently be fed to
 # createTask.
 JOB_TYPES = {
     'decision-task': decision.run_decision_task,
 }
@@ -38,21 +39,28 @@ logger = logging.getLogger(__name__)
 
 def get_session():
     global _session
     if not _session:
         _session = requests.Session()
     return _session
 
 
-def load_jobs():
+def load_jobs(params):
     with open(os.path.join(GECKO, '.cron.yml'), 'rb') as f:
         cron_yml = yaml.load(f)
     schema.validate(cron_yml)
-    return {j['name']: j for j in cron_yml['jobs']}
+
+    # resolve keyed_by fields in each job
+    jobs = cron_yml['jobs']
+    for job in jobs:
+        resolve_keyed_by(job, 'when', 'Cron job ' + job['name'],
+                         project=params['project'])
+
+    return {j['name']: j for j in jobs}
 
 
 def should_run(job, params):
     run_on_projects = job.get('run-on-projects', ['all'])
     if not match_run_on_projects(params['project'], run_on_projects):
         return False
     if not any(match_utc(params, hour=sched.get('hour'), minute=sched.get('minute'))
                for sched in job.get('when', [])):
@@ -77,33 +85,36 @@ def run_job(job_name, job, params):
             for task_id, task in tasks:
                 create_task(get_session(), task_id, params['job_name'], task)
 
     except Exception:
         # report the exception, but don't fail the whole cron task, as that
         # would leave other jobs un-run.  NOTE: we could report job failure to
         # a responsible person here via tc-notify
         traceback.print_exc()
-        logger.error("cron job {} run failed; continuing to next job".format(params['job_name']))
+        logger.error("cron job {} run failed; continuing to next job".format(
+            params['job_name']))
 
 
 def calculate_time(options):
     if 'TASK_ID' not in os.environ:
         # running in a development environment, so look for CRON_TIME or use
         # the current time
         if 'CRON_TIME' in os.environ:
             logger.warning("setting params['time'] based on $CRON_TIME")
-            time = datetime.datetime.utcfromtimestamp(int(os.environ['CRON_TIME']))
+            time = datetime.datetime.utcfromtimestamp(
+                int(os.environ['CRON_TIME']))
         else:
             logger.warning("using current time for params['time']; try setting $CRON_TIME "
                            "to a timestamp")
             time = datetime.datetime.utcnow()
     else:
         # fetch this task from the queue
-        res = get_session().get('http://taskcluster/queue/v1/task/' + os.environ['TASK_ID'])
+        res = get_session().get(
+            'http://taskcluster/queue/v1/task/' + os.environ['TASK_ID'])
         if res.status_code != 200:
             try:
                 logger.error(res.json()['message'])
             except:
                 logger.error(res.text)
             res.raise_for_status()
         # the task's `created` time is close to when the hook ran, although that
         # may be some time ago if task execution was delayed
@@ -140,17 +151,17 @@ def taskgraph_cron(options):
 
         # if true, tasks will not actually be created
         'no_create': options['no_create'],
 
         # the time that this cron task was created (as a UTC datetime object)
         'time': calculate_time(options),
     }
 
-    jobs = load_jobs()
+    jobs = load_jobs(params)
 
     if options['force_run']:
         job_name = options['force_run']
         logger.info("force-running cron job {}".format(job_name))
         run_job(job_name, jobs[job_name], params)
         return
 
     for job_name, job in sorted(jobs.items()):
--- a/taskcluster/taskgraph/cron/schema.py
+++ b/taskcluster/taskgraph/cron/schema.py
@@ -2,17 +2,21 @@
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 
 from __future__ import absolute_import, print_function, unicode_literals
 
-from voluptuous import Schema, Any, Required, All, MultipleInvalid
+from voluptuous import Schema, Any, Required, All
+from taskgraph.util.schema import (
+    optionally_keyed_by,
+    validate_schema,
+)
 
 
 def even_15_minutes(minutes):
     if minutes % 15 != 0:
         raise ValueError("minutes must be evenly divisible by 15")
 
 cron_yml_schema = Schema({
     'jobs': [{
@@ -38,23 +42,21 @@ cron_yml_schema = Schema({
         # when to run it
 
         # Optional set of projects on which this job should run; if omitted, this will
         # run on all projects for which cron tasks are set up.  This works just like the
         # `run_on_projects` attribute, where strings like "release" and "integration" are
         # expanded to cover multiple repositories.  (taskcluster/docs/attributes.rst)
         'run-on-projects': [basestring],
 
-        # Array of times at which this task should run.  These *must* be a multiple of
-        # 15 minutes, the minimum scheduling interval.
-        'when': [{'hour': int, 'minute': All(int, even_15_minutes)}],
+        # Array of times at which this task should run.  These *must* be a
+        # multiple of 15 minutes, the minimum scheduling interval.  This field
+        # can be keyed by project so that each project has a different schedule
+        # for the same job.
+        'when': optionally_keyed_by(
+            'project',
+            [{'hour': int, 'minute': All(int, even_15_minutes)}]),
     }],
 })
 
 
 def validate(cron_yml):
-    try:
-        cron_yml_schema(cron_yml)
-    except MultipleInvalid as exc:
-        msg = ["Invalid .cron.yml:"]
-        for error in exc.errors:
-            msg.append(str(error))
-        raise Exception('\n'.join(msg))
+    validate_schema(cron_yml_schema, cron_yml, "Invalid .cron.yml:")
--- a/taskcluster/taskgraph/test/test_transforms_base.py
+++ b/taskcluster/taskgraph/test/test_transforms_base.py
@@ -2,26 +2,18 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import unittest
 from mozunit import main
 from taskgraph.transforms.base import (
-    validate_schema,
-    resolve_keyed_by,
     TransformSequence
 )
-from voluptuous import Schema
-
-schema = Schema({
-    'x': int,
-    'y': basestring,
-})
 
 transforms = TransformSequence()
 
 
 @transforms.add
 def trans1(config, tests):
     for test in tests:
         test['one'] = 1
@@ -41,94 +33,10 @@ class TestTransformSequence(unittest.Tes
         tests = [{}, {'two': 1, 'second': True}]
         res = list(transforms({}, tests))
         self.assertEqual(res, [
             {u'two': 2, u'one': 1},
             {u'second': True, u'two': 2, u'one': 1},
         ])
 
 
-class TestValidateSchema(unittest.TestCase):
-
-    def test_valid(self):
-        validate_schema(schema, {'x': 10, 'y': 'foo'}, "pfx")
-
-    def test_invalid(self):
-        try:
-            validate_schema(schema, {'x': 'not-int'}, "pfx")
-            self.fail("no exception raised")
-        except Exception, e:
-            self.failUnless(str(e).startswith("pfx\n"))
-
-
-class TestResolveKeyedBy(unittest.TestCase):
-
-    def test_no_by(self):
-        self.assertEqual(
-            resolve_keyed_by({'x': 10}, 'z', 'n'),
-            {'x': 10})
-
-    def test_no_by_dotted(self):
-        self.assertEqual(
-            resolve_keyed_by({'x': {'y': 10}}, 'x.z', 'n'),
-            {'x': {'y': 10}})
-
-    def test_no_by_not_dict(self):
-        self.assertEqual(
-            resolve_keyed_by({'x': 10}, 'x.y', 'n'),
-            {'x': 10})
-
-    def test_no_by_not_by(self):
-        self.assertEqual(
-            resolve_keyed_by({'x': {'a': 10}}, 'x', 'n'),
-            {'x': {'a': 10}})
-
-    def test_no_by_empty_dict(self):
-        self.assertEqual(
-            resolve_keyed_by({'x': {}}, 'x', 'n'),
-            {'x': {}})
-
-    def test_no_by_not_only_by(self):
-        self.assertEqual(
-            resolve_keyed_by({'x': {'by-y': True, 'a': 10}}, 'x', 'n'),
-            {'x': {'by-y': True, 'a': 10}})
-
-    def test_match_nested_exact(self):
-        self.assertEqual(
-            resolve_keyed_by(
-                {'f': 'shoes', 'x': {'y': {'by-f': {'shoes': 'feet', 'gloves': 'hands'}}}},
-                'x.y', 'n'),
-            {'f': 'shoes', 'x': {'y': 'feet'}})
-
-    def test_match_regexp(self):
-        self.assertEqual(
-            resolve_keyed_by(
-                {'f': 'shoes', 'x': {'by-f': {'s?[hH]oes?': 'feet', 'gloves': 'hands'}}},
-                'x', 'n'),
-            {'f': 'shoes', 'x': 'feet'})
-
-    def test_match_partial_regexp(self):
-        self.assertEqual(
-            resolve_keyed_by(
-                {'f': 'shoes', 'x': {'by-f': {'sh': 'feet', 'default': 'hands'}}},
-                'x', 'n'),
-            {'f': 'shoes', 'x': 'hands'})
-
-    def test_match_default(self):
-        self.assertEqual(
-            resolve_keyed_by(
-                {'f': 'shoes', 'x': {'by-f': {'hat': 'head', 'default': 'anywhere'}}},
-                'x', 'n'),
-            {'f': 'shoes', 'x': 'anywhere'})
-
-    def test_no_match(self):
-        self.assertRaises(
-            Exception, resolve_keyed_by,
-            {'f': 'shoes', 'x': {'by-f': {'hat': 'head'}}}, 'x', 'n')
-
-    def test_multiple_matches(self):
-        self.assertRaises(
-            Exception, resolve_keyed_by,
-            {'f': 'hats', 'x': {'by-f': {'hat.*': 'head', 'ha.*': 'hair'}}}, 'x', 'n')
-
-
 if __name__ == '__main__':
     main()
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/test/test_util_schema.py
@@ -0,0 +1,114 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import unittest
+from mozunit import main
+from taskgraph.util.schema import (
+    validate_schema,
+    resolve_keyed_by,
+)
+from voluptuous import Schema
+
+schema = Schema({
+    'x': int,
+    'y': basestring,
+})
+
+
+class TestValidateSchema(unittest.TestCase):
+
+    def test_valid(self):
+        validate_schema(schema, {'x': 10, 'y': 'foo'}, "pfx")
+
+    def test_invalid(self):
+        try:
+            validate_schema(schema, {'x': 'not-int'}, "pfx")
+            self.fail("no exception raised")
+        except Exception, e:
+            self.failUnless(str(e).startswith("pfx\n"))
+
+
+class TestResolveKeyedBy(unittest.TestCase):
+
+    def test_no_by(self):
+        self.assertEqual(
+            resolve_keyed_by({'x': 10}, 'z', 'n'),
+            {'x': 10})
+
+    def test_no_by_dotted(self):
+        self.assertEqual(
+            resolve_keyed_by({'x': {'y': 10}}, 'x.z', 'n'),
+            {'x': {'y': 10}})
+
+    def test_no_by_not_dict(self):
+        self.assertEqual(
+            resolve_keyed_by({'x': 10}, 'x.y', 'n'),
+            {'x': 10})
+
+    def test_no_by_not_by(self):
+        self.assertEqual(
+            resolve_keyed_by({'x': {'a': 10}}, 'x', 'n'),
+            {'x': {'a': 10}})
+
+    def test_no_by_empty_dict(self):
+        self.assertEqual(
+            resolve_keyed_by({'x': {}}, 'x', 'n'),
+            {'x': {}})
+
+    def test_no_by_not_only_by(self):
+        self.assertEqual(
+            resolve_keyed_by({'x': {'by-y': True, 'a': 10}}, 'x', 'n'),
+            {'x': {'by-y': True, 'a': 10}})
+
+    def test_match_nested_exact(self):
+        self.assertEqual(
+            resolve_keyed_by(
+                {'f': 'shoes', 'x': {'y': {'by-f': {'shoes': 'feet', 'gloves': 'hands'}}}},
+                'x.y', 'n'),
+            {'f': 'shoes', 'x': {'y': 'feet'}})
+
+    def test_match_regexp(self):
+        self.assertEqual(
+            resolve_keyed_by(
+                {'f': 'shoes', 'x': {'by-f': {'s?[hH]oes?': 'feet', 'gloves': 'hands'}}},
+                'x', 'n'),
+            {'f': 'shoes', 'x': 'feet'})
+
+    def test_match_partial_regexp(self):
+        self.assertEqual(
+            resolve_keyed_by(
+                {'f': 'shoes', 'x': {'by-f': {'sh': 'feet', 'default': 'hands'}}},
+                'x', 'n'),
+            {'f': 'shoes', 'x': 'hands'})
+
+    def test_match_default(self):
+        self.assertEqual(
+            resolve_keyed_by(
+                {'f': 'shoes', 'x': {'by-f': {'hat': 'head', 'default': 'anywhere'}}},
+                'x', 'n'),
+            {'f': 'shoes', 'x': 'anywhere'})
+
+    def test_match_extra_value(self):
+        self.assertEqual(
+            resolve_keyed_by(
+                {'f': {'by-foo': {'x': 10, 'y': 20}}},
+                'f', 'n',
+                foo='y'),
+            {'f': 20})
+
+    def test_no_match(self):
+        self.assertRaises(
+            Exception, resolve_keyed_by,
+            {'f': 'shoes', 'x': {'by-f': {'hat': 'head'}}}, 'x', 'n')
+
+    def test_multiple_matches(self):
+        self.assertRaises(
+            Exception, resolve_keyed_by,
+            {'f': 'hats', 'x': {'by-f': {'hat.*': 'head', 'ha.*': 'hair'}}}, 'x', 'n')
+
+
+if __name__ == '__main__':
+    main()
--- a/taskcluster/taskgraph/transforms/balrog.py
+++ b/taskcluster/taskgraph/transforms/balrog.py
@@ -2,20 +2,18 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 """
 Transform the beetmover task into an actual task description.
 """
 
 from __future__ import absolute_import, print_function, unicode_literals
 
-from taskgraph.transforms.base import (
-    validate_schema,
-    TransformSequence
-)
+from taskgraph.transforms.base import TransformSequence
+from taskgraph.util.schema import validate_schema
 from taskgraph.transforms.task import task_description_schema
 from voluptuous import Schema, Any, Required, Optional
 
 
 # Voluptuous uses marker objects as dictionary *keys*, but they are not
 # comparable, so we cast all of the keys back to regular strings
 task_description_schema = {str(k): v for k, v in task_description_schema.schema.iteritems()}
 
--- a/taskcluster/taskgraph/transforms/base.py
+++ b/taskcluster/taskgraph/transforms/base.py
@@ -1,19 +1,14 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
-import re
-import copy
-import pprint
-import voluptuous
-
 
 class TransformConfig(object):
     """A container for configuration affecting transforms.  The `config`
     argument to transforms is an instance of this class, possibly with
     additional kind-specific attributes beyond those set here."""
     def __init__(self, kind, path, config, params):
         # the name of the current kind
         self.kind = kind
@@ -53,110 +48,8 @@ class TransformSequence(object):
         return '\n'.join(
             ['TransformSequence(['] +
             [repr(x) for x in self.transforms] +
             ['])'])
 
     def add(self, func):
         self.transforms.append(func)
         return func
-
-
-def validate_schema(schema, obj, msg_prefix):
-    """
-    Validate that object satisfies schema.  If not, generate a useful exception
-    beginning with msg_prefix.
-    """
-    try:
-        # deep copy the result since it may include mutable defaults
-        return copy.deepcopy(schema(obj))
-    except voluptuous.MultipleInvalid as exc:
-        msg = [msg_prefix]
-        for error in exc.errors:
-            msg.append(str(error))
-        raise Exception('\n'.join(msg) + '\n' + pprint.pformat(obj))
-
-
-def optionally_keyed_by(*arguments):
-    """
-    Mark a schema value as optionally keyed by any of a number of fields.  The
-    schema is the last argument, and the remaining fields are taken to be the
-    field names.  For example:
-
-        'some-value': optionally_keyed_by(
-            'test-platform', 'build-platform',
-            Any('a', 'b', 'c'))
-    """
-    subschema = arguments[-1]
-    fields = arguments[:-1]
-    options = [subschema]
-    for field in fields:
-        options.append({'by-' + field: {basestring: subschema}})
-    return voluptuous.Any(*options)
-
-
-def resolve_keyed_by(item, field, item_name):
-    """
-    For values which can either accept a literal value, or be keyed by some
-    other attribute of the item, perform that lookup and replacement in-place
-    (modifying `item` directly).  The field is specified using dotted notation
-    to traverse dictionaries.
-
-    For example, given item
-
-        job:
-            chunks:
-                by-test-platform:
-                    macosx-10.11/debug: 13
-                    win.*: 6
-                    default: 12
-
-    a call to `resolve_keyed_by(item, 'job.chunks', item['thing-name'])
-    would mutate item in-place to
-
-        job:
-            chunks: 12
-
-    The `item_name` parameter is used to generate useful error messages.
-    """
-    # find the field, returning the item unchanged if anything goes wrong
-    container, subfield = item, field
-    while '.' in subfield:
-        f, subfield = subfield.split('.', 1)
-        if f not in container:
-            return item
-        container = container[f]
-        if not isinstance(container, dict):
-            return item
-
-    if subfield not in container:
-        return item
-    value = container[subfield]
-    if not isinstance(value, dict) or len(value) != 1 or not value.keys()[0].startswith('by-'):
-        return item
-
-    keyed_by = value.keys()[0][3:]  # strip off 'by-' prefix
-    key = item[keyed_by]
-    alternatives = value.values()[0]
-
-    # exact match
-    if key in alternatives:
-        container[subfield] = alternatives[key]
-        return item
-
-    # regular expression match
-    matches = [(k, v) for k, v in alternatives.iteritems() if re.match(k + '$', key)]
-    if len(matches) > 1:
-        raise Exception(
-            "Multiple matching values for {} {!r} found while determining item {} in {}".format(
-                keyed_by, key, field, item_name))
-    elif matches:
-        container[subfield] = matches[0][1]
-        return item
-
-    # default
-    if 'default' in alternatives:
-        container[subfield] = alternatives['default']
-        return item
-
-    raise Exception(
-        "No {} matching {!r} nor 'default' found while determining item {} in {}".format(
-            keyed_by, key, field, item_name))
--- a/taskcluster/taskgraph/transforms/beetmover.py
+++ b/taskcluster/taskgraph/transforms/beetmover.py
@@ -2,20 +2,18 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 """
 Transform the beetmover task into an actual task description.
 """
 
 from __future__ import absolute_import, print_function, unicode_literals
 
-from taskgraph.transforms.base import (
-    validate_schema,
-    TransformSequence
-)
+from taskgraph.transforms.base import TransformSequence
+from taskgraph.util.schema import validate_schema
 from taskgraph.transforms.task import task_description_schema
 from voluptuous import Schema, Any, Required, Optional
 
 
 _DESKTOP_UPSTREAM_ARTIFACTS_UNSIGNED_EN_US = [
     "balrog_props.json",
     "target.common.tests.zip",
     "target.cppunittest.tests.zip",
--- a/taskcluster/taskgraph/transforms/job/__init__.py
+++ b/taskcluster/taskgraph/transforms/job/__init__.py
@@ -10,17 +10,21 @@ run-using handlers in `taskcluster/taskg
 """
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import copy
 import logging
 import os
 
-from taskgraph.transforms.base import resolve_keyed_by, validate_schema, TransformSequence
+from taskgraph.transforms.base import TransformSequence
+from taskgraph.util.schema import (
+    validate_schema,
+    resolve_keyed_by,
+)
 from taskgraph.transforms.task import task_description_schema
 from voluptuous import (
     Any,
     Extra,
     Optional,
     Required,
     Schema,
 )
--- a/taskcluster/taskgraph/transforms/l10n.py
+++ b/taskcluster/taskgraph/transforms/l10n.py
@@ -7,19 +7,21 @@ Do transforms specific to l10n kind
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import copy
 
 from mozbuild.chunkify import chunkify
 from taskgraph.transforms.base import (
     TransformSequence,
-    resolve_keyed_by,
+)
+from taskgraph.util.schema import (
+    validate_schema,
     optionally_keyed_by,
-    validate_schema
+    resolve_keyed_by,
 )
 from taskgraph.util.treeherder import split_symbol, join_symbol
 from voluptuous import (
     Any,
     Extra,
     Optional,
     Required,
     Schema,
--- a/taskcluster/taskgraph/transforms/signing.py
+++ b/taskcluster/taskgraph/transforms/signing.py
@@ -2,20 +2,18 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 """
 Transform the signing task into an actual task description.
 """
 
 from __future__ import absolute_import, print_function, unicode_literals
 
-from taskgraph.transforms.base import (
-    validate_schema,
-    TransformSequence
-)
+from taskgraph.transforms.base import TransformSequence
+from taskgraph.util.schema import validate_schema
 from taskgraph.transforms.task import task_description_schema
 from voluptuous import Schema, Any, Required, Optional
 
 
 ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/<{}>/artifacts/{}'
 
 
 # Voluptuous uses marker objects as dictionary *keys*, but they are not
--- a/taskcluster/taskgraph/transforms/task.py
+++ b/taskcluster/taskgraph/transforms/task.py
@@ -9,20 +9,18 @@ complexities of worker implementations, 
 """
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import json
 import time
 
 from taskgraph.util.treeherder import split_symbol
-from taskgraph.transforms.base import (
-    validate_schema,
-    TransformSequence
-)
+from taskgraph.transforms.base import TransformSequence
+from taskgraph.util.schema import validate_schema
 from voluptuous import Schema, Any, Required, Optional, Extra
 
 from .gecko_v2_whitelist import JOB_NAME_WHITELIST, JOB_NAME_WHITELIST_ERROR
 
 
 # shortcut for a string where task references are allowed
 taskref_or_string = Any(
     basestring,
--- a/taskcluster/taskgraph/transforms/tests.py
+++ b/taskcluster/taskgraph/transforms/tests.py
@@ -14,22 +14,26 @@ This is a good place to translate a test
 The test description should be fully formed by the time it reaches these
 transforms, and these transforms should not embody any specific knowledge about
 what should run where. this is the wrong place for special-casing platforms,
 for example - use `all_tests.py` instead.
 """
 
 from __future__ import absolute_import, print_function, unicode_literals
 
-from taskgraph.transforms.base import TransformSequence, resolve_keyed_by
+from taskgraph.transforms.base import TransformSequence
+from taskgraph.util.schema import resolve_keyed_by
 from taskgraph.util.treeherder import split_symbol, join_symbol
 from taskgraph.transforms.job.common import (
     docker_worker_support_vcs_checkout,
 )
-from taskgraph.transforms.base import validate_schema, optionally_keyed_by
+from taskgraph.util.schema import (
+    validate_schema,
+    optionally_keyed_by,
+)
 from voluptuous import (
     Any,
     Optional,
     Required,
     Schema,
 )
 
 import copy
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/util/schema.py
@@ -0,0 +1,116 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import re
+import copy
+import pprint
+import voluptuous
+
+
+def validate_schema(schema, obj, msg_prefix):
+    """
+    Validate that object satisfies schema.  If not, generate a useful exception
+    beginning with msg_prefix.
+    """
+    try:
+        # deep copy the result since it may include mutable defaults
+        return copy.deepcopy(schema(obj))
+    except voluptuous.MultipleInvalid as exc:
+        msg = [msg_prefix]
+        for error in exc.errors:
+            msg.append(str(error))
+        raise Exception('\n'.join(msg) + '\n' + pprint.pformat(obj))
+
+
+def optionally_keyed_by(*arguments):
+    """
+    Mark a schema value as optionally keyed by any of a number of fields.  The
+    schema is the last argument, and the remaining fields are taken to be the
+    field names.  For example:
+
+        'some-value': optionally_keyed_by(
+            'test-platform', 'build-platform',
+            Any('a', 'b', 'c'))
+    """
+    subschema = arguments[-1]
+    fields = arguments[:-1]
+    options = [subschema]
+    for field in fields:
+        options.append({'by-' + field: {basestring: subschema}})
+    return voluptuous.Any(*options)
+
+
+def resolve_keyed_by(item, field, item_name, **extra_values):
+    """
+    For values which can either accept a literal value, or be keyed by some
+    other attribute of the item, perform that lookup and replacement in-place
+    (modifying `item` directly).  The field is specified using dotted notation
+    to traverse dictionaries.
+
+    For example, given item
+
+        job:
+            test-platform: linux128
+            chunks:
+                by-test-platform:
+                    macosx-10.11/debug: 13
+                    win.*: 6
+                    default: 12
+
+    a call to `resolve_keyed_by(item, 'job.chunks', item['thing-name'])
+    would mutate item in-place to
+
+        job:
+            chunks: 12
+
+    The `item_name` parameter is used to generate useful error messages.
+
+    If extra_values are supplied, they represent additional values available
+    for reference from by-<field>.
+    """
+    # find the field, returning the item unchanged if anything goes wrong
+    container, subfield = item, field
+    while '.' in subfield:
+        f, subfield = subfield.split('.', 1)
+        if f not in container:
+            return item
+        container = container[f]
+        if not isinstance(container, dict):
+            return item
+
+    if subfield not in container:
+        return item
+    value = container[subfield]
+    if not isinstance(value, dict) or len(value) != 1 or not value.keys()[0].startswith('by-'):
+        return item
+
+    keyed_by = value.keys()[0][3:]  # strip off 'by-' prefix
+    key = extra_values.get(keyed_by) if keyed_by in extra_values else item[keyed_by]
+    alternatives = value.values()[0]
+
+    # exact match
+    if key in alternatives:
+        container[subfield] = alternatives[key]
+        return item
+
+    # regular expression match
+    matches = [(k, v) for k, v in alternatives.iteritems() if re.match(k + '$', key)]
+    if len(matches) > 1:
+        raise Exception(
+            "Multiple matching values for {} {!r} found while determining item {} in {}".format(
+                keyed_by, key, field, item_name))
+    elif matches:
+        container[subfield] = matches[0][1]
+        return item
+
+    # default
+    if 'default' in alternatives:
+        container[subfield] = alternatives['default']
+        return item
+
+    raise Exception(
+        "No {} matching {!r} nor 'default' found while determining item {} in {}".format(
+            keyed_by, key, field, item_name))