blob: d6603b9b600aea6a19e77972ffed8a9bb7863e2b [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Monitor jobs and abort them as necessary.
This daemon does a number of upkeep tasks:
* When a process owning a job crashes, job_aborter will mark the job as
aborted in the database and clean up its lease files.
* When a job is marked aborted in the database, job_aborter will signal
the process owning the job to abort.
See also http://goto.google.com/monitor_db_per_job_refactor
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import datetime
import logging
import sys
import time
from lucifer import autotest
from lucifer import leasing
from lucifer import loglib
logger = logging.getLogger(__name__)
def main(args):
"""Main function
@param args: list of command line args
"""
parser = argparse.ArgumentParser(prog='job_aborter', description=__doc__)
parser.add_argument('--jobdir', required=True)
loglib.add_logging_options(parser)
args = parser.parse_args(args)
loglib.configure_logging_with_args(parser, args)
logger.info('Starting with args: %r', args)
autotest.monkeypatch()
_main_loop(jobdir=args.jobdir)
assert False # cannot exit normally
def _main_loop(jobdir):
transaction = autotest.deps_load('django.db.transaction')
@transaction.commit_manually
def flush_transaction():
"""Flush transaction https://stackoverflow.com/questions/3346124/"""
transaction.commit()
while True:
logger.debug('Tick')
_main_loop_body(jobdir)
flush_transaction()
time.sleep(20)
def _main_loop_body(jobdir):
active_leases = {
lease.id: lease for lease in leasing.leases_iter(jobdir)
if not lease.expired()
}
_mark_expired_jobs_failed(active_leases)
_abort_timed_out_jobs(active_leases)
_abort_jobs_marked_aborting(active_leases)
_abort_special_tasks_marked_aborted()
_clean_up_expired_leases(jobdir)
# TODO(crbug.com/748234): abort_jobs_past_max_runtime goes into
# lucifer_run_job
def _mark_expired_jobs_failed(active_leases):
"""Mark expired jobs failed.
Expired jobs are jobs that have an incomplete JobHandoff and that do
not have an active lease. These jobs have been handed off to a
job_reporter, but that job_reporter has crashed. These jobs are
marked failed in the database.
@param active_leases: dict mapping job ids to Leases.
"""
logger.debug('Looking for expired jobs')
job_ids_to_mark = []
for handoff in _incomplete_handoffs_queryset():
logger.debug('Found handoff: %d', handoff.job_id)
if handoff.job_id not in active_leases:
logger.debug('Handoff %d is missing active lease', handoff.job_id)
job_ids_to_mark.append(handoff.job_id)
_mark_failed(job_ids_to_mark)
def _abort_timed_out_jobs(active_leases):
"""Send abort to timed out jobs.
@param active_leases: dict mapping job ids to Leases.
"""
for job in _timed_out_jobs_queryset():
if job.id in active_leases:
active_leases[job.id].abort()
def _abort_jobs_marked_aborting(active_leases):
"""Send abort to jobs marked aborting in Autotest database.
@param active_leases: dict mapping job ids to Leases.
"""
for job in _aborting_jobs_queryset():
if job.id in active_leases:
active_leases[job.id].abort()
def _abort_special_tasks_marked_aborted():
# TODO(crbug.com/748234): Special tasks not implemented yet. This
# would abort jobs running on the behalf of special tasks and thus
# need to check a different database table.
pass
def _clean_up_expired_leases(jobdir):
"""Clean up files for expired leases.
We only care about active leases, so we can remove the stale files
for expired leases.
"""
for lease in leasing.leases_iter(jobdir):
if lease.expired():
lease.cleanup()
_JOB_GRACE_SECS = 10
def _incomplete_handoffs_queryset():
"""Return a QuerySet of incomplete JobHandoffs.
JobHandoff created within a cutoff period are exempt to allow the
job the chance to acquire its lease file; otherwise, incomplete jobs
without an active lease are considered dead.
@returns: Django QuerySet
"""
models = autotest.load('frontend.afe.models')
# Time ---*---------|---------*-------|--->
# incomplete cutoff newborn now
cutoff = (datetime.datetime.now()
- datetime.timedelta(seconds=_JOB_GRACE_SECS))
return models.JobHandoff.objects.filter(
completed=False, created__lt=cutoff)
def _timed_out_jobs_queryset():
"""Return a QuerySet of timed out Jobs.
@returns: Django QuerySet
"""
models = autotest.load('frontend.afe.models')
return (
models.Job.objects
.filter(hostqueueentry__complete=False)
.extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
.distinct()
)
def _aborting_jobs_queryset():
"""Return a QuerySet of aborting Jobs.
@returns: Django QuerySet
"""
models = autotest.load('frontend.afe.models')
return (
models.Job.objects
.filter(hostqueueentry__aborted=True)
.filter(hostqueueentry__complete=False)
.distinct()
)
def _filter_leased(jobdir, dbjobs):
"""Filter Job models for leased jobs.
Yields pairs of Job model and Lease instances.
@param jobdir: job lease file directory
@param dbjobs: iterable of Django model Job instances
@returns: iterator of Leases
"""
our_jobs = {job.id: job for job in leasing.leases_iter(jobdir)}
for dbjob in dbjobs:
if dbjob.id in our_jobs:
yield dbjob, our_jobs[dbjob.id]
def _mark_failed(job_ids):
"""Mark jobs failed in database.
This also marks the corresponding JobHandoffs as completed.
"""
if not job_ids:
return
models = autotest.load('frontend.afe.models')
logger.info('Marking jobs failed: %r', job_ids)
hqes = models.HostQueueEntry.objects.filter(job_id__in=job_ids)
hqes.update(complete=True,
active=False,
status=models.HostQueueEntry.Status.FAILED)
(hqes.exclude(started_on=None)
.update(finished_on=datetime.datetime.now()))
(models.JobHandoff.objects
.filter(job_id__in=job_ids)
.update(completed=True))
if __name__ == '__main__':
main(sys.argv[1:])