blob: 267c236f6709367c96f46c7b0212ca2f6c6f8c29 [file] [log] [blame]
#!/usr/bin/python2
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper for running platform2 tests.
This handles the fun details like running against the right sysroot, via
qemu, bind mounts, etc...
"""
from __future__ import print_function
import argparse
import contextlib
import ctypes
import ctypes.util
import errno
import os
import psutil
import re
import signal
import sys
import tempfile
from chromite.lib import cros_build_lib
from chromite.lib import namespaces
from chromite.lib import osutils
from chromite.lib import process_util
from chromite.lib import proctitle
from chromite.lib import qemu
from chromite.lib import retry_util
from chromite.lib import signals
PR_SET_CHILD_SUBREAPER = 0x24
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
def _MakeProcessSubreaper():
"""Marks the current process as a subreaper.
This causes all orphaned processes to be reparented to this process instead
of the init process.
"""
if libc.prctl(ctypes.c_int(PR_SET_CHILD_SUBREAPER), ctypes.c_int(1)) != 0:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
def _ReapUntilProcessExits(monitored_pid):
"""Reap processes until |monitored_pid| exits, then return its exit status.
This will also reap any other processes ready to be reaped immediately after
|monitored_pid| is reaped.
"""
pid_status = None
options = 0
while True:
try:
(pid, status, _) = os.wait3(options)
# Capture status of monitored_pid so we can return it.
if pid == monitored_pid:
pid_status = status
# Switch to nohang so we can churn through the zombies w/out getting
# stuck on live orphaned processes.
options = os.WNOHANG
# There may be some more child processes still running, but none of them
# have exited/finished. Don't wait for those as we'll throw an error in
# the caller.
if pid_status != None and pid == 0 and status == 0:
break
except OSError as e:
if e.errno == errno.ECHILD:
break
elif e.errno != errno.EINTR:
raise
return pid_status
# Compiled regular expressions for determining what environment variables to
# let through to the test env when we do sudo. If any character at the
# beginning of an environment variable matches one of the regular expression
# patterns (i.e. matching via re.match), the environment variable is let
# through.
ENV_PASSTHRU_REGEX_LIST = map(re.compile, [
# Used by various sanitizers.
'[AL]SAN_OPTIONS$',
# Used by QEMU.
'QEMU_',
# Used to select profiling output location for gcov.
'GCOV_',
])
class Platform2Test(object):
"""Framework for running platform2 tests"""
_BIND_MOUNT_PATHS = ('dev', 'dev/pts', 'proc', 'mnt/host/source', 'sys')
def __init__(self, test_bin, board, host, framework,
run_as_root, gtest_filter, user_gtest_filter,
sysroot, test_bin_args):
if not test_bin_args:
test_bin_args = [test_bin]
if not test_bin:
test_bin = test_bin_args[0]
self.bin = test_bin
self.args = test_bin_args
self.board = board
self.host = host
self.run_as_root = run_as_root
(self.gtest_filter, self.user_gtest_filter) = \
self.generateGtestFilter(gtest_filter, user_gtest_filter)
if sysroot:
self.sysroot = sysroot
else:
self.sysroot = cros_build_lib.GetSysroot(self.board)
self.framework = framework
if self.framework == 'auto':
qemu_arch = qemu.Qemu.DetectArch(self.bin, self.sysroot)
if qemu_arch is None:
self.framework = 'ldso'
else:
self.framework = 'qemu'
if self.framework == 'qemu':
self.qemu = qemu.Qemu(self.sysroot, arch=qemu_arch)
@classmethod
def generateGtestSubfilter(cls, gtest_filter):
"""Split a gtest_filter down into positive and negative filters.
Args:
gtest_filter: A filter string as normally passed to --gtest_filter.
Returns:
A tuple of format (positive_filters, negative_filters).
"""
filters = gtest_filter.split('-', 1)
positive_filters = [x for x in filters[0].split(':') if x]
if len(filters) > 1:
negative_filters = [x for x in filters[1].split(':') if x]
else:
negative_filters = []
return (positive_filters, negative_filters)
@classmethod
def generateGtestFilter(cls, filters, user_filters):
"""Merge internal gtest filters and user-supplied gtest filters.
Returns:
A string that can be passed to --gtest_filter.
"""
gtest_filter = cls.generateGtestSubfilter(filters)
user_gtest_filter = []
if user_filters:
filters = user_filters.split('::')[-1]
user_gtest_filter = cls.generateGtestSubfilter(filters)
return (gtest_filter, user_gtest_filter)
def removeSysrootPrefix(self, path):
"""Returns the given path with any sysroot prefix removed."""
# If the sysroot is /, then the paths are already normalized.
if self.sysroot != '/' and path.startswith(self.sysroot):
path = path.replace(self.sysroot, '', 1)
return path
@staticmethod
def GetNonRootAccount():
"""Return details about the non-root account we want to use.
Returns:
A tuple of (username, uid, gid, home).
"""
return (
os.environ.get('SUDO_USER', 'nobody'),
int(os.environ.get('SUDO_UID', '65534')),
int(os.environ.get('SUDO_GID', '65534')),
# Should we find a better home?
'/tmp/portage',
)
@staticmethod
@contextlib.contextmanager
def LockDb(db):
"""Lock an account database.
We use the same algorithm as shadow/user.eclass. This way we don't race
and corrupt things in parallel.
"""
lock = '%s.lock' % db
_, tmplock = tempfile.mkstemp(prefix='%s.platform.' % lock)
# First try forever to grab the lock.
retry = lambda e: e.errno == errno.EEXIST
# Retry quickly at first, but slow down over time.
try:
retry_util.GenericRetry(retry, 60, os.link, tmplock, lock, sleep=0.1)
except Exception:
print('error: could not grab lock %s' % lock)
raise
# Yield while holding the lock, but try to clean it no matter what.
try:
os.unlink(tmplock)
yield lock
finally:
os.unlink(lock)
def SetupUser(self):
"""Propogate the user name<->id mapping from outside the chroot.
Some unittests use getpwnam($USER), as does bash. If the account
is not registered in the sysroot, they get back errors.
"""
MAGIC_GECOS = 'Added by your friendly platform test helper; do not modify'
# This is kept in sync with what sdk_lib/make_chroot.sh generates.
SDK_GECOS = 'ChromeOS Developer'
user, uid, gid, home = self.GetNonRootAccount()
if user == 'nobody':
return
passwd_db = os.path.join(self.sysroot, 'etc', 'passwd')
with self.LockDb(passwd_db):
data = osutils.ReadFile(passwd_db)
accts = data.splitlines()
for acct in accts:
passwd = acct.split(':')
if passwd[0] == user:
# Did the sdk make this account?
if passwd[4] == SDK_GECOS:
# Don't modify it (see below) since we didn't create it.
return
# Did we make this account?
if passwd[4] != MAGIC_GECOS:
raise RuntimeError('your passwd db (%s) has unmanaged acct %s' %
(passwd_db, user))
# Maybe we should see if it needs to be updated? Like if they
# changed UIDs? But we don't really check that elsewhere ...
return
acct = '%(name)s:x:%(uid)s:%(gid)s:%(gecos)s:%(homedir)s:%(shell)s' % {
'name': user,
'uid': uid,
'gid': gid,
'gecos': MAGIC_GECOS,
'homedir': home,
'shell': '/bin/bash',
}
with open(passwd_db, 'a') as f:
if data[-1] != '\n':
f.write('\n')
f.write('%s\n' % acct)
def pre_test(self):
"""Runs pre-test environment setup.
Sets up any required mounts and copying any required files to run tests
(not those specific to tests) into the sysroot.
"""
if not self.run_as_root:
self.SetupUser()
if self.framework == 'qemu':
self.qemu.Install()
self.qemu.RegisterBinfmt()
def post_test(self):
"""Runs post-test teardown, removes mounts/files copied during pre-test."""
def run(self):
"""Runs the test in a proper environment (e.g. qemu)."""
# We know these pre-tests are fast (especially if they've already been run
# once), so run them automatically for the user if they test by hand.
self.pre_test()
for mount in self._BIND_MOUNT_PATHS:
path = os.path.join(self.sysroot, mount)
osutils.SafeMakedirs(path)
osutils.Mount('/' + mount, path, 'none', osutils.MS_BIND)
positive_filters = self.gtest_filter[0]
negative_filters = self.gtest_filter[1]
if self.user_gtest_filter:
positive_filters += self.user_gtest_filter[0]
negative_filters += self.user_gtest_filter[1]
filters = (':'.join(positive_filters), ':'.join(negative_filters))
gtest_filter = '%s-%s' % filters
cmd = self.removeSysrootPrefix(self.bin)
argv = self.args[:]
argv[0] = self.removeSysrootPrefix(argv[0])
if gtest_filter != '-':
argv.append('--gtest_filter=' + gtest_filter)
# Some programs expect to find data files via $CWD, so doing a chroot
# and dropping them into / would make them fail.
cwd = self.removeSysrootPrefix(os.getcwd())
# Make orphaned child processes reparent to this process instead of the init
# process. This allows us to kill them if they do not terminate after the
# test has finished running.
_MakeProcessSubreaper()
# Fork off a child to run the test. This way we can make tweaks to the
# env that only affect the child (gid/uid/chroot/cwd/etc...). We have
# to fork anyways to run the test, so might as well do it all ourselves
# to avoid (slow) chaining through programs like:
# sudo -u $SUDO_UID -g $SUDO_GID chroot $SYSROOT bash -c 'cd $CWD; $BIN'
child = os.fork()
if child == 0:
print('chroot: %s' % self.sysroot)
print('cwd: %s' % cwd)
print('cmd: {%s} %s' % (cmd, ' '.join(map(repr, argv))))
os.chroot(self.sysroot)
os.chdir(cwd)
# Set the child's pgid to its pid, so we can kill any processes that the
# child creates after the child terminates.
os.setpgid(0, 0)
# The TERM the user is leveraging might not exist in the sysroot.
# Force a sane default that supports standard color sequences.
os.environ['TERM'] = 'ansi'
# Some progs want this like bash else they get super confused.
os.environ['PWD'] = cwd
os.environ['GTEST_COLOR'] = 'yes'
if not self.run_as_root:
user, uid, gid, home = self.GetNonRootAccount()
os.setgid(gid)
os.setuid(uid)
os.environ['HOME'] = home
os.environ['USER'] = user
sys.exit(os.execvp(cmd, argv))
proctitle.settitle('sysroot watcher', cmd)
# Mask SIGINT with the assumption that the child will catch & process it.
# We'll pass that back up below.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Reap any processes that were reparented to us until the child exits.
status = _ReapUntilProcessExits(child)
leaked_children = psutil.Process().get_children(recursive=True)
if leaked_children:
# It's possible the child forked and the forked processes are still
# running. Kill the forked processes.
try:
os.killpg(child, signal.SIGTERM)
except OSError as e:
if e.errno != errno.ESRCH:
print('Warning: while trying to kill pgid %s caught exception\n%s' %
(child, e), file=sys.stderr)
# Kill any orphaned processes originally created by the test that were in
# a different process group. This will also kill any processes that did
# not respond to the SIGTERM.
for child in leaked_children:
try:
child.kill()
except psutil.NoSuchProcess:
pass
failmsg = None
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
failmsg = 'signal %s(%i)' % (signals.StrSignal(sig), sig)
else:
exit_status = os.WEXITSTATUS(status)
if exit_status:
failmsg = 'exit code %i' % exit_status
if failmsg:
print('Error: %s: failed with %s' % (cmd, failmsg), file=sys.stderr)
if leaked_children:
for p in leaked_children:
print('Error: the test leaked process %s with pid %s (it was forcefully'
' killed)' % (p.name(), p.pid), file=sys.stderr)
# TODO(vapier): Make this an error. We need to track down some scenarios
# where processes do leak though before we can make this fatal :(.
#sys.exit(100)
process_util.ExitAsStatus(status)
def _SudoCommand():
"""Get the 'sudo' command, along with all needed environment variables."""
cmd = ['sudo']
for key, value in os.environ.iteritems():
for pattern in ENV_PASSTHRU_REGEX_LIST:
if pattern.match(key):
cmd += ['%s=%s' % (key, value)]
break
return cmd
def _ReExecuteIfNeeded(argv, ns_net=True, ns_pid=True):
"""Re-execute tests as root.
We often need to do things as root, so make sure we're that. Like chroot
for proper library environment or do bind mounts.
Also unshare the mount namespace so as to ensure that doing bind mounts for
tests don't leak out to the normal chroot. Also unshare the UTS namespace
so changes to `hostname` do not impact the host.
"""
# Disable the Gentoo sandbox if it's active to avoid warnings/errors.
if os.environ.get('SANDBOX_ON') == '1':
os.environ['SANDBOX_ON'] = '0'
os.execvp(argv[0], argv)
elif os.geteuid() != 0:
# Clear the LD_PRELOAD var since it won't be usable w/sudo (and the Gentoo
# sandbox normally sets it for us).
os.environ.pop('LD_PRELOAD', None)
cmd = _SudoCommand() + ['--'] + argv
os.execvp(cmd[0], cmd)
else:
namespaces.SimpleUnshare(net=ns_net, pid=ns_pid)
class _ParseStringSetAction(argparse.Action):
"""Support flags that store into a set (vs a list)."""
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, set(values.split()))
def main(argv):
actions = ['pre_test', 'post_test', 'run']
parser = argparse.ArgumentParser()
group = parser.add_argument_group('Namespaces')
group.add_argument('--no-ns-net', dest='ns_net',
default=True, action='store_false',
help='Do not create a new network namespace')
group.add_argument('--no-ns-pid', dest='ns_pid',
default=True, action='store_false',
help='Do not create a new PID namespace')
parser.add_argument('--action', default='run',
choices=actions, help='action to perform')
parser.add_argument('--bin',
help='test binary to run')
parser.add_argument('--board', default=None,
help='board to build for')
parser.add_argument('--sysroot', default=None,
help='sysroot to run tests inside')
parser.add_argument('--framework', default='auto',
choices=('auto', 'ldso', 'qemu'),
help='framework to be used to run tests')
parser.add_argument('--gtest_filter', default='',
help='args to pass to gtest/test binary')
parser.add_argument('--host', action='store_true', default=False,
help='specify that we\'re testing for the host')
parser.add_argument('--run_as_root', action='store_true',
help='should the test be run as root')
parser.add_argument('--user_gtest_filter', default='',
help=argparse.SUPPRESS)
parser.add_argument('cmdline', nargs='*')
options = parser.parse_args(argv)
if options.action == 'run' and ((not options.bin or len(options.bin) == 0)
and not options.cmdline):
raise AssertionError('You must specify a binary for the "run" action')
if options.host and options.board:
raise AssertionError('You must provide only one of --board or --host')
elif not options.host and not options.board and not options.sysroot:
raise AssertionError('You must provide --board or --host or --sysroot')
if options.sysroot:
# Normalize the value so we can assume certain aspects.
options.sysroot = osutils.ExpandPath(options.sysroot)
if not os.path.isdir(options.sysroot):
raise AssertionError('Sysroot does not exist: %s' % options.sysroot)
# Once we've finished sanity checking args, make sure we're root.
_ReExecuteIfNeeded([sys.argv[0]] + argv, ns_net=options.ns_net,
ns_pid=options.ns_pid)
p2test = Platform2Test(options.bin, options.board, options.host,
options.framework,
options.run_as_root, options.gtest_filter,
options.user_gtest_filter,
options.sysroot, options.cmdline)
getattr(p2test, options.action)()
if __name__ == '__main__':
main(sys.argv[1:])