blob: ea5bc6ed47890723d49fc51659fc0896ba2170e3 [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import sys
import dbus
from autotest_lib.client.cros import upstart
def _proto_to_blob(proto):
return dbus.ByteArray(proto.SerializeToString())
class SmbProvider(object):
"""
Wrapper for D-Bus calls to SmbProvider Daemon
The SmbProvider daemon handles calling the libsmbclient to communicate with
an SMB server. This class is a wrapper to the D-Bus interface to the daemon.
"""
_DBUS_SERVICE_NAME = "org.chromium.SmbProvider"
_DBUS_SERVICE_PATH = "/org/chromium/SmbProvider"
_DBUS_INTERFACE_NAME = "org.chromium.SmbProvider"
# Default timeout in seconds for D-Bus calls.
_DEFAULT_TIMEOUT = 120
# Chronos user ID.
_CHRONOS_UID = 1000
def __init__(self, bus_loop, proto_binding_location):
"""
Constructor.
Creates and D-Bus connection to smbproviderd.
@param bus_loop: Glib main loop object
@param proto_binding_location: The location of generated python bindings
for smbprovider protobufs.
"""
sys.path.append(proto_binding_location)
self._bus_loop = bus_loop
self.restart()
def restart(self):
"""
Restarts smbproviderd and rebinds to D-Bus interface.
"""
logging.info('restarting smbproviderd')
upstart.restart_job('smbproviderd')
try:
# Get the interface as Chronos since only they are allowed to send
# D-Bus messages to smbproviderd.
os.setresuid(self._CHRONOS_UID, self._CHRONOS_UID, 0)
bus = dbus.SystemBus(self._bus_loop)
proxy = bus.get_object(self._DBUS_SERVICE_NAME,
self._DBUS_SERVICE_PATH)
self._smbproviderd = dbus.Interface(proxy,
self._DBUS_INTERFACE_NAME)
finally:
os.setresuid(0, 0, 0)
def stop(self):
"""
Stops smbproviderd.
"""
logging.info('stopping smbproviderd')
try:
upstart.stop_job('smbproviderd')
finally:
self._smbproviderd = None
def mount(self, mount_path, workgroup, username, password):
"""
Mounts a share.
@param mount_path: Path of the share to mount.
@param workgroup: Workgroup for the mount.
@param username: Username for the mount.
@param password: Password for the mount.
@return A tuple with the ErrorType and the mount id returned the D-Bus
call.
"""
logging.info("Mounting: %s", mount_path)
from directory_entry_pb2 import MountOptionsProto
proto = MountOptionsProto()
proto.path = mount_path
proto.workgroup = workgroup
proto.username = username
with self.PasswordFd(password) as password_fd:
return self._smbproviderd.Mount(_proto_to_blob(proto),
dbus.types.UnixFd(password_fd),
timeout=self._DEFAULT_TIMEOUT,
byte_arrays=True)
def unmount(self, mount_id):
"""
Unmounts a share.
@param mount_id: Mount ID to be umounted.
@return: ErrorType from the returned D-Bus call.
"""
logging.info("Unmounting: %s", mount_id)
from directory_entry_pb2 import UnmountOptionsProto
proto = UnmountOptionsProto()
proto.mount_id = mount_id
return self._smbproviderd.Unmount(_proto_to_blob(proto))
def read_directory(self, mount_id, directory_path):
"""
Reads a directory.
@param mount_id: Mount ID corresponding to the share.
@param directory_path: Path of the directory to read.
@return A tuple with the ErrorType and the DirectoryEntryListProto blob
string returned by the D-Bus call.
"""
logging.info("Reading directory: %s", directory_path)
from directory_entry_pb2 import ReadDirectoryOptionsProto
from directory_entry_pb2 import DirectoryEntryListProto
from directory_entry_pb2 import ERROR_OK
proto = ReadDirectoryOptionsProto()
proto.mount_id = mount_id
proto.directory_path = directory_path
error, entries_blob = self._smbproviderd.ReadDirectory(
_proto_to_blob(proto),
timeout=self._DEFAULT_TIMEOUT,
byte_arrays=True)
entries = DirectoryEntryListProto()
if error == ERROR_OK:
entries.ParseFromString(entries_blob)
return error, entries
class PasswordFd(object):
"""
Writes password into a file descriptor.
Use in a 'with' statement to automatically close the returned file
descriptor.
@param password: Plaintext password string.
@return A file descriptor (pipe) containing the password.
"""
def __init__(self, password):
self._password = password
self._read_fd = None
def __enter__(self):
"""Creates the password file descriptor."""
self._read_fd, write_fd = os.pipe()
os.write(write_fd, self._password)
os.close(write_fd)
return self._read_fd
def __exit__(self, mytype, value, traceback):
"""Closes the password file descriptor again."""
if self._read_fd:
os.close(self._read_fd)