blob: 54ef36ab5a97a8540621e52f97c4cdb1f377ea1e [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/model_impl/model_type_store_impl.h"
#include <map>
#include <utility>
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/optional.h"
#include "base/task_runner_util.h"
#include "base/task_scheduler/post_task.h"
#include "base/task_scheduler/task_traits.h"
#include "base/threading/thread_task_runner_handle.h"
#include "components/sync/model/model_error.h"
#include "components/sync/model_impl/model_type_store_backend.h"
#include "components/sync/protocol/entity_metadata.pb.h"
#include "components/sync/protocol/model_type_state.pb.h"
#include "third_party/leveldatabase/src/include/leveldb/env.h"
#include "third_party/leveldatabase/src/include/leveldb/write_batch.h"
namespace syncer {
namespace {
// Key prefix for data/metadata records.
const char kDataPrefix[] = "-dt-";
const char kMetadataPrefix[] = "-md-";
// Key for global metadata record.
const char kGlobalMetadataKey[] = "-GlobalMetadata";
void NoOpForBackendDtor(scoped_refptr<ModelTypeStoreBackend> backend) {
// This function was intentionally left blank.
}
// Formats key prefix for data records of |type|.
std::string FormatDataPrefix(ModelType type) {
return std::string(GetModelTypeRootTag(type)) + kDataPrefix;
}
// Formats key prefix for metadata records of |type|.
std::string FormatMetaPrefix(ModelType type) {
return std::string(GetModelTypeRootTag(type)) + kMetadataPrefix;
}
// Formats key for global metadata record of |type|.
std::string FormatGlobalMetadataKey(ModelType type) {
return std::string(GetModelTypeRootTag(type)) + kGlobalMetadataKey;
}
// Holds a one to one mapping between profile path and SequencedTaskRunner. This
// class is expected to be accessed on any thread, and uses a lock to guarantee
// thread safety. The task runners are held onto by scoped_refptrs, and since
// this class is leaky, none of these task runners are ever destroyed.
class TaskRunnerMap {
public:
TaskRunnerMap() = default;
scoped_refptr<base::SequencedTaskRunner> GetOrCreateTaskRunner(
const std::string& path) {
base::AutoLock scoped_lock(lock_);
auto iter = task_runner_map_.find(path);
if (iter == task_runner_map_.end()) {
scoped_refptr<base::SequencedTaskRunner> task_runner =
base::CreateSequencedTaskRunnerWithTraits(
{base::MayBlock(), base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN});
task_runner_map_[path] = task_runner;
return task_runner;
} else {
return iter->second;
}
}
private:
mutable base::Lock lock_;
std::map<std::string, scoped_refptr<base::SequencedTaskRunner>>
task_runner_map_;
DISALLOW_COPY_AND_ASSIGN(TaskRunnerMap);
};
base::LazyInstance<TaskRunnerMap>::Leaky task_runner_map_singleton =
LAZY_INSTANCE_INITIALIZER;
} // namespace
// static
leveldb::WriteBatch* ModelTypeStoreImpl::GetLeveldbWriteBatch(
WriteBatch* write_batch) {
return static_cast<WriteBatchImpl*>(write_batch)->leveldb_write_batch_.get();
}
std::string ModelTypeStoreImpl::FormatDataKey(const std::string& id) {
return data_prefix_ + id;
}
std::string ModelTypeStoreImpl::FormatMetadataKey(const std::string& id) {
return metadata_prefix_ + id;
}
ModelTypeStoreImpl::ModelTypeStoreImpl(
ModelType type,
scoped_refptr<ModelTypeStoreBackend> backend,
scoped_refptr<base::SequencedTaskRunner> backend_task_runner)
: backend_(backend),
backend_task_runner_(backend_task_runner),
data_prefix_(FormatDataPrefix(type)),
metadata_prefix_(FormatMetaPrefix(type)),
global_metadata_key_(FormatGlobalMetadataKey(type)),
weak_ptr_factory_(this) {
DCHECK(backend_);
DCHECK(backend_task_runner_);
}
ModelTypeStoreImpl::~ModelTypeStoreImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
backend_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&NoOpForBackendDtor, std::move(backend_)));
}
// static
void ModelTypeStoreImpl::CreateStore(ModelType type,
const std::string& path,
InitCallback callback) {
DCHECK(!callback.is_null());
std::unique_ptr<leveldb::Env> env;
scoped_refptr<base::SequencedTaskRunner> task_runner =
task_runner_map_singleton.Get().GetOrCreateTaskRunner(path);
std::unique_ptr<Result> result(new Result());
auto task = base::BindOnce(&ModelTypeStoreBackend::GetOrCreateBackend, path,
std::move(env), result.get());
auto reply =
base::BindOnce(&ModelTypeStoreImpl::BackendInitDone, type,
std::move(result), task_runner, std::move(callback));
base::PostTaskAndReplyWithResult(task_runner.get(), FROM_HERE,
std::move(task), std::move(reply));
}
// static
void ModelTypeStoreImpl::CreateInMemoryStoreForTest(ModelType type,
InitCallback callback) {
DCHECK(!callback.is_null());
std::unique_ptr<leveldb::Env> env =
ModelTypeStoreBackend::CreateInMemoryEnv();
std::string path;
env->GetTestDirectory(&path);
path += "/in-memory";
// In-memory store backend works on the same thread as test.
scoped_refptr<base::SequencedTaskRunner> task_runner =
base::ThreadTaskRunnerHandle::Get();
std::unique_ptr<Result> result(new Result());
auto task = base::BindOnce(&ModelTypeStoreBackend::GetOrCreateBackend, path,
std::move(env), result.get());
auto reply =
base::BindOnce(&ModelTypeStoreImpl::BackendInitDone, type,
std::move(result), task_runner, std::move(callback));
base::PostTaskAndReplyWithResult(task_runner.get(), FROM_HERE,
std::move(task), std::move(reply));
}
// static
void ModelTypeStoreImpl::BackendInitDone(
ModelType type,
std::unique_ptr<Result> result,
scoped_refptr<base::SequencedTaskRunner> blocking_task_runner,
InitCallback callback,
scoped_refptr<ModelTypeStoreBackend> backend) {
std::unique_ptr<ModelTypeStoreImpl> store;
if (*result == Result::SUCCESS) {
store.reset(new ModelTypeStoreImpl(type, backend, blocking_task_runner));
}
std::move(callback).Run(*result, std::move(store));
}
// Note on pattern for communicating with backend:
// - API function (e.g. ReadData) allocates lists for output.
// - API function prepares two callbacks: task that will be posted on backend
// thread and reply which will be posted on model type thread once task
// finishes.
// - Task for backend thread takes raw pointers to output lists while reply
// takes ownership of those lists. This allows backend interface to be simple
// while ensuring proper objects' lifetime.
// - Function bound by reply calls consumer's callback and passes ownership of
// output lists to it.
void ModelTypeStoreImpl::ReadData(const IdList& id_list,
ReadDataCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!callback.is_null());
std::unique_ptr<RecordList> record_list(new RecordList());
std::unique_ptr<IdList> missing_id_list(new IdList());
auto task = base::BindOnce(&ModelTypeStoreBackend::ReadRecordsWithPrefix,
base::Unretained(backend_.get()), data_prefix_,
id_list, base::Unretained(record_list.get()),
base::Unretained(missing_id_list.get()));
auto reply = base::BindOnce(
&ModelTypeStoreImpl::ReadDataDone, weak_ptr_factory_.GetWeakPtr(),
std::move(callback), std::move(record_list), std::move(missing_id_list));
base::PostTaskAndReplyWithResult(backend_task_runner_.get(), FROM_HERE,
std::move(task), std::move(reply));
}
void ModelTypeStoreImpl::ReadDataDone(ReadDataCallback callback,
std::unique_ptr<RecordList> record_list,
std::unique_ptr<IdList> missing_id_list,
Result result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::move(callback).Run(result, std::move(record_list),
std::move(missing_id_list));
}
void ModelTypeStoreImpl::ReadAllData(ReadAllDataCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!callback.is_null());
std::unique_ptr<RecordList> record_list(new RecordList());
auto task = base::BindOnce(&ModelTypeStoreBackend::ReadAllRecordsWithPrefix,
base::Unretained(backend_.get()), data_prefix_,
base::Unretained(record_list.get()));
auto reply = base::BindOnce(&ModelTypeStoreImpl::ReadAllDataDone,
weak_ptr_factory_.GetWeakPtr(),
std::move(callback), std::move(record_list));
base::PostTaskAndReplyWithResult(backend_task_runner_.get(), FROM_HERE,
std::move(task), std::move(reply));
}
void ModelTypeStoreImpl::ReadAllDataDone(
ReadAllDataCallback callback,
std::unique_ptr<RecordList> record_list,
Result result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::move(callback).Run(result, std::move(record_list));
}
void ModelTypeStoreImpl::ReadAllMetadata(ReadMetadataCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!callback.is_null());
// ReadAllMetadata performs two steps sequentially: read all metadata records
// and then read global metadata record. Start reading metadata records here.
// When this read operation is done ReadMetadataRecordsDone callback will
// issue read operation for global metadata record.
std::unique_ptr<RecordList> metadata_records(new RecordList());
auto task = base::BindOnce(&ModelTypeStoreBackend::ReadAllRecordsWithPrefix,
base::Unretained(backend_.get()), metadata_prefix_,
base::Unretained(metadata_records.get()));
auto reply = base::BindOnce(&ModelTypeStoreImpl::ReadMetadataRecordsDone,
weak_ptr_factory_.GetWeakPtr(),
std::move(callback), std::move(metadata_records));
base::PostTaskAndReplyWithResult(backend_task_runner_.get(), FROM_HERE,
std::move(task), std::move(reply));
}
void ModelTypeStoreImpl::ReadMetadataRecordsDone(
ReadMetadataCallback callback,
std::unique_ptr<RecordList> metadata_records,
Result result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (result != Result::SUCCESS) {
std::move(callback).Run(ModelError(FROM_HERE, "Reading metadata failed."),
std::make_unique<MetadataBatch>());
return;
}
IdList global_metadata_id;
global_metadata_id.push_back(global_metadata_key_);
std::unique_ptr<RecordList> global_metadata_records(new RecordList());
std::unique_ptr<IdList> missing_id_list(new IdList());
auto task = base::BindOnce(&ModelTypeStoreBackend::ReadRecordsWithPrefix,
base::Unretained(backend_.get()), std::string(),
global_metadata_id,
base::Unretained(global_metadata_records.get()),
base::Unretained(missing_id_list.get()));
auto reply = base::BindOnce(
&ModelTypeStoreImpl::ReadAllMetadataDone, weak_ptr_factory_.GetWeakPtr(),
std::move(callback), std::move(metadata_records),
std::move(global_metadata_records), std::move(missing_id_list));
base::PostTaskAndReplyWithResult(backend_task_runner_.get(), FROM_HERE,
std::move(task), std::move(reply));
}
void ModelTypeStoreImpl::ReadAllMetadataDone(
ReadMetadataCallback callback,
std::unique_ptr<RecordList> metadata_records,
std::unique_ptr<RecordList> global_metadata_records,
std::unique_ptr<IdList> missing_id_list,
Result result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (result != Result::SUCCESS) {
std::move(callback).Run(ModelError(FROM_HERE, "Reading metadata failed."),
std::make_unique<MetadataBatch>());
return;
}
std::string global_metadata = "";
if (!missing_id_list->empty()) {
// Missing global metadata record is not an error; we can just return the
// default instance using the empty string above.
DCHECK_EQ(global_metadata_key_, (*missing_id_list)[0]);
DCHECK(global_metadata_records->empty());
} else {
DCHECK_EQ(1U, global_metadata_records->size());
DCHECK_EQ(global_metadata_key_, (*global_metadata_records)[0].id);
global_metadata = (*global_metadata_records)[0].value;
}
DeserializeMetadata(std::move(callback), global_metadata,
std::move(metadata_records));
}
void ModelTypeStoreImpl::DeserializeMetadata(
ReadMetadataCallback callback,
const std::string& global_metadata,
std::unique_ptr<RecordList> metadata_records) {
auto metadata_batch = std::make_unique<MetadataBatch>();
sync_pb::ModelTypeState state;
if (!state.ParseFromString(global_metadata)) {
std::move(callback).Run(
ModelError(FROM_HERE, "Failed to deserialize model type state."),
std::make_unique<MetadataBatch>());
return;
}
metadata_batch->SetModelTypeState(state);
for (const Record& r : *metadata_records.get()) {
sync_pb::EntityMetadata entity_metadata;
if (!entity_metadata.ParseFromString(r.value)) {
std::move(callback).Run(
ModelError(FROM_HERE, "Failed to deserialize entity metadata."),
std::make_unique<MetadataBatch>());
return;
}
metadata_batch->AddMetadata(r.id, entity_metadata);
}
std::move(callback).Run({}, std::move(metadata_batch));
}
std::unique_ptr<ModelTypeStore::WriteBatch>
ModelTypeStoreImpl::CreateWriteBatch() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return std::make_unique<WriteBatchImpl>(this);
}
void ModelTypeStoreImpl::CommitWriteBatch(
std::unique_ptr<WriteBatch> write_batch,
CallbackWithResult callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!callback.is_null());
WriteBatchImpl* write_batch_impl =
static_cast<WriteBatchImpl*>(write_batch.get());
auto task = base::BindOnce(&ModelTypeStoreBackend::WriteModifications,
base::Unretained(backend_.get()),
std::move(write_batch_impl->leveldb_write_batch_));
auto reply =
base::BindOnce(&ModelTypeStoreImpl::WriteModificationsDone,
weak_ptr_factory_.GetWeakPtr(), std::move(callback));
base::PostTaskAndReplyWithResult(backend_task_runner_.get(), FROM_HERE,
std::move(task), std::move(reply));
}
void ModelTypeStoreImpl::WriteModificationsDone(CallbackWithResult callback,
Result result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::move(callback).Run(result);
}
void ModelTypeStoreImpl::WriteData(WriteBatch* write_batch,
const std::string& id,
const std::string& value) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
GetLeveldbWriteBatch(write_batch)->Put(FormatDataKey(id), value);
}
void ModelTypeStoreImpl::WriteMetadata(WriteBatch* write_batch,
const std::string& id,
const std::string& value) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
GetLeveldbWriteBatch(write_batch)->Put(FormatMetadataKey(id), value);
}
void ModelTypeStoreImpl::WriteGlobalMetadata(WriteBatch* write_batch,
const std::string& value) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
GetLeveldbWriteBatch(write_batch)->Put(global_metadata_key_, value);
}
void ModelTypeStoreImpl::DeleteData(WriteBatch* write_batch,
const std::string& id) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
GetLeveldbWriteBatch(write_batch)->Delete(FormatDataKey(id));
}
void ModelTypeStoreImpl::DeleteMetadata(WriteBatch* write_batch,
const std::string& id) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
GetLeveldbWriteBatch(write_batch)->Delete(FormatMetadataKey(id));
}
void ModelTypeStoreImpl::DeleteGlobalMetadata(WriteBatch* write_batch) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
GetLeveldbWriteBatch(write_batch)->Delete(global_metadata_key_);
}
ModelTypeStoreImpl::WriteBatchImpl::WriteBatchImpl(ModelTypeStore* store)
: WriteBatch(store) {
leveldb_write_batch_ = std::make_unique<leveldb::WriteBatch>();
}
ModelTypeStoreImpl::WriteBatchImpl::~WriteBatchImpl() {}
} // namespace syncer