blob: b4b79464ddba15316d70fc0ae5124bb8529d08f2 [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""URL endpoint for adding new histograms to the dashboard."""
import cloudstorage
import json
import logging
import sys
import uuid
import zlib
from google.appengine.api import taskqueue
from dashboard.api import api_request_handler
from dashboard.common import datastore_hooks
from dashboard.common import histogram_helpers
from dashboard.common import request_handler
from dashboard.common import timing
from dashboard.common import utils
from dashboard.models import graph_data
from dashboard.models import histogram
from tracing.value import histogram_set
from tracing.value.diagnostics import diagnostic
from tracing.value.diagnostics import reserved_infos
SUITE_LEVEL_SPARSE_DIAGNOSTIC_NAMES = set([
reserved_infos.ARCHITECTURES.name,
reserved_infos.BENCHMARKS.name,
reserved_infos.BENCHMARK_DESCRIPTIONS.name,
reserved_infos.BOTS.name,
reserved_infos.BUG_COMPONENTS.name,
reserved_infos.DOCUMENTATION_URLS.name,
reserved_infos.GPUS.name,
reserved_infos.MASTERS.name,
reserved_infos.MEMORY_AMOUNTS.name,
reserved_infos.OS_NAMES.name,
reserved_infos.OS_VERSIONS.name,
reserved_infos.OWNERS.name,
reserved_infos.PRODUCT_VERSIONS.name,
reserved_infos.TAG_MAP.name,
])
HISTOGRAM_LEVEL_SPARSE_DIAGNOSTIC_NAMES = set([
reserved_infos.DEVICE_IDS.name,
reserved_infos.RELATED_NAMES.name,
reserved_infos.STORIES.name,
reserved_infos.STORYSET_REPEATS.name,
reserved_infos.STORY_TAGS.name,
])
SPARSE_DIAGNOSTIC_NAMES = SUITE_LEVEL_SPARSE_DIAGNOSTIC_NAMES.union(
HISTOGRAM_LEVEL_SPARSE_DIAGNOSTIC_NAMES)
TASK_QUEUE_NAME = 'histograms-queue'
_RETRY_PARAMS = cloudstorage.RetryParams(backoff_factor=1.1)
_TASK_RETRY_LIMIT = 4
def _CheckRequest(condition, msg):
if not condition:
raise api_request_handler.BadRequestError(msg)
class AddHistogramsProcessHandler(request_handler.RequestHandler):
def post(self):
datastore_hooks.SetPrivilegedRequest()
try:
params = json.loads(self.request.body)
gcs_file_path = params['gcs_file_path']
try:
gcs_file = cloudstorage.open(
gcs_file_path, 'r', retry_params=_RETRY_PARAMS)
contents = gcs_file.read()
data_str = zlib.decompress(contents)
gcs_file.close()
finally:
cloudstorage.delete(gcs_file_path, retry_params=_RETRY_PARAMS)
with timing.WallTimeLogger('json.loads'):
histogram_dicts = json.loads(data_str)
ProcessHistogramSet(histogram_dicts)
except Exception as e: # pylint: disable=broad-except
logging.error('Error processing histograms: %r', e.message)
self.response.out.write(json.dumps({'error': e.message}))
class AddHistogramsHandler(api_request_handler.ApiRequestHandler):
def _CheckUser(self):
self._CheckIsInternalUser()
def Post(self):
with timing.WallTimeLogger('decompress'):
try:
data_str = self.request.body
zlib.decompress(data_str)
logging.info('Recieved compressed data.')
except zlib.error:
data_str = self.request.get('data')
if not data_str:
raise api_request_handler.BadRequestError(
'Missing or uncompressed data.')
data_str = zlib.compress(data_str)
logging.info('Recieved uncompressed data.')
if not data_str:
raise api_request_handler.BadRequestError('Missing "data" parameter')
filename = uuid.uuid4()
params = {'gcs_file_path': '/add-histograms-cache/%s' % filename}
gcs_file = cloudstorage.open(
params['gcs_file_path'], 'w',
content_type='application/octet-stream',
retry_params=_RETRY_PARAMS)
gcs_file.write(data_str)
gcs_file.close()
retry_options = taskqueue.TaskRetryOptions(
task_retry_limit=_TASK_RETRY_LIMIT)
queue = taskqueue.Queue('default')
queue.add(
taskqueue.Task(
url='/add_histograms/process', payload=json.dumps(params),
retry_options=retry_options))
def _LogDebugInfo(histograms):
hist = histograms.GetFirstHistogram()
if not hist:
logging.info('No histograms in data.')
return
log_urls = hist.diagnostics.get(reserved_infos.LOG_URLS.name)
if log_urls:
log_urls = list(log_urls)
msg = 'Buildbot URL: %s' % str(log_urls)
logging.info(msg)
else:
logging.info('No LOG_URLS in data.')
build_urls = hist.diagnostics.get(reserved_infos.BUILD_URLS.name)
if build_urls:
build_urls = list(build_urls)
msg = 'Build URL: %s' % str(build_urls)
logging.info(msg)
else:
logging.info('No BUILD_URLS in data.')
def ProcessHistogramSet(histogram_dicts):
if not isinstance(histogram_dicts, list):
raise api_request_handler.BadRequestError(
'HistogramSet JSON much be a list of dicts')
histograms = histogram_set.HistogramSet()
with timing.WallTimeLogger('hs.ImportDicts'):
histograms.ImportDicts(histogram_dicts)
with timing.WallTimeLogger('hs.ResolveRelatedHistograms'):
histograms.ResolveRelatedHistograms()
with timing.WallTimeLogger('hs.DeduplicateDiagnostics'):
histograms.DeduplicateDiagnostics()
if len(histograms) == 0:
raise api_request_handler.BadRequestError(
'HistogramSet JSON must contain at least one histogram.')
with timing.WallTimeLogger('hs._LogDebugInfo'):
_LogDebugInfo(histograms)
with timing.WallTimeLogger('InlineDenseSharedDiagnostics'):
InlineDenseSharedDiagnostics(histograms)
# TODO(#4242): Get rid of this.
# https://github.com/catapult-project/catapult/issues/4242
with timing.WallTimeLogger('_PurgeHistogramBinData'):
_PurgeHistogramBinData(histograms)
with timing.WallTimeLogger('_GetDiagnosticValue calls'):
master = _GetDiagnosticValue(
reserved_infos.MASTERS.name, histograms.GetFirstHistogram())
bot = _GetDiagnosticValue(
reserved_infos.BOTS.name, histograms.GetFirstHistogram())
benchmark = _GetDiagnosticValue(
reserved_infos.BENCHMARKS.name, histograms.GetFirstHistogram())
benchmark_description = _GetDiagnosticValue(
reserved_infos.BENCHMARK_DESCRIPTIONS.name,
histograms.GetFirstHistogram(), optional=True)
with timing.WallTimeLogger('_ValidateMasterBotBenchmarkName'):
_ValidateMasterBotBenchmarkName(master, bot, benchmark)
with timing.WallTimeLogger('ComputeRevision'):
suite_key = utils.TestKey('%s/%s/%s' % (master, bot, benchmark))
logging.info('Suite: %s', suite_key.id())
revision = ComputeRevision(histograms)
internal_only = graph_data.Bot.GetInternalOnlySync(master, bot)
revision_record = histogram.HistogramRevisionRecord.GetOrCreate(
suite_key, revision)
revision_record.put()
last_added = histogram.HistogramRevisionRecord.GetLatest(
suite_key).get_result()
# On first upload, a query immediately following a put may return nothing.
if not last_added:
last_added = revision_record
_CheckRequest(last_added, 'No last revision')
# We'll skip the histogram-level sparse diagnostics because we need to
# handle those with the histograms, below, so that we can properly assign
# test paths.
with timing.WallTimeLogger('FindSuiteLevelSparseDiagnostics'):
suite_level_sparse_diagnostic_entities = FindSuiteLevelSparseDiagnostics(
histograms, suite_key, revision, internal_only)
# TODO(896856): Refactor master/bot computation to happen above this line
# so that we can replace with a DiagnosticRef rather than a full diagnostic.
with timing.WallTimeLogger('DeduplicateAndPut'):
new_guids_to_old_diagnostics = (
histogram.SparseDiagnostic.FindOrInsertDiagnostics(
suite_level_sparse_diagnostic_entities, suite_key,
revision, last_added.revision).get_result())
with timing.WallTimeLogger('ReplaceSharedDiagnostic calls'):
for new_guid, old_diagnostic in new_guids_to_old_diagnostics.iteritems():
histograms.ReplaceSharedDiagnostic(
new_guid, diagnostic.Diagnostic.FromDict(old_diagnostic))
with timing.WallTimeLogger('_BatchHistogramsIntoTasks'):
tasks = _BatchHistogramsIntoTasks(
suite_key.id(), histograms, revision, benchmark_description)
with timing.WallTimeLogger('_QueueHistogramTasks'):
_QueueHistogramTasks(tasks)
def _ValidateMasterBotBenchmarkName(master, bot, benchmark):
for n in (master, bot, benchmark):
if '/' in n:
raise api_request_handler.BadRequestError('Illegal slash in %s' % n)
def _QueueHistogramTasks(tasks):
queue = taskqueue.Queue(TASK_QUEUE_NAME)
futures = []
for i in xrange(0, len(tasks), taskqueue.MAX_TASKS_PER_ADD):
f = queue.add_async(tasks[i:i + taskqueue.MAX_TASKS_PER_ADD])
futures.append(f)
for f in futures:
f.get_result()
def _MakeTask(params):
return taskqueue.Task(
url='/add_histograms_queue', payload=json.dumps(params),
_size_check=False)
def _BatchHistogramsIntoTasks(
suite_path, histograms, revision, benchmark_description):
params = []
tasks = []
base_size = _MakeTask([]).size
estimated_size = 0
duplicate_check = set()
for hist in histograms:
diagnostics = FindHistogramLevelSparseDiagnostics(hist)
# TODO(896856): Don't compute full diagnostics, because we need anyway to
# call GetOrCreate here and in the queue.
test_path = '%s/%s' % (suite_path, histogram_helpers.ComputeTestPath(hist))
if test_path in duplicate_check:
raise api_request_handler.BadRequestError(
'Duplicate histogram detected: %s' % test_path)
duplicate_check.add(test_path)
# TODO(#4135): Batch these better than one per task.
task_dict = _MakeTaskDict(
hist, test_path, revision, benchmark_description, diagnostics)
estimated_size_dict = len(json.dumps(task_dict))
estimated_size += estimated_size_dict
# Creating the task directly and getting the size back is slow, so we just
# keep a running total of estimated task size. A bit hand-wavy but the #
# of histograms per task doesn't need to be perfect, just has to be under
# the max task size.
estimated_total_size = estimated_size * 1.05 + base_size + 1024
if estimated_total_size > taskqueue.MAX_TASK_SIZE_BYTES:
t = _MakeTask(params)
tasks.append(t)
params = []
estimated_size = estimated_size_dict
params.append(task_dict)
if params:
t = _MakeTask(params)
tasks.append(t)
return tasks
def _MakeTaskDict(
hist, test_path, revision, benchmark_description, diagnostics):
# TODO(simonhatch): "revision" is common to all tasks, as is the majority of
# the test path
params = {
'test_path': test_path,
'revision': revision,
'benchmark_description': benchmark_description
}
# By changing the GUID just before serializing the task, we're making it
# unique for each histogram. This avoids each histogram trying to write the
# same diagnostic out (datastore contention), at the cost of copyin the
# data. These are sparsely written to datastore anyway, so the extra
# storage should be minimal.
for d in diagnostics.itervalues():
d.ResetGuid()
diagnostics = {k: d.AsDict() for k, d in diagnostics.iteritems()}
params['diagnostics'] = diagnostics
params['data'] = hist.AsDict()
return params
def FindSuiteLevelSparseDiagnostics(
histograms, suite_key, revision, internal_only):
diagnostics = {}
for hist in histograms:
for name, diag in hist.diagnostics.iteritems():
if name in SUITE_LEVEL_SPARSE_DIAGNOSTIC_NAMES:
existing_entity = diagnostics.get(name)
if existing_entity is None:
diagnostics[name] = histogram.SparseDiagnostic(
id=diag.guid, data=diag.AsDict(), test=suite_key,
start_revision=revision, end_revision=sys.maxint, name=name,
internal_only=internal_only)
elif existing_entity.key.id() != diag.guid:
raise ValueError(
name + ' diagnostics must be the same for all histograms')
return diagnostics.values()
def FindHistogramLevelSparseDiagnostics(hist):
diagnostics = {}
for name, diag in hist.diagnostics.iteritems():
if name in HISTOGRAM_LEVEL_SPARSE_DIAGNOSTIC_NAMES:
diagnostics[name] = diag
return diagnostics
def _GetDiagnosticValue(name, hist, optional=False):
if optional:
if name not in hist.diagnostics:
return None
_CheckRequest(
name in hist.diagnostics,
'Histogram [%s] missing "%s" diagnostic' % (hist.name, name))
value = hist.diagnostics[name]
_CheckRequest(
len(value) == 1,
'Histograms must have exactly 1 "%s"' % name)
return value.GetOnlyElement()
def ComputeRevision(histograms):
_CheckRequest(len(histograms) > 0, 'Must upload at least one histogram')
rev = _GetDiagnosticValue(
reserved_infos.POINT_ID.name,
histograms.GetFirstHistogram(), optional=True)
if rev is None:
rev = _GetDiagnosticValue(
reserved_infos.CHROMIUM_COMMIT_POSITIONS.name,
histograms.GetFirstHistogram(), optional=True)
if rev is None:
revision_timestamps = histograms.GetFirstHistogram().diagnostics.get(
reserved_infos.REVISION_TIMESTAMPS.name)
_CheckRequest(revision_timestamps is not None,
'Must specify REVISION_TIMESTAMPS, CHROMIUM_COMMIT_POSITIONS,'
' or POINT_ID')
rev = revision_timestamps.max_timestamp
if not isinstance(rev, (long, int)):
raise api_request_handler.BadRequestError(
'Point ID must be an integer.')
return rev
def InlineDenseSharedDiagnostics(histograms):
# TODO(896856): Delete inlined diagnostics from the set
for hist in histograms:
diagnostics = hist.diagnostics
for name, diag in diagnostics.iteritems():
if name not in SPARSE_DIAGNOSTIC_NAMES:
diag.Inline()
def _PurgeHistogramBinData(histograms):
# We do this because RelatedEventSet and Breakdown data in bins is
# enormous in their current implementation.
for cur_hist in histograms:
for cur_bin in cur_hist.bins:
for dm in cur_bin.diagnostic_maps:
keys = dm.keys()
for k in keys:
del dm[k]