blob: 3318c9898e8390d739a57f590e961b51334a47fe [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coordinator
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/proto/google"
logdog "go.chromium.org/luci/logdog/api/endpoints/coordinator/services/v1"
)
// ErrStreamArchived is returned by ArchivalParams' PublishTask if the supplied
// LogStream is already archived.
var ErrStreamArchived = errors.New("stream is already archived")
// ArchivalParams is the archival configuration.
type ArchivalParams struct {
// RequestID is the unique request ID to use as a random base for the
// archival key.
RequestID string
// PreviousKey (rescheduled tasks only) is the archive key of the previous archive task.
PreviousKey []byte
// SettleDelay is the amount of settle delay to attach to this request.
SettleDelay time.Duration
// CompletePeriod is the amount of time after the initial archival task is
// executed when the task should fail if the stream is incomplete. After this
// period has expired, the archival may complete successfully even if the
// stream is missing log entries.
CompletePeriod time.Duration
}
// PublishTask creates and dispatches a task queue task for the supplied
// LogStream. PublishTask is goroutine-safe.
//
// This should be run within a transaction on lst. On success, lst's state will
// be updated to reflect the archival tasking. This will NOT update lst's
// datastore entity; the caller must make sure to call Put within the same
// transaction for transactional safety.
//
// If the task is created successfully, this will return nil. If the LogStream
// already had a task dispatched, it will return ErrStreamArchived.
func (p *ArchivalParams) PublishTask(c context.Context, ap ArchivalPublisher, lst *LogStreamState) error {
if lst.ArchivalState().Archived() {
// An archival task has already been dispatched for this log stream.
return ErrStreamArchived
}
id := lst.ID()
if len(lst.ArchivalKey) > 0 {
// This is a rescheduled task. Check to see if the key matches.
if !bytes.Equal(p.PreviousKey, lst.ArchivalKey) {
logging.Warningf(c, "Key does not match, this is probably a duplicate request, discarding.")
return nil
}
lst.ArchiveRetryCount++
}
msg := logdog.ArchiveTask{
Project: string(Project(c)),
Id: string(id),
Key: p.createArchivalKey(id, ap.NewPublishIndex()),
DispatchedAt: google.NewTimestamp(clock.Now(c)),
}
if p.SettleDelay > 0 {
msg.SettleDelay = google.NewDuration(p.SettleDelay)
}
if p.CompletePeriod > 0 {
msg.CompletePeriod = google.NewDuration(p.CompletePeriod)
}
// Publish an archival request.
if err := ap.Publish(c, &msg); err != nil {
return err
}
// Update our LogStream's ArchiveState to reflect that an archival task has
// been dispatched.
lst.ArchivalKey = msg.Key
return nil
}
// createArchivalKey returns a unique archival request key.
//
// The uniqueness is ensured by folding several components into a hash:
// - The request ID, which is unique per HTTP request.
// - The stream path.
// - An atomically-incrementing key index, which is unique per ArchivalParams
// instance.
//
// The first two should be sufficient for a unique value, since a given request
// will only be handed to a single instance, and the atomic value is unique
// within the instance.
func (p *ArchivalParams) createArchivalKey(id HashID, pidx uint64) []byte {
hash := sha256.New()
if _, err := fmt.Fprintf(hash, "%s\x00%s\x00%d", p.RequestID, id, pidx); err != nil {
panic(err)
}
return hash.Sum(nil)
}