278 lines
7.0 KiB
Go
278 lines
7.0 KiB
Go
|
// Copyright 2019 Sorint.lab
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package command
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"time"
|
||
|
|
||
|
"github.com/sorintlab/agola/internal/etcd"
|
||
|
"github.com/sorintlab/agola/internal/objectstorage"
|
||
|
"github.com/sorintlab/agola/internal/runconfig"
|
||
|
"github.com/sorintlab/agola/internal/sequence"
|
||
|
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
|
||
|
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
|
||
|
"github.com/sorintlab/agola/internal/services/runservice/types"
|
||
|
"github.com/sorintlab/agola/internal/util"
|
||
|
"github.com/sorintlab/agola/internal/wal"
|
||
|
|
||
|
"github.com/pkg/errors"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
type CommandHandler struct {
|
||
|
log *zap.SugaredLogger
|
||
|
e *etcd.Store
|
||
|
lts *objectstorage.ObjStorage
|
||
|
wal *wal.WalManager
|
||
|
}
|
||
|
|
||
|
func NewCommandHandler(logger *zap.Logger, e *etcd.Store, lts *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler {
|
||
|
return &CommandHandler{
|
||
|
log: logger.Sugar(),
|
||
|
e: e,
|
||
|
lts: lts,
|
||
|
wal: wal,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type RunChangePhaseRequest struct {
|
||
|
RunID string
|
||
|
Phase types.RunPhase
|
||
|
ChangeGroupsUpdateToken string
|
||
|
}
|
||
|
|
||
|
func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhaseRequest) error {
|
||
|
cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
r, _, err := store.GetRun(ctx, s.e, req.RunID)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
switch req.Phase {
|
||
|
case types.RunPhaseRunning:
|
||
|
if r.Phase != types.RunPhaseQueued {
|
||
|
return errors.Errorf("run %s is not queued but in %q phase", r.ID, r.Phase)
|
||
|
}
|
||
|
r.ChangePhase(types.RunPhaseRunning)
|
||
|
}
|
||
|
|
||
|
_, err = store.AtomicPutRun(ctx, s.e, r, "", cgt)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
type RunCreateRequest struct {
|
||
|
RunConfig *types.RunConfig
|
||
|
Group string
|
||
|
Environment map[string]string
|
||
|
Annotations map[string]string
|
||
|
ChangeGroupsUpdateToken string
|
||
|
}
|
||
|
|
||
|
func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) error {
|
||
|
runcgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rc := req.RunConfig
|
||
|
|
||
|
// generate a new run sequence that will be the same for the run, runconfig and rundata
|
||
|
seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
id := seq.String()
|
||
|
|
||
|
// TODO(sgotti) validate run config
|
||
|
if err := runconfig.CheckRunConfig(rc); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// set the run config ID
|
||
|
rc.ID = id
|
||
|
|
||
|
// generate tasks levels
|
||
|
if err := runconfig.GenTasksLevels(rc); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rd := &types.RunData{
|
||
|
ID: id,
|
||
|
Group: req.Group,
|
||
|
Environment: req.Environment,
|
||
|
Annotations: req.Annotations,
|
||
|
}
|
||
|
|
||
|
run, err := s.genRun(ctx, rc, rd)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
s.log.Debugf("created run: %s", util.Dump(run))
|
||
|
|
||
|
c, cgt, err := store.LTSGetRunCounter(s.wal, run.Group)
|
||
|
s.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt))
|
||
|
if err != nil && err != objectstorage.ErrNotExist {
|
||
|
return err
|
||
|
}
|
||
|
c++
|
||
|
run.Counter = c
|
||
|
|
||
|
actions := []*wal.Action{}
|
||
|
|
||
|
// persist group counter
|
||
|
rca, err := store.LTSUpdateRunCounterAction(ctx, c, run.Group)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
actions = append(actions, rca)
|
||
|
|
||
|
// persist run config
|
||
|
rca, err = store.LTSSaveRunConfigAction(rc)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
actions = append(actions, rca)
|
||
|
|
||
|
// persist run data
|
||
|
rda, err := store.LTSSaveRunDataAction(rd)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
actions = append(actions, rda)
|
||
|
|
||
|
if _, err = s.wal.WriteWal(ctx, actions, cgt); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if _, err := store.AtomicPutRun(ctx, s.e, run, common.RunEventTypeQueued, runcgt); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *CommandHandler) genRunTask(ctx context.Context, rct *types.RunConfigTask) *types.RunTask {
|
||
|
rt := &types.RunTask{
|
||
|
ID: rct.ID,
|
||
|
Status: types.RunTaskStatusNotStarted,
|
||
|
Steps: make([]*types.RunTaskStep, len(rct.Steps)),
|
||
|
WorkspaceArchives: []int{},
|
||
|
}
|
||
|
for i := range rt.Steps {
|
||
|
s := &types.RunTaskStep{
|
||
|
Phase: types.ExecutorTaskPhaseNotStarted,
|
||
|
LogPhase: types.RunTaskFetchPhaseNotStarted,
|
||
|
}
|
||
|
rt.Steps[i] = s
|
||
|
}
|
||
|
for i, ps := range rct.Steps {
|
||
|
switch ps.(type) {
|
||
|
case *types.SaveToWorkspaceStep:
|
||
|
rt.WorkspaceArchives = append(rt.WorkspaceArchives, i)
|
||
|
}
|
||
|
}
|
||
|
rt.WorkspaceArchivesPhase = make([]types.RunTaskFetchPhase, len(rt.WorkspaceArchives))
|
||
|
for i := range rt.WorkspaceArchivesPhase {
|
||
|
rt.WorkspaceArchivesPhase[i] = types.RunTaskFetchPhaseNotStarted
|
||
|
}
|
||
|
|
||
|
return rt
|
||
|
}
|
||
|
|
||
|
func (s *CommandHandler) genRun(ctx context.Context, rc *types.RunConfig, rd *types.RunData) (*types.Run, error) {
|
||
|
r := &types.Run{
|
||
|
ID: rc.ID,
|
||
|
Name: rc.Name,
|
||
|
Group: rd.Group,
|
||
|
Annotations: rd.Annotations,
|
||
|
Phase: types.RunPhaseQueued,
|
||
|
Result: types.RunResultUnknown,
|
||
|
RunTasks: make(map[string]*types.RunTask),
|
||
|
EnqueueTime: util.TimePtr(time.Now()),
|
||
|
}
|
||
|
|
||
|
for _, rct := range rc.Tasks {
|
||
|
rt := s.genRunTask(ctx, rct)
|
||
|
r.RunTasks[rt.ID] = rt
|
||
|
}
|
||
|
|
||
|
return r, nil
|
||
|
}
|
||
|
|
||
|
type RunTaskApproveRequest struct {
|
||
|
RunID string
|
||
|
TaskID string
|
||
|
ApprovalAnnotations map[string]string
|
||
|
ChangeGroupsUpdateToken string
|
||
|
}
|
||
|
|
||
|
func (s *CommandHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveRequest) error {
|
||
|
cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
r, _, err := store.GetRun(ctx, s.e, req.RunID)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
task, ok := r.RunTasks[req.TaskID]
|
||
|
if !ok {
|
||
|
return errors.Errorf("run %q doesn't have task %q", r.ID, req.TaskID)
|
||
|
}
|
||
|
|
||
|
if !task.WaitingApproval {
|
||
|
return errors.Errorf("run %q, task %q is not in waiting approval state", r.ID, req.TaskID)
|
||
|
}
|
||
|
|
||
|
if !task.Approved {
|
||
|
return errors.Errorf("run %q, task %q is already approved", r.ID, req.TaskID)
|
||
|
}
|
||
|
|
||
|
task.Approved = true
|
||
|
task.ApprovalAnnotations = req.ApprovalAnnotations
|
||
|
|
||
|
_, err = store.AtomicPutRun(ctx, s.e, r, "", cgt)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string) error {
|
||
|
// mark all executor tasks as failed
|
||
|
ets, err := store.GetExecutorTasks(ctx, s.e, executorID)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, et := range ets {
|
||
|
et.Status.Phase = types.ExecutorTaskPhaseFailed
|
||
|
et.FailError = "executor deleted"
|
||
|
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// delete the executor
|
||
|
if err := store.DeleteExecutor(ctx, s.e, executorID); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|