5db23410d0
Don't put datamanager base dirs inside the root of the ost but use a base path. Let's do it now before releasing since this is a breaking change that requires moving the ost data to the new path
270 lines
8.5 KiB
Go
270 lines
8.5 KiB
Go
// Copyright 2019 Sorint.lab
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package runservice
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"net/http"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
scommon "agola.io/agola/internal/common"
|
|
"agola.io/agola/internal/datamanager"
|
|
"agola.io/agola/internal/etcd"
|
|
"agola.io/agola/internal/objectstorage"
|
|
"agola.io/agola/internal/services/config"
|
|
"agola.io/agola/internal/services/runservice/action"
|
|
"agola.io/agola/internal/services/runservice/api"
|
|
"agola.io/agola/internal/services/runservice/common"
|
|
"agola.io/agola/internal/services/runservice/readdb"
|
|
"agola.io/agola/internal/services/runservice/types"
|
|
"agola.io/agola/internal/util"
|
|
|
|
ghandlers "github.com/gorilla/handlers"
|
|
"github.com/gorilla/mux"
|
|
etcdclientv3 "go.etcd.io/etcd/clientv3"
|
|
"go.uber.org/zap/zapcore"
|
|
)
|
|
|
|
// etcdPingerLoop periodically updates a key.
|
|
// This is used by watchers to inform the client of the current revision
|
|
// this is needed since if other users are updating other unwatched keys on
|
|
// etcd we won't be notified, not updating the known revisions
|
|
// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress???
|
|
func (s *Runservice) etcdPingerLoop(ctx context.Context) {
|
|
for {
|
|
if err := s.etcdPinger(ctx); err != nil {
|
|
log.Errorf("err: %+v", err)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
|
|
func (s *Runservice) etcdPinger(ctx context.Context) error {
|
|
if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type Runservice struct {
|
|
c *config.Runservice
|
|
e *etcd.Store
|
|
ost *objectstorage.ObjStorage
|
|
dm *datamanager.DataManager
|
|
readDB *readdb.ReadDB
|
|
ah *action.ActionHandler
|
|
}
|
|
|
|
func NewRunservice(ctx context.Context, c *config.Runservice) (*Runservice, error) {
|
|
if c.Debug {
|
|
level.SetLevel(zapcore.DebugLevel)
|
|
}
|
|
|
|
ost, err := scommon.NewObjectStorage(&c.ObjectStorage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
e, err := scommon.NewEtcd(&c.Etcd, logger, "runservice")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s := &Runservice{
|
|
c: c,
|
|
e: e,
|
|
ost: ost,
|
|
}
|
|
|
|
dmConf := &datamanager.DataManagerConfig{
|
|
BasePath: "rundata",
|
|
E: e,
|
|
OST: ost,
|
|
DataTypes: []string{
|
|
string(common.DataTypeRun),
|
|
string(common.DataTypeRunConfig),
|
|
string(common.DataTypeRunCounter),
|
|
},
|
|
}
|
|
dm, err := datamanager.NewDataManager(ctx, logger, dmConf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.dm = dm
|
|
|
|
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.readDB = readDB
|
|
|
|
ah := action.NewActionHandler(logger, e, readDB, ost, dm)
|
|
s.ah = ah
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Runservice) InitEtcd(ctx context.Context) error {
|
|
// Create changegroup min revision if it doesn't exists
|
|
cmp := []etcdclientv3.Cmp{}
|
|
then := []etcdclientv3.Op{}
|
|
|
|
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0))
|
|
then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, ""))
|
|
txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...)
|
|
if _, err := txn.Commit(); err != nil {
|
|
return etcd.FromEtcdError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Runservice) Run(ctx context.Context) error {
|
|
errCh := make(chan error)
|
|
dmReadyCh := make(chan struct{})
|
|
|
|
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
|
|
|
|
// wait for dm to be ready
|
|
<-dmReadyCh
|
|
|
|
for {
|
|
err := s.InitEtcd(ctx)
|
|
if err == nil {
|
|
break
|
|
}
|
|
log.Errorf("failed to initialize etcd: %+v", err)
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
go func() { errCh <- s.readDB.Run(ctx) }()
|
|
|
|
ch := make(chan *types.ExecutorTask)
|
|
|
|
corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"})
|
|
corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"})
|
|
corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"})
|
|
corsHandler := ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions)
|
|
|
|
// executor dedicated api, only calls from executor should happen on these handlers
|
|
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
|
|
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch)
|
|
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
|
|
executorTasksHandler := api.NewExecutorTasksHandler(s.e)
|
|
archivesHandler := api.NewArchivesHandler(logger, s.ost)
|
|
cacheHandler := api.NewCacheHandler(logger, s.ost)
|
|
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost)
|
|
|
|
// api from clients
|
|
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah)
|
|
|
|
logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm)
|
|
|
|
runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB)
|
|
runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah)
|
|
runsHandler := api.NewRunsHandler(logger, s.readDB)
|
|
runActionsHandler := api.NewRunActionsHandler(logger, s.ah)
|
|
runCreateHandler := api.NewRunCreateHandler(logger, s.ah)
|
|
runEventsHandler := api.NewRunEventsHandler(logger, s.e, s.ost, s.dm)
|
|
|
|
changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB)
|
|
|
|
router := mux.NewRouter()
|
|
apirouter := router.PathPrefix("/api/v1alpha").Subrouter()
|
|
|
|
// don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method
|
|
apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
|
|
|
|
apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST")
|
|
apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE")
|
|
apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET")
|
|
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET")
|
|
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST")
|
|
apirouter.Handle("/executor/archives", archivesHandler).Methods("GET")
|
|
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD")
|
|
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET")
|
|
apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST")
|
|
|
|
apirouter.Handle("/logs", logsHandler).Methods("GET")
|
|
|
|
apirouter.Handle("/runs/events", runEventsHandler).Methods("GET")
|
|
apirouter.Handle("/runs/{runid}", runHandler).Methods("GET")
|
|
apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT")
|
|
apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT")
|
|
apirouter.Handle("/runs", runsHandler).Methods("GET")
|
|
apirouter.Handle("/runs", runCreateHandler).Methods("POST")
|
|
|
|
apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET")
|
|
|
|
mainrouter := mux.NewRouter()
|
|
mainrouter.PathPrefix("/").Handler(corsHandler(router))
|
|
|
|
// Return a bad request when it doesn't match any route
|
|
mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
|
|
|
|
go s.executorTasksCleanerLoop(ctx)
|
|
go s.runsSchedulerLoop(ctx)
|
|
go s.runTasksUpdaterLoop(ctx)
|
|
go s.fetcherLoop(ctx)
|
|
go s.finishedRunsArchiverLoop(ctx)
|
|
go s.compactChangeGroupsLoop(ctx)
|
|
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
|
|
go s.executorTaskUpdateHandler(ctx, ch)
|
|
|
|
go s.etcdPingerLoop(ctx)
|
|
|
|
var tlsConfig *tls.Config
|
|
if s.c.Web.TLS {
|
|
var err error
|
|
tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false)
|
|
if err != nil {
|
|
log.Errorf("err: %+v")
|
|
return err
|
|
}
|
|
}
|
|
|
|
httpServer := http.Server{
|
|
Addr: s.c.Web.ListenAddress,
|
|
Handler: mainrouter,
|
|
TLSConfig: tlsConfig,
|
|
}
|
|
|
|
lerrCh := make(chan error)
|
|
go func() {
|
|
lerrCh <- httpServer.ListenAndServe()
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Infof("runservice scheduler exiting")
|
|
httpServer.Close()
|
|
return nil
|
|
case err := <-lerrCh:
|
|
log.Errorf("http server listen error: %v", err)
|
|
return err
|
|
case err := <-errCh:
|
|
log.Errorf("error: %+v", err)
|
|
return err
|
|
}
|
|
}
|