| # Copyright 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import datetime |
| import functools |
| import hashlib |
| import os.path |
| |
| from recipe_engine import recipe_api |
| from recipe_engine import util as recipe_util |
| |
| # Minimally supported version of swarming.py script (reported by --version). |
| MINIMAL_SWARMING_VERSION = (0, 8, 6) |
| |
| # The IMPLIED_*_BINARIES will be installed to the swarming task at this local |
| # path. |
| IMPLIED_BINARY_PATH = '.swarming_module' |
| |
| # This is the name and path for the cache used for the implied binaries |
| # (specifically vpython). The name of the implied cache will be prepended with |
| # IMPLIED_CACHE_NAME, and the path will be relative to IMPLIED_CACHE_BASE. |
| IMPLIED_CACHE_NAME = 'swarming_module_cache' |
| IMPLIED_CACHE_BASE = '.swarming_module_cache' |
| IMPLIED_CACHES = { |
| 'vpython': 'vpython', |
| } |
| IMPLIED_ENV_PREFIXES = { |
| 'VPYTHON_VIRTUALENV_ROOT': '/'.join((IMPLIED_CACHE_BASE, 'vpython')), |
| } |
| |
| # These CIPD packages will be automatically put on $PATH for all swarming tasks |
| # generated from this module. The first member of the tuple is the path relative |
| # to IMPLIED_BINARY_PATH which should be added to $PATH. |
| IMPLIED_CIPD_BINARIES = { |
| # Both vpython versions MUST be changed together. |
| 'infra/tools/luci/vpython/${platform}': |
| ('', 'git_revision:96f81e737868d43124b4661cf1c325296ca04944'), |
| 'infra/tools/luci/vpython-native/${platform}': |
| ('', 'git_revision:96f81e737868d43124b4661cf1c325296ca04944'), |
| |
| 'infra/tools/luci/logdog/butler/${platform}': |
| ('', 'git_revision:e1abc57be62d198b5c2f487bfb2fa2d2eb0e867c'), |
| # NOTE(crbug.com/812693): this isn't currently available on arm. See |
| # SwarmingApi.trigger_task for hack. |
| 'infra/python/cpython/${platform}': |
| ('bin', 'version:2.7.14.chromium14'), |
| } |
| |
| |
| def safe(f, *args, **kw): |
| try: |
| f(*args, **kw) |
| return True |
| except Exception: |
| return False |
| |
| |
| def filter_outdir(dumps, output_dir, text_files=('.txt', '.json', ''), |
| msize=1024): |
| """Create a summary of contents of a raw_io.output_dir.""" |
| outdir_json = {} |
| for filename in sorted(output_dir): |
| _, ext = os.path.splitext(filename) |
| |
| contents = output_dir[filename] |
| |
| # If a text file is small enough, just dump it |
| if ext in text_files and len(contents) < msize and safe(dumps, contents): |
| output = contents |
| |
| # Otherwise, just output some details |
| else: |
| output = { |
| 'sha1': hashlib.sha1(contents).hexdigest(), |
| 'size': len(contents), |
| } |
| if ext in text_files: |
| hsize = int(msize/2) |
| output['type'] = 'text' |
| if safe(dumps, contents[:hsize]): |
| # Space in the name so it sorts a[ :x],a[-x:] |
| output['contents[ :%s]' % hsize] = contents[:hsize] |
| if safe(dumps, contents[-hsize:]): |
| output['contents[-%s:]' % hsize] = contents[-hsize:] |
| else: |
| output['type'] = 'binary' |
| |
| outdir_json[filename] = output |
| |
| return outdir_json |
| |
| |
| def text_for_task(task): |
| lines = [] |
| |
| if task.dimensions.get('id'): |
| lines.append('Bot id: %r' % task.dimensions['id']) |
| if task.dimensions.get('os'): |
| lines.append('Run on OS: %r' % task.dimensions['os']) |
| |
| return '<br/>'.join(lines) |
| |
| |
| def parse_time(value): |
| """Converts serialized time from the API to datetime.datetime.""" |
| # When microseconds are 0, the '.123456' suffix is elided. This means the |
| # serialized format is not consistent, which confuses the hell out of python. |
| # TODO(maruel): Remove third format once we enforce version >=0.8.2. |
| for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S'): |
| try: |
| return datetime.datetime.strptime(value, fmt) |
| except ValueError: # pragma: no cover |
| pass |
| raise ValueError('Failed to parse %s' % value) # pragma: no cover |
| |
| |
| def fmt_time(seconds): |
| """Formats some number of seconds into a string. If this is < 60, it will |
| render as `NNs`. If it's >= 60 seconds, it will render as 'hh:mm:ss'.""" |
| return ( |
| '%ds' % (seconds,) if seconds < 60 |
| else str(datetime.timedelta(seconds=seconds)) |
| ) |
| |
| |
| class ReadOnlyDict(dict): |
| def __setitem__(self, key, value): |
| raise TypeError('ReadOnlyDict is immutable') |
| |
| |
| class SwarmingApi(recipe_api.RecipeApi): |
| """Recipe module to use swarming.py tool to run tasks on Swarming. |
| |
| General usage: |
| 1. Tweak default task parameters applied to all swarming tasks (such as |
| default_dimensions and default_priority). |
| 2. Isolate some test using 'isolate' recipe module. Get isolated hash as |
| a result of that process. |
| 3. Create a task configuration using 'task(...)' method, providing |
| isolated hash obtained previously. |
| 4. Tweak the task parameters. This step is optional. |
| 5. Launch the task on swarming by calling 'trigger_task(...)'. |
| 6. Continue doing useful work locally while the task is running concurrently |
| on swarming. |
| 7. Wait for task to finish and collect its result (exit code, logs) |
| by calling 'collect_task(...)'. |
| |
| See also example.py for concrete code. |
| """ |
| |
| def _get_exit_code(self, shard): |
| if 'exit_code' in shard: |
| return int(shard['exit_code']) |
| |
| if shard.get('state') == 'COMPLETED': |
| # This case, task finished successfully. |
| return 0 |
| return None |
| |
| def __init__(self, **kwargs): |
| super(SwarmingApi, self).__init__(**kwargs) |
| # All tests default to a x86-64 bot running with no GPU. This simplifies |
| # management so that new tests are not executed on exotic bots by accidents |
| # even if misconfigured. |
| self._default_dimensions = { |
| 'cpu': 'x86-64', |
| 'gpu': 'none', |
| } |
| # Expirations are set to mildly good values and will be tightened soon. |
| self._default_expiration = 60*60 |
| self._default_env = {} |
| self._default_hard_timeout = 60*60 |
| self._default_idempotent = False |
| self._default_io_timeout = 20*60 |
| # The default priority is extremely low and should be increased dependending |
| # on the type of task. |
| self._default_priority = 200 |
| self._default_tags = set() |
| self._default_user = None |
| self._pending_tasks = set() |
| self._service_account_json = None |
| self._show_outputs_ref_in_collect_step = True |
| self._show_shards_in_collect_step = False |
| self._swarming_server = 'https://chromium-swarm.appspot.com' |
| self._verbose = False |
| |
| # Record all durations of shards for aggregation. |
| self._shards_durations = [] |
| |
| # Counter used to ensure test data task ids are unique across different |
| # triggers. |
| self._task_test_data_id_offset = 0 |
| |
| def initialize(self): |
| self.add_default_tag( |
| 'build_is_experimental:' + str(self.m.runtime.is_experimental).lower()) |
| |
| @recipe_util.returns_placeholder |
| def summary(self): |
| return self.m.json.output() |
| |
| @property |
| def service_account_json(self): |
| """Service account json to use for swarming.""" |
| return self._service_account_json |
| |
| @service_account_json.setter |
| def service_account_json(self, value): |
| """Service account json to use for swarming.""" |
| self._service_account_json = value |
| |
| @property |
| def swarming_server(self): |
| """URL of Swarming server to use, default is a production one.""" |
| return self._swarming_server |
| |
| @swarming_server.setter |
| def swarming_server(self, value): |
| """Changes URL of Swarming server to use.""" |
| self._swarming_server = value |
| |
| @property |
| def verbose(self): |
| """True to run swarming scripts with verbose output.""" |
| return self._verbose |
| |
| @verbose.setter |
| def verbose(self, value): |
| """Enables or disables verbose output in swarming scripts.""" |
| assert isinstance(value, bool), value |
| self._verbose = value |
| |
| @property |
| def default_expiration(self): |
| """Number of seconds that the server will wait to find a bot able to run the |
| task. |
| |
| If not bot runs the task by this number of seconds, the task is canceled as |
| EXPIRED. |
| |
| This value can be changed per individual task. |
| """ |
| return self._default_expiration |
| |
| @default_expiration.setter |
| def default_expiration(self, value): |
| assert 30 <= value <= 24*60*60, value |
| self._default_expiration = value |
| |
| @property |
| def default_hard_timeout(self): |
| """Number of seconds in which the task must complete. |
| |
| If the task takes more than this amount of time, the process is assumed to |
| be hung. It forcibly killed via SIGTERM then SIGKILL after a grace period |
| (default: 30s). Then the task is marked as TIMED_OUT. |
| |
| This value can be changed per individual task. |
| """ |
| return self._default_hard_timeout |
| |
| @default_hard_timeout.setter |
| def default_hard_timeout(self, value): |
| assert 30 <= value <= 6*60*60, value |
| self._default_hard_timeout = value |
| |
| @property |
| def default_io_timeout(self): |
| """Number of seconds at which interval the task must write to stdout or |
| stderr. |
| |
| If the task takes more than this amount of time between writes to stdout or |
| stderr, the process is assumed to be hung. It forcibly killed via SIGTERM |
| then SIGKILL after a grace period (default: 30s). Then the task is marked as |
| TIMED_OUT. |
| |
| This value can be changed per individual task. |
| """ |
| return self._default_io_timeout |
| |
| @default_io_timeout.setter |
| def default_io_timeout(self, value): |
| assert 30 <= value <= 6*60*60, value |
| self._default_io_timeout = value |
| |
| @property |
| def default_idempotent(self): |
| """Bool to specify if task deduplication can be done. |
| |
| When set, the server will search for another task that ran in the last days |
| that had the exact same properties. If it finds one, the task will not be |
| run at all, the previous results will be returned as-is. |
| |
| For more infos, see: |
| https://github.com/luci/luci-py/blob/master/appengine/swarming/doc/User-Guide.md#task-idempotency |
| |
| This value can be changed per individual task. |
| """ |
| return self._default_idempotent |
| |
| @default_idempotent.setter |
| def default_idempotent(self, value): |
| assert isinstance(value, bool), value |
| self._default_idempotent = value |
| |
| @property |
| def default_user(self): |
| """String to represent who triggered the task. |
| |
| The user should be an email address when someone requested testing via |
| pre-commit or manual testing. |
| |
| This value can be changed per individual task. |
| """ |
| return self._default_user |
| |
| @default_user.setter |
| def default_user(self, value): |
| assert value is None or isinstance(value, basestring), value |
| self._default_user = value |
| |
| @property |
| def default_dimensions(self): |
| """Returns a copy of the default Swarming dimensions to run task on. |
| |
| The dimensions are what is used to filter which bots are able to run the |
| task successfully. This is particularly useful to discern between OS |
| versions, type of CPU, GPU card or VM, or preallocated pool. |
| |
| Example: |
| {'cpu': 'x86-64', 'os': 'Windows-XP-SP3'} |
| |
| This value can be changed per individual task. |
| """ |
| return ReadOnlyDict(self._default_dimensions) |
| |
| def set_default_dimension(self, key, value): |
| assert isinstance(key, basestring), key |
| assert isinstance(value, basestring) or value is None, value |
| if value is None: |
| self._default_dimensions.pop(key, None) |
| else: |
| self._default_dimensions[key] = value |
| |
| @property |
| def default_env(self): |
| """Returns a copy of the default environment variable to run tasks with. |
| |
| By default the environment variable is not modified. Additional environment |
| variables can be specified for each task. |
| |
| This value can be changed per individual task. |
| """ |
| return ReadOnlyDict(self._default_env) |
| |
| def set_default_env(self, key, value): |
| assert isinstance(key, basestring), key |
| assert isinstance(value, basestring), value |
| self._default_env[key] = value |
| |
| @property |
| def default_priority(self): |
| """Swarming task priority for tasks triggered from the recipe. |
| |
| Priority ranges from 1 to 255. The lower the value, the most important the |
| task is and will preempty any task with a lower priority. |
| |
| This value can be changed per individual task. |
| """ |
| return self._default_priority |
| |
| @default_priority.setter |
| def default_priority(self, value): |
| assert 1 <= value <= 255 |
| self._default_priority = value |
| |
| def add_default_tag(self, tag): |
| """Adds a tag to the Swarming tasks triggered. |
| |
| Tags are used for maintenance, they can be used to calculate the number of |
| tasks run for a day to calculate the cost of a type of type (CQ, ASAN, etc). |
| |
| Tags can be added per individual task. |
| """ |
| assert ':' in tag, tag |
| self._default_tags.add(tag) |
| |
| @property |
| def show_outputs_ref_in_collect_step(self): |
| """Show the shard's isolated out link in each collect step.""" |
| return self._show_outputs_ref_in_collect_step |
| |
| @show_outputs_ref_in_collect_step.setter |
| def show_outputs_ref_in_collect_step(self, value): |
| self._show_outputs_ref_in_collect_step = value |
| |
| @property |
| def show_shards_in_collect_step(self): |
| """Show the shard link in each collect step.""" |
| return self._show_shards_in_collect_step |
| |
| @show_shards_in_collect_step.setter |
| def show_shards_in_collect_step(self, value): |
| self._show_shards_in_collect_step = value |
| |
| @staticmethod |
| def prefered_os_dimension(platform): |
| """Given a platform name returns the prefered Swarming OS dimension. |
| |
| Platform name is usually provided by 'platform' recipe module, it's one |
| of 'win', 'linux', 'mac'. This function returns more concrete Swarming OS |
| dimension that represent this platform on Swarming by default. |
| |
| Recipes are free to use other OS dimension if there's a need for it. For |
| example WinXP try bot recipe may explicitly specify 'Windows-XP-SP3' |
| dimension. |
| """ |
| return { |
| 'linux': 'Ubuntu-14.04', |
| 'mac': 'Mac-10.13', |
| 'win': 'Windows-7-SP1', |
| }[platform] |
| |
| def task(self, title, isolated_hash, ignore_task_failure=False, shards=1, |
| task_output_dir=None, extra_args=None, idempotent=None, |
| cipd_packages=None, build_properties=None, merge=None, |
| trigger_script=None, named_caches=None, service_account=None, |
| raw_cmd=None, env_prefixes=None, env=None, optional_dimensions=None): |
| """Returns a new SwarmingTask instance to run an isolated executable on |
| Swarming. |
| |
| For google test executables, use gtest_task() instead. |
| |
| At the time of this writting, this code is used by V8, Skia and iOS. |
| |
| The return value can be customized if necessary (see SwarmingTask class |
| below). Pass it to 'trigger_task' to launch it on swarming. Later pass the |
| same instance to 'collect_task' to wait for the task to finish and fetch its |
| results. |
| |
| Args: |
| * title: name of the test, used as part of a task ID. |
| * isolated_hash: hash of isolated test on isolate server, the test should |
| be already isolated there, see 'isolate' recipe module. |
| * ignore_task_failure: whether to ignore the test failure of swarming |
| tasks. By default, this is set to False. |
| * shards: if defined, the number of shards to use for the task. By default |
| this value is either 1 or based on the title. |
| * task_output_dir: if defined, the directory where task results are |
| placed. The caller is responsible for removing this folder when |
| finished. |
| * extra_args: list of command line arguments to pass to isolated tasks. |
| * idempotent: whether this task is considered idempotent. Defaults |
| to self.default_idempotent if not specified. |
| * cipd_packages: list of 3-tuples corresponding to CIPD packages needed |
| for the task: ('path', 'package_name', 'version'), defined as |
| follows: |
| * path: Path relative to the Swarming root dir in which to install |
| the package. |
| * package_name: Name of the package to install, |
| eg. "infra/tools/luci-auth/${platform}" |
| * version: Version of the package, either a package instance ID, |
| ref, or tag key/value pair. |
| * build_properties: An optional dict containing various build properties. |
| These are typically but not necessarily the properties emitted by |
| bot_update. |
| * merge: An optional dict containing: |
| * "script": path to a script to call to post process and merge the |
| collected outputs from the tasks. The script should take one |
| named (but required) parameter, '-o' (for output), that represents |
| the path that the merged results should be written to, and accept |
| N additional paths to result files to merge. The merged results |
| should be in the JSON Results File Format |
| (https://www.chromium.org/developers/the-json-test-results-format) |
| and may optionally contain a top level "links" field that |
| may contain a dict mapping link text to URLs, for a set of |
| links that will be included in the buildbot output. |
| * "args": an optional list of additional arguments to pass to the |
| above script. |
| * trigger_script: An optional dict containing: |
| * "script": path to a script to call which will use custom logic to |
| trigger appropriate swarming jobs, using swarming.py. |
| * "args": an optional list of additional arguments to pass to the |
| script. |
| See SwarmingTask.__init__ docstring for more details. |
| * named_caches: a dict {name: relpath} requesting a cache named `name` |
| to be installed in `relpath` relative to the task root directory. |
| * service_account: (string) a service account email to run the task under. |
| * raw_cmd: Optional list of arguments to be used as raw command. Can be |
| used instead of extra args. |
| * env_prefixes: a dict {ENVVAR: [relative, paths]} which instructs |
| swarming to prepend the given relative paths to the PATH-style ENVVAR |
| specified. |
| * env: a dict {ENVVAR: ENVVALUE} which instructs swarming to set the |
| environment variables before invoking the command. These are applied |
| on top of the default environment variables. |
| * optional_dimensions: {expiration: [{key: value]} mapping with swarming |
| dimensions that specify on what Swarming slaves tasks can run. These |
| are similar to what is specified in dimensions but will create |
| additional 'fallback' task slice(s) with the optional dimensions. |
| """ |
| if idempotent is None: |
| idempotent = self.default_idempotent |
| |
| spec_name = '' |
| builder_id = self.m.buildbucket.build.builder |
| if builder_id.bucket and builder_id.project: |
| spec_name = '%s.%s:%s' % ( |
| builder_id.project, builder_id.bucket, builder_id.builder) |
| |
| init_env = dict(self.default_env) |
| if env: |
| init_env.update(env) |
| |
| return SwarmingTask( |
| title=title, |
| isolated_hash=isolated_hash, |
| dimensions=self._default_dimensions, |
| env=init_env, |
| priority=self.default_priority, |
| shards=shards, |
| spec_name=spec_name, |
| buildername=self.m.properties.get('buildername'), |
| buildnumber=self.m.properties.get('buildnumber'), |
| user=self.default_user, |
| expiration=self.default_expiration, |
| io_timeout=self.default_io_timeout, |
| hard_timeout=self.default_hard_timeout, |
| idempotent=idempotent, |
| ignore_task_failure=ignore_task_failure, |
| extra_args=extra_args, |
| collect_step=self._default_collect_step, |
| task_output_dir=task_output_dir, |
| cipd_packages=cipd_packages or [], |
| build_properties=build_properties, |
| merge=merge, |
| trigger_script=trigger_script, |
| named_caches=named_caches, |
| service_account=service_account, |
| raw_cmd=raw_cmd, |
| env_prefixes=env_prefixes, |
| optional_dimensions=optional_dimensions, |
| ) |
| |
| def gtest_task(self, title, isolated_hash, test_launcher_summary_output=None, |
| extra_args=None, cipd_packages=None, merge=None, **kwargs): |
| """Returns a new SwarmingTask instance to run an isolated gtest on Swarming. |
| |
| Swarming recipe module knows how collect and interpret JSON files with test |
| execution summary produced by chromium test launcher. It will combine JSON |
| results from multiple shards and place it in path provided by |
| |test_launcher_summary_output| placeholder. |
| |
| For meaning of the rest of the arguments see 'task' method. |
| """ |
| extra_args = list(extra_args or []) |
| |
| # Ensure --test-launcher-summary-output is not already passed. We are going |
| # to overwrite it. |
| bad_args = any( |
| x.startswith('--test-launcher-summary-output=') for x in extra_args) |
| if bad_args: # pragma: no cover |
| raise ValueError('--test-launcher-summary-output should not be used.') |
| |
| # Append it. output.json name is expected by collect_task.py. |
| extra_args.append( |
| '--test-launcher-summary-output=${ISOLATED_OUTDIR}/output.json') |
| |
| merge = merge or {'script': self.resource('standard_gtest_merge.py')} |
| |
| # Make a task, configure it to be collected through shim script. |
| task = self.task(title, isolated_hash, extra_args=extra_args, |
| cipd_packages=cipd_packages, merge=merge, **kwargs) |
| task.collect_step = lambda *args, **kw: ( |
| self._gtest_collect_step(test_launcher_summary_output, *args, **kw)) |
| return task |
| |
| def _check_and_set_output_flag(self, extra_args, flag, output_file_name): |
| extra_args = list(extra_args or []) |
| # Ensure flag is not already passed. We are going to overwrite it. |
| flag_value = '--%s=' % flag |
| bad_args = any(x.startswith(flag_value) for x in extra_args) |
| if bad_args: # pragma: no cover |
| raise ValueError('--%s should not be used' % flag) |
| |
| # Append it. |
| output_arg = '--%s=${ISOLATED_OUTDIR}/%s' % (flag, output_file_name) |
| extra_args.append(output_arg) |
| return extra_args |
| |
| def isolated_script_task(self, title, isolated_hash, extra_args=None, |
| idempotent=False, merge=None, **kwargs): |
| """Returns a new SwarmingTask to run an isolated script test on Swarming. |
| |
| At the time of this writting, this code is used by WebRTC and |
| "isolated_scripts" entries in Chromium's src/testing/buildbot/*.json. |
| |
| Swarming recipe module knows how collect JSON file with test execution |
| summary produced by isolated script tests launcher. A custom script |
| can be passed to merge the collected results and post-process them. |
| |
| For meaning of the rest of the arguments see 'task' method. |
| """ |
| |
| # Ensure output flags are not already passed. We are going |
| # to overwrite them. |
| # output.json name is expected by collect_task.py. |
| extra_args = self._check_and_set_output_flag( |
| extra_args, 'isolated-script-test-output', 'output.json') |
| # perftest-output.json name is expected by benchmarks generating chartjson |
| # or histogram output |
| extra_args = self._check_and_set_output_flag( |
| extra_args, |
| 'isolated-script-test-perf-output', |
| 'perftest-output.json') |
| |
| merge = merge or { |
| 'script': self.resource('standard_isolated_script_merge.py') |
| } |
| |
| task = self.task(title, isolated_hash, extra_args=extra_args, |
| idempotent=idempotent, merge=merge, **kwargs) |
| task.collect_step = self._isolated_script_collect_step |
| return task |
| |
| def check_client_version(self, step_test_data=None): |
| """Yields steps to verify compatibility with swarming_client version.""" |
| return self.m.swarming_client.ensure_script_version( |
| 'swarming.py', MINIMAL_SWARMING_VERSION, step_test_data) |
| |
| def trigger_task(self, task, **kwargs): |
| """Triggers one task. |
| |
| It the task is sharded, will trigger all shards. This steps justs posts |
| the task and immediately returns. Use 'collect_task' to wait for a task to |
| finish and grab its result. |
| |
| Behaves as a regular recipe step: returns StepData with step results |
| on success or raises StepFailure if step fails. |
| |
| Args: |
| task: SwarmingTask instance. |
| kwargs: passed to recipe step constructor as-is. |
| """ |
| assert isinstance(task, SwarmingTask) |
| assert task.task_name not in self._pending_tasks, ( |
| 'Triggered same task twice: %s' % task.task_name) |
| assert 'os' in task.dimensions, task.dimensions |
| self._pending_tasks.add(task.task_name) |
| |
| # Mix in standard infra packages 'vpython' and 'logdog' so that the task can |
| # always access them on $PATH. |
| cipd_packages = list(task.cipd_packages or ()) |
| for pkg in cipd_packages: |
| assert not pkg[0].startswith(IMPLIED_BINARY_PATH), \ |
| 'cipd_packages may not be installed to %r' % (IMPLIED_BINARY_PATH,) |
| |
| to_add = dict(IMPLIED_CIPD_BINARIES) |
| |
| # HACK(crbug.com/812693) - we don't support cpython on arm yet, so remove |
| # it from packages to inject. |
| # HACK(crbug.com/842234): We also don't support CPython on mips. |
| cpu_dimension = task.dimensions.get('cpu', '') |
| if 'arm' in cpu_dimension or 'mips' in cpu_dimension: |
| for k in to_add.keys(): |
| if 'cpython' in k: |
| to_add.pop(k) |
| |
| path_env_prefix = set() |
| for pkg, (subdir, vers) in sorted(to_add.items()): |
| path_env_prefix.add('/'.join((IMPLIED_BINARY_PATH, subdir)) if subdir |
| else IMPLIED_BINARY_PATH) |
| vers = 'TEST_VERSION' if self._test_data.enabled else vers |
| cipd_packages.append((IMPLIED_BINARY_PATH, pkg, vers)) |
| |
| # update implied caches |
| named_caches = dict(task.named_caches or {}) |
| named_caches.update({ |
| '_'.join((IMPLIED_CACHE_NAME, k)): '/'.join((IMPLIED_CACHE_BASE,v)) |
| for k, v in IMPLIED_CACHES.iteritems() |
| }) |
| |
| # update $PATH |
| env_prefixes = dict(task.env_prefixes or {}) # copy it |
| env_prefixes.setdefault('PATH', [])[:0] = sorted( # prepend stuff |
| path_env_prefix, key=lambda x: (len(x), x)) |
| for k, path in IMPLIED_ENV_PREFIXES.iteritems(): |
| env_prefixes.setdefault(k, [path]) |
| |
| # Trigger parameters. |
| args = [ |
| 'trigger', |
| '--swarming', self.swarming_server, |
| '--isolate-server', self.m.isolate.isolate_server, |
| '--priority', str(task.priority), |
| '--shards', str(task.shards), |
| '--task-name', task.task_name, |
| '--dump-json', self.m.json.output(), |
| '--expiration', str(task.expiration), |
| '--io-timeout', str(task.io_timeout), |
| '--hard-timeout', str(task.hard_timeout), |
| ] |
| |
| for name, value in sorted(task.dimensions.iteritems()): |
| assert isinstance(value, basestring), \ |
| 'dimension %s is not a string: %s' % (name, value) |
| args.extend(['--dimension', name, value]) |
| |
| if task.optional_dimensions: |
| for exp, dimensions in task.optional_dimensions.iteritems(): |
| for d in dimensions: |
| for name, value in d.iteritems(): |
| assert isinstance(value, basestring), \ |
| 'optional-dimension %s is not a string: %s' % (name, value) |
| args.extend(['--optional-dimension', name, value, exp]) |
| |
| for name, value in sorted(task.env.iteritems()): |
| assert isinstance(value, basestring), \ |
| 'env var %s is not a string: %s' % (name, value) |
| args.extend(['--env', name, value]) |
| |
| for name, relpath in sorted(named_caches.iteritems()): |
| args.extend(['--named-cache', name, relpath]) |
| |
| if task.service_account: |
| args.extend(['--service-account', task.service_account]) |
| |
| if self.service_account_json: |
| args.extend(['--auth-service-account-json', self.service_account_json]) |
| |
| if task.wait_for_capacity: |
| args.append('--wait-for-capacity') |
| |
| # Default tags. |
| tags = set(task.tags) |
| tags.update(self._default_tags) |
| tags.add('data:' + task.isolated_hash) |
| tags.add('name:' + task.title.split(' ')[0]) |
| mastername = self.m.properties.get('mastername') |
| if mastername: |
| tags.add('master:' + mastername) |
| if task.spec_name: |
| tags.add('spec_name:' + task.spec_name) |
| if task.buildername: |
| tags.add('buildername:' + task.buildername) |
| if task.buildnumber: |
| tags.add('buildnumber:%s' % task.buildnumber) |
| if self.m.properties.get('bot_id'): |
| tags.add('slavename:%s' % self.m.properties['bot_id']) |
| tags.add('stepname:%s' % self.get_step_name('', task)) |
| for cl in self.m.buildbucket.build.input.gerrit_changes: |
| tags.add('gerrit:https://%s/c/%s/%s' % (cl.host, cl.change, cl.patchset)) |
| for tag in sorted(tags): |
| assert ':' in tag, tag |
| args.extend(['--tag', tag]) |
| |
| if self.verbose: |
| args.append('--verbose') |
| if task.idempotent: |
| args.append('--idempotent') |
| if task.user: |
| args.extend(['--user', task.user]) |
| |
| if cipd_packages: |
| for path, pkg, version in cipd_packages: |
| args.extend(['--cipd-package', '%s:%s:%s' % (path, pkg, version)]) |
| |
| if env_prefixes: |
| for key, paths in sorted(env_prefixes.items()): |
| for path in paths: |
| args.extend(('--env-prefix', key, path)) |
| |
| # What isolated command to trigger. |
| args.extend(('--isolated', task.isolated_hash)) |
| |
| # Use a raw command as extra-args on tasks without command. |
| if task.raw_cmd: |
| # Allow using only one of raw_cmd or extra_args. |
| assert not task.extra_args |
| args.append('--raw-cmd') |
| |
| # Additional command line args for isolated command. |
| if task.extra_args or task.raw_cmd: |
| args.append('--') |
| args.extend(task.extra_args or task.raw_cmd) |
| |
| script = self.m.swarming_client.path.join('swarming.py') |
| if task.trigger_script: |
| script = task.trigger_script['script'] |
| if task.trigger_script.get('args'): |
| args = task.trigger_script['args'] + args |
| |
| # The step can fail only on infra failures, so mark it as 'infra_step'. |
| try: |
| return self.m.python( |
| name=self.get_step_name('trigger', task), |
| script=script, args=args, |
| step_test_data=functools.partial( |
| self._gen_trigger_step_test_data, task), |
| infra_step=True, |
| **kwargs) |
| finally: |
| # Store trigger output with the |task|, print links to triggered shards. |
| step_result = self.m.step.active_result |
| step_result.presentation.step_text += text_for_task(task) |
| |
| if step_result.presentation != self.m.step.FAILURE: |
| task._trigger_output = step_result.json.output |
| links = step_result.presentation.links |
| for index in xrange(task.shards): |
| url = task.get_shard_view_url(index) |
| if url: |
| links['shard #%d' % index] = url |
| assert not hasattr(step_result, 'swarming_task') |
| step_result.swarming_task = task |
| |
| def collect_task(self, task, **kwargs): |
| """Waits for a single triggered task to finish. |
| |
| If the task is sharded, will wait for all shards to finish. Behaves as |
| a regular recipe step: returns StepData with step results on success or |
| raises StepFailure if task fails. |
| |
| Args: |
| task: SwarmingTask instance, previously triggered with 'trigger' method. |
| kwargs: passed to recipe step constructor as-is. |
| """ |
| # TODO(vadimsh): Raise InfraFailure on Swarming failures. |
| assert isinstance(task, SwarmingTask) |
| assert task.task_name in self._pending_tasks, ( |
| 'Trying to collect a task that was not triggered: %s' % |
| task.task_name) |
| self._pending_tasks.remove(task.task_name) |
| |
| try: |
| return task.collect_step(task, **kwargs) |
| finally: |
| try: |
| self.m.step.active_result.swarming_task = task |
| except Exception: # pragma: no cover |
| # If we don't have an active_result, something failed very early, |
| # so we eat this exception and let that one propagate. |
| pass |
| |
| def report_stats(self): |
| """Report statistics on all tasks ran so far.""" |
| if not self._shards_durations: |
| return |
| stats = ['Total shards: %d' % len(self._shards_durations)] |
| total = sum(self._shards_durations) |
| mean = total / len(self._shards_durations) |
| stats.extend([ |
| 'Total runtime: %s ' % fmt_time(total), |
| ]) |
| detailed_stats = stats + [ |
| 'Min/mean/max: %s / %s / %s' % ( |
| fmt_time(min(self._shards_durations)), |
| fmt_time(mean), |
| fmt_time(max(self._shards_durations)), |
| ), |
| ] |
| step_text = self.m.test_utils.format_step_text([ |
| ('Stats', stats)]) |
| result = self.m.python.succeeding_step('Tests statistics', step_text) |
| result.presentation.logs['detailed stats'] = detailed_stats |
| |
| @staticmethod |
| def _display_pending(shards, step_presentation): |
| """Shows max pending time in seconds across all shards if it exceeds 10s, |
| and also displays the min and max shard duration accross all shards.""" |
| max_pending = (-1, None) |
| max_duration = (-1, None) |
| min_duration = (None, None) |
| for i, shard in enumerate(shards): |
| if not shard or not shard.get('started_ts'): |
| continue |
| |
| created = parse_time(shard['created_ts']) |
| started = parse_time(shard['started_ts']) |
| |
| pending = (started - created).total_seconds() |
| if pending > max_pending[0]: |
| max_pending = (pending, i) |
| |
| if shard.get('completed_ts'): |
| duration = (parse_time(shard['completed_ts']) - started).total_seconds() |
| if duration > max_duration[0]: |
| max_duration = (duration, i) |
| if min_duration[0] is None or duration < min_duration[0]: |
| min_duration = (duration, i) |
| |
| # Only display annotation when pending more than 10 seconds to reduce noise. |
| if max_pending[0] > 10: |
| prefix = 'P' if len(shards) <= 1 else 'Max p' |
| suffix = '' if len(shards) <= 1 else ' (shard #%d)' % max_pending[1] |
| step_presentation.step_text += ( |
| '<br>%sending time: %s%s' % (prefix, fmt_time(max_pending[0]), suffix)) |
| |
| if max_duration[0] > 0: |
| prefix = 'S' if len(shards) <= 1 else 'Max s' |
| suffix = '' if len(shards) <= 1 else ' (shard #%d)' % max_duration[1] |
| step_presentation.step_text += ( |
| '<br>%shard duration: %s%s' % ( |
| prefix, fmt_time(max_duration[0]), suffix)) |
| |
| if min_duration[0] is not None and len(shards) > 1: |
| step_presentation.step_text += ( |
| '<br>Min shard duration: %s (shard #%d)' % ( |
| fmt_time(min_duration[0]), min_duration[1])) |
| |
| def _default_collect_step( |
| self, task, merged_test_output=None, name=None, step_test_data=None, |
| **kwargs): |
| """Produces a step that collects a result of an arbitrary task.""" |
| task_output_dir = task.task_output_dir or self.m.raw_io.output_dir() |
| |
| # If we don't already have a Placeholder, wrap the task_output_dir in one |
| # so we can read out of it later w/ step_result.raw_io.output_dir. |
| if not isinstance(task_output_dir, recipe_util.Placeholder): |
| task_output_dir = self.m.raw_io.output_dir(leak_to=task_output_dir) |
| |
| task_args = [ |
| '-o', merged_test_output or self.m.json.output(), |
| '--task-output-dir', task_output_dir, |
| ] |
| |
| merge_script = (task.merge.get('script') |
| or self.resource('noop_merge.py')) |
| merge_args = (task.merge.get('args') or []) |
| |
| task_args.extend([ |
| '--merge-script', merge_script, |
| '--merge-script-stdout-file', |
| self.m.raw_io.output('merge_script_log'), |
| '--merge-additional-args', self.m.json.dumps(merge_args), |
| ]) |
| |
| if task.build_properties: |
| properties = dict(task.build_properties) |
| # exclude any recipe-engine-controlling properties (starting with $) |
| properties.update((k, v) for k, v in self.m.properties.thaw().iteritems() |
| if not k.startswith('$')) |
| task_args.extend([ |
| '--build-properties', self.m.json.dumps(properties), |
| ]) |
| |
| task_args.append('--') |
| # Arguments for the actual 'collect' command. |
| collect_cmd = [ |
| 'python', |
| '-u', |
| self.m.swarming_client.path.join('swarming.py'), |
| ] |
| collect_cmd.extend(self.get_collect_cmd_args(task)) |
| collect_cmd.extend([ |
| '--task-summary-json', self.summary(), |
| ]) |
| |
| task_args.extend(collect_cmd) |
| |
| allowed_return_codes = {0} |
| if task.ignore_task_failure: |
| allowed_return_codes = 'any' |
| |
| # The call to collect_task emits two JSON files and one text file: |
| # 1) a task summary JSON emitted by swarming |
| # 2) a gtest results JSON emitted by the task |
| # 3) a merge script stdout/stderr log emitted by the task |
| # This builds an instance of StepTestData that covers all of them. |
| step_test_data = step_test_data or ( |
| self.test_api.canned_summary_output(task.shards) + |
| self.m.json.test_api.output({}) + |
| self.m.raw_io.test_api.output( |
| 'Successfully merged all data')) |
| |
| try: |
| with self.m.context(cwd=self.m.path['start_dir']): |
| return self.m.build.python( |
| name=name or self.get_step_name('', task), |
| script=self.resource('collect_task.py'), |
| args=task_args, |
| ok_ret=allowed_return_codes, |
| step_test_data=lambda: step_test_data, |
| **kwargs) |
| finally: |
| step_result = None |
| try: |
| step_result = self.m.step.active_result |
| if step_result is not None: |
| step_result.presentation.step_text = text_for_task(task) |
| |
| step_result.presentation.logs['Merge script log'] = [ |
| step_result.raw_io.output] |
| |
| links = {} |
| if hasattr(step_result, 'json') and hasattr( |
| step_result.json, 'output'): |
| links = step_result.json.output.get('links', {}) |
| elif (hasattr(step_result, 'test_utils') and |
| hasattr(step_result.test_utils, 'gtest_results')): |
| links = step_result.test_utils.gtest_results.raw.get('links', {}) |
| for k, v in links.iteritems(): |
| step_result.presentation.links[k] = v |
| |
| summary_json = step_result.swarming.summary |
| self._handle_summary_json(task, summary_json, step_result) |
| |
| except self.m.step.StepFailure: |
| # Make sure that, if _handle_summary_json raises an StepFailure, it |
| # correctly propogates. |
| raise |
| except Exception as e: |
| if step_result is not None: |
| step_result.presentation.logs['no_results_exc'] = [ |
| str(e), '\n', self.m.traceback.format_exc()] |
| |
| def _gtest_collect_step(self, merged_test_output, task, **kwargs): |
| """Produces a step that collects and processes a result of google-test task. |
| """ |
| # Where to put combined summary to, consumed by recipes. Also emit |
| # test expectation only if |merged_test_output| is really used. |
| gtest_results_test_data = kwargs.pop('step_test_data', None) |
| if merged_test_output and not gtest_results_test_data: |
| gtest_results_test_data = ( |
| self.m.test_utils.test_api.canned_gtest_output(True)) |
| |
| # The call to collect_task emits two JSON files and a test file: |
| # 1) a task summary JSON emitted by swarming |
| # 2) a gtest results JSON emitted by the task |
| # 3) a log file that stores stdout/stderr of task |
| # This builds an instance of StepTestData that covers all three. |
| step_test_data = ( |
| self.test_api.canned_summary_output(shards=task.shards) + |
| gtest_results_test_data + |
| self.test_api.merge_script_log_file('Gtest merged successfully')) |
| try: |
| return self._default_collect_step( |
| task, |
| merged_test_output=merged_test_output, |
| step_test_data=step_test_data, |
| allow_subannotations=True, |
| **kwargs) |
| finally: |
| # HACK: it is assumed that caller used 'api.test_utils.gtest_results' |
| # placeholder for 'test_launcher_summary_output' parameter when calling |
| # gtest_task(...). It's not enforced in any way. |
| step_result = self.m.step.active_result |
| |
| gtest_results = self.m.test_utils.present_gtest_failures(step_result) |
| if gtest_results and gtest_results.valid: |
| p = step_result.presentation |
| missing_shards = gtest_results.raw.get('missing_shards') or [] |
| if missing_shards: |
| step_result.presentation.status = self.m.step.EXCEPTION |
| for index in missing_shards: |
| p.links['missing shard #%d' % index] = \ |
| task.get_shard_view_url(index) |
| |
| swarming_summary = step_result.swarming.summary |
| |
| # Show any remaining isolated outputs (such as logcats). |
| # Note that collect_task.py uses the default summary.json, which |
| # only has 'outputs_ref' instead of the deprecated 'isolated_out'. |
| for index, shard in enumerate(swarming_summary.get('shards', [])): |
| if not shard: |
| continue |
| |
| outputs_ref = shard.get('outputs_ref') |
| if outputs_ref: |
| link_name = 'shard #%d isolated out' % index |
| p.links[link_name] = '%s/browse?namespace=%s&hash=%s' % ( |
| outputs_ref['isolatedserver'], outputs_ref['namespace'], |
| outputs_ref['isolated']) |
| |
| def _merge_isolated_script_perftest_output_shards(self, task, step_result): |
| # Taken from third_party/catapult/telemetry/telemetry/internal/results/ |
| # chart_json_output_formatter.py, the json entries are as follows: |
| # result_dict = { |
| # 'format_version': '0.1', |
| # 'next_version': '0.2', |
| # 'benchmark_name': benchmark_metadata.name, |
| # 'benchmark_description': benchmark_metadata.description, |
| # 'trace_rerun_options': benchmark_metadata.rerun_options, |
| # 'benchmark_metadata': benchmark_metadata.AsDict(), |
| # 'charts': charts, |
| # } |
| # |
| # Therefore, all entries should be the same and we should only need to merge |
| # the chart from each shard. |
| collected_results = [] |
| for i in xrange(task.shards): |
| path = self.m.path.join(str(i), 'perftest-output.json') |
| if path not in step_result.raw_io.output_dir: |
| # perf test results were not written for this shard, not an error, |
| # just continue to the next shard |
| continue |
| results_raw = step_result.raw_io.output_dir[path] |
| try: |
| perf_results_json = self.m.json.loads(results_raw) |
| except Exception as e: |
| raise Exception( |
| 'error decoding chart JSON results from shard #%d\n%s\n%s' % ( |
| i, |
| str(e), |
| self.m.traceback.format_exc())) |
| collected_results.append(perf_results_json) |
| |
| if collected_results: |
| # If the first result is a dict, we assume that we're dealing with |
| # chart JSON. By contrast, HistogramSets are serialized as lists. |
| if isinstance(collected_results[0], dict): |
| return self._merge_chartjson_results(collected_results), False |
| elif isinstance(collected_results[0], list): |
| return self._merge_histogram_results(collected_results), True |
| |
| return {}, False |
| |
| def _merge_chartjson_results(self, chartjson_dicts): |
| merged_results = chartjson_dicts[0] |
| for chartjson_dict in chartjson_dicts[1:]: |
| for key in chartjson_dict: |
| if key == 'charts': |
| for add_key in chartjson_dict[key]: |
| merged_results[key][add_key] = chartjson_dict[key][add_key] |
| |
| return merged_results |
| |
| def _merge_histogram_results(self, histogram_lists): |
| merged_results = [] |
| for histogram_list in histogram_lists: |
| merged_results += histogram_list |
| |
| return merged_results |
| |
| def wait_for_finished_task_set(self, task_sets, suffix=None, attempts=0): |
| """Waits for a finished set of tasks. |
| |
| Args: |
| task_sets: A list of lists. Each item in task_sets is a set of tasks, |
| which should be collected together. |
| suffix: An optional name suffix. |
| attempts: How many times have we polled swarming for this data. Used |
| to retry at a slower rate, so we don't overload the server |
| with requests. |
| |
| Returns: |
| A tuple of two items: |
| 1. A list of task sets which have finished. |
| 2. How many attempts we've now made to get task data. |
| |
| Uses the 'get_states' endpoint on the swarming server.""" |
| args = [ |
| '--swarming-server', self.swarming_server, |
| '--swarming-py-path', self.m.swarming_client.path.join('swarming.py'), |
| '--output-json', self.m.json.output(), |
| '--input-json', self.m.json.input(data=task_sets), |
| '--attempts', attempts, |
| '--verbose', |
| ] |
| |
| if self.service_account_json: |
| args.extend(['--auth-service-account-json', self.service_account_json]) |
| |
| result = self.m.python( |
| 'wait for tasks%s' % (suffix or ''), |
| self.resource('wait_for_finished_task_set.py'), |
| step_test_data=lambda: self.m.json.test_api.output(data={ |
| 'attempts': 0, |
| 'sets': task_sets, |
| }), |
| args=args) |
| return [ |
| tuple(task_set) for task_set in result.json.output['sets'] |
| ], result.json.output['attempts'] |
| |
| def _isolated_script_collect_step(self, task, **kwargs): |
| """Collects results for a step that is *not* a googletest, like telemetry. |
| """ |
| isolated_script_results_test_data = kwargs.pop('step_test_data', None) |
| if not isolated_script_results_test_data: |
| isolated_script_results_test_data = ( |
| self.m.test_utils.test_api.canned_isolated_script_output( |
| passing=True, is_win=self.m.platform.is_win, swarming=True, |
| use_json_test_format=True, shards=task.shards)) |
| |
| # The call to collect_isolated_script_task emits two JSON files: |
| # 1) a task summary JSON emitted by swarming |
| # 2) a test results JSON emitted by the task |
| # This builds an instance of StepTestData that covers both. |
| step_test_data = ( |
| self.test_api.canned_summary_output(task.shards) + |
| isolated_script_results_test_data + |
| self.test_api.merge_script_log_file('Merged succesfully')) |
| |
| try: |
| return self._default_collect_step( |
| task, step_test_data=step_test_data, **kwargs) |
| finally: |
| # Regardless of the outcome of the test (pass or fail), we try to parse |
| # the results. If any error occurs while parsing results, then we set them |
| # to None, which caller should treat as invalid results. |
| # Note that try-except block below will not mask the |
| # recipe_api.StepFailure exception from the collect step above. Instead |
| # it is being allowed to propagate after the results have been parsed. |
| try: |
| step_result = self.m.step.active_result |
| |
| if step_result is not None: |
| outdir = filter_outdir( |
| self.m.json.dumps, step_result.raw_io.output_dir) |
| outdir_json = self.m.json.dumps(outdir, indent=2) |
| step_result.presentation.logs['outdir_json'] = ( |
| outdir_json.splitlines()) |
| |
| step_result.isolated_script_results = step_result.json.output |
| |
| # Obtain perftest results if present |
| perftest_results, is_histogramset = \ |
| self._merge_isolated_script_perftest_output_shards( |
| task, step_result) |
| step_result.isolated_script_perf_results = { |
| 'is_histogramset': is_histogramset, |
| 'data': perftest_results |
| } |
| |
| except Exception as e: |
| if self.m.step.active_result is not None: |
| self.m.step.active_result.presentation.logs[ |
| 'no_isolated_results_exc'] = [ |
| str(e), '\n', self.m.traceback.format_exc()] |
| self.m.step.active_result.isolated_script_results = None |
| |
| def get_step_name(self, prefix, task): |
| """SwarmingTask -> name of a step of a waterfall. |
| |
| Will take a task title (+ step name prefix) and append OS dimension to it. |
| |
| Args: |
| prefix: prefix to append to task title, like 'trigger'. |
| task: SwarmingTask instance. |
| |
| Returns: |
| '[<prefix>] <task title> on <OS>' |
| """ |
| prefix = '[%s] ' % prefix if prefix else '' |
| task_os = task.dimensions['os'] |
| |
| bot_os = self.prefered_os_dimension(self.m.platform.name) |
| suffix = ('' if ( |
| task_os == bot_os or task_os.lower() == self.m.platform.name.lower() or |
| task_os in task.title) |
| else ' on %s' % task_os) |
| # Note: properly detecting dimensions of the bot the recipe is running |
| # on is somewhat non-trivial. It is not safe to assume it uses default |
| # or preferred dimensions for its OS. For example, the version of the OS |
| # can differ. |
| return ''.join((prefix, task.title, suffix)) |
| |
| def _handle_summary_json(self, task, summary, step_result): |
| # We store this now, and add links to all shards first, before failing the |
| # build. Format is tuple of (error message, shard that failed) |
| infra_failures = [] |
| links = step_result.presentation.links |
| for index, shard in enumerate(summary['shards']): |
| url = task.get_shard_view_url(index) |
| duration = shard and shard.get('duration') |
| if duration is not None: |
| display_text = 'shard #%d (%.1f sec)' % (index, duration) |
| self._shards_durations.append(duration) |
| else: |
| display_text = 'shard #%d' % index |
| |
| if shard and shard.get('deduped_from'): |
| display_text += ' (deduped)' |
| |
| if not shard or shard.get('internal_failure'): |
| display_text = ( |
| 'shard #%d had an internal swarming failure' % index) |
| infra_failures.append((index, 'Internal swarming failure')) |
| elif shard.get('state') == 'EXPIRED': |
| display_text = ( |
| 'shard #%d expired, not enough capacity' % index) |
| infra_failures.append(( |
| index, 'There isn\'t enough capacity to run your test')) |
| elif shard.get('state') == 'TIMED_OUT': |
| if duration is not None: |
| display_text = ( |
| 'shard #%d timed out after %.1f sec' % (index, duration)) |
| else: # pragma: no cover |
| # TODO(tikuta): Add coverage for this code. |
| display_text = ( |
| 'shard #%d timed out, took too much time to complete' % index) |
| elif self._get_exit_code(shard) != 0: |
| # TODO(bpastene): Add coverage for this code. |
| if duration is not None: # pragma: no cover |
| display_text = 'shard #%d (failed) (%.1f sec)' % (index, duration) |
| else: |
| display_text = 'shard #%d (failed)' % index |
| |
| if shard and self.show_outputs_ref_in_collect_step: |
| outputs_ref = shard.get('outputs_ref') |
| if outputs_ref: |
| link_name = 'shard #%d isolated out' % index |
| links[link_name] = '%s/browse?namespace=%s&hash=%s' % ( |
| outputs_ref['isolatedserver'], outputs_ref['namespace'], |
| outputs_ref['isolated']) |
| |
| if url and self.show_shards_in_collect_step: |
| links[display_text] = url |
| |
| self._display_pending(summary.get('shards', []), step_result.presentation) |
| |
| if infra_failures: |
| template = 'Shard #%s failed: %s' |
| |
| step_result.presentation.status = self.m.step.EXCEPTION |
| raise recipe_api.StepFailure( |
| '\n'.join(template % f for f in infra_failures), result=step_result) |
| |
| def get_collect_cmd_args(self, task): |
| """SwarmingTask -> argument list for 'swarming.py' command.""" |
| args = [ |
| 'collect', |
| '--swarming', self.swarming_server, |
| '--decorate', |
| '--print-status-updates', |
| ] |
| if self.verbose: |
| args.append('--verbose') |
| args.extend(('--json', self.m.json.input(task.trigger_output))) |
| if self.service_account_json: |
| args.extend(['--auth-service-account-json', self.service_account_json]) |
| return args |
| |
| # TODO(tikuta): This is for recipe_modules/v8/testing.py. |
| # Remove after switch (crbug.com/894045). |
| def get_collect_cmd_args_for_python(self, task): |
| """SwarmingTask -> argument list for 'swarming.py' command.""" |
| args = [ |
| 'collect', |
| '--swarming', self.swarming_server, |
| '--decorate', |
| '--print-status-updates', |
| ] |
| args.extend(('--json', self.m.json.input(task.trigger_output))) |
| return args |
| |
| def _gen_trigger_step_test_data(self, task): |
| """Generates an expected value of --dump-json in 'trigger' step. |
| |
| Used when running recipes to generate test expectations. |
| """ |
| # Suffixes of shard subtask names. |
| subtasks = [] |
| if task.shards == 1: |
| subtasks = [''] |
| else: |
| subtasks = [':%d:%d' % (task.shards, i) for i in range(task.shards)] |
| self._task_test_data_id_offset += len(subtasks) |
| tid = lambda i: '1%02d00' % ( |
| i + 100*(self._task_test_data_id_offset - len(subtasks))) |
| return self.m.json.test_api.output({ |
| 'base_task_name': task.task_name, |
| 'tasks': { |
| '%s%s' % (task.task_name, suffix): { |
| 'task_id': tid(i), |
| 'shard_index': i, |
| 'view_url': '%s/user/task/%s' % (self.swarming_server, tid(i)), |
| } for i, suffix in enumerate(subtasks) |
| }, |
| }) |
| |
| |
| class SwarmingTask(object): |
| """Definition of a task to run on swarming.""" |
| |
| def __init__(self, title, isolated_hash, ignore_task_failure, dimensions, |
| env, priority, shards, spec_name, buildername, buildnumber, |
| expiration, user, io_timeout, hard_timeout, idempotent, |
| extra_args, collect_step, task_output_dir, cipd_packages=None, |
| build_properties=None, merge=None, trigger_script=None, |
| named_caches=None, service_account=None, raw_cmd=None, |
| env_prefixes=None, optional_dimensions=None): |
| """Configuration of a swarming task. |
| |
| Args: |
| * title: display name of the task, hints to what task is doing. Usually |
| corresponds to a name of a test executable. Doesn't have to be unique. |
| * isolated_hash: hash of isolated file that describes all files needed to |
| run the task as well as command line to launch. See 'isolate' recipe |
| module. |
| * ignore_task_failure: whether to ignore the test failure of swarming |
| tasks. |
| * cipd_packages: list of 3-tuples corresponding to CIPD packages needed |
| for the task: ('path', 'package_name', 'version'), defined as follows: |
| * path: Path relative to the Swarming root dir in which to install |
| the package. |
| * package_name: Name of the package to install, |
| eg. "infra/tools/luci-auth/${platform}" |
| * version: Version of the package, either a package instance ID, |
| ref, or tag key/value pair. |
| * collect_step: callback that will be called to collect and processes |
| results of task execution, signature is collect_step(task, **kwargs). |
| * dimensions: key-value mapping with swarming dimensions that specify |
| on what Swarming slaves task can run. One important dimension is 'os', |
| which defines platform flavor to run the task on. See Swarming doc. |
| * env: key-value mapping with additional environment variables to add to |
| environment before launching the task executable. |
| * priority: integer [0, 255] that defines how urgent the task is. |
| Lower value corresponds to higher priority. Swarming service executes |
| tasks with higher priority first. |
| * shards: how many concurrent shards to run, makes sense only for |
| isolated tests based on gtest. Swarming uses GTEST_SHARD_INDEX |
| and GTEST_TOTAL_SHARDS environment variables to tell the executable |
| what shard to run. |
| * spec_name: task spec name. Used in monitoring. |
| * buildername: buildbot builder this task was triggered from. |
| * buildnumber: build number of a build this task was triggered from. |
| * expiration: number of schedule until the task shouldn't even be run if |
| it hadn't started yet. |
| * user: user that requested this task, if applicable. |
| * io_timeout: number of seconds that the task is allowed to not emit any |
| stdout bytes, after which it is forcibly killed. |
| * hard_timeout: number of seconds for which the task is allowed to run, |
| after which it is forcibly killed. |
| * idempotent: True if the results from a previous task can be reused. E.g. |
| this task has no side-effects. |
| * extra_args: list of command line arguments to pass to isolated tasks. |
| * task_output_dir: if defined, the directory where task results are placed |
| during the collect step. |
| * build_properties: An optional dict containing various build properties. |
| These are typically but not necessarily the properties emitted by |
| bot_update. |
| * merge: An optional dict containing: |
| * "script": path to a script to call to post process and merge the |
| collected outputs from the tasks. |
| * "args": an optional list of additional arguments to pass to the |
| above script. |
| * trigger_script: An optional dict containing: |
| * "script": path to a script to call which will use custom logic to |
| trigger appropriate swarming jobs, using swarming.py. Required. |
| * "args": an optional list of additional arguments to pass to the |
| script. |
| The script will receive the exact same arguments that are normally |
| passed to calls to `swarming.py trigger`, along with any arguments |
| provided in the "args" entry. |
| |
| The script is required to output a json file to the location provided |
| by the --dump-json argument. This json file should describe the |
| swarming tasks it launched, as well as some information about the |
| request, which is used when swarming collects the tasks. |
| |
| If the script launches multiple swarming shards, it needs to pass the |
| appropriate environment variables to each shard (this is normally done |
| by swarming.py trigger). Specifically, each shard should receive |
| GTEST_SHARD_INDEX, which is its shard index, and |
| GTEST_TOTAL_SHARDS, which is the total number of shards. |
| This can be done by passing `--env GTEST_SHARD_INDEX [NUM]` and |
| `--env GTEST_SHARD_SHARDS [NUM]` when calling swarming.py trigger. |
| * named_caches: a dict {name: relpath} requesting a cache named `name` |
| to be installed in `relpath` relative to the task root directory. |
| * service_account: (string) a service account email to run the task under. |
| * raw_cmd: Optional list of arguments to be used as raw command. Can be |
| used instead of extra args. |
| * env_prefixes: a dict {ENVVAR: [relative, paths]} which instructs |
| swarming to prepend the given relative paths to the PATH-style ENVVAR |
| specified. |
| * optional_dimensions: {expiration: [{key: value]} mapping with swarming |
| dimensions that specify on what Swarming slaves tasks can run. These |
| are similar to what is specified in dimensions but will create |
| additional 'fallback' task slice(s) with the optional dimensions. |
| """ |
| |
| self._trigger_output = None |
| self.build_properties = build_properties |
| self.spec_name = spec_name |
| self.buildername = buildername |
| self.buildnumber = buildnumber |
| self.cipd_packages = cipd_packages |
| self.collect_step = collect_step |
| self.dimensions = dimensions.copy() |
| self.env = env.copy() |
| self.expiration = expiration |
| self.extra_args = tuple(extra_args or []) |
| self.hard_timeout = hard_timeout |
| self.idempotent = idempotent |
| self.ignore_task_failure = ignore_task_failure |
| self.io_timeout = io_timeout |
| self.isolated_hash = isolated_hash |
| self.merge = merge or {} |
| self.named_caches = named_caches or {} |
| self.service_account = service_account |
| self.trigger_script = trigger_script or {} |
| self.priority = priority |
| self.raw_cmd = tuple(raw_cmd or []) |
| self.shards = shards |
| self.tags = set() |
| self.task_output_dir = task_output_dir |
| self.title = title |
| self.user = user |
| self.env_prefixes = { |
| var: list(paths) for var, paths in (env_prefixes or {}).iteritems()} |
| if optional_dimensions: |
| self.optional_dimensions = optional_dimensions.copy() |
| else: |
| self.optional_dimensions = None |
| self.wait_for_capacity = False |
| |
| @property |
| def task_name(self): |
| """Name of this task, derived from its other properties. |
| |
| The task name is purely to make sense of the task and is not used in any |
| other way. |
| """ |
| out = '%s/%s/%s' % ( |
| self.title, self.dimensions['os'], self.isolated_hash[:10]) |
| if self.buildername: |
| out += '/%s/%s' % (self.buildername, self.buildnumber or -1) |
| return out |
| |
| @property |
| def trigger_output(self): |
| """JSON results of 'trigger' step or None if not triggered.""" |
| return self._trigger_output |
| |
| def get_task_shard_output_dirs(self): |
| """Return the directory of each task shard outputs.""" |
| return [str(i) for i in range(self.shards)] |
| |
| def get_shard_view_url(self, index): |
| """Returns URL of HTML page with shard details or None if not available. |
| |
| Works only after the task has been successfully triggered. |
| """ |
| if self._trigger_output and self._trigger_output.get('tasks'): |
| for shard_dict in self._trigger_output['tasks'].itervalues(): |
| if shard_dict['shard_index'] == index: |
| return shard_dict['view_url'] |
| |
| def get_task_ids(self): |
| """Returns task id of all shards. |
| |
| Works only after the task has been successfully triggered. |
| """ |
| task_ids = [] |
| if self._trigger_output and self._trigger_output.get('tasks'): |
| for shard_dict in self._trigger_output['tasks'].itervalues(): |
| task_ids.append(shard_dict['task_id']) |
| return task_ids |