blob: 3f256df8822ee433bb95e00eae58b904f157d1b7 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus
import os
import shutil
import subprocess
import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib.cros import dbus_send
class platform_ImageLoader(test.test):
"""Tests the ImageLoader dbus service.
"""
version = 1
STORAGE = '/var/lib/imageloader'
BUS_NAME = 'org.chromium.ImageLoader'
BUS_PATH = '/org/chromium/ImageLoader'
BUS_INTERFACE = 'org.chromium.ImageLoaderInterface'
GET_COMPONENT_VERSION = 'GetComponentVersion'
REGISTER_COMPONENT = 'RegisterComponent'
LOAD_COMPONENT = 'LoadComponent'
LOAD_COMPONENT_AT_PATH = 'LoadComponentAtPath'
BAD_RESULT = ''
USER = 'chronos'
COMPONENT_NAME = 'TestFlashComponent'
CORRUPT_COMPONENT_NAME = 'CorruptTestFlashComponent'
CORRUPT_COMPONENT_PATH = '/tmp/CorruptTestFlashComponent'
OLD_VERSION = '23.0.0.207'
NEW_VERSION = '24.0.0.186'
def _get_component_version(self, name):
args = [dbus.String(name)]
return dbus_send.dbus_send(
self.BUS_NAME,
self.BUS_INTERFACE,
self.BUS_PATH,
self.GET_COMPONENT_VERSION,
user=self.USER,
args=args).response
def _register_component(self, name, version, path):
args = [dbus.String(name), dbus.String(version), dbus.String(path)]
return dbus_send.dbus_send(
self.BUS_NAME,
self.BUS_INTERFACE,
self.BUS_PATH,
self.REGISTER_COMPONENT,
timeout_seconds=20,
user=self.USER,
args=args).response
def _load_component(self, name):
args = [dbus.String(name)]
return dbus_send.dbus_send(
self.BUS_NAME,
self.BUS_INTERFACE,
self.BUS_PATH,
self.LOAD_COMPONENT,
timeout_seconds=20,
user=self.USER,
args=args).response
def _load_component_at_path(self, name, path):
args = [dbus.String(name), dbus.String(path)]
return dbus_send.dbus_send(
self.BUS_NAME,
self.BUS_INTERFACE,
self.BUS_PATH,
self.LOAD_COMPONENT_AT_PATH,
timeout_seconds=20,
user=self.USER,
args=args).response
def _corrupt_and_load_component(self, component, iteration, target, offset):
"""Registers a valid component and then corrupts it by writing
a random byte to the target file at the given offset.
It then attemps to load the component and returns whether or
not that succeeded.
@component The path to the component to register.
@iteration A prefix to append to the name of the component, so that
multiple registrations do not clash.
@target The name of the file in the component to corrupt.
@offset The offset in the file to corrupt.
"""
versioned_name = self.CORRUPT_COMPONENT_NAME + iteration
if not self._register_component(versioned_name, self.OLD_VERSION,
component):
raise error.TestError('Failed to register a valid component')
self._components_to_delete.append(versioned_name)
corrupt_path = os.path.join('/var/lib/imageloader', versioned_name,
self.OLD_VERSION)
os.system('printf \'\\xa1\' | dd conv=notrunc of=%s bs=1 seek=%s' %
(corrupt_path + '/' + target, offset))
return self._load_component(versioned_name)
def _get_unmount_all_paths(self):
"""Returns a set representing all the paths that would be unmounted by
imageloader.
"""
return subprocess.check_output([
'/usr/sbin/imageloader', '--dry_run', '--unmount_all',
]).splitlines()
def initialize(self):
"""Initialize the test variables."""
self._paths_to_unmount = []
self._components_to_delete = []
def run_once(self, component1=None, component2=None):
"""Executes the test cases."""
if component1 == None or component2 == None:
raise error.TestError('Must supply two versions of '
'a production signed component.')
paths_before = self._get_unmount_all_paths()
# Make sure there is no version returned at first.
if self._get_component_version(self.COMPONENT_NAME) != self.BAD_RESULT:
raise error.TestError('There should be no currently '
'registered component version')
# Register a component and fetch the version.
if not self._register_component(self.COMPONENT_NAME, self.OLD_VERSION,
component1):
raise error.TestError('The component failed to register')
self._components_to_delete.append(self.COMPONENT_NAME)
if self._get_component_version(self.COMPONENT_NAME) != '23.0.0.207':
raise error.TestError('The component version is incorrect')
# Make sure the same version cannot be re-registered.
if self._register_component(self.COMPONENT_NAME, self.OLD_VERSION,
component1):
raise error.TestError('ImageLoader allowed registration '
'of duplicate component version')
# Make sure that ImageLoader matches the reported version to the
# manifest.
if self._register_component(self.COMPONENT_NAME, self.NEW_VERSION,
component1):
raise error.TestError('ImageLoader allowed registration of a '
'mismatched component version')
# Register a newer component and fetch the version.
if not self._register_component(self.COMPONENT_NAME, self.NEW_VERSION,
component2):
raise error.TestError('Failed to register updated version')
if self._get_component_version(self.COMPONENT_NAME) != '24.0.0.186':
raise error.TestError('The component version is incorrect')
# Simulate a rollback.
if self._register_component(self.COMPONENT_NAME, self.OLD_VERSION,
component1):
raise error.TestError('ImageLoader allowed a rollback')
# Test loading a component that lives at an arbitrary path.
mnt_path = self._load_component_at_path('FlashLoadedAtPath', component1)
if mnt_path == self.BAD_RESULT:
raise error.TestError('LoadComponentAtPath failed')
self._paths_to_unmount.append(mnt_path)
known_mount_path = '/run/imageloader/TestFlashComponent_testing'
# Now test loading the component on the command line.
if subprocess.call([
'/usr/sbin/imageloader', '--mount',
'--mount_component=TestFlashComponent', '--mount_point=%s' %
(known_mount_path)
]) != 0:
raise error.TestError('Failed to mount component')
# If the component is already mounted, it should return the path again.
if subprocess.call([
'/usr/sbin/imageloader', '--mount',
'--mount_component=TestFlashComponent', '--mount_point=%s' %
(known_mount_path)
]) != 0:
raise error.TestError('Failed to remount component')
self._paths_to_unmount.append(known_mount_path)
if not os.path.exists(
os.path.join(known_mount_path, 'libpepflashplayer.so')):
raise error.TestError('Flash player file does not exist')
mount_path = self._load_component(self.COMPONENT_NAME)
if mount_path == self.BAD_RESULT:
raise error.TestError('Failed to mount component as dbus service.')
self._paths_to_unmount.append(mount_path)
if not os.path.exists(os.path.join(mount_path, 'libpepflashplayer.so')):
raise error.TestError('Flash player file does not exist ' +
'after loading as dbus service.')
# Now test some corrupt components.
shutil.copytree(component1, self.CORRUPT_COMPONENT_PATH)
# Corrupt the disk image file in the corrupt component.
os.system('printf \'\\xa1\' | dd conv=notrunc of=%s bs=1 seek=1000000' %
(self.CORRUPT_COMPONENT_PATH + '/image.squash'))
# Make sure registration fails.
if self._register_component(self.CORRUPT_COMPONENT_NAME,
self.OLD_VERSION,
self.CORRUPT_COMPONENT_PATH):
raise error.TestError('Registered a corrupt component')
# Now register a valid component, and then corrupt it.
mount_path = self._corrupt_and_load_component(component1, '1',
'image.squash', '1000000')
if mount_path == self.BAD_RESULT:
raise error.TestError('Failed to load component with corrupt image')
self._paths_to_unmount.append(mount_path)
corrupt_file = os.path.join(mount_path, 'libpepflashplayer.so')
if not os.path.exists(corrupt_file):
raise error.TestError('Flash player file does not exist ' +
'for corrupt image')
# Reading the files should fail.
# This is a critical test. We assume dm-verity works, but by default it
# panics the machine and forces a powerwash. For component updates,
# ImageLoader should configure the dm-verity table to just return an I/O
# error. If this test does not throw an exception at all, ImageLoader
# may not be attaching the dm-verity tree correctly.
try:
with open(corrupt_file, 'rb') as f:
byte = f.read(1)
while byte != '':
byte = f.read(1)
except IOError:
pass
# Catching an IOError once we read the corrupt block is the expected
# behavior.
else:
raise error.TestError(
'Did not receive an I/O error while reading the corrupt image')
# Modify the signature and make sure the component does not load.
if self._corrupt_and_load_component(
component1, '2', 'imageloader.sig.1', '50') != self.BAD_RESULT:
raise error.TestError('Mounted component with corrupt signature')
# Modify the manifest and make sure the component does not load.
if self._corrupt_and_load_component(component1, '3', 'imageloader.json',
'1') != self.BAD_RESULT:
raise error.TestError('Mounted component with corrupt manifest')
# Modify the table and make sure the component does not load.
if self._corrupt_and_load_component(component1, '4', 'table',
'1') != self.BAD_RESULT:
raise error.TestError('Mounted component with corrupt table')
# Determine which mount points were added during the test and verify
# that they match the expected mount points.
paths_after = self._get_unmount_all_paths()
measured_unmount = set(paths_after) - set(paths_before);
expected_unmount = set(self._paths_to_unmount);
if measured_unmount != expected_unmount:
raise error.TestError('Discrepency between --unmount_all and '
'paths_to_unmount "%s" and "%s"' %
(measured_unmount, expected_unmount))
# Clean up each mount point created by the test to verify the
# functionality of the --unmount flag.
for path in expected_unmount:
if subprocess.call([
'/usr/sbin/imageloader', '--unmount',
'--mount_point=%s' % (path,)]) != 0:
raise error.TestError('Failed to unmount component')
# Verify that the mount points were indeed cleaned up.
paths_cleanup = self._get_unmount_all_paths()
if paths_cleanup != paths_before:
raise error.TestError('--unmount failed.')
def cleanup(self):
"""Cleans up the files and mounts created by the test."""
shutil.rmtree(self.CORRUPT_COMPONENT_PATH, ignore_errors=True)
for name in self._components_to_delete:
shutil.rmtree(os.path.join(self.STORAGE, name), ignore_errors=True)
for path in self._paths_to_unmount:
utils.system('umount %s' % (path), ignore_status=True)
shutil.rmtree(path, ignore_errors=True)