agola/internal/services/runservice/runservice.go

274 lines
8.7 KiB
Go

// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package runservice
import (
"context"
"crypto/tls"
"net/http"
"path/filepath"
"time"
scommon "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/config"
"github.com/sorintlab/agola/internal/services/runservice/action"
"github.com/sorintlab/agola/internal/services/runservice/api"
"github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/readdb"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
ghandlers "github.com/gorilla/handlers"
"github.com/gorilla/mux"
etcdclientv3 "go.etcd.io/etcd/clientv3"
"go.uber.org/zap/zapcore"
)
// etcdPingerLoop periodically updates a key.
// This is used by watchers to inform the client of the current revision
// this is needed since if other users are updating other unwatched keys on
// etcd we won't be notified, not updating the known revisions
// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress???
func (s *Runservice) etcdPingerLoop(ctx context.Context) {
for {
if err := s.etcdPinger(ctx); err != nil {
log.Errorf("err: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(1 * time.Second)
}
}
func (s *Runservice) etcdPinger(ctx context.Context) error {
if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil {
return err
}
return nil
}
type Runservice struct {
c *config.Runservice
e *etcd.Store
ost *objectstorage.ObjStorage
dm *datamanager.DataManager
readDB *readdb.ReadDB
ah *action.ActionHandler
}
func NewRunservice(ctx context.Context, c *config.Runservice) (*Runservice, error) {
if c.Debug {
level.SetLevel(zapcore.DebugLevel)
}
ost, err := scommon.NewObjectStorage(&c.ObjectStorage)
if err != nil {
return nil, err
}
e, err := scommon.NewEtcd(&c.Etcd, logger, "runservice")
if err != nil {
return nil, err
}
s := &Runservice{
c: c,
e: e,
ost: ost,
}
dmConf := &datamanager.DataManagerConfig{
E: e,
OST: ost,
DataTypes: []string{
string(common.DataTypeRun),
string(common.DataTypeRunConfig),
string(common.DataTypeRunCounter),
},
}
dm, err := datamanager.NewDataManager(ctx, logger, dmConf)
if err != nil {
return nil, err
}
s.dm = dm
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm)
if err != nil {
return nil, err
}
s.readDB = readDB
ah := action.NewActionHandler(logger, e, readDB, ost, dm)
s.ah = ah
return s, nil
}
func (s *Runservice) InitEtcd(ctx context.Context) error {
// Create changegroup min revision if it doesn't exists
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0))
then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, ""))
txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...)
if _, err := txn.Commit(); err != nil {
return etcd.FromEtcdError(err)
}
return nil
}
func (s *Runservice) Run(ctx context.Context) error {
errCh := make(chan error)
dmReadyCh := make(chan struct{})
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
// wait for dm to be ready
<-dmReadyCh
for {
err := s.InitEtcd(ctx)
if err == nil {
break
}
log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
}
go s.readDB.Run(ctx)
ch := make(chan *types.ExecutorTask)
// noop coors handler
corsHandler := func(h http.Handler) http.Handler {
return h
}
corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"})
corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"})
corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"})
corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions)
// executor dedicated api, only calls from executor should happen on these handlers
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch)
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
executorTasksHandler := api.NewExecutorTasksHandler(s.e)
archivesHandler := api.NewArchivesHandler(logger, s.ost)
cacheHandler := api.NewCacheHandler(logger, s.ost)
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost)
// api from clients
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah)
logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm)
runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB)
runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah)
runsHandler := api.NewRunsHandler(logger, s.readDB)
runActionsHandler := api.NewRunActionsHandler(logger, s.ah)
runCreateHandler := api.NewRunCreateHandler(logger, s.ah)
runEventsHandler := api.NewRunEventsHandler(logger, s.e, s.ost, s.dm)
changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB)
router := mux.NewRouter()
apirouter := router.PathPrefix("/api/v1alpha").Subrouter()
// don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method
apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST")
apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE")
apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET")
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET")
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST")
apirouter.Handle("/executor/archives", archivesHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST")
apirouter.Handle("/logs", logsHandler).Methods("GET")
apirouter.Handle("/runs/events", runEventsHandler).Methods("GET")
apirouter.Handle("/runs/{runid}", runHandler).Methods("GET")
apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT")
apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT")
apirouter.Handle("/runs", runsHandler).Methods("GET")
apirouter.Handle("/runs", runCreateHandler).Methods("POST")
apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET")
mainrouter := mux.NewRouter()
mainrouter.PathPrefix("/").Handler(corsHandler(router))
// Return a bad request when it doesn't match any route
mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
go s.executorTasksCleanerLoop(ctx)
go s.runsSchedulerLoop(ctx)
go s.runTasksUpdaterLoop(ctx)
go s.fetcherLoop(ctx)
go s.finishedRunsArchiverLoop(ctx)
go s.compactChangeGroupsLoop(ctx)
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
go s.executorTaskUpdateHandler(ctx, ch)
go s.etcdPingerLoop(ctx)
var tlsConfig *tls.Config
if s.c.Web.TLS {
var err error
tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false)
if err != nil {
log.Errorf("err: %+v")
return err
}
}
httpServer := http.Server{
Addr: s.c.Web.ListenAddress,
Handler: mainrouter,
TLSConfig: tlsConfig,
}
lerrCh := make(chan error)
go func() {
lerrCh <- httpServer.ListenAndServe()
}()
select {
case <-ctx.Done():
log.Infof("runservice scheduler exiting")
httpServer.Close()
return nil
case err := <-lerrCh:
log.Errorf("http server listen error: %v", err)
return err
case err := <-errCh:
log.Errorf("error: %+v", err)
return err
}
}