blob: 7d935d8db0561d303cfeb42036c7024272227f0f [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from google.appengine.ext import ndb
from dashboard import alerts
from dashboard.api import api_request_handler
from dashboard.api import utils as api_utils
from dashboard.common import descriptor
from dashboard.common import timing
from dashboard.common import utils
from dashboard.models import anomaly
from dashboard.models import graph_data
from dashboard.models import histogram
# These limits should prevent DeadlineExceededErrors.
# TODO(benjhayden): Find a better strategy for staying under the deadline.
DIAGNOSTICS_QUERY_LIMIT = 10000
HISTOGRAMS_QUERY_LIMIT = 1000
ROWS_QUERY_LIMIT = 20000
COLUMNS_REQUIRING_ROWS = {'timestamp', 'revisions'}.union(descriptor.STATISTICS)
CACHE_SECONDS = 60 * 60 * 24 * 7
class Timeseries2Handler(api_request_handler.ApiRequestHandler):
def _CheckUser(self):
pass
def Post(self):
desc = descriptor.Descriptor(
test_suite=self.request.get('test_suite'),
measurement=self.request.get('measurement'),
bot=self.request.get('bot'),
test_case=self.request.get('test_case'),
statistic=None,
build_type=self.request.get('build_type'))
min_revision = self.request.get('min_revision')
min_revision = int(min_revision) if min_revision else None
max_revision = self.request.get('max_revision')
max_revision = int(max_revision) if max_revision else None
query = TimeseriesQuery(
desc, self.request.get('columns').split(','),
min_revision, max_revision,
api_utils.ParseISO8601(self.request.get('min_timestamp', None)),
api_utils.ParseISO8601(self.request.get('max_timestamp', None)))
try:
result = query.FetchSync()
except AssertionError:
# The caller has requested internal-only data but is not authorized.
raise api_request_handler.NotFoundError
self.response.headers['Cache-Control'] = '%s, max-age=%d' % (
'private' if query.private else 'public', CACHE_SECONDS)
return result
class TimeseriesQuery(object):
def __init__(self, desc, columns, min_revision=None, max_revision=None,
min_timestamp=None, max_timestamp=None):
self._descriptor = desc
self._columns = columns
self._min_revision = min_revision
self._max_revision = max_revision
self._min_timestamp = min_timestamp
self._max_timestamp = max_timestamp
self._statistic_columns = []
self._unsuffixed_test_metadata_keys = []
self._test_keys = []
self._units = None
self._improvement_direction = None
self._data = {}
self._private = False
@property
def private(self):
return self._private
@timing.TimeWall('fetch')
@timing.TimeCpu('fetch')
def FetchSync(self):
return self.FetchAsync().get_result()
@ndb.tasklet
def FetchAsync(self):
"""Returns {units, improvement_direction, data}.
Raises:
api_request_handler.NotFoundError when the timeseries cannot be found.
AssertionError when an external user requests internal-only data.
"""
# Always need to query TestMetadata even if the caller doesn't need units
# and improvement_direction because Row doesn't have internal_only.
# Use tasklets so that they can process the data as it arrives.
self._CreateTestKeys()
futures = [self._FetchTests()]
if COLUMNS_REQUIRING_ROWS.intersection(self._columns):
futures.append(self._FetchRows())
elif 'histogram' in self._columns:
futures.append(self._FetchHistograms())
if 'alert' in self._columns:
self._ResolveTimestamps()
futures.append(self._FetchAlerts())
if 'diagnostics' in self._columns:
self._ResolveTimestamps()
futures.append(self._FetchDiagnostics())
yield futures
raise ndb.Return({
'units': self._units,
'improvement_direction': self._improvement_direction,
'data': [[datum.get(col) for col in self._columns]
for _, datum in sorted(self._data.iteritems())],
})
def _ResolveTimestamps(self):
if self._min_timestamp and self._min_revision is None:
self._min_revision = self._ResolveTimestamp(self._min_timestamp)
if self._max_timestamp and self._max_revision is None:
self._max_revision = self._ResolveTimestamp(self._max_timestamp)
def _ResolveTimestamp(self, timestamp):
query = graph_data.Row.query(
graph_data.Row.parent_test.IN(self._test_keys),
graph_data.Row.timestamp <= timestamp)
query = query.order(-graph_data.Row.timestamp)
row_keys = query.fetch(1, keys_only=True)
if not row_keys:
return None
return row_keys[0].integer_id()
def _CreateTestKeys(self):
desc = self._descriptor.Clone()
desc.statistic = None
unsuffixed_test_paths = desc.ToTestPathsSync()
self._unsuffixed_test_metadata_keys = [
utils.TestMetadataKey(path) for path in unsuffixed_test_paths]
self._statistic_columns = [
col for col in self._columns if col in descriptor.STATISTICS]
test_paths = []
for statistic in self._statistic_columns:
desc.statistic = statistic
test_paths.extend(desc.ToTestPathsSync())
test_metadata_keys = [utils.TestMetadataKey(path) for path in test_paths]
test_metadata_keys.extend(self._unsuffixed_test_metadata_keys)
test_paths.extend(unsuffixed_test_paths)
test_old_keys = [utils.OldStyleTestKey(path) for path in test_paths]
self._test_keys = test_old_keys + test_metadata_keys
@ndb.tasklet
def _FetchTests(self):
with timing.WallTimeLogger('fetch_tests'):
tests = yield [key.get_async() for key in self._test_keys]
tests = [test for test in tests if test]
if not tests:
raise api_request_handler.NotFoundError
improvement_direction = None
for test in tests:
if test.internal_only:
self._private = True
test_desc = yield descriptor.Descriptor.FromTestPathAsync(
utils.TestPath(test.key))
# The unit for 'count' statistics is trivially always 'count'. Callers
# certainly want the units of the measurement, which is the same as the
# units of the 'avg' and 'std' statistics.
if self._units is None or test_desc.statistic != 'count':
self._units = test.units
improvement_direction = test.improvement_direction
if improvement_direction == anomaly.DOWN:
self._improvement_direction = 'down'
elif improvement_direction == anomaly.UP:
self._improvement_direction = 'up'
else:
self._improvement_direction = None
def _Datum(self, revision):
return self._data.setdefault(revision, {'revision': revision})
@ndb.tasklet
def _FetchRows(self):
yield [self._FetchRowsForTest(test_key) for test_key in self._test_keys]
@staticmethod
def Round(x):
return round(x, 6)
def _RowQueryProjection(self, statistic):
limit = ROWS_QUERY_LIMIT
projection = None
# 'r_'-prefixed revisions are not in any index, so a projection query can't
# get them.
if 'revisions' in self._columns:
return projection, limit
# There is no index like (parent_test, -timestamp, revision, value):
self._ResolveTimestamps()
# If statistic is not None, then project value. _CreateTestKeys will
# generate test keys for the other statistics in columns if there are any.
# If statistic is None and the only statistic is avg, then project value.
# If statistic is None and there are multiple statistics, then fetch full
# Row entities because we might need their 'error' aka 'std' or
# 'd_'-prefixed statistics.
if ((statistic is not None) or
(','.join(self._statistic_columns) == 'avg')):
projection = ['revision', 'value']
if 'timestamp' in self._columns:
projection.append('timestamp')
limit = None
return projection, limit
@ndb.tasklet
def _FetchRowsForTest(self, test_key):
test_desc = yield descriptor.Descriptor.FromTestPathAsync(
utils.TestPath(test_key))
projection, limit = self._RowQueryProjection(test_desc.statistic)
query = graph_data.Row.query(projection=projection)
query = query.filter(graph_data.Row.parent_test == test_key)
query = self._FilterRowQuery(query)
with timing.WallTimeLogger('fetch_test'):
rows = yield query.fetch_async(limit)
with timing.CpuTimeLogger('rows'):
for row in rows:
# Sometimes the dev environment just ignores some filters.
if self._min_revision and row.revision < self._min_revision:
continue
if self._min_timestamp and row.timestamp < self._min_timestamp:
continue
if self._max_revision and row.revision > self._max_revision:
continue
if self._max_timestamp and row.timestamp > self._max_timestamp:
continue
datum = self._Datum(row.revision)
if test_desc.statistic is None:
datum['avg'] = self.Round(row.value)
if hasattr(row, 'error') and row.error:
datum['std'] = self.Round(row.error)
else:
datum[test_desc.statistic] = self.Round(row.value)
for stat in self._statistic_columns:
if hasattr(row, 'd_' + stat):
datum[stat] = self.Round(getattr(row, 'd_' + stat))
if 'timestamp' in self._columns:
datum['timestamp'] = row.timestamp.isoformat()
if 'revisions' in self._columns:
datum['revisions'] = {
attr: value for attr, value in row.to_dict().iteritems()
if attr.startswith('r_')}
if 'histogram' in self._columns and test_desc.statistic == None:
with timing.WallTimeLogger('fetch_histograms'):
yield [self._FetchHistogram(test_key, row.revision) for row in rows]
def _FilterRowQuery(self, query):
if self._min_revision or self._max_revision:
if self._min_revision:
query = query.filter(graph_data.Row.revision >= self._min_revision)
if self._max_revision:
query = query.filter(graph_data.Row.revision <= self._max_revision)
query = query.order(-graph_data.Row.revision)
elif self._min_timestamp or self._max_timestamp:
if self._min_timestamp:
query = query.filter(graph_data.Row.timestamp >= self._min_timestamp)
if self._max_timestamp:
query = query.filter(graph_data.Row.timestamp <= self._max_timestamp)
query = query.order(-graph_data.Row.timestamp)
else:
query = query.order(-graph_data.Row.revision)
return query
@ndb.tasklet
def _FetchAlerts(self):
anomalies, _, _ = yield anomaly.Anomaly.QueryAsync(
test_keys=self._test_keys,
max_start_revision=self._max_revision,
min_end_revision=self._min_revision)
for alert in anomalies:
if alert.internal_only:
self._private = True
datum = self._Datum(alert.end_revision)
# TODO(benjhayden) bisect_status
datum['alert'] = alerts.GetAnomalyDict(alert)
@ndb.tasklet
def _FetchHistograms(self):
yield [self._FetchHistogramsForTest(test) for test in self._test_keys]
@ndb.tasklet
def _FetchHistogramsForTest(self, test):
query = graph_data.Row.query(graph_data.Row.parent_test == test)
query = self._FilterRowQuery(query)
with timing.WallTimeLogger('fetch_histograms'):
row_keys = yield query.fetch_async(HISTOGRAMS_QUERY_LIMIT, keys_only=True)
yield [self._FetchHistogram(test, row_key.integer_id())
for row_key in row_keys]
@ndb.tasklet
def _FetchHistogram(self, test, revision):
query = histogram.Histogram.query(
histogram.Histogram.test == utils.TestMetadataKey(test),
histogram.Histogram.revision == revision)
hist = yield query.get_async()
if hist is None:
return
if hist.internal_only:
self._private = True
self._Datum(hist.revision)['histogram'] = hist.data
@ndb.tasklet
def _FetchDiagnostics(self):
with timing.WallTimeLogger('fetch_diagnosticss'):
yield [self._FetchDiagnosticsForTest(test)
for test in self._unsuffixed_test_metadata_keys]
@ndb.tasklet
def _FetchDiagnosticsForTest(self, test):
query = histogram.SparseDiagnostic.query(
histogram.SparseDiagnostic.test == test)
if self._min_revision:
query = query.filter(
histogram.SparseDiagnostic.start_revision >= self._min_revision)
if self._max_revision:
query = query.filter(
histogram.SparseDiagnostic.start_revision <= self._max_revision)
query = query.order(-histogram.SparseDiagnostic.start_revision)
diagnostics = yield query.fetch_async(DIAGNOSTICS_QUERY_LIMIT)
for diag in diagnostics:
if diag.internal_only:
self._private = True
datum = self._Datum(diag.start_revision)
datum_diags = datum.setdefault('diagnostics', {})
datum_diags[diag.name] = diag.data