blob: 633e18d495a0e77e2155b87b2875bc817c8194b0 [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/user_events/user_event_sync_bridge.h"
#include <set>
#include <utility>
#include <vector>
#include "base/big_endian.h"
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/stl_util.h"
#include "base/strings/string_number_conversions.h"
#include "components/sync/model/entity_change.h"
#include "components/sync/model/metadata_batch.h"
#include "components/sync/model/mutable_data_batch.h"
#include "components/sync/protocol/sync.pb.h"
namespace syncer {
using sync_pb::UserEventSpecifics;
using sync_pb::ModelTypeState;
using IdList = ModelTypeStore::IdList;
using Record = ModelTypeStore::Record;
using RecordList = ModelTypeStore::RecordList;
using Result = ModelTypeStore::Result;
using WriteBatch = ModelTypeStore::WriteBatch;
namespace {
std::string GetStorageKeyFromSpecifics(const UserEventSpecifics& specifics) {
// Force Big Endian, this means newly created keys are last in sort order,
// which allows leveldb to append new writes, which it is best at.
// TODO(skym): Until we force |event_time_usec| to never conflict, this has
// the potential for errors.
std::string key(8, 0);
base::WriteBigEndian(&key[0], specifics.event_time_usec());
return key;
}
int64_t GetEventTimeFromStorageKey(const std::string& storage_key) {
int64_t event_time;
base::ReadBigEndian(&storage_key[0], &event_time);
return event_time;
}
std::unique_ptr<EntityData> MoveToEntityData(
std::unique_ptr<UserEventSpecifics> specifics) {
auto entity_data = std::make_unique<EntityData>();
entity_data->non_unique_name =
base::Int64ToString(specifics->event_time_usec());
entity_data->specifics.set_allocated_user_event(specifics.release());
return entity_data;
}
std::unique_ptr<EntityData> CopyToEntityData(
const UserEventSpecifics specifics) {
auto entity_data = std::make_unique<EntityData>();
entity_data->non_unique_name =
base::Int64ToString(specifics.event_time_usec());
*entity_data->specifics.mutable_user_event() = specifics;
return entity_data;
}
} // namespace
UserEventSyncBridge::UserEventSyncBridge(
OnceModelTypeStoreFactory store_factory,
const ChangeProcessorFactory& change_processor_factory,
GlobalIdMapper* global_id_mapper)
: ModelTypeSyncBridge(change_processor_factory, USER_EVENTS),
global_id_mapper_(global_id_mapper) {
DCHECK(global_id_mapper_);
std::move(store_factory)
.Run(USER_EVENTS, base::BindOnce(&UserEventSyncBridge::OnStoreCreated,
base::AsWeakPtr(this)));
global_id_mapper_->AddGlobalIdChangeObserver(base::Bind(
&UserEventSyncBridge::HandleGlobalIdChange, base::AsWeakPtr(this)));
}
UserEventSyncBridge::~UserEventSyncBridge() {}
std::unique_ptr<MetadataChangeList>
UserEventSyncBridge::CreateMetadataChangeList() {
return WriteBatch::CreateMetadataChangeList();
}
base::Optional<ModelError> UserEventSyncBridge::MergeSyncData(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_data) {
NOTREACHED();
return {};
}
base::Optional<ModelError> UserEventSyncBridge::ApplySyncChanges(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_changes) {
std::unique_ptr<WriteBatch> batch = store_->CreateWriteBatch();
std::set<int64_t> deleted_event_times;
for (EntityChange& change : entity_changes) {
DCHECK_EQ(EntityChange::ACTION_DELETE, change.type());
batch->DeleteData(change.storage_key());
deleted_event_times.insert(
GetEventTimeFromStorageKey(change.storage_key()));
}
// Because we receive ApplySyncChanges with deletions when our commits are
// confirmed, this is the perfect time to cleanup our in flight objects which
// are no longer in flight.
base::EraseIf(in_flight_nav_linked_events_,
[&deleted_event_times](
const std::pair<int64_t, sync_pb::UserEventSpecifics> kv) {
return base::ContainsKey(deleted_event_times,
kv.second.event_time_usec());
});
batch->TransferMetadataChanges(std::move(metadata_change_list));
store_->CommitWriteBatch(
std::move(batch),
base::Bind(&UserEventSyncBridge::OnCommit, base::AsWeakPtr(this)));
return {};
}
void UserEventSyncBridge::GetData(StorageKeyList storage_keys,
DataCallback callback) {
store_->ReadData(storage_keys, base::Bind(&UserEventSyncBridge::OnReadData,
base::AsWeakPtr(this), callback));
}
void UserEventSyncBridge::GetAllData(DataCallback callback) {
store_->ReadAllData(base::Bind(&UserEventSyncBridge::OnReadAllData,
base::AsWeakPtr(this), callback));
}
std::string UserEventSyncBridge::GetClientTag(const EntityData& entity_data) {
return GetStorageKey(entity_data);
}
std::string UserEventSyncBridge::GetStorageKey(const EntityData& entity_data) {
return GetStorageKeyFromSpecifics(entity_data.specifics.user_event());
}
void UserEventSyncBridge::DisableSync() {
// No data should be retained through sign out.
store_->ReadAllData(base::Bind(&UserEventSyncBridge::OnReadAllDataToDelete,
base::AsWeakPtr(this)));
}
void UserEventSyncBridge::RecordUserEvent(
std::unique_ptr<UserEventSpecifics> specifics) {
// TODO(skym): Remove this when ModelTypeStore synchronously returns a
// partially initialized reference, see crbug.com/709094.
if (!store_) {
return;
}
// TODO(skym): Remove this when the processor can handle Put() calls before
// being given metadata, see crbug.com/761485. Dropping data on the floor here
// is better than just writing to the store, because it will be lost if sent
// to just the store, and bloat persistent storage indefinitely.
if (!change_processor()->IsTrackingMetadata()) {
return;
}
std::string storage_key = GetStorageKeyFromSpecifics(*specifics);
// There are two scenarios we need to guard against here. First, the given
// user even may have been read from an old global_id timestamp off of a
// navigation, which has already been re-written. In this case, we should be
// able to look up the latest/best global_id to use right now, and update as
// such. The other scenario is that the navigation is going to be updated in
// the future, and the current global_id, while valid for now, is never going
// to make it to the server, and will need to be fixed. To handle this
// scenario, we store a specifics copy in |in in_flight_nav_linked_events_|,
// and will re-record in HandleGlobalIdChange.
if (specifics->has_navigation_id()) {
int64_t latest_global_id =
global_id_mapper_->GetLatestGlobalId(specifics->navigation_id());
specifics->set_navigation_id(latest_global_id);
in_flight_nav_linked_events_.insert(
std::make_pair(latest_global_id, *specifics));
}
std::unique_ptr<WriteBatch> batch = store_->CreateWriteBatch();
batch->WriteData(storage_key, specifics->SerializeAsString());
change_processor()->Put(storage_key, MoveToEntityData(std::move(specifics)),
batch->GetMetadataChangeList());
store_->CommitWriteBatch(
std::move(batch),
base::Bind(&UserEventSyncBridge::OnCommit, base::AsWeakPtr(this)));
}
void UserEventSyncBridge::OnStoreCreated(
Result result,
std::unique_ptr<ModelTypeStore> store) {
if (result == Result::SUCCESS) {
std::swap(store_, store);
store_->ReadAllMetadata(base::Bind(&UserEventSyncBridge::OnReadAllMetadata,
base::AsWeakPtr(this)));
} else {
change_processor()->ReportError(
{FROM_HERE, "ModelTypeStore creation failed."});
}
}
void UserEventSyncBridge::OnReadAllMetadata(
base::Optional<ModelError> error,
std::unique_ptr<MetadataBatch> metadata_batch) {
if (error) {
change_processor()->ReportError(*error);
} else {
if (!metadata_batch->GetModelTypeState().initial_sync_done()) {
// We have never initialized before, force it to true. We are not going to
// ever have a GetUpdates because our type is commit only.
// TODO(skym): Do we need to worry about saving this back ourselves? Or
// does that get taken care for us?
ModelTypeState state = metadata_batch->GetModelTypeState();
state.set_initial_sync_done(true);
metadata_batch->SetModelTypeState(state);
}
change_processor()->ModelReadyToSync(std::move(metadata_batch));
}
}
void UserEventSyncBridge::OnCommit(Result result) {
if (result != Result::SUCCESS) {
change_processor()->ReportError({FROM_HERE, "Failed writing user events."});
}
}
void UserEventSyncBridge::OnReadData(DataCallback callback,
Result result,
std::unique_ptr<RecordList> data_records,
std::unique_ptr<IdList> missing_id_list) {
OnReadAllData(callback, result, std::move(data_records));
}
void UserEventSyncBridge::OnReadAllData(
DataCallback callback,
Result result,
std::unique_ptr<RecordList> data_records) {
if (result != Result::SUCCESS) {
change_processor()->ReportError({FROM_HERE, "Failed reading user events."});
return;
}
auto batch = std::make_unique<MutableDataBatch>();
UserEventSpecifics specifics;
for (const Record& r : *data_records) {
if (specifics.ParseFromString(r.value)) {
DCHECK_EQ(r.id, GetStorageKeyFromSpecifics(specifics));
batch->Put(r.id, CopyToEntityData(specifics));
} else {
change_processor()->ReportError(
{FROM_HERE, "Failed deserializing user events."});
return;
}
}
callback.Run(std::move(batch));
}
void UserEventSyncBridge::OnReadAllDataToDelete(
Result result,
std::unique_ptr<RecordList> data_records) {
if (result != Result::SUCCESS) {
change_processor()->ReportError({FROM_HERE, "Failed reading user events."});
return;
}
std::unique_ptr<WriteBatch> batch = store_->CreateWriteBatch();
for (const Record& r : *data_records) {
batch->DeleteData(r.id);
}
store_->CommitWriteBatch(
std::move(batch),
base::Bind(&UserEventSyncBridge::OnCommit, base::AsWeakPtr(this)));
}
void UserEventSyncBridge::HandleGlobalIdChange(int64_t old_global_id,
int64_t new_global_id) {
DCHECK_NE(old_global_id, new_global_id);
// Store specifics in a temp vector while erasing, as |RecordUserEvent()| will
// insert new values into |in_flight_nav_linked_events_|. While insert should
// not invalidate a std::multimap's iterator, and the updated global_id should
// not be within our given range, this approach seems less error prone.
std::vector<std::unique_ptr<UserEventSpecifics>> affected;
auto range = in_flight_nav_linked_events_.equal_range(old_global_id);
for (auto iter = range.first; iter != range.second;) {
DCHECK_EQ(old_global_id, iter->second.navigation_id());
affected.emplace_back(std::make_unique<UserEventSpecifics>(iter->second));
iter = in_flight_nav_linked_events_.erase(iter);
}
for (std::unique_ptr<UserEventSpecifics>& specifics : affected) {
specifics->set_navigation_id(new_global_id);
RecordUserEvent(std::move(specifics));
}
}
} // namespace syncer